io_uring 接口
接上回
目前来讲,用户应该使用 liburing,不太需要去直接调用底层的 io_uring API,但是去了解 io_uring 本身提供哪些接口是非常有必要的。
接下来会通过一个类似 cat 的程序,来讲解 io_uring 的一些操作。
readv(2) 系统调用
Linux 提供了许多读写操作的系统调用,比如 read(2), write(2)
io_uring 常用的是 readv(2), writev(2)
以读操作为例,readv 被认为优于 read,因为
- readv 可以一次读取出多个成员,要用 read 的话,往往需要拷贝数据,然后多次调用 read 完成。
- readv 操作是原子的,而实现同样功能的多次 read 则不是
不用 io_uring
首先作为对比版本,先实现一个没有使用 io_uring 的版本,核心代码如下
int read_and_print_file(char *file_name) {
struct iovec *iovecs;
int file_fd = open(file_name, O_RDONLY);
if (file_fd < 0) {
perror("open");
return 1;
}
off_t file_sz = get_file_size(file_fd);
off_t bytes_remaining = file_sz;
int blocks = (int) file_sz / BLOCK_SZ;
if (file_sz % BLOCK_SZ) blocks++;
iovecs = malloc(sizeof(struct iovec) * blocks);
int current_block = 0;
/*
* For the file we're reading, allocate enough blocks to be able to hold
* the file data. Each block is described in an iovec structure, which is
* passed to readv as part of the array of iovecs.
* */
while (bytes_remaining) {
off_t bytes_to_read = bytes_remaining;
if (bytes_to_read > BLOCK_SZ)
bytes_to_read = BLOCK_SZ;
void *buf;
if( posix_memalign(&buf, BLOCK_SZ, BLOCK_SZ)) {
perror("posix_memalign");
return 1;
}
iovecs[current_block].iov_base = buf;
iovecs[current_block].iov_len = bytes_to_read;
current_block++;
bytes_remaining -= bytes_to_read;
}
/*
* The readv() call will block until all iovec buffers are filled with
* file data. Once it returns, we should be able to access the file data
* from the iovecs and print them on the console.
* */
int ret = readv(file_fd, iovecs, blocks);
if (ret < 0) {
perror("readv");
return 1;
}
for (int i = 0; i < blocks; i++)
output_to_console(iovecs[i].iov_base, iovecs[i].iov_len);
return 0;
}
核心思路大致是计算所需要的 iovecs 的大小,然后一次性初始化所需的空间,然后进行一次 readv 调用,最后把 iovecs 中的数据输出出来
io_uring 版本
核心思路是,提交多个 SQE,告诉 io_uring 我们想要调用 readv 读文件,然后 kernel 做完之后,我们去读取 CQEs
CQE(completion queue entry)
我们只了解了一些 mental 的概念,现在看实际 CQE 的结构
struct io_uring_cqe {
__u64 user_data; /* sqe->user_data submission passed back */
__s32 res; /* result code for this event */
__u32 flags;
};
user_data 是在 SQE 传入,CQE 传出的字段。假设这样的场景,我们提交了一堆请求到 SQ,完成时,CQ 里面的顺序是不保证和请求顺序一致的,比如一些操作先完成,另一些后完成,这时调用者需要一些标识之类的来识别 CQE 对应的具体是哪个 SQE。
res 字段则是我们在 SQE 中告诉 kernel 我们想要的系统调用的返回值。
PS: 上面提到 CQE 的顺序不保证和 SQE 一致,实际上是可以设置的,见 io_uring.pdf
SQE(submission queue entry)
它的定义更加复杂一些,因为用户是通过它告诉 io_uring 我们想要做什么
struct io_uring_sqe {
__u8 opcode; /* type of operation for this sqe */
__u8 flags; /* IOSQE_ flags */
__u16 ioprio; /* ioprio for the request */
__s32 fd; /* file descriptor to do IO on */
union {
__u64 off; /* offset into file */
__u64 addr2;
};
union {
__u64 addr; /* pointer to buffer or iovecs */
__u64 splice_off_in;
};
__u32 len; /* buffer size or number of iovecs */
union {
__kernel_rwf_t rw_flags;
__u32 fsync_flags;
__u16 poll_events;
__u32 sync_range_flags;
__u32 msg_flags;
__u32 timeout_flags;
__u32 accept_flags;
__u32 cancel_flags;
__u32 open_flags;
__u32 statx_flags;
__u32 fadvise_advice;
__u32 splice_flags;
};
/* data to be passed back at completion time */
__u64 user_data;
union {
struct {
/* pack this to avoid bogus arm OABI complaints */
union {
/* index into fixed buffers, if used */
__u16 buf_index;
/* for grouped buffer selection */
__u16 buf_group;
} __attribute__((packed));
/* personality to use, if used */
__u16 personality;
__s32 splice_fd_in;
};
__u64 __pad2[3];
};
};
这里以 cat 为例,我们想要对文件进行 readv 系统调用,那么
- opcode 设置为
IORING_OP_READV
代表 readv - fd 设置为我们想操作的文件描述符
- addr 我们需要传入的 iovec
- len 是 iovecs 的长度
初始化
io_uring_setup
需要 2 个参数
- entry 大小,这里的例子使用 1 即可
- param
io_uring_params
,见下面
struct io_uring_params {
__u32 sq_entries;
__u32 cq_entries;
__u32 flags;
__u32 sq_thread_cpu;
__u32 sq_thread_idle;
__u32 resv[5];
struct io_sqring_offsets sq_off;
struct io_cqring_offsets cq_off;
};
cat 这个例子暂时用不上,因此全部初始化为 0 即可
初始化后会返回一个文件描述符,后续的一些 mmap 操作需要用到
mmap
初始化得到文件描述符后,我们需要 mmap 操作,使 kernel-user 共享 CQ 和 SQ 两个 ring buffer,关键代码如下
int sring_sz = p.sq_off.array + p.sq_entries * sizeof(unsigned);
int cring_sz = p.cq_off.cqes + p.cq_entries * sizeof(struct io_uring_cqe);
if (p.features & IORING_FEAT_SINGLE_MMAP) {
if (cring_sz > sring_sz) {
sring_sz = cring_sz;
}
cring_sz = sring_sz;
}
sq_ptr = mmap(0, sring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE,
s->ring_fd, IORING_OFF_SQ_RING);
if (sq_ptr == MAP_FAILED) {
perror("mmap");
return 1;
}
if (p.features & IORING_FEAT_SINGLE_MMAP) {
cq_ptr = sq_ptr;
} else {
cq_ptr = mmap(0, cring_sz, PROT_READ | PROT_WRITE,
MAP_SHARED | MAP_POPULATE, s->ring_fd, IORING_OFF_CQ_RING);
if (cq_ptr == MAP_FAILED) {
perror("mmap");
return 1;
}
}
/* map in the SQE array */
s->sqes = mmap(0, p.sq_entries * sizeof(struct io_uring_sqe),
PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, s->ring_fd,
IORING_OFF_SQES);
if (s->sqes == MAP_FAILED) {
perror("mmap");
return 1;
}
Q: 为什么有三次 mmap,CQ, SQ 两个就够了
A: 简单讲,SQ ring buffer 里面是间接地管理 SQE array,SQ ring buffer 存储的是 index,因此还需要 mmap 下面的 array。 而 CQ 的部分,内核是直接映射的 CQE。因此用户还需要为 SQE 的 array 进行一次映射(While the completion queue ring directly indexes the shared array of CQEs, the submission ring has an indirection array in between. The submission side ring buffer is an index into this array, which in turn contains the index into the SQEs.)
使用 ring buffer
因为 SQ 和 CQ 的 ring buffer 是内核以及用户共享的内存,在多核的 CPU 上,读写操作的顺序就需要额外注意,我们可能需要一些内存屏障保证读写的正确性。
比如内核更新 CQ 后,用户要保证看到最新的修改。反之,用户更新 SQ,消费 CQ 后,要保证内核看到最新的修改。
消费 CQ
对于 CQE 而言,它由 kernel 进行添加,并且更新到 CQ 的 tail,而我们是在用户态读取,这里就需要讨论 memory ordering。
要谨记每一行代码都可能发生 context switch,因此在比较 head 和 tail 前,要用 read_barrier()
,确保我们能读到内核的更新。
而消费完 CQE 后,我们更新了 CQ 的 head,要确保内核清楚我们的更新操作,调用了一次 write_barrier()
关键代码如下
head = *cring->head;
do {
read_barrier(); /* ensure previous writes are visible */
if (head == *cring->tail)
break;
cqe = &cring->cqes[head & *s->cq_ring.ring_mask];
fi = (struct file_info *)cqe->user_data;
if (cqe->res < 0)
fprintf(stderr, "Error: %s\n", strerror(abs(cqe->res)));
int blocks = (int)fi->file_sz / BLOCK_SZ;
if (fi->file_sz % BLOCK_SZ)
blocks++;
for (int i = 0; i < blocks; i++)
output_to_console(fi->iovecs[i].iov_base, fi->iovecs[i].iov_len);
head++;
} while (1);
*cring->head = head;
write_barrier();
提交 SQE
流程大致是,读取当前 tail,获取下一个 tail 的位置,然后在获取 index 前,使用 read_barrier()
保证 tail 和 next_tail 的值是正确写入的。
然后对 sqe 进行了初始化,告诉 kernel 我们需要的操作。
接着更新当前的 tail,使用 write_barrier()
确保 kernel 读到变更。
最后系统调用 io_uring_enter()
把一切交给 kernel
关键代码
next_tail = tail = *sring->tail;
next_tail++;
read_barrier();
index = tail & *s->sq_ring.ring_mask;
struct io_uring_sqe *sqe = &s->sqes[index];
sqe->fd = file_fd;
sqe->flags = 0;
sqe->opcode = IORING_OP_READV;
sqe->addr = (unsigned long)fi->iovecs;
sqe->len = blocks;
sqe->off = 0;
sqe->user_data = (unsigned long long)fi;
sring->array[index] = index;
tail = next_tail;
/* update the tail so the kernel can see it. */
if (*sring->tail != tail) {
*sring->tail = tail;
write_barrier();
}
int ret = io_uring_enter(s->ring_fd, 1, 1, IORING_ENTER_GETEVENTS);
if (ret < 0) {
perror("io_uring_enter");
return 1;
}