[RFC] Async/Await dialect targeting LLVM coroutines

Proposal

I propose to add an Async/Await dialect to MLIR to support task asynchronous programming model. This dialect is inspired by the Async/Await programming model in C# and Scala. The goal of the dialect is to express asynchronous/multi-threaded/concurrent programs in a sequential manner, and use LLVM compiler passes to convert asynchronous functions (functions with async operations in it) into LLVM coroutines.

Example

func @computation_1() { … }
func @computation_2() { … }
func @computation_3() { … }

func @async_compute(%arg0: !async.runtime) -> !async.handle {
  %hdl1 = async.call %arg0 : !async.runtime 
          @computation_1() : () -> !async.handle

  %hdl2 = async.call %arg0 : !async.runtime 
          @computation_2() : () -> !async.handle

  async.await %hdl1 : !async.handle
  async.await %hdl2 : !async.handle

  call @computation_3(): () -> ()

  %hdl3 = async.ret_handle : !async.handle
  return %hdl3 : !async.handle
}

In this example @async_compute starts two asynchronous compute tasks #1 and #2, and “awaits” for their completion. When the first two tasks are completed it runs a third compute task.

Although this function looks like a very simple regular sequential function, it is converted into an asynchronous state-machine. The function caller immediately gets a handle to an asynchronous computation, that will become ready when all compute tasks are completed.

In the example above compute function #1 and #2 are running on a separate threads, compute function #3 is running on a thread that completed the last pending task.

Async Dialect

Async Runtime

Async runtime hides all the details of launching concurrent/async tasks, managing worker threads, etc. It could be multiple implementations, relying on different concurrency primitives (e.g. fixed-size vs dynamically-sized thread pools)

func @async_func(%runtime : !async.runtime) {
}

Async Handle

Async handle is a handle into an asynchronously running task, that becomes ready when the task is completed. Inside the async function it is possible to await for asynchronous task completion using async.await operation.

Async Call

Async call launches a function on a separate thread, and returns a handle that will signal its completion.

%handle = async.call %runtime : !async.runtime 
          @callee() -> !async.handle

Async Runtime API

Async operations lowered to async runtime API calls. All runtime types passed in as opaque pointers, and runtime implementations are free to choose concurrency primitives.

typedef struct MLIR_AsyncRuntime MLIR_AsyncRuntime;
typedef struct MLIR_AsyncHandle MLIR_AsyncHandle;

using TaskFunction = void (*)();     // asynchronous task task function
using CoroHandle = void *;           // coroutine handle
using CoroResume = void (*)(void *); // coroutine resume function

// Get the default runtime instance.
extern "C" MLIR_AsyncRuntime *MLIR_AsyncRT_DefaultRuntime();

// Create an asynchronous task and return a handle.
extern "C"  MLIR_AsyncHandle *MLIR_AsyncRT_Call
            (MLIR_AsyncRuntime *, TaskFunction);

// Async wait that resumes suspended coroutine.
extern "C" void MLIR_AsyncRT_Await
           (MLIR_AsyncHandle *, CoroHandle, CoroResume);

// Wait for the async handle blocking the caller thread.
extern "C" void MLIR_AsyncRT_SyncAwait(MLIR_AsyncHandle *);

// Create handle in not-ready state.
extern "C" MLIR_AsyncHandle * MLIR_AsyncRT_CreateHandle(MLIR_AsyncRuntime *);

// Mark handle ready.
extern "C" void MLIR_AsyncRT_EmplaceHandle(MLIR_AsyncHandle *);

Conversion to LLVM

To convert async functions into the asynchronous state machines we will use LLVM Coroutines passes.

This function:

func @async_await(%arg0: !async.runtime) -> !async.handle {
  %handle = async.call %arg0 : !async.runtime 
            @callee() : () -> !async.handle

  async.await %handle : !async.handle

  %ret_handle = async.ret_handle : !async.handle
  return  %ret_handle : !async.handle
}

