How async/await works in Lwan 2024-02-29

Some people over the years told me they were interested in knowing how asynchronous I/O works in Lwan. I've already written a bit how asynchronous I/O works with HTTP clients, but this was a long time ago and a refresher might be useful, as new features have been added. Of note, connection handlers can now sleep, or await for another file descriptor, allowing proxies and other async patterns to be implemented in Lwan.

The heart and soul of Lwan are event loops managed by epoll (or kqueue on BSD systems, through a naïve implementation of epoll on top of it). By default, there's one event loop per CPU core, each running in its own thread.

Coroutines are used as green threads to reduce context switching overhead; the lifetime of a coroutine is tied to a connection, so if a connection is closed, for whatever reason, the coroutine and all its state is destroyed. Some other state might be tied to the coroutine, including callbacks to free memory, decrement the reference count of refcounted objects, and other things, which also makes it possible to have tight resource control without an actual garbage collector.

Event LoopHTTP Client 1HTTP Client 2ResumeYield (want-read)ResumeResumeYield (abort)Client #2deferred callbackscalled

Two coroutines yielding to the event loop, updated from an old blog post. Not to scale.

Coroutines can both yield a value to the event loop, and obtain a value from the event loop. This 64-bit value is used extensively by the event loop to determine how to suspend or destroy a coroutine, or, more commonly, how to change the epoll event interest mask.

There are 10 possible values that a connection coroutine can yield to the event loop:

Each file descriptor in the Lwan process has an associated struct lwan_connection, despite that not necessarily being related to a connection, specifically. A pointer to this struct is stored in the epoll event data, so we can quickly figure out, through a flags member in this struct, what this file descriptor is responsible for.

A file descriptor and its struct lwan_connection match in most cases (e.g. HTTP clients and the listening sockets), but not in the case of a file descriptor that's being awaited on: since this struct contains a pointer to the coroutine that handles a client connection, and we need that to resume the coroutine once the events being awaited are deemed ready by epoll, these will differ.

The ASYNC_AWAIT values are at the end of all expected values so it's a bit cheaper to check, every time, a coroutine yields back to the event loop, if it's requesting an operation on another file descriptor:

static ALWAYS_INLINE void resume_coro(struct timeout_queue *tq,
                                      struct lwan_connection *conn,
                                      int epoll_fd)
{
    int64_t from_coro = coro_resume(conn->coro);
    enum lwan_connection_coro_yield yield_result = from_coro & 0xffffffff;

    /* Quick check for async/await: this overrides the yield_result
     * so that the coroutine can be either aborted (with CONN_CORO_ABORT),
     * or just suspended with CORO_SUSPEND.  */
    if (UNLIKELY(yield_result >= CONN_CORO_ASYNC)) {
        yield_result =
            resume_async(tq, yield_result, from_coro, conn, epoll_fd);
    }

    if (UNLIKELY(yield_result == CONN_CORO_ABORT)) {
        timeout_queue_expire(tq, conn);
    } else {
        update_epoll_flags(tq, conn, epoll_fd, yield_result);
        timeout_queue_move_to_last(tq, conn);
    }
}

The values that are then passed to update_epoll_flags() will ultimately be used in a pair of look-up tables that will ultimately determine if the epoll interest mask should be changed for the HTTP client file descriptor:

static void update_epoll_flags(const struct timeout_queue *tq,
                               struct lwan_connection *conn,
                               int epoll_fd,
                               enum lwan_connection_coro_yield yield_result)
{
    static const enum lwan_connection_flags or_mask[CONN_CORO_MAX] = {
        [CONN_CORO_YIELD] = 0,

        [CONN_CORO_WANT_READ_WRITE] = CONN_EVENTS_READ_WRITE,
        [CONN_CORO_WANT_READ] = CONN_EVENTS_READ,
        [CONN_CORO_WANT_WRITE] = CONN_EVENTS_WRITE,

        /* While the coro is suspended, we're not interested in either EPOLLIN
         * or EPOLLOUT events.  We still want to track this fd in epoll, though,
         * so unset both so that only EPOLLRDHUP (plus the implicitly-set ones)
         * are set. */
        [CONN_CORO_SUSPEND] = CONN_SUSPENDED,

        /* Ideally, when suspending a coroutine, the current flags&CONN_EVENTS_MASK
         * would have to be stored and restored -- however, resuming as if the
         * client coroutine is interested in a write event always guarantees that
         * they'll be resumed as they're TCP sockets.  There's a good chance that
         * trying to read from a socket after resuming a coroutine will succeed,
         * but if it doesn't because read() returns -EAGAIN, the I/O wrappers will
         * yield with CONN_CORO_WANT_READ anyway.  */
        [CONN_CORO_RESUME] = CONN_EVENTS_WRITE,
    };
    static const enum lwan_connection_flags and_mask[CONN_CORO_MAX] = {
        [CONN_CORO_YIELD] = ~0,

        [CONN_CORO_WANT_READ_WRITE] = ~0,
        [CONN_CORO_WANT_READ] = ~CONN_EVENTS_WRITE,
        [CONN_CORO_WANT_WRITE] = ~CONN_EVENTS_READ,

        [CONN_CORO_SUSPEND] = ~CONN_EVENTS_READ_WRITE,
        [CONN_CORO_RESUME] = ~CONN_SUSPENDED,
    };
    enum lwan_connection_flags prev_flags = conn->flags;

    conn->flags |= or_mask[yield_result];
    conn->flags &= and_mask[yield_result];

    if (LWAN_EVENTS(conn->flags) == LWAN_EVENTS(prev_flags))
        return;

    struct epoll_event event = {.events = conn_flags_to_epoll_events(conn->flags),
                                .data.ptr = conn};
    int fd = lwan_connection_get_fd(tq->lwan, conn);

    if (UNLIKELY(epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &event) < 0))
        lwan_status_perror("epoll_ctl");
}

Since resume_async(), as called by resume_coro() can only return CONN_CORO_ABORT or CONN_CORO_SUSPEND, the effect here is that, if the coroutine isn't aborted, this will cause CONN_EVENTS_READ_WRITE to be unset in the connection flags, ultimately resetting EPOLLRD | EPOLLWR from the epoll interest mask, leaving only the standard items (i.e. EPOLLHUP | EPOLLERR | EPOLLRDHUP) so the event loop can be notified of the client connection being closed by the peer.

resume_async() will also modify the epoll interest flags for the file descriptor being awaited on, and set the connection flag CONN_ASYNC_AWAIT so that the correct operation is passed to epoll_ctl() in possible subsequent awaits. (This flag is unset inside a coroutine deferred callback.)

static enum lwan_connection_coro_yield
resume_async(const struct timeout_queue *tq,
             enum lwan_connection_coro_yield yield_result,
             int64_t from_coro,
             struct lwan_connection *conn,
             int epoll_fd)
{
    static const enum lwan_connection_flags to_connection_flags[] = {
        [CONN_CORO_ASYNC_AWAIT_READ] = CONN_EVENTS_READ,
        [CONN_CORO_ASYNC_AWAIT_WRITE] = CONN_EVENTS_WRITE,
        [CONN_CORO_ASYNC_AWAIT_READ_WRITE] = CONN_EVENTS_READ_WRITE,
    };
    int await_fd = (int)((uint64_t)from_coro >> 32);
    enum lwan_connection_flags flags;
    int op;

    flags = to_connection_flags[yield_result];

    struct lwan_connection *await_fd_conn = &tq->lwan->conns[await_fd];
    if (LIKELY(await_fd_conn->flags & CONN_ASYNC_AWAIT)) {
        if (LIKELY((await_fd_conn->flags & CONN_EVENTS_MASK) == flags))
            return CONN_CORO_SUSPEND;

        op = EPOLL_CTL_MOD;
    } else {
        op = EPOLL_CTL_ADD;
        flags |= CONN_ASYNC_AWAIT;
        coro_defer(conn->coro, clear_async_await_flag, await_fd_conn);
    }

    struct epoll_event event = {.events = conn_flags_to_epoll_events(flags),
                                .data.ptr = conn};
    if (LIKELY(!epoll_ctl(epoll_fd, op, await_fd, &event))) {
        await_fd_conn->flags &= ~CONN_EVENTS_MASK;
        await_fd_conn->flags |= flags;
        return CONN_CORO_SUSPEND;
    }

    return CONN_CORO_ABORT;
}

To have an idea of the kinds of flags in a struct lwan_connection that affect this whole mechanism, here's the relevant excerpt:

/* The connection flags used by the event loop for both HTTP clients and
 * awaited file descriptors.  */

