A Programmer-Friendly I/O Abstraction Over io_uring and kqueue
Consider this tale of I/O and performance. We'll start with blocking I/O, explore io_uring and kqueue, and take home an event loop very similar to some software you may find familiar.
This is a twist on King's talk at Software You Can Love Milan '22.
When you want to read from a file you might open()
and
then call read()
as many times as necessary to fill a
buffer of bytes from the file. And in the opposite direction, you call
write()
as many times as needed until everything is
written. It's similar for a TCP client with sockets, but instead of
open()
you first call socket()
and then
connect()
to your server. Fun stuff.
In the real world though you can't always read everything you want immediately from a file descriptor. Nor can you always write everything you want immediately to a file descriptor.
You can switch a file descriptor into non-blocking mode so the call won't block while data you requested is not available. But system calls are still expensive, incurring context switches and cache misses. In fact, networks and disks have become so fast that these costs can start to approach the cost of doing the I/O itself. For the duration of time a file descriptor is unable to read or write, you don't want to waste time continuously retrying read or write system calls.
So you switch to io_uring on Linux or kqueue on FreeBSD/macOS. (I'm skipping the generation of epoll/select users.) These APIs let you submit requests to the kernel to learn about readiness: when a file descriptor is ready to read or write. You can send readiness requests in batches (also referred to as queues). Completion events, one for each submitted request, are available in a separate queue.
Being able to batch I/O like this is especially important for TCP servers that want to multiplex reads and writes for multiple connected clients.
However in io_uring, you can even go one step further. Instead of
having to call read()
or write()
in userland
after a readiness event, you can request that the kernel do the
read()
or write()
itself with a buffer you
provide. Thus almost all of your I/O is done in the kernel, amortizing
the overhead of system calls.
If you haven't seen io_uring or kqueue before, you'd probably like an example! Consider this code: a simple, minimal, not-production-ready TCP echo server.
const std = @import("std");
const os = std.os;
const linux = os.linux;
const allocator = std.heap.page_allocator;
const State = enum{ accept, recv, send };
const Socket = struct {
: os.socket_t,
handle: [1024]u8,
buffer: State,
state
};
pub fn main() !void {
const entries = 32;
const flags = 0;
var ring = try linux.IO_Uring.init(entries, flags);
defer ring.deinit();
var server: Socket = undefined;
.handle = try os.socket(os.AF.INET, os.SOCK.STREAM, os.IPPROTO.TCP);
serverdefer os.closeSocket(server.handle);
const port = 12345;
var addr = std.net.Address.initIp4(.{127, 0, 0, 1}, port);
var addr_len: os.socklen_t = addr.getOsSockLen();
try os.setsockopt(server.handle, os.SOL.SOCKET, os.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));
try os.bind(server.handle, &addr.any, addr_len);
const backlog = 128;
try os.listen(server.handle, backlog);
.state = .accept;
server= try ring.accept(@ptrToInt(&server), server.handle, &addr.any, &addr_len, 0);
_
while (true) {
= try ring.submit_and_wait(1);
_
while (ring.cq_ready() > 0) {
const cqe = try ring.copy_cqe();
var client = @intToPtr(*Socket, @intCast(usize, cqe.user_data));
if (cqe.res < 0) std.debug.panic("{}({}): {}", .{
.state,
client.handle,
client@intToEnum(os.E, -cqe.res),
});
switch (client.state) {
.accept => {
= try allocator.create(Socket);
client .handle = @intCast(os.socket_t, cqe.res);
client.state = .recv;
client= try ring.recv(@ptrToInt(client), client.handle, .{.buffer = &client.buffer}, 0);
_ = try ring.accept(@ptrToInt(&server), server.handle, &addr.any, &addr_len, 0);
_ ,
}.recv => {
const read = @intCast(usize, cqe.res);
.state = .send;
client= try ring.send(@ptrToInt(client), client.handle, client.buffer[0..read], 0);
_ ,
}.send => {
.closeSocket(client.handle);
os.destroy(client);
allocator,
}
}
}
} }
This is a great, minimal example. But notice that this code ties io_uring behavior directly to business logic (in this case, handling echoing data between request and response). It is fine for a small example like this. But in a large application you might want to do I/O throughout the code base, not just in one place. You might not want to keep adding business logic to this single loop.
Instead, you might want to be able to schedule I/O and pass a callback (and sometimes with some application context) to be called when the event is complete.
The interface might look like:
.dispatch({
io_dispatch// some big struct/union with relevant fields for all event types
, my_callback); }
This is great! Now your business logic can schedule and handle I/O no matter where in the code base it is.
Under the hood it can decide whether to use io_uring or kqueue depending on what kernel it's running on. The dispatch can also batch these individual calls through io_uring or kqueue to amortize system calls. The application no longer needs to know the details.
Additionally, we can use this wrapper to stop thinking about readiness events, just I/O completion. That is, if we dispatch a read event, the io_uring implementation would actually ask the kernel to read data into a buffer. Whereas the kqueue implementation would send a “read” readiness event, do the read back in userland, and then call our callback.
And finally, now that we've got this central dispatcher, we don't need spaghetti code in a loop switching on every possible submission and completion event.
Every time we call io_uring or kqueue we both submit event requests and poll for completion events. The io_uring and kqueue APIs tie these two actions together in the same system call.
To sync our requests to io_uring or kqueue we'll build a
flush
function that submits requests and polls for
completion events. (In the next section we'll talk about how the user of
the central dispatch learns about completion events.)
To make flush
more convenient, we'll build a nice
wrapper around it so that we can submit as many requests (and process as
many completion events) as possible. To avoid accidentally blocking
indefinitely we'll also introduce a time limit. We'll call the wrapper
run_for_ns
.
Finally we'll put the user in charge of setting up a loop to call
this run_for_ns
function, independent of normal program
execution.
This is now your traditional event loop.
You may have noticed that in the API above we passed a callback. The idea is that after the requested I/O has completed, our callback should be invoked. But the question remains: how to track this callback between the submission and completion queue?
Thankfully, io_uring and kqueue events have user data fields. The user data field is opaque to the kernel. When a submitted event completes, the kernel sends a completion event back to userland containing the user data value from the submission event.
We can store the callback in the user data field by setting it to the callback's pointer casted to an integer. When the completion for a requested event comes up, we cast from the integer in the user data field back to the callback pointer. Then, we invoke the callback.
As described above, the struct for io_dispatch.dispatch
could get quite large handling all the different kinds of I/O events and
their arguments. We could make our API a little more expressive by
creating wrapper functions for each event type.
So if we wanted to schedule a read function we could call:
.read(fd, &buf, nBytesToRead, callback); io_dispatch
Or to write, similarly:
.write(fd, buf, nBytesToWrite, callback); io_dispatch
One more thing we need to worry about is that the batch we pass to io_uring or kqueue has a fixed size (technically, kqueue allows any batch size but using that might introduce unnecessary allocations). So we'll build our own queue on top of our I/O abstraction to keep track of requests that we could not immediately submit to io_uring or kqueue.
To keep this API simple we could allocate for each entry in the queue. Or we could modify the
io_dispatch.X
calls slightly to accept a struct that can be used in an intrusive linked list to contain all request context, including the callback. The latter is what we do in TigerBeetle.
Put another way: every time code calls io_dispatch
,
we'll try to immediately submit the requested event to io_uring or
kqueue. But if there's no room, we store the event in an overflow
queue.
The overflow queue needs to be processed eventually, so we update our
flush
function (described in Callbacks and context above) to pull
as many events from our overflow queue before submitting a batch to
io_uring or kqueue.
We've now built something similar to libuv, the I/O library that Node.js uses. And if you squint, it is basically TigerBeetle's I/O library! (And interestingly enough, TigerBeetle's I/O code was adopted into Bun! Open-source for the win!)
Let's check out how the Darwin
version of TigerBeetle's I/O library (with kqueue) differs from the
Linux
version. As mentioned, the complete send
call in the
Darwin implementation waits for file descriptor readiness (through
kqueue). Once ready, the actual send
call is made back in
userland:
pub fn send(
self: *IO,
comptime Context: type,
: Context,
contextcomptime callback: fn (
: Context,
context: *Completion,
completion: SendError!usize,
resultvoid,
) : *Completion,
completion: os.socket_t,
socket: []const u8,
buffervoid {
) self.submit(
,
context,
callback,
completion.send,
.{
.socket = socket,
.buf = buffer.ptr,
.len = @intCast(u32, buffer_limit(buffer.len)),
,
}struct {
fn do_operation(op: anytype) SendError!usize {
return os.send(op.socket, op.buf[0..op.len], 0);
},
}
); }
Compare this to the Linux version (with io_uring) where the kernel handles everything and there is no send system call in userland:
pub fn send(
self: *IO,
comptime Context: type,
: Context,
contextcomptime callback: fn (
: Context,
context: *Completion,
completion: SendError!usize,
resultvoid,
) : *Completion,
completion: os.socket_t,
socket: []const u8,
buffervoid {
) .* = .{
completion.io = self,
.context = context,
.callback = struct {
fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void {
callback(@intToPtr(Context, @ptrToInt(ctx)),
,
comp@intToPtr(*const SendError!usize, @ptrToInt(res)).*,
);
}.wrapper,
}.operation = .{
.send = .{
.socket = socket,
.buffer = buffer,
,
},
}
};// Fill out a submission immediately if possible, otherwise adds to overflow buffer
self.enqueue(completion);
}
Similarly, take a look at flush
on Linux
and macOS
for event processing. Look at run_for_ns
on Linux
and macOS
for the public API users must call. And finally, look at what puts this
all into practice, the loop calling run_for_ns
in
src/main.zig.
We've come this far and you might be wondering — what about cross-platform support for Windows? The good news is that Windows also has a completion based system similar to io_uring but without batching, called IOCP. And for bonus points, TigerBeetle provides the same I/O abstraction over it! But it's enough to cover just Linux and macOS in this post. :)
In both this blog post and in TigerBeetle, we implemented a single-threaded event loop. Keeping I/O code single-threaded in userspace is beneficial (whether or not I/O processing is single-threaded in the kernel is not our concern). It's the simplest code and best for workloads that are not embarrassingly parallel. It is also best for determinism, which is integral to the design of TigerBeetle because it enables us to do Deterministic Simulation Testing
But there are other valid architectures for other workloads.
For workloads that are embarrassingly parallel, like many web servers, you could instead use multiple threads where each thread has its own queue. In optimal conditions, this architecture has the highest I/O throughput possible.
But if each thread has its own queue, individual threads can become starved if an uneven amount of work is scheduled on one thread. In the case of dynamic amounts of work, the better architecture would be to have a single queue but multiple worker threads doing the work made available on the queue.
Hey, maybe we'll split this out so you can use it too. It's written in Zig so we can easily expose a C API. Any language with a C foreign function interface (i.e. every language) should work well with it. Keep an eye on our GitHub. :)
Additional resources: