From: Jack Kamm <jackkamm@gmail.com>
To: emacs-orgmode@gnu.org
Subject: Asynchronous session evaluation
Date: Sat, 01 Jun 2019 17:54:41 -0700	[thread overview]
Message-ID: <87muj04xim.fsf@jaheira.i-did-not-set--mail-host-address--so-tickle-me> (raw)

For some time I've been wishing for asynchronous Babel session
evaluation. So I've created an experimental branch implementing this. I
have an initial version working for R, so I thought it'd be a good time
to seek feedback and gauge interest.

To test the attached patch, add ":async yes" to an R session block
with a long computation (or "System.sleep") in it. Upon evaluation,
your Emacs won't freeze to wait for the result -- instead, a
placeholder will be inserted, and replaced with the true result when
it's ready.

I'll note how this is different from some related projects. ob-async
implements asynchronous evaluation for Babel, but it doesn't work with
sessions. emacs-jupyter, ein, and ob-ipython all implement asynchronous
session evaluation, but only for Jupyter kernels. Jupyter is great for
some cases, but sometimes I'd prefer not to use it. For example, the
native R console has great Emacs support via ESS, whereas the Jupyter R
console doesn't work with ESS and is not widely used in the R community.

Note that if you use ob-async, make sure to add "R" to
`ob-async-no-async-languages-alist' before testing this.

The new functionality is mainly implemented in
`org-babel-comint-async-filter', which I've defined in ob-comint.el,
and added as a hook to `comint-output-filter-functions'.  Whenever new
output is added to the comint buffer, the filter scans for an
indicator token (this is inspired by
`org-babel-comint-with-output'). Upon encountering the token, the
filter uses a regular expression to extract a UUID or temp-file
associated with the result, then searches for the appropriate location
to add the result to.

I've tried to make behavior as similar as possible to existing
ob-comint behavior, so that some of the existing code for interacting
with ob-comint can be refactored and reused. Still, it will be a large
task to add this feature for all languages. So far, I've only done R,
but my thought is to implement a few more languages before nailing
down the functionality. But, I hope something like this could be
merged in supporting just a subset of languages initially, then
gradually increasing the number of supported languages over time.

From f1c198a85666507164e9a97a7e0758f1d5dcf126 Mon Sep 17 00:00:00 2001
From: Jack Kamm <jackkamm@gmail.com>
Date: Sat, 1 Jun 2019 12:26:13 -0700
Subject: [PATCH] Add asynchronous Babel comint & R eval

 lisp/ob-R.el              | 174 +++++++++++++++++++++++++++++---------
 lisp/ob-comint.el         | 152 +++++++++++++++++++++++++++++++--
 testing/lisp/test-ob-R.el |  24 ++++++
 3 files changed, 305 insertions(+), 45 deletions(-)

diff --git a/lisp/ob-R.el b/lisp/ob-R.el
index 9e738a8a5..74336d083 100644
--- a/lisp/ob-R.el
+++ b/lisp/ob-R.el
@@ -161,8 +161,8 @@ This function is called by `org-babel-execute-src-block'."
 		     (cdr (assq :session params)) params))
 	   (graphics-file (and (member "graphics" (assq :result-params params))
 			       (org-babel-graphical-output-file params)))
-	   (colnames-p (unless graphics-file (cdr (assq :colnames params))))
 	   (rownames-p (unless graphics-file (cdr (assq :rownames params))))
+	   (async (cdr (assq :async params)))
 	    (let ((inside
 		   (list (org-babel-expand-body:R body params graphics-file))))
@@ -178,12 +178,11 @@ This function is called by `org-babel-execute-src-block'."
 	     session full-body result-type result-params
-	     (or (equal "yes" colnames-p)
-		 (org-babel-pick-name
-		  (cdr (assq :colname-names params)) colnames-p))
+	     (org-babel-R-get-colnames-p params)
 	     (or (equal "yes" rownames-p)
-		  (cdr (assq :rowname-names params)) rownames-p)))))
+		  (cdr (assq :rowname-names params)) rownames-p))
+	     (equal "yes" async))))
       (if graphics-file nil result))))
 (defun org-babel-prep-session:R (session params)
@@ -369,11 +368,15 @@ Has four %s escapes to be filled in:
 4. The name of the file to write to")
 (defun org-babel-R-evaluate
-  (session body result-type result-params column-names-p row-names-p)
+    (session body result-type result-params
+	     column-names-p row-names-p async-p)
   "Evaluate R code in BODY."
   (if session
-      (org-babel-R-evaluate-session
-       session body result-type result-params column-names-p row-names-p)
+      (if async-p
+	  (org-babel-R-evaluate-session-async
+	   session body result-type column-names-p row-names-p)
+	(org-babel-R-evaluate-session
+	 session body result-type result-params column-names-p row-names-p))
      body result-type result-params column-names-p row-names-p)))