enum lwan_connection_flags {
    /* Upper 16-bit of CONN_EVENTS_* store the epoll event interest
     * mask for those events.  The epoll event mask is shifted into
     * these flags to avoid having a table lookup in a hot path.  */
    CONN_EVENTS_READ = ((EPOLLIN | EPOLLRDHUP) << CONN_EPOLL_EVENT_SHIFT) | 1 << 0,
    CONN_EVENTS_WRITE = ((EPOLLOUT | EPOLLRDHUP) << CONN_EPOLL_EVENT_SHIFT) | 1 << 1,
    CONN_EVENTS_READ_WRITE = CONN_EVENTS_READ | CONN_EVENTS_WRITE,
    CONN_EVENTS_MASK = 1 << 0 | 1 << 1,

    (...)

    /* These are used for a few different things:
     * - Avoid re-deferring callbacks to remove request from the timeout wheel
     *   after it has slept previously and is requesting to sleep again. (The
     *   timeout defer is disarmed right after resuming, and is only there
     * because connections may be closed when they're suspended.)
     * - Distinguish file descriptor in event loop between the connection and
     *   an awaited file descriptor.  (This is set in the connection that's
     *   awaiting since the pointer to the connection is used as user_data in both
     *   cases. This is required to be able to resume the connection coroutine
     *   after the await is completed, and to bubble up errors in awaited file
     *   descriptors to request handlers rather than abruptly closing the
     *   connection.) */
    CONN_SUSPENDED_MASK = 1 << 5,
    CONN_SUSPENDED = (EPOLLRDHUP << CONN_EPOLL_EVENT_SHIFT) | CONN_SUSPENDED_MASK,
    CONN_HAS_REMOVE_SLEEP_DEFER = 1 << 6,
    CONN_AWAITED_FD = CONN_SUSPENDED_MASK | CONN_HAS_REMOVE_SLEEP_DEFER,

    (...)

    /* Set only on file descriptors being watched by async/await to determine
     * which epoll operation to use when suspending/resuming (ADD/MOD). Reset
     * whenever associated client connection is closed. */
    CONN_ASYNC_AWAIT = 1 << 8,

    (...)
};

To all of this, we then add a few helper functions:

static inline int64_t
make_async_yield_value(int fd, enum lwan_connection_coro_yield event)
{
    return (int64_t)(((uint64_t)fd << 32 | event));
}

static inline void async_await_fd(struct coro *coro,
                                  int fd,
                                  enum lwan_connection_coro_yield events)
{
    assert(events >= CONN_CORO_ASYNC_AWAIT_READ &&
           events <= CONN_CORO_ASYNC_AWAIT_READ_WRITE);

    return (void)coro_yield(coro, make_async_yield_value(fd, events));
}

void lwan_request_await_read(struct lwan_request *r, int fd)
{
    return async_await_fd(r->conn->coro, fd, CONN_CORO_ASYNC_AWAIT_READ);
}

void lwan_request_await_write(struct lwan_request *r, int fd) { /* ... */ }
void lwan_request_await_read_write(struct lwan_request *r, int fd) { /* ... */ }

Which then allows for the implementation of more helper functions, mimicking the APIs of read(2), write(2), recv(2), and writev(2), and we then have a basic but functional async/await implementation. The rest can be built on top of this as necessary.

FastCGI implementation

Even with missing features you'd usually find in other far more sophisticated environments, this whole mechanism was sufficient to implement FastCGI using APIs that are synchronous on the surface, but rely on all this machinery behind the scenes. An excerpt of the FastCGI handler is shown below; if you've looked at implementations in other servers, where callbacks are used to process I/O events, you'll appreciate how straightforward this implementation looks like:

static enum lwan_http_status
fastcgi_handle_request(struct lwan_request *request,
                       struct lwan_response *response,
                       void *instance)
{
    /* ... */

    fcgi_fd =
        socket(pd->addr_family, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
    if (fcgi_fd < 0)
        return HTTP_INTERNAL_ERROR;

    /* Ensure that the connection to the FastCGI socket is closed if the
     * client HTTP socket is closed somehow. */
    coro_defer(request->conn->coro, close_fd, (void *)(intptr_t)fcgi_fd);

    /* Since the socket is created with SOCK_NONBLOCK, the connect(2) call might
     * return before this is completed.  The try_connect() function will await
     * on the fcgi_fd to be writable before proceeding, at which point we
     * check for a successful connection using getsockopt(SO_ERROR).  */
    if (!try_connect(request, fcgi_fd, (struct sockaddr *)&pd->sock_addr,
                     pd->addr_size)) {
        return HTTP_UNAVAILABLE;
    }

    /* ... */

    while (true) {
        struct record record;
        ssize_t r;

        r = lwan_request_async_read(request, fcgi_fd, &record, sizeof(record));
        if (r < 0)
            return HTTP_UNAVAILABLE;
        if (r != (ssize_t)sizeof(record))
            return HTTP_INTERNAL_ERROR;

        switch (record.type) {
        case FASTCGI_TYPE_STDOUT:
            /* ... */
        case FASTCGI_TYPE_END_REQUEST:
            /* ... */
        case FASTCGI_TYPE_STDERR:
            /* ... */
        default:
            if (!discard_unknown_record(request, &record, fcgi_fd))
                return HTTP_INTERNAL_ERROR;
            break;
        }
    }

    __builtin_unreachable();
}

Overall, I'm quite happy with the result of implementing this feature in Lwan – and hopefully this blog post helped you understand how it was put together. As always, there are many details that were omitted and that can be found in the source code.

Shortcomings and future work

There are some caveats with the current implementation, though. As you can probably tell, this machinery permits a coroutine to await on one task at a time – which is perfectly fine for many applications, but not for everything. For instance, included with Lwan is a chat application demonstrating the use of its websockets API. This application effectively has to wait on two things:

As a workaround for the lack of something like .NET's Task.WhenAll(), this sample application has a busy polling loop that tries reading from the websocket, and having nothing there, proceeds to look at the pub-sub subscription. To avoid epoll churn, however, the connection is suspended for some increasing amount of time, introducing undesirable lags.

/* Excerpt from the "websocket" sample from the Lwan repo: this is a
 * simple chat room that listens on a websocket (for messages from the
 * client to be published to the chat room) and, if nothing was available,
 * polls the subscription for messages from other users, forwarding them
 * to the websocket connection.  To avoid a busy polling loop, the handler
 * sleeps for longer and longer periods of time (up to 8s) before looking
 * at the websocket again.  */
while (true) {
    switch (lwan_response_websocket_read(request)) {
    case ENOTCONN:   /* read() called before connection is websocket */
    case ECONNRESET: /* Client closed the connection */
        goto out;

    case EAGAIN: /* Nothing is available from other clients */
        while ((msg = lwan_pubsub_consume(sub))) {
            const struct lwan_value *value = lwan_pubsub_msg_value(msg);

            lwan_strbuf_set(response->buffer, value->value, value->len);

            /* Mark as done before writing: websocket_write() can abort the
             * coroutine and we want to drop the reference before this
             * happens. */
            lwan_pubsub_msg_done(msg);

            lwan_response_websocket_write_text(request);
            sleep_time = 500;
        }

        lwan_request_sleep(request, sleep_time);

        /* We're receiving a lot of messages, wait up to 1s (500ms in the loop
         * above, and 500ms in the increment below). Otherwise, wait 500ms every
         * time we return from lwan_request_sleep() until we reach 8s.  This way,
         * if a chat is pretty busy, we'll have a lag of at least 1s -- which is
         * probably fine; if it's not busy, we can sleep a bit more and conserve
         * some resources. */
        if (sleep_time <= 8000)
            sleep_time += 500;
        break;

    case 0: /* We got something! Copy it to echo it back */
        lwan_pubsub_publishf(chat, "User%d: %.*s\n", user_id,
                             (int)lwan_strbuf_get_length(response->buffer),
                             lwan_strbuf_get_buffer(response->buffer));
        break;
    }
}

A notification mechanism based on file descriptors (possibly eventfd on Linux, with a fallback to a plain pipe on other platforms) may be available in the future for things that don't necessarily work directly with file descriptors, such as the pub-sub mechanism present in Lwan. When that is available, the chat application will not have to artificially lag to avoid busy poll loops, for instance; this will of course open the possibility for a few other things.

Thanks to

Thanks to Ariadne Conill and Thaís Hamasaki for reviewing this post.

🖂 Send me an email about this blog post
If you liked this post, consider getting me a coffee!