From 864a2377b4eea58df6b0ccd07c4bcba080ecc724 Mon Sep 17 00:00:00 2001 From: Jack Kamm Date: Sun, 28 Feb 2021 13:17:33 -0800 Subject: [PATCH] ob-comint.el, ob-python.el: Async session evaluation Adds functionality to ob-comint.el to implement async session eval on a per-language basis. Adds a reference implementation for ob-python. * lisp/ob-comint.el (org-babel-comint-with-output): Remove comment. (org-babel-comint-async-indicator, org-babel-comint-async-buffers, org-babel-comint-async-file-callback, org-babel-comint-async-chunk-callback, org-babel-comint-async-dangling): Add buffer-local variables used for async comint evaluation. (org-babel-comint-use-async): Add function to determine whether block should be evaluated asynchronously. (org-babel-comint-async-filter): Add filter function to attach to comint-output-filter-functions for babel async eval. (org-babel-comint-async-register): Add function to setup buffer variables and hooks for session eval. (org-babel-comint-async-delete-dangling-and-eval): Add helper function for async session eval. * lisp/ob-python.el (org-babel-execute:python): Check for async header argument. (org-babel-python-evaluate): Check whether to use async evaluation. (org-babel-python-async-indicator): Add constant for indicating the start/end of async evaluations. (org-babel-python-async-evaluate-session): Add function for Python async eval. * testing/lisp/test-ob-python.el (test-ob-python/async-simple-session-output): Unit test for Python async session eval. (test-ob-python/async-named-output): Unit test that Python async eval can replace named output. (test-ob-python/async-output-drawer): Unit test that Python async eval works with drawer results. --- etc/ORG-NEWS | 15 +++ lisp/ob-comint.el | 173 +++++++++++++++++++++++++++++++-- lisp/ob-python.el | 56 ++++++++++- testing/lisp/test-ob-python.el | 61 ++++++++++++ 4 files changed, 295 insertions(+), 10 deletions(-) diff --git a/etc/ORG-NEWS b/etc/ORG-NEWS index f95a568a6..eff75605c 100644 --- a/etc/ORG-NEWS +++ b/etc/ORG-NEWS @@ -169,6 +169,21 @@ tags including from both buffer local and user defined persistent global list (~org-tag-alist~ and ~org-tag-persistent-alist~). Now option ~org-complete-tags-always-offer-all-agenda-tags~ is honored. +*** Async session evaluation + +The =:async= header argument can be used for asynchronous evaluation +in session blocks for certain languages. + +Currently, async evaluation is supported in Python. There is also +functionality to implement async evaluation in other languages that +use comint, but this needs to be done on a per-language basis. + +By default, async evaluation is disabled unless the =:async= header +argument is present. You can also set =:async no= to force it off +(for example if you've set =:async= in a property drawer). + +Async evaluation is disabled during export. + ** Miscellaneous *** =org-goto-first-child= now works before first heading diff --git a/lisp/ob-comint.el b/lisp/ob-comint.el index b14849df6..d81ff3edd 100644 --- a/lisp/ob-comint.el +++ b/lisp/ob-comint.el @@ -93,12 +93,7 @@ (defmacro org-babel-comint-with-output (meta &rest body) (regexp-quote ,eoe-indicator) nil t) (re-search-forward comint-prompt-regexp nil t))))) - (accept-process-output (get-buffer-process (current-buffer))) - ;; thought the following this would allow async - ;; background running, but I was wrong... - ;; (run-with-timer .5 .5 'accept-process-output - ;; (get-buffer-process (current-buffer))) - ) + (accept-process-output (get-buffer-process (current-buffer)))) ;; replace cut dangling text (goto-char (process-mark (get-buffer-process (current-buffer)))) (insert dangling-text) @@ -147,6 +142,172 @@ (defun org-babel-comint-eval-invisibly-and-wait-for-file (if (= (aref string (1- (length string))) ?\n) string (concat string "\n"))) (while (not (file-exists-p file)) (sit-for (or period 0.25)))) + +;; Async evaluation + +(defvar-local org-babel-comint-async-indicator nil + "Regular expression that `org-babel-comint-async-filter' scans for. +It should have 2 parenthesized expressions, +e.g. \"org_babel_async_\\(start\\|end\\|file\\)_\\(.*\\)\". The +first parenthesized expression determines whether the token is +delimiting a result block, or whether the result is in a file. If +delimiting a block, the second expression gives a UUID for the +location to insert the result. Otherwise, the result is in a tmp +file, and the second expression gives the file name.") + +(defvar-local org-babel-comint-async-buffers nil + "List of org-mode buffers to check for Babel async output results.") + +(defvar-local org-babel-comint-async-file-callback nil + "Callback to clean and insert Babel async results from a temp file. +The callback function takes two arguments: the alist of params of the Babel +source block, and the name of the temp file.") + +(defvar-local org-babel-comint-async-chunk-callback nil + "Callback function to clean Babel async output results before insertion. +Its single argument is a string consisting of output from the +comint process. It should return a string that will be be passed +to `org-babel-insert-result'.") + +(defvar-local org-babel-comint-async-dangling nil + "Dangling piece of the last process output, in case +`org-babel-comint-async-indicator' is spread across multiple +comint outputs due to buffering.") + +(defun org-babel-comint-use-async (params) + "Determine whether to use session async evaluation. +PARAMS are the header arguments as passed to +`org-babel-execute:lang'." + (let ((async (assq :async params)) + (session (assq :session params))) + (and async + (not org-babel-exp-reference-buffer) + (not (equal (cdr async) "no")) + (not (equal (cdr session) "none"))))) + +(defun org-babel-comint-async-filter (string) + "Captures Babel async output from comint buffer back to org-mode buffers. +This function is added as a hook to `comint-output-filter-functions'. +STRING contains the output originally inserted into the comint buffer." + ;; Remove outdated org-mode buffers + (setq org-babel-comint-async-buffers + (cl-loop for buf in org-babel-comint-async-buffers + if (buffer-live-p buf) + collect buf)) + (let* ((indicator org-babel-comint-async-indicator) + (org-buffers org-babel-comint-async-buffers) + (file-callback org-babel-comint-async-file-callback) + (combined-string (concat org-babel-comint-async-dangling string)) + (new-dangling combined-string) + ;; list of UUID's matched by `org-babel-comint-async-indicator' + uuid-list) + (with-temp-buffer + (insert combined-string) + (goto-char (point-min)) + (while (re-search-forward indicator nil t) + ;; update dangling + (setq new-dangling (buffer-substring (point) (point-max))) + (cond ((equal (match-string 1) "end") + ;; save UUID for insertion later + (push (match-string 2) uuid-list)) + ((equal (match-string 1) "file") + ;; insert results from tmp-file + (let ((tmp-file (match-string 2))) + (cl-loop for buf in org-buffers + until + (with-current-buffer buf + (save-excursion + (goto-char (point-min)) + (when (search-forward tmp-file nil t) + (org-babel-previous-src-block) + (let* ((info (org-babel-get-src-block-info)) + (params (nth 2 info)) + (result-params + (cdr (assq :result-params params)))) + (org-babel-insert-result + (funcall file-callback + (nth + 2 (org-babel-get-src-block-info)) + tmp-file) + result-params info)) + t)))))))) + ;; Truncate dangling to only the most recent output + (when (> (length new-dangling) (length string)) + (setq new-dangling string))) + (setq-local org-babel-comint-async-dangling new-dangling) + (when uuid-list + ;; Search for results in the comint buffer + (save-excursion + (goto-char (point-max)) + (while uuid-list + (re-search-backward indicator) + (when (equal (match-string 1) "end") + (let* ((uuid (match-string-no-properties 2)) + (res-str-raw + (buffer-substring + ;; move point to beginning of indicator + (- (match-beginning 0) 1) + ;; find the matching start indicator + (cl-loop + do (re-search-backward indicator) + until (and (equal (match-string 1) "start") + (equal (match-string 2) uuid)) + finally return (+ 1 (match-end 0))))) + ;; Apply callback to clean up the result + (res-str (funcall org-babel-comint-async-chunk-callback + res-str-raw))) + ;; Search for uuid in associated org-buffers to insert results + (cl-loop for buf in org-buffers + until (with-current-buffer buf + (save-excursion + (goto-char (point-min)) + (when (search-forward uuid nil t) + (org-babel-previous-src-block) + (let* ((info (org-babel-get-src-block-info)) + (params (nth 2 info)) + (result-params + (cdr (assq :result-params params)))) + (org-babel-insert-result + res-str result-params info)) + t)))) + ;; Remove uuid from the list to search for + (setq uuid-list (delete uuid uuid-list))))))))) + +(defun org-babel-comint-async-register + (session-buffer org-buffer indicator-regexp + chunk-callback file-callback) + "Sets local org-babel-comint-async variables in SESSION-BUFFER. +ORG-BUFFER is added to `org-babel-comint-async-buffers' if not +present. `org-babel-comint-async-indicator', +`org-babel-comint-async-chunk-callback', and +`org-babel-comint-async-file-callback' are set to +INDICATOR-REGEXP, CHUNK-CALLBACK, and FILE-CALLBACK +respectively." + (org-babel-comint-in-buffer session-buffer + (setq org-babel-comint-async-indicator indicator-regexp + org-babel-comint-async-chunk-callback chunk-callback + org-babel-comint-async-file-callback file-callback) + (unless (memq org-buffer org-babel-comint-async-buffers) + (setq org-babel-comint-async-buffers + (cons org-buffer org-babel-comint-async-buffers))) + (add-hook 'comint-output-filter-functions + 'org-babel-comint-async-filter nil t))) + +(defmacro org-babel-comint-async-delete-dangling-and-eval + (session-buffer &rest body) + "Remove dangling text in SESSION-BUFFER and evaluate BODY. +This is analogous to `org-babel-comint-with-output', but meant +for asynchronous output, and much shorter because inserting the +result is delegated to `org-babel-comint-async-filter'." + (declare (indent 1)) + `(org-babel-comint-in-buffer ,session-buffer + (goto-char (process-mark (get-buffer-process (current-buffer)))) + (delete-region (point) (point-max)) + ,@body)) +(def-edebug-spec org-babel-comint-async-with-output (sexp body)) + (provide 'ob-comint) + + ;;; ob-comint.el ends here diff --git a/lisp/ob-python.el b/lisp/ob-python.el index 9f2386392..45c33a453 100644 --- a/lisp/ob-python.el +++ b/lisp/ob-python.el @@ -84,6 +84,7 @@ (defun org-babel-execute:python (body params) (return-val (when (eq result-type 'value) (cdr (assq :return params)))) (preamble (cdr (assq :preamble params))) + (async (org-babel-comint-use-async params)) (full-body (concat (org-babel-expand-body:generic @@ -92,7 +93,8 @@ (defun org-babel-execute:python (body params) (when return-val (format (if session "\n%s" "\nreturn %s") return-val)))) (result (org-babel-python-evaluate - session full-body result-type result-params preamble))) + session full-body result-type + result-params preamble async))) (org-babel-reassemble-table result (org-babel-pick-name (cdr (assq :colname-names params)) @@ -278,11 +280,14 @@ (defun org-babel-python-format-session-value (if (member "pp" result-params) "True" "False"))) (defun org-babel-python-evaluate - (session body &optional result-type result-params preamble) + (session body &optional result-type result-params preamble async) "Evaluate BODY as Python code." (if session - (org-babel-python-evaluate-session - session body result-type result-params) + (if async + (org-babel-python-async-evaluate-session + session body result-type result-params) + (org-babel-python-evaluate-session + session body result-type result-params)) (org-babel-python-evaluate-external-process body result-type result-params preamble))) @@ -391,6 +396,49 @@ (defun org-babel-python-read-string (string) (substring string 1 -1) string)) +;; Async session eval + +(defconst org-babel-python-async-indicator "print ('ob_comint_async_python_%s_%s')") + +(defun org-babel-python-async-value-callback (params tmp-file) + (let ((result-params (cdr (assq :result-params params))) + (results (org-babel-eval-read-file tmp-file))) + (org-babel-result-cond result-params + results + (org-babel-python-table-or-string results)))) + +(defun org-babel-python-async-evaluate-session + (session body &optional result-type result-params) + "Asynchronously evaluate BODY in SESSION. +Returns a placeholder string for insertion, to later be replaced +by `org-babel-comint-async-filter'." + (org-babel-comint-async-register + session (current-buffer) + "ob_comint_async_python_\\(.+\\)_\\(.+\\)" + 'org-babel-chomp 'org-babel-python-async-value-callback) + (let ((python-shell-buffer-name (org-babel-python-without-earmuffs session))) + (pcase result-type + (`output + (let ((uuid (md5 (number-to-string (random 100000000))))) + (with-temp-buffer + (insert (format org-babel-python-async-indicator "start" uuid)) + (insert "\n") + (insert body) + (insert "\n") + (insert (format org-babel-python-async-indicator "end" uuid)) + (python-shell-send-buffer)) + uuid)) + (`value + (let ((tmp-results-file (org-babel-temp-file "python-")) + (tmp-src-file (org-babel-temp-file "python-"))) + (with-temp-file tmp-src-file (insert body)) + (with-temp-buffer + (insert (org-babel-python-format-session-value tmp-src-file tmp-results-file result-params)) + (insert "\n") + (insert (format org-babel-python-async-indicator "file" tmp-results-file)) + (python-shell-send-buffer)) + tmp-results-file))))) + (provide 'ob-python) ;;; ob-python.el ends here diff --git a/testing/lisp/test-ob-python.el b/testing/lisp/test-ob-python.el index a2cc7b79c..0267678cd 100644 --- a/testing/lisp/test-ob-python.el +++ b/testing/lisp/test-ob-python.el @@ -207,6 +207,67 @@ (ert-deftest test-ob-python/session-value-sleep () #+end_src" (org-babel-execute-src-block))))) +(ert-deftest test-ob-python/async-simple-session-output () + (let ((org-babel-temporary-directory "/tmp") + (org-confirm-babel-evaluate nil)) + (org-test-with-temp-text + "#+begin_src python :session :async yes :results output +import time +time.sleep(.1) +print('Yep!') +#+end_src\n" + (should (let ((expected "Yep!")) + (and (not (string= expected (org-babel-execute-src-block))) + (string= expected + (progn + (sleep-for 0 200) + (goto-char (org-babel-where-is-src-block-result)) + (org-babel-read-result))))))))) + +(ert-deftest test-ob-python/async-named-output () + (let (org-confirm-babel-evaluate + (org-babel-temporary-directory "/tmp") + (src-block "#+begin_src python :async :session :results output +print(\"Yep!\") +#+end_src") + (results-before " + +#+NAME: foobar +#+RESULTS: +: Nope!") + (results-after " + +#+NAME: foobar +#+RESULTS: +: Yep! +")) + (org-test-with-temp-text + (concat src-block results-before) + (should (progn (org-babel-execute-src-block) + (sleep-for 0 200) + (string= (concat src-block results-after) + (buffer-string))))))) + +(ert-deftest test-ob-python/async-output-drawer () + (let (org-confirm-babel-evaluate + (org-babel-temporary-directory "/tmp") + (src-block "#+begin_src python :async :session :results output drawer +print(list(range(3))) +#+end_src") + (result " + +#+RESULTS: +:results: +[0, 1, 2] +:end: +")) + (org-test-with-temp-text + src-block + (should (progn (org-babel-execute-src-block) + (sleep-for 0 200) + (string= (concat src-block result) + (buffer-string))))))) + (provide 'test-ob-python) ;;; test-ob-python.el ends here -- 2.30.1