Will be converted to roughly this LLVM IR with coro intrinsics:

func @async_await(%arg0: !llvm<"i8*">) -> !llvm<"i8*"> {
  // Initialize coroutine id and frame.
  %id = llvm.call @llvm.coro.id(...)
  %size = llvm.call @llvm.coro.size.i64()
  %alloc = llvm.call @malloc(%4)
  %hdl = llvm.call @llvm.coro.begin(%3, %5)

  // Prepare return handle and call function asynchronously.
  %ret_hdl = llvm.call @MLIR_AsyncRT_CreateHandle(%arg0)
  %call_hdl = llvm.call @MLIR_AsyncRT_Call(%arg0, %8)

  // Save coroutine state.
  %coro_state = llvm.call @llvm.coro.save(%6)

  // Pass coroutine handle to async runtime and suspend it.
  // When `%call_hdl` will become ready, runtime will resume coroutine `%hdl`.
  llvm.call @MLIR_AsyncRT_Await(%call_hdl, %hdl)
  %suspend = llvm.call @llvm.coro.suspend(%hdl)
  swtich %suspend ^suspend, ^resume, ^cleanup

   // Resume coroutine after async call completed.
  ^resume:
    // Emplace ret handle.
    llvm.call @MLIR_AsyncRT_EmplaceHandle(%7)   
    br ^cleanup

   // Cleanup coroutine state.
  ^cleanup:  // 2 preds: ^bb1, ^bb2
    %mem = llvm.call @llvm.coro.free(%hdl)
    llvm.call @free(%mem)
    br ^suspend

   // Return a not-ready handle from the coroutine ramp function.
  ^suspend:
    %19 = llvm.call @llvm.coro.end(%hdl)
    return %7 : !llvm<"i8*">
  }

Async Vs cppcoro::task

C++20 added coroutines support and one example of async tasks implemented on top of coroutines is cppcoro.

The main difference between async.handle and cppcoro::task is that async handle is a handle into “running computation” and handle will become ready eventually, while the cpp task is a “handle” (coroutine handle) into the “suspended computation” (suspended coroutine), and the caller must explicitly resume it to get back the result.

Links

  1. Asynchronous programming with async and await
  2. Task asynchronous programming model
  3. SIP-22 - ASYNC
  4. C++20 Coroutines
  5. LLVM Coroutines

Thanks for the writeup! This is interesting, and a nice re-use of the LLVM co-routine intrinsics.

I’d have some question about the level of abstraction here: it seems focused on mapping “host” task to a “host” runtime more than modeling concurrency semantics in a program from an IR standpoint.
I haven’t wrapped my head around to see if this is just a “leaf” dialect intended to help later phases of the codegen, and how this would compose with work that @herhut (and others) have been conducting on modeling async semantics across host and accelerator (GPU mostly) from an IR standpoint (I mean by this what kind of semantics analysis and transformations are enabled by the IR).

I think we should have an RFC from @herhut fairy soon to compare all this.

Yes, this RFC is mostly about how to get async-non-blocking host program via lowering to LLVM and reusing LLVM passes. I missed “GPUs in MLIR” April 16 talk by @herhut, but from the presentation it seems like that we want the same thing, just for different purposes and at different pipeline stages. async.hande is pretty much the chain from the presentation.

Having these:

  1. async.value<some-type>
  2. async.handle (aka async.value<>)
  3. async.run { ... region ... }
  4. custom user ops that return async handles/values.

We could run generic async codegen-friendly code like this:

%device_mem, %hdl0 = gpu.memcpy_host_to_device(...) : !async.handle
async.await %hdl0
"use_on_the_host"(%device_mem, %other_host_values)

However from the presentation I didn’t get why there is a chain dependency between two gpu.launch calls, they should be already properly serialized because they are launched on the same stream. I can only see a value of having a chain for operations that work across host <-> device boundary.