@@ -395,11 +398,7 @@ last statement in BODY, as elisp."
 			       (format "{function ()\n{\n%s\n}}()" body)
 			       (org-babel-process-file-name tmp-file 'noquote)))
-	(org-babel-result-cond result-params
-	  (with-temp-buffer
-	    (insert-file-contents tmp-file)
-	    (org-babel-chomp (buffer-string) "\n"))
-	  (org-babel-import-elisp-from-file tmp-file '(16)))
+	(org-babel-R-value-from-tmp-file result-params tmp-file)
     (output (org-babel-eval org-babel-R-command body))))
@@ -422,38 +421,22 @@ last statement in BODY, as elisp."
      (let ((tmp-file (org-babel-temp-file "R-")))
 	session tmp-file
-	(format org-babel-R-write-object-command
-		(if row-names-p "TRUE" "FALSE")
-		(if column-names-p
-		    (if row-names-p "NA" "TRUE")
-		  "FALSE")
-		".Last.value" (org-babel-process-file-name tmp-file 'noquote)))
+	(org-babel-R-write-last-value-command row-names-p
+					      column-names-p
+					      tmp-file))
-	(org-babel-result-cond result-params
-	  (with-temp-buffer
-	    (insert-file-contents tmp-file)
-	    (org-babel-chomp (buffer-string) "\n"))
-	  (org-babel-import-elisp-from-file tmp-file '(16)))
+	(org-babel-R-value-from-tmp-file result-params tmp-file)
-       (delq nil
-	     (mapcar
-	      (lambda (line) (when (> (length line) 0) line))
-	      (mapcar
-	       (lambda (line) ;; cleanup extra prompts left in output
-		 (if (string-match
-		      "^\\([>+.]\\([ ][>.+]\\)*[ ]\\)"
-		      (car (split-string line "\n")))
-		     (substring line (match-end 1))
-		   line))
-	       (org-babel-comint-with-output (session org-babel-R-eoe-output)
-		 (insert (mapconcat 'org-babel-chomp
-				    (list body org-babel-R-eoe-indicator)
-				    "\n"))
-		 (inferior-ess-send-input)))))) "\n"))))
+       (org-babel-R-clean-session-output
+	(org-babel-comint-with-output (session org-babel-R-eoe-output)
+	  (insert (mapconcat 'org-babel-chomp
+			     (list body org-babel-R-eoe-indicator)
+			     "\n"))
+	  (inferior-ess-send-input)))) "\n"))))
 (defun org-babel-R-process-value-result (result column-names-p)
   "R-specific processing of return value.
@@ -462,6 +445,119 @@ Insert hline if column names in output have been requested."
       (cons (car result) (cons 'hline (cdr result)))
+(defun org-babel-R-value-from-tmp-file (result-params tmp-file)
+  "Insert result from TMP-FILE with RESULT-PARAMS."
+  (org-babel-result-cond result-params
+	  (with-temp-buffer
+	    (insert-file-contents tmp-file)
+	    (org-babel-chomp (buffer-string) "\n"))
+	  (org-babel-import-elisp-from-file tmp-file '(16))))
+(defun org-babel-R-clean-session-output (output)
+  "Remove extra prompts and empty lines from OUTPUT."
+  (delq nil
+	(mapcar
+	 (lambda (line) (when (> (length line) 0) line))
+	 (mapcar
+	  (lambda (line) ;; cleanup extra prompts left in output
+	    (if (string-match
+		 "^\\([>+.]\\([ ][>.+]\\)*[ ]\\)"
+		 (car (split-string line "\n")))
+		(substring line (match-end 1))
+	      line))
+	  output))))
+(defun org-babel-R-write-last-value-command (row-names-p column-names-p tmp-file)
+  "Generate R command to output last value to TMP-FILE."
+  (format org-babel-R-write-object-command
+	  (if row-names-p "TRUE" "FALSE")
+	  (if column-names-p
+	      (if row-names-p "NA" "TRUE")
+	    "FALSE")
+	  ".Last.value" (org-babel-process-file-name tmp-file 'noquote)))
+(defun org-babel-R-get-colnames-p (params)
+  "Determine whether to use column names from PARAMS of R Babel block."
+  (let* ((graphics-file (and (member "graphics" (assq :result-params params))
+			     (org-babel-graphical-output-file params)))
+	 (colnames-p (unless graphics-file (cdr (assq :colnames params)))))
+    (or (equal "yes" colnames-p)
+	(org-babel-pick-name
+	 (cdr (assq :colname-names params)) colnames-p))))
+;; Async evaluation
+(defconst org-babel-R-async-indicator "'org_babel_R_async_%s_%s'")
+(defconst org-babel-R-async-indicator-output
+  "^\\[1\\] \"org_babel_R_async_\\(.+\\)_\\(.+\\)\"$")
+(defun org-babel-R-evaluate-session-async
+    (session body result-type column-names-p row-names-p)
+  "Asynchronously evaluate BODY in SESSION.
+Returns a placeholder string for insertion, to later be replaced
+by `org-babel-comint-async-filter'."
+  (org-babel-comint-async-register session (current-buffer)
+				   org-babel-R-async-indicator-output
+				   'org-babel-R-async-output-callback
+				   'org-babel-R-async-value-callback)
+  (cl-case result-type
+    (value
+     (let ((tmp-file (org-babel-temp-file "R-")))
+       (with-temp-buffer
+       (insert
+	(org-babel-chomp body))
+       (let ((ess-local-process-name
+	      (process-name (get-buffer-process session)))
+	     (ess-eval-visibly-p nil))
+	 (ess-eval-buffer nil)))
+       (with-temp-buffer
+	 (insert
+	  (mapconcat 'org-babel-chomp
+		     (list (org-babel-R-write-last-value-command row-names-p
+								 column-names-p
+								 tmp-file)
+			   (format org-babel-R-async-indicator
+				   "file" tmp-file))
+		     "\n"))
+	 (let ((ess-local-process-name
+		(process-name (get-buffer-process session)))
+	       (ess-eval-visibly-p nil))
+	   (ess-eval-buffer nil)))
+       tmp-file))
+    (output
+     (let ((uuid (md5 (number-to-string (random 100000000)))))
+       (org-babel-comint-delete-dangling-and-eval
+	   session
+	 (insert (mapconcat 'org-babel-chomp
+			    (list (format org-babel-R-async-indicator
+					  "start" uuid)
+				  body
+				  (format org-babel-R-async-indicator
+					  "end" uuid))
+			    "\n"))
+	 (inferior-ess-send-input))
+       uuid))))
+(defun org-babel-R-async-output-callback (output)
+  "Callback for async output results.
+Assigned locally to `org-babel-comint-async-chunk-callback' in R
+comint buffers used for asynchronous Babel evaluation."
+  (mapconcat
+   'org-babel-chomp
+   (cdr (butlast (mapcar (lambda (line) (string-remove-prefix "\n" line))
+			 (org-babel-R-clean-session-output output))))
+   "\n"))
+(defun org-babel-R-async-value-callback (params tmp-file)
+  "Callback for async value results.
+Assigned locally to `org-babel-comint-async-file-callback' in R
+comint buffers used for asynchronous Babel evaluation."
+  (org-babel-R-process-value-result
+   (org-babel-R-value-from-tmp-file
+    (assq :result-params params) tmp-file)
+   ;; TODO this is not exactly the same as colnames-p above...
+   (org-babel-R-get-colnames-p params)))
 (provide 'ob-R)
diff --git a/lisp/ob-comint.el b/lisp/ob-comint.el
index aa0d341da..2131f54dd 100644
--- a/lisp/ob-comint.el
+++ b/lisp/ob-comint.el
@@ -94,12 +94,7 @@ or user `keyboard-quit' during execution of body."
 			       (regexp-quote ,eoe-indicator) nil t)
 			       comint-prompt-regexp nil t)))))
-	   (accept-process-output (get-buffer-process (current-buffer)))
-	   ;; thought the following this would allow async
-	   ;; background running, but I was wrong...
-	   ;; (run-with-timer .5 .5 'accept-process-output
-	   ;; 		 (get-buffer-process (current-buffer)))
-	   )
+	   (accept-process-output (get-buffer-process (current-buffer))))
 	 ;; replace cut dangling text
 	 (goto-char (process-mark (get-buffer-process (current-buffer))))
 	 (insert dangling-text)
@@ -149,6 +144,151 @@ FILE exists at end of evaluation."
    (if (= (aref string (1- (length string))) ?\n) string (concat string "\n")))
   (while (not (file-exists-p file)) (sit-for (or period 0.25))))
