The async
package allows you to write code that executes incrementally.
When an ordinary R function is called, control does not return until that function either produces a value, or finishes with an error. A async({})
block. will run until it reaches an await(pr)
and pauses, returning control until the value of promise pr
is available. So a coroutine’s evaluation can be temporally interleaved with other operations.
However, this does not require any changes to R’s execution model; coroutines are implemented in terms of base R, within its single-threaded execution model. In order to pause and resume, a coroutine has to explicitly keep track of its internal state. When it reaches yield
it saves its place, then resumes on the next nextOr
.
In other words a generator is a kind of state machine sitting behind the iteror
interface. An async
also operates as a kind of state machine, one that interacts with thepromise
interface; its state advances according to its program and how awaited promises and other calls resolve.
The particular state transitions, and the internal state that is tracked, are constructed from the coroutine expression. In other words you could say that the async
package implements a mini-language for describing state machines.
The illusion that a coroutine works like a parallel stream of R code is due to this mini-language being constructed to function analogously to base R. However, the language is slightly different from base R, so this document tries to collect and explain those differences, and tries to paint a picture of how these objects operate.
The first thing to know about the coroutine mini-language is that a yield()
or await()
can only appear under the arguments of “pausable” functions. The async
package has built in pausable implementations of R’s most commonly used control flow and error handling functions. The function async::pausables()
will return a list of all pausable functions it can see. As of this writing, the list is:
## [1] "&&" "(" "<-" "<<-" "await" "awaitNext"
## [7] "break" "for" "goto" "if" "next" "nextOr"
## [13] "on.exit" "repeat" "return" "switch" "try" "tryCatch"
## [19] "while" "yield" "yieldFrom" "{" "||"
While any R functions can be used in a coroutine expression, yield
or await
can only appear within the arguments of the above functions. This restriction applies recursively; the following is illegal because list
is not pausable.
## Error in trans_call(expr, path): A pause or break appears in an argument to `list`, which is not pausable. Consider using split_pipes=TRUE
The error message refers to option split_pipes
, which offers a workaround. If split_pipes=TRUE
, an await in the leftmost argument of a function can be rewritten using a temp variable. async({await(x) + 5})
will be rewritten using the “base R pipe” i.e. async({await(x) ->.; . + 5})
Constructing a gen
or async
is somewhat like calling a function, in that it creates a new environment, executes a given block of code with respect to that environment, and supports actions like break
, return
, stop
, and on,exit
. that apply to a scope. Coroutine constructors by themselves do not take arguments; to create a generator with respect to some argument, you simply write a function that takes and argument and returns a generator:
# given a sequence of sequences, concatenate them like c() into one sequence
chain <- function(sequences) {
force(sequences)
gen({
for (seq in sequences) {
yieldFrom(seq)
}
})
}
Note that it is a good practice in R to force()
the function arguments, as in any case when a function returns an inner construct that captures its scope. Because it’s easy to forget to force arguments, the async
package offers a “generator function” syntax. So the above can be rewritten as:
All that is going on here is when gen
sees function
as the head of its expression, it just adds the force
and moves the call to “gen” inside the function, reproducing the earlier version..
Coroutines do not have a stack or a notion of procedure calls; function definition is left to R, and the coroutine parser will ignore anything written in a function definition. If you write an inner function in a generator, that inner function will not be able to yield
.
names <- c("Ana", "Bob", "Charlie")
g <- gen({
greet <- function(name) yield(paste0("Good morning, ", name))
for (i in names) {
greet(name)
}
})
as.list(g)
## Error in yield(paste0("Good morning, ", name)): yield() called outside a generator
However, you can use yieldFrom()
to return any number of values from another iterator.
names <- c("Ana", "Bob", "Charlie")
greet <- gen(function(name) {
yield(paste0("Good morning, ", name))
})
run(type="", {
for (n in names) {
yieldFrom(greet(n))
}
})
## [1] "Good morning, Ana" "Good morning, Bob" "Good morning, Charlie"
for
loops in a coroutine can run over iterators as well as plain vectors. The in
argument to a for
loop will be converted to the iteror
interface using iteror(x)
.
When writing an async
or stream
routine, for
loops can be used to consume values from channels/streams, without explicitly calling await()
.
repeat
, while
and for
otherwise work like in base R, including support for next
and break
operators.
You can also write looping code by using switch()
with goto()
, described below.
if
, ||
&&
Coroutine if
statements work similarly to base R.
Short circuiting logical operators ||
and &&
will try first their left branch, which must return a scalar logical. If not satisfied with the left branch, they will evaluate the right argument and return it directly. That is, in a coroutine yield(TRUE) && "string"
may return “string”, while in base R this raises an error.
I write “may” above, because if you write ||
in a coroutine expression you might be using either base R ||
or the pausable version of ||
. In general you will only be using pausable ||
if there is a pause somewhere on either side of it. So if you write (x == 5) || FALSE
you will be using base R ||
, since ==
does not pause. Whereas if you write (await(success) == 5) || yield("assertion")
you will be using pausable ||
, because there is an await
on one side of it.
switch
and goto
switch()
statements are more strict than in base R; the input to a switch
must match a given branch or it will raise an error. That is, in base R, switch(-1, "not run")
or switch("bar", foo="not run")
returns invisible NULL
, while in a coroutine this is an error.
Additionally, switch
statements support a delimited goto. Within a given switch
statement, goto("other_branch")
stops executing the present branch and jumps to the given sibling branch. Calling goto()
without arguments re-reads the switch input again. For example, you might try to read an input file that might be in a few different formats.
file_dataset <- async({
filename <- await( download_file(url) )
switch(getExtension(filename),
"txt"=readTxt(filename),
"csv"=tryCatch(readCSV(filename), error=goto("txt")),
"json"={
if (!await(validateSchema(filename))) goto("txt")
tryCatch(read_json(filename), error=goto("txt"))
},
"zip"= {
unzipped <- unzip_async(filename) |> await()
filename <- unzipped
on.exit(unlink(unzipped))
goto(getExtension(unzipped))
}
)
})
Here, if there is an error in reading a .csv
or .json
file we re-try ingesting it as a text file; on encountering a zip
file we unzip it and try again with the new filename.
Note that if a goto
appears inside of a try(..., finally={...})
call, which is itself inside a branch, the finally
clause will be executed before jumping to the new branch.
All of this sequencing we are discussing through this document is baked into a sort of graph structure when a generator is constructed. See if you can follow the logic of the preceding async
through its graph:
Is it a bit spaghetti? Here we have nodes for each discrete step in the computation. Nodes in reverse type are user-level R calls, there are a few grouped nodes to handle an await
or tryCatch
.
Each node in the graph is literally a function and each line represents where it calls the next function. You can step through these if you set debugAsync(as, debugInternal=TRUE)
.
Saving state, then, is just remembering the last call. (Dotted lines, here, represent where it saves state rather
Starting and finishing may mean different things for different coroutines.
run
is “eager;” executes an entire coroutine expression immediately without pausing.gen
does not compute anything when it is first constructed, it only begins executing when there is a request for data via nextOr
.async
routines are “eager;” they begin executing immediately on creation, before the constructor returns. An async
runs until it gets the first await
and then pauses. allowing the constructor to return. Upon receiving, the async will continue running.stream
objects can run in either lazy or eager mode. If lazy=TRUE
, they will not compute anything until something calls the channel’s nextThen
method, and will pause after yield
if there are no more listeners. If lazy=FALSE
, a stream starts executing when constructed, will not pause after yield
, and may run ahead and queue up outgoing values until it reaches an await
.await(p)
in an async
or stream, pauses execution until the given promise resolves.yield(val)
in a generator pauses execution, returns the given value as the result of nextOr()
.yield(val)
in a stream, resolves the next listener with the value given. It may or may not pause depending on whether lazy=TRUE
and whether there are any more pending requests/yieldFrom(i)
takes an iterator or a stream as argument, and yields successive values until it is exhausted. yieldFrom(iter)
is basically equivalent to for (i in iter) yield(i)
.A coroutine might finish normally by reaching a call to return(...)
, or by simply reaching the end of its expression.
If a coroutine reaches a return
call, all enclosing tryCatch(finally=)
and on.exit
handlers will be executed before returning the value.
nextOr
() forces its or
argument. The value returned by the generator expression is discarded.async
finishes normally , its promise interface resolves with the return value.A coroutine might finish abnormally if the user-level R code throws an error, or if there is an internal error in the coroutine implementation.
gen
or run
routine finishes abnormally, all on.exit
clauses are triggered, while the inciting error is allowed to propagate normally without being caught.async
routine throws an error, it will be caught and the error will be forwarded to reject a promise.on.exit
handlers are run in a special phase after main execution finishes (either normally or abnormally). After on.exit
handlers have all finished, the coroutine will continue to propagate its error or its return value.
If you were to do something as strange as return()
or yield()
from an on.exit handler, any pending error will be cancelled. (This is also how base R’s on.exit
behaves.)
Errors during execution can come from user-level R code, or, possibly, from within the coroutine implementation itself. The coroutine implementation of tryCatch
tries to be able to catch either type. When a coroutine enters a tryCatch
, it briefly saves state so that it can wind up an R-level tryCatch
and continuing underneath it. This way it can catch exceptions coming from both the coroutine implementation and user-level R calls.
A slight difference from base R tryCatch
is you can supply a non- function value to error
if you don’t care about parsing the error message; it will just return that value.
For that matter, you can provide an error value or handler directly to await
, and this might be more efficient as it doesn’t have to wind up another tryCatch on the coroutine end of things.
You can also use break
or next
or goto
in the error
value to take those actions on error.
A coroutine may have a list of exit handlers. The expr
argument to each on.exit
is pulled out at parse time to make each handler; so that their effective scope is outside of any enclosing loops or tryCatch calls.
At run time, calling on.exit
registers the given handler to be executed later.. When the coroutine finishes, either by normal return or by error, the coroutine saves the return value and starts executing each handler that was registered. After the last on.exit
handler has been executed, the saved value is returned.
A typical use of on.exit
might be to close a file or other resource after you are done reading; here’s a simple generator that reads a file one line at a time:
fileLines <- gen(function(...) {
f <- file(..., open="rt")
on.exit(close(f))
while (length(line <- readLines(f, 1)) > 0) yield(line)
})
length(as.list(fileLines("language.Rmd")))
## [1] 306
It is possible (but rather strange) to call stop()
or return()
from inside an on.exit
handler. In that case the new value and disposition overrides the old one.
## Error in eval(val, targetEnv): an error
## [1] "this instead"
Here he have an odd situation where an error is printed out to the console, but the value was returned from run
anyway. (Base R also does this.)
tryCatch(expr={...}, finally={...})
will execute expr
, and after expr
finishes, whether it finishes normally or not, tryCatch
will then execute finally
.
The finally
clause will also be executed if you exit from expr
without finishing it, for instance by break
or next
in a loop, or by goto
in a switch statement.
Assuming finally
finishes normally, tryCatch
will either return the saved return value or throw the saved error from expr
. If the finally
clause throws an error then the original error will be lost.
Writing code in a coroutine’s on.exit
clause or in a tryCatch(finally=)
call does not give any guarantee that the cleanup code will ever run, because coroutines only execute “on demand.” If you make a generator designed to return ten items, then call nextOr
ten times, the generator will never run its exit handler. The generator will just be paused waiting to reach end of iteration; you need to call nextOr
an eleventh time to allow it to reach the end, run ts exit handlers and signal end of iteration.
finished <- FALSE
g <- gen({
on.exit(finished <<- TRUE)
for (i in 1:10) yield(i)
})
for (j in 1:10) nextOr(g, break)
finished
## [1] FALSE
## [1] "stop"
## [1] TRUE
Similarly, an async
with an on.exit
or finally
handler will not exevute those handlers if it is paused on an await
that never resolves. With those caveats in mind, coroutines do implement both on.exit
and finally
constructs and they are useful for sequencing cleanup actions.
tryCatch
and iteration performanceWhen a coroutine enters a tryCatch
it has to momentarily save state so that it can wind up an R-level tryCatch
on the stack and then resume. If there is a a yield
or await
in the tryCatch expression, the coroutine has to exit from its tryCatch and re-establish it when execution resumes.
This means that yielding from inside a tryCatch can be more costly, performance-wise, than a regular yield. This is part of what motivated the development of the [iterors package][iterors::iteror-package]; desiring a protocol for iteration that did not involve using exceptions for normal flow control. Just eliminating tryCatch
caused a 2x speedup for many generators, and the iterors
package contains many other optimizations besides.
generators
and run
try to operate without using any tryCatch
handlers. This means that if a generator encounters a problem, the error will bubble up through R’s normal error handling mechanism, and it should be relatively easy to use options(error=recover)
to debug the error.
async
and stream
are required to have a global tryCatch
because any errors they encounter should be forwarded to listeners. This however makes it a bit harder to debug them.
The method debugAsync()
will toggle single step debugging on a coroutine.
If you call debugAsync(co, R=TRUE)
the coroutine co
will start a browser before executing each user-level R expression. You can press “n” to step through each expression.
If you call debugAsync(co, internal=TRUE)
the browser will be opened at the R implementation level. This will present you with a twisty little path of tailcalls; Step through each call with n
but make sure to s
to step into a tailcall to follow execution.
The async
package imports nseval which can be very handy at the debugger console. I often use get_function()
, get_call()
, and caller()
to get a handle on where exactly the debugger is paused at. arg_env
arg_expr
, is_forced
and friends really help when dealing with R’s lazy evaluation,