It’d be good to see the comparison, but I’m not sure I would start by trying to abstract all such scheduling models together (at least in one step): increasing levels of unification on that axis tend to bake in more and more assumptions that are unlikely to hold universally.

I don’t know what to call it, but having a library of high level cpu async abstractions seems like a good thing generally, and the topic is pretty well studied. The fact that a default lowering to existing intrinsics exists is a nice indication with respect to this proposal.

It is common on such setups eventually to want to loan the main thread/not force a thread switch. Does your design enforce this constraint by construction?

No, async.call is free to execute "task" in the caller thread, also async.await in theory could do work stealing. This is completely “async runtime” implementation defined. I checked that it is safe to to call @llvm.coro.resume before the corresponding @llvm.coro.suspend call, so it is safe to “inline” async calls, the following coroutine suspension point becomes no-op. This is required for async.parallel_for implementation, so it would not be forced to launch async tasks for small sizes.

This looks pretty interesting Eugene! Some random questions for you:

  1. It seems like you’re proposing a specific set of runtime entrypoints. Is that intended for testing, or is that the expected use-case? I would imagine that real languages targeting this would want to control a lot of the details here, which is why the LLVM lowering logic provides a lot of flexibility to the language using the compiler support.

  2. Where in the MLIR tree does the AsyncRuntimeAPI live? Do we have precedent for how runtime components are handled?

  3. Do you have an intended use-case for this? Is this intended to support something real (that we can get experience with) or is it mostly to explore the ideas?

-Chris

I’ve uploaded a proof-of-concept to ⚙ D82704 [WIP] [MLIR] [RFC] Async dialect targeting LLVM coroutines. It implements a simple Async->LLVM conversion, and has a few end-to-end tests on top of mlir-cpu-runner.

The end goal is to codegen async/non-blocking/parallel/concurrent host-side code, starting form a hight level dialect, lowering to LLVM dialect, and jitting at runtime (non-blocking TFRT kernel, if to be more specific).

Disclaimer: This is very TFRT-codegen-centric goal :slight_smile: other people have different opinions on what this dialect should be about. Given this very narrow goal maybe it makes sense to keep it outside of MLIR tree.

The goal is to make @kernel non blocking, and “classic” parallel for with a barrier doesn’t work here.

func @kernel(%in : memref<x?f32>) {
%done = async.parallel_for { ....}
async.await %done : !async.handle // <--- this can't block
... some other operations ...
}

Proposed async dialect is intentionally much more constrained than a generic coroutine. It is only about concurrency, while coroutines can be also generators, and support different “types” of concurrency: lazy vs eager.

The expected use case is that runtime using this dialect should pass its own implementation of runtime API to the codegen (implemendation defined async handle, async call, async await, …).

I’ve put a “runtime reference implementation” under mlir-cpu-runner.

Cool, I’m a fan of TFRT :-), my primary concern was to make sure that it was grounded on something real, so if you have a use-case for it, then building it in the main MLIR repo seems reasonable to me. Out of curiosity, what do you plan to have generate this dialect? Will there be something end-to-end that is wired up here, including a “standard library” of I/O and other blocking APIs that can be called into? Or do you plan to write (e.g.) BEF kernels by hand using this?

Got it, makes sense. It would probably make sense to eventually parameterize the lowering code, so other clients could change the entry points being used. However, that can be done the day there is a second client :slight_smile:, I don’t see any reason for premature generalization here.

This is really cool Eugene!

-Chris

Fwiw, I’ll want some representation like this when I get npcomp further along: I could see some user level API/intrinsic that compiles async functions and would generate these forms directly. I hadn’t thought through the details yet. Some such things use API calls that trigger special compilation flows, various annotations, or special syntax (python “async” functions in the limit).

I hadn’t thought yet about whether I would want something more constrained than coroutines as a starting point. Also, there are some transformation interplays between things like scf.parallel that it might be good to be explicit about (and get @herhut’s opinion). It seems like, if targeting a CPU, a reasonable lowering exists from scf.parallel to async.call/etc? For what it represents, I believe scf.parallel is the higher level form. From this perspective, this async dialect would exist at the same level as the gpu dialect and be in play for parts of the program targeting cpu (or could be generated directly by higher levels that know they are targeting a CPU).