+;; Async evaluation
+(defvar-local org-babel-comint-async-indicator nil
+  "Regular expression that `org-babel-comint-async-filter' scans for.
+It should have 2 parenthesized expressions,
+e.g. \"org_babel_async_\\(start\\|end\\|file\\)_\\(.*\\)\". The
+first parenthesized expression determines whether the token is
+delimiting a result block, or whether the result is in a file. If
+delimiting a block, the second expression gives a UUID for the
+location to insert the result. Otherwise, the result is in a tmp
+file, and the second expression gives the file name.")
+(defvar-local org-babel-comint-async-buffers nil
+  "List of org-mode buffers to check for Babel async output results.")
+(defvar-local org-babel-comint-async-file-callback nil
+  "Callback to clean and insert Babel async results from a temp file.
+The callback function takes two arguments: the alist of params of the Babel
+source block, and the name of the temp file.")
+(defvar-local org-babel-comint-async-chunk-callback nil
+  "Callback to clean Babel async output results before insertion.
+The input is assumed to be split by `comint-prompt-regexp', as in
+ `org-babel-comint-with-output'. The output should be a string.")
+(defvar-local org-babel-comint-async-dangling nil
+  "Dangling piece of the last process output, in case
+`org-babel-comint-async-indicator' is spread across multiple
+comint outputs due to buffering.")
+(defun org-babel-comint-async-filter (string)
+  "Captures Babel async output from comint buffer back to org-mode buffers.
+This function is added as a hook to `comint-output-filter-functions'.
+STRING contains the output originally inserted into the comint buffer."
+  ;; Remove outdated org-mode buffers
+  (setq org-babel-comint-async-buffers
+	(cl-loop for buf in org-babel-comint-async-buffers
+	      if (buffer-live-p buf)
+	      collect buf))
+  (let* ((indicator org-babel-comint-async-indicator)
+	 (org-buffers org-babel-comint-async-buffers)
+	 (file-callback org-babel-comint-async-file-callback)
+	 (combined-string (concat org-babel-comint-async-dangling string))
+	 (new-dangling combined-string)
+	 ;; list of UUID's matched by `org-babel-comint-async-indicator'
+	 uuid-list)
+    (with-temp-buffer
+      (insert combined-string)
+      (goto-char (point-min))
+      (while (re-search-forward indicator nil t)
+	;; update dangling
+	(setq new-dangling (buffer-substring (point) (point-max)))
+	(cond ((equal (match-string 1) "end")
+	       ;; save UUID for insertion later
+	       (push (match-string 2) uuid-list))
+	      ((equal (match-string 1) "file")
+	       ;; insert results from tmp-file
+	       (let ((tmp-file (match-string 2)))
+		 (cl-loop for buf in org-buffers
+		       until
+		       (with-current-buffer buf
+			 (save-excursion
+			   (goto-char (point-min))
+			   (when (search-forward tmp-file nil t)
+			     (org-babel-previous-src-block)
+			     (org-babel-remove-result)
+			     (org-babel-insert-result
+			      (funcall file-callback
+				       (nth
+					2 (org-babel-get-src-block-info))
+				       tmp-file))
+			     t))))))))
+      ;; Truncate dangling to only the most recent output
+      (when (> (length new-dangling) (length string))
+	(setq new-dangling string)))
+    (setq-local org-babel-comint-async-dangling new-dangling)
+    (when uuid-list
+      ;; Search for results in the comint buffer
+      (save-excursion
+	(goto-char (point-max))
+	(while uuid-list
+	  (re-search-backward indicator)
+	  (when (equal (match-string 1) "end")
+	    (let* ((uuid (match-string-no-properties 2))
+		   (res-str-raw
+		    (buffer-substring
+		     ;; move point to end of indicator
+		     (re-search-forward indicator)
+		     ;; find the matching start indicator
+		     (cl-loop for pos = (re-search-backward indicator)
+			   until (and (equal (match-string 1) "start")
+				      (equal (match-string 2) uuid))
+			   finally return pos)))
+		   ;; Apply callback to clean up the result
+		   (res-str (funcall org-babel-comint-async-chunk-callback
+				     (split-string
+				      res-str-raw
+				      comint-prompt-regexp))))
+	      ;; Search for uuid in associated org-buffers to insert results
+	      (cl-loop for buf in org-buffers
+		    until (with-current-buffer buf
+			    (save-excursion
+			      (goto-char (point-min))
+			      (when (search-forward uuid nil t)
+				(org-babel-previous-src-block)
+				(org-babel-remove-result)
+				(org-babel-insert-result res-str)
+				t))))
+	      ;; Remove uuid from the list to search for
+	      (setq uuid-list (delete uuid uuid-list)))))))))
+(defun org-babel-comint-async-register
+    (session-buffer org-buffer indicator-regexp
+		    chunk-callback file-callback)
+  "Sets local org-babel-comint-async variables in SESSION-BUFFER.
+ORG-BUFFER is added to `org-babel-comint-async-buffers' if not
+present.  `org-babel-comint-async-indicator',
+`org-babel-comint-async-chunk-callback', and
+`org-babel-comint-async-file-callback' are set to
+  (org-babel-comint-in-buffer session-buffer
+    (setq org-babel-comint-async-indicator indicator-regexp
+	  org-babel-comint-async-chunk-callback chunk-callback
+	  org-babel-comint-async-file-callback file-callback)
+    (unless (memq org-buffer org-babel-comint-async-buffers)
+      (setq org-babel-comint-async-buffers
+	    (cons org-buffer org-babel-comint-async-buffers)))
+    (add-hook 'comint-output-filter-functions
+	      'org-babel-comint-async-filter nil t)))
+(defmacro org-babel-comint-delete-dangling-and-eval
+    (session-buffer &rest body)
+  "Remove dangling text in SESSION-BUFFER and evaluate BODY.
+This is analogous to `org-babel-comint-with-output', but meant
+for asynchronous output, and much shorter because inserting the
+result is delegated to `org-babel-comint-async-filter'."
+  (declare (indent 1))
+  `(org-babel-comint-in-buffer ,session-buffer
+     (goto-char (process-mark (get-buffer-process (current-buffer))))
+     (delete-region (point) (point-max))
+     ,@body))
+(def-edebug-spec org-babel-comint-async-with-output (sexp body))
 (provide 'ob-comint)
diff --git a/testing/lisp/test-ob-R.el b/testing/lisp/test-ob-R.el
index 7ce340ba4..185b211ab 100644
--- a/testing/lisp/test-ob-R.el
+++ b/testing/lisp/test-ob-R.el
@@ -31,6 +31,30 @@
      "#+begin_src R :session R\n  paste(\"Yep!\")\n#+end_src\n"
      (should (string= "Yep!" (org-babel-execute-src-block))))))
+(ert-deftest test-ob-R/simple-session-async-value ()
+  (let (ess-ask-for-ess-directory ess-history-file)
+    (org-test-with-temp-text
+     "#+begin_src R :session R :async yes\n  Sys.sleep(.1)\n  paste(\"Yep!\")\n#+end_src\n"
+     (should (let ((expected "Yep!"))
+	       (and (not (string= expected (org-babel-execute-src-block)))
+		    (string= expected
+			     (progn
+			       (sleep-for 0 200)
+			       (goto-char (org-babel-where-is-src-block-result))
+			       (org-babel-read-result)))))))))
+(ert-deftest test-ob-R/simple-session-async-output ()
+  (let (ess-ask-for-ess-directory ess-history-file)
+    (org-test-with-temp-text
+     "#+begin_src R :session R :results output :async yes\n  Sys.sleep(.1)\n  1:5\n#+end_src\n"
+     (should (let ((expected "[1] 1 2 3 4 5"))
+	       (and (not (string= expected (org-babel-execute-src-block)))
+		    (string= expected
+			     (progn
+			       (sleep-for 0 200)
+			       (goto-char (org-babel-where-is-src-block-result))
+			       (org-babel-read-result)))))))))
 (ert-deftest test-ob-R/colnames-yes-header-argument ()
   (org-test-with-temp-text "#+name: eg
 | col |