One design nit: the region based scf ops are a nicer representation compared to the function based way you have this. In practice, when analyzing/transforming these things, one needs to work across the caller/callee boundary, and that is cleaner without needing to do interprocedural analysis. I would rename async.call to async.invoke and make it take a region instead of a callee.

One way to get a feel for the ergonomics and layering would be to write a lowering from scf.parallel to what you have (in the same vein of how SCFToStandard generates sequential code). This gets to @mehdi_amini’s question about whether this is a leaf or a part of the transformation dialects.

I currently think of Linalg -> SCF (scf.parallel) -> SCF+Async (async.parallel/async.await) -> LLVM lowering path. However @mehdi_amini and @herhut have concerns about introducing concurrency that late in the pipeline, and maybe concurrency/asynchronicity should be explicit much earlier.

One of the goals is to be able to add a BEF kernel that will jit compile async region:

%arg0 = ... !t.tensor
%arg1 = ... !t.tensor
%done = hex.jit_execute(%arg0, %arg1) -> !hex.chain {
   // MLIR in LingAlg/SCF dialects.
   scf.parallel { ... }
   "single_threaded_op_between_parallel_loops"()
   scf.parallel { ... }
   "few_more_ops_here"()
}

Attached region must be executed without ever blocking the caller thread, with presence of parallel loops it must be converted to async state machine first.

Yes, that’s the plan, it was just much easier to start with async.call first for a proof of concept :slight_smile: And this will be anyway useful as an intermediate lowering before going to LLVM.

Lowering scf.parallel with reductions will be a bit tricky, but in general it’s rather straghtforward: scf.parallelasync.parallel followed by async.await. This is the first transformation I have in mind for this dialect.

I generally agree. It is better to not lose such semantics if they exist at a higher level. That does not need to be mutually exclusive with having defined default lowerings that recover some parallelism for simple/explicit low level cases though. How useful that turns out to be would depend on how applicable the higher level forms are to all problems.

“Analysis-first” dialect from @herhut has explicit async handle/token arguments to async regions:

%0 = async.region { ... } -> !async.token
%1 = async.region [%0] { ... } -> !async.token

I think we can easily have both: async.await and "explicit tokens".

For hight level analysis → TFRT program compilation we can use token args, because it’s close to what TFRT already expects, for host program codegen we can convert token args to async.await:

%0 = async.region { ... } -> !async.token
async.await %0
%1 = async.region { ... } -> !async.token

I wanted to send the RFC for my variant but have not found the time to fully write it out. I will try to get to it asap. But the above is the gist of it.

I am not sure I follow this example. Are you proposing to use aync.await as an alternative lowering instead of TFRT?

I see your proposal generally at the level of the GPU dialect wrt. the stage of lowering. For the gpu dialect, the operations themselves are also asynchronous on top of the surrounding context that is lowered to TFRT (for example). We might get the same in the CPU context. Assume the example

%0, %t = async.region {
  %1 = scf.parallel_for ...
  async.yield %1
}

Here, we would have an async region that itself contains some parallelism in the form of an scf.parallel_for. The outer async.region could turn into a BEF level async kernel. Can your async dialect be used to compile the body? If so, what would that look like?

Yes. Proposed lowering is for the “TFRT kernel layer” that is below TFRT/BEF runtime.

Yes, that’s pretty much the goal. “Transformation-first” async dialect used to split program into the TFRT / BEF kernels, “codegen-first” dialect used to inside JIT-compile kernel “body”.

Your example would ultimately be lowered to bef-compatible mlir like this:

// "Outer" mlir translated into BEF file. 
%value, %chain = hex.run {
   // "Inner" region compiled into x86 binary at runtime.
   %1 , %2= async.parallel_for {... } : memref<?>, !async.handle
   async.await %2
   kernel.return %1
} : !t.tensor, !hex.chain

Before splitting program into TFRT/BEF kernels attached regions must be in Linalg/SCF dialects for simplicity, after extracting kernel level parallelism, attached regions lowered to a codegen-first async dialect.

Nice, this fits very well with my RFC (which I finally posted).

The semantic of this “async_compute” isn’t totally clear to me: I don’t quite get why we create a handle “out-of-thin” air and return it? What does it carry and how will it be “ready”?

Seems like you edited some of the SSA values, but not all, can you edit the post and fix the typos? (I can’t be sure what you’re returning either)

I looked at the revision actually, it is more clear there.

Simplifying to the minimum it is:

// ===--------------------------------------------------------------------=== //
// Convert `async_function` to LLVM coroutine and call it asynchronously.
// ===--------------------------------------------------------------------=== //

func @async_function(%arg0: !async.runtime) -> !async.handle {
  call @print_i32(%c789): (i32) -> ()
  async.emplace_handle %ret_handle : !async.handle
  return %ret_handle : !async.handle
}

func @main2() {
  %runtime = async.default_runtime : !async.runtime
  %handle = call @async_function(%runtime): (!async.runtime) -> !async.handle
  async.sync_await %handle : !async.handle
  return
}

There is something a bit strange here in that the IR is in a strange state before transformation. It seems to me that this is embedding some “breadcrumbs” that are expected to be recovered by a future transformation (the call does not make sense without the transformation for example).

What about expressing this in the call itself instead of this fake/empty handle? Something like:

// ===--------------------------------------------------------------------=== //
// Convert `async_function` to LLVM coroutine and call it asynchronously.
// ===--------------------------------------------------------------------=== //

func @async_function(%arg0: !async.runtime) {
  call @print_i32(%c789): (i32) -> ()
  return
}

func @main2() {
  %runtime = async.default_runtime : !async.runtime
  %handle = async.call @async_function(%runtime): (!async.runtime) -> !async.handle
  async.sync_await %handle : !async.handle
  return
}

Handles created by the runtime, the semantics is that it will become “ready” when the async computation is completed. Details are implementation defined, runtime can launch computation on a separate thread, or can run it immediately.

My though was that “async function” is a “function that returns async handle” and that fake handle was a hacky way to signal that intent, but now I’d go for explicit AsyncFuncOp to make it clear that it is not a regular function, and must be called with async call (similar to python coroutines).

Returning “something” that “owns” a coroutine handle is a C++ way of writing coroutines, e.g. cppcoro::task is just a thin wrapper around void* coro_handle:

Example from: Coroutines (C++20) - cppreference.com

task<> tcp_echo_server() {
  for (;;) {
    size_t n = co_await socket.async_read_some(buffer(data));
    co_await async_write(socket, buffer(data, n));
  }
}

Because in MLIR functions must have explicit terminator, async function has to “create a handle “out-of-thin” air”.

I think something like this is easier to work with and better aligned with RFC from Stephan:

func @main() {
  %handle = async.region %runtime {
     call @print_hello_world(): () -> ()
     async.yield
  }
}

Not sure if the runtime arg should be explicit with async regions, or introduced later when we are doing host-side codegen.

Async regions converted to functions (captured values packed into struct?)

async.func @async_fn(%arg0: !async.runtime) {
  call @print_hello_world(): () -> ()
  async.return
}

func @main {
   %handle = async.call @async_fn(%runtime): (!async.runtime) -> !async.handle
}

Right, the rest of the handling around coroutine is fairly clear to me: I was mostly trying to define away this “fake handle” from the IR.

Creating a separate op may be a good way to get started, otherwise we could have maybe try to shove the logic in the terminator itself without a different FuncOp:

func @async_func() -> !async.handle {
  ...
  async.return // no need to create a fake handle
}

(not even sure if we need to have a return value for the func right now)