Caiwen的博客

2025春操作系统训练营游记(一二阶段)

2025-05-02 09:09:00

2025-??-??

从一个公众号上得知了这个训练营并报名

本来就对rust和os感兴趣,中间断断续续在图书馆抱着 programming rust 看,以及看 csapp 和 x86汇编-从实模式到保护模式

03-31 ~ 04-05

突然发现训练营已经开始了。根据说明,搞好了 rustlings 。第一次接触这种形式的练习感觉很新奇

rustlings 前面大部分内容之前已经了解过了,于是很快就做完了。后面的进阶的知识比较陌生,于是又在图书馆专门看完了 programming rust 的并发和宏等章节。在清明节的第二天晚上赶完了 rustlings,结果提交后发现还差 12 分,发现 quiz 和 10 个算法相关的练习还没做,于是又通宵做完,可算是晋级了

04-06

初步搭建好了实验环境。发现高数作业还有8页没写完,赶紧去补了()

04-07

看了一下指导文档,发现很多地方讲的不是很详细,还以为需要很多前置知识。就在准备继续去看 csapp 补前置知识的时候发现原来还有个详细版本的 rCore 文档,训练营的指导文档只是精简版本

看完了第一章,初步对内核的基本结构和 riscv 有了点了解,感觉意犹未尽...

riscv寄存器

x0/zero 寄存器恒恒为零

x1/ra 寄存器存放当前函数调用结束之后的返回到的地址,由被调用者保存

x2/sp 寄存器存放栈指针,由被调用者保存。开栈时,sp 减小,[新sp, 旧sp) 部分即为新开的栈空间

x3/gp 寄存器在程序运行期间不会发生变化,调用过程中无需保存

x4/tp 寄存器同上

x5~x7 和 x28~x31(又被合称为 t0~t6) 寄存器调用者保存

x8~x9 和 x18~x27(又被合称为 s0~s11) 寄存器被调用者保存

其中,s0/fp 寄存器既可以作为一个临时寄存器,也可以作为栈帧指针(即指向栈的起始位置)的寄存器。是被调用者保存寄存器。

x10~x17(又被合称为 a0~a7) 用来传递参数,调用者保存。其中 a0 和 a1 还用来保存返回值

04-09(特权级切换)

看完了第二章,跟着完整的敲了一遍代码

这一章为了实现在操作系统上运行应用程序,引入了特权级机制。特权级机制是软硬件配合得以实现的

在用户模式下,一些可能破坏计算机系统的指令会被限制执行。需要程序调用系统内核提供的接口来间接执行这些指令。而调用这些内核提供的接口时,CPU会从用户模式切换到监督模式,内核操作完毕之后CPU又会用监督模式切换到用户模式。模式之间的切换称为陷入。

ecall 指令可以从用户模式切换到监督模式

具体来说,内核在启动时会在 stvec 寄存器中设置处理函数的地址。当用户模式的程序调用 ecall 时,CPU 将跳到 stvec 寄存器中设置的地址处(当然不一定是 ecall 指令才会触发,其他异常也会触发),并设置如下寄存器的值

  • sstatus :发生 trap 之前 CPU 处于哪个特权级
  • sepc:记录 trap 发生之前执行的最后一条指令的地址
  • scause:描述 trap 发生的原因
  • stval:其他附加信息

而处理函数要执行的话就需要使用栈,如果应用程序和内核的处理函数共用一个栈,那么会带来安全隐患。因此需要单独开辟一个内核栈和一个用户栈

同时,由于 ecall 也相当于是一个函数调用,所以需要在调用时保存当前上下文。由于应用程序和内核是两个程序,所以保存上下文的时候需要把所有通用寄存器都保存一遍

为了方便上下文保存和两个栈之间的切换,还有一个 sscratch 寄存器可以让我们用来保存用户栈/内核栈的指针

stvecsstatussepcscausestvalsscratch 寄存器被称为 CSR(控制状态寄存器),在进行读写的时候需要使用 csrrwcsrrcsrw 指令进行操作

于是在上下文都保存好之后,处理函数的就会根据 ecall 提供的参数,进行不同的操作。所有操作结束之后,再将上下文还原,根据 sepc 的值,使用 sret 指令跳回到用户模式

对于初始时从监督模式跳到用户模式,只需要构造一个 trap 上下文(主要是设置 sepc)并使用 sret 指令即可跳入用户模式

rust指针操作相关笔记

ptr 为一个指针类型

ptr.read_volatile(); 读取指针指向数据

ptr.write_volatile(); 写指针

core::slice::from_raw_parts(ptr, cnt)ptrptr + cnt 转为一个不可变引用切片

core::slice::from_raw_parts_mut(ptr, cnt)ptrptr + cnt 转为一个可变引用切片

slice.copy_from_slice(slice2)slice2 数据复制到 slice

内存布局

  • .bss 保存程序中未初始化的全局数据,应当在程序加载时由程序或者加载者将其清零
  • .rodata 保存只读的全局数据,如一些常亮、字符串
  • .data 保存可以被修改的全局数据
  • .text 保存代码

由于 rust 可能会对内存布局进行一些优化,所以当 rust 需要与 汇编/其他编程语言 进行交互时,需要设置内存布局是兼容 C 语言的。如在需要与汇编进行交互的结构体上添加 #[repr(C)]

04-10~04-11(分时多任务)

花了两天的时间完成了第三章并完成了相应的实验

第三章实现了将程序全部加载到内存中并对程序的运行进行分时的调度

首先是多道程序的放置与加载。由于目前还没有上虚拟内存,所以需要用一个 python 脚本设置好每个应用程序链接到的地址。于是在内核初始化时就可以一口气把所有的程序都载入内存了

我们把所有内存都载入内存是想实现程序的任务的调度和切换

本章首先实现了一个协作式调度,即程序运行时可以调用一个 yield 的系统调用,主动让出 CPU 的所有权,然后内核会暂停当前程序,转而去执行其他的程序。yield 被调用之后,内核像上一章一样保存 trap 上下文并调用 trap_handler,在 trap_handler 中标记当前任务状态为暂停然后切换到其他任务

考虑如何切换任务,一个任务被切换出去后相当于这个任务停在了 trap_handler,而被切换回来相当于继续完成 trap_handler 。那么总的来说内核在任务切换前后仍然是继续完成 trap_handler,但是上下文却发生了不同。我们引入 __switch 函数来实现上下文的切换。既然上下文不同了,那么相当于每个任务都应该有一个独立的内核栈。任务会发生切换,因此也应该有个独立的用户栈。__switch 在切换任务时应该做到切换到目标任务的内核栈(顺着目标任务的内核栈继续往下执行即可切换到目标任务的用户栈,所以用户栈不用考虑),并且寄存器恢复到原来目标任务被中断时的状态。__switch 函数没有什么特殊的,我们把它看作一个函数调用,于是编译器会自动为我们生成调用者保存寄存器的保存与恢复的代码,但 __switch 是汇编实现的,需要我们手动处理被调用者保存寄存器的保存和恢复。我们把这些被调用者保存寄存器的信息称为 task 上下文,保存切换出去的任务的 task 上下文到内核,恢复切换回来的 task 上下文(由于 sp 也是被调用者保存寄存器,因此也顺带完成了切换内核栈的过程)

但考虑到很多程序可能并没有主动让出的意识,于是本章又实现了时间片轮转调度

在 CPU 中 mtime 寄存器保存了 CPU 自上电以来经过了多少时钟周期。同时还有一个 mtimecmp 寄存器。当 mtime 的值超过了 mtimecmp 的值就会触发时钟中断

于是我们有了这样的实现:初始时设置 mtimecmp 为 10ms 之后的时钟周期(可以通过 CPU 的时钟频率计算得到),于是 10ms 后时钟周期会被触发,内核强制暂停当前任务并切换到下一个任务,然后再把 mtimecmp 设置为下一个 10ms 之后的时钟周期

中断屏蔽

  • 如果中断特权级低于 CPU 当前特权级,则该终断会被屏蔽,不会被处理
  • 否则,继续判断相应的 CSR 来判断该中断是否会被屏蔽

在第二点中,控制中断是否被屏蔽的 CSR 有 sstatus 和 sie

如果 sstatus 的 sie 字段设为 0 则屏蔽所有 S 特权级的中断,反之则继续看 sie

sie 有三个字段 ssie/stie/seie 分别控制 S 特权级的软件中断,时钟中断和外部中断

指导书最后一段话不太清楚是不是可以理解为:在 U 态时触发的 S 特权中断,判断中断是否被屏蔽时跳过 sstatus 的 sie 字段的校验

嵌套中断

中断发生时,sstatus 的 sie 字段会先被保存到 spie 字段,然后 sie 字段清零,后续所有 S 特权级的中断都会被屏蔽

当 sret 指令返回到被中断打断的地方之后,硬件会把 sie 字段恢复为 spie 字段的值

因此默认情况下不会发生同等级的嵌套中断,但可能还会嵌套更高级别的中断

本章有一个大坑

我照着指导书敲了一遍代码之后发现神秘bug,有的用户程序在调用 println 后触发了 IllegalInstruction 的 trap,有时候甚至触发 InstructionFault 。开始我在代码中插入一些 println 语句来看大概是哪个地方出现了问题,发现是 __switch 函数调用时发生问题。于是考虑 gdb 调试,单步调试发现 __switch 函数已经成功切换上下文并将控制权交给了用户程序,上述异常是用户程序触发的,但应用程序中只有 println。由于 rust 编译后的代码加了很多内容(毕竟 println 宏并非单纯的系统调用,还有 fmt 相关的东西),所以不太能调试。

后续问 ai 得知触发异常的 trap 之后 sepc 会储存发生异常的指令的地址。然后我在内核代码里输出了一下,更灵异了:异常指令的地址是一块未被使用的内存。此时我考虑可能是程序链接出现了什么问题,于是各种排查...

最终发现,指导书上的多道程序在链接的时候是用 python 脚本修改 linker.ld 的 BASE_ADDRESS 来让程序加载到不同的位置,而仓库代码却是在编译时传递 -Clink-args=-Ttext=地址 的方法来指定链接位置。前者有概率会出现奇怪 bug,会让我们的用户程序未链接到我们期望的位置,这就导致程序在执行的过程中跳到了某个未被使用的内存,从而触发上述异常。

gdb 的一些使用笔记

break [symbol]/[*address] 在某个符号或者地址处打断点

clear [symbol]/[*address] 清除某个符号或者地址处的断点

info breakpoints 可以查看当前所有断点

delete 3 删除某个编号的断点

display/i $pc 可以在每条指令执行结束后显示一下指定的寄存器/地址的内容,可以用来持续监控

p/x $寄存器名 可以以十六进制显示某个寄存器的值

disas 反编译当前函数

disas [symbol] 反编译指定函数

disas /m 开始地址,结束地址 反编译指定地址范围的指令

其中的参数可选

  • /m 显示源代码和汇编代码混合视图
  • /r 显示原始指令码
  • /s 显示汇编代码大小

反汇编工具

rust-objdump -S [elf] 可以反汇编某个 elf 文件,但是只反汇编 .text 段

rust-objdump -S -D [elf] 反汇编所有的段

04-13~04-16(虚拟内存)

内容非常多的一章!

本章首先先实现了如何让内核得以实现动态内存分配,这样内核就可以使用堆了,与此同时一些数据结构也能够被使用。实现的过程大概是采用了一个第三方的 crate 实现堆内存分配算法,然后堆是在 bss 段中提前开好的。

然后是本章重头戏:虚拟内存。首先是建立页表,具体来说:本章的代码中,FrameAllocator 来自动分配一个物理页。为了让物理页能够被 Rust 自动管理,我们创建一个结构体 FrameTracker ,类似智能指针那样包裹物理页编号,并为其实现 Drop trait,在 FrameTracker 生命周期结束时自动释放物理页。

我们定义了 PageTable 这个结构体作为页表。PageTable 上有一个 Vec<FrameTracker> ,这样做是为了把物理页的生命周期绑定到页表上。注意这里的物理页是用来存放页表节点的。

我们定义了 MapArea 表示一块连续的虚拟内存空间,其有一个 BTreeMap 来建立虚拟页编号和物理页之间的映射,这样做是为了把物理页的生命周期绑定到这个 MapArea 上。这里的物理页是用来存放数据的。

我们定义了 MemorySet 这个结构体来描述内核/应用程序的内存空间。其有一个 page_table 成员为这个内存空间的一级页表,以此来把页表的生命周期绑定到 MemorySet 上。同时还有一个 Vec 来存放 MapArea,来把 MapArea 的生命周期绑定到 MemorySet 上。

我们可以看到物理页作为一个资源,其生命周期有如下两个绑定路径

  • FrameTracker -> PageTable -> MemorySet
  • FrameTracker -> MapArea -> MemorySet

在读这章的时候我一直有疑问,页表已经实实在在存在内存上了,我们为什么还要创建这些结构体。况且这些结构体还附带了 Vec 以及 BTreeMap,这些数据存放在内核的堆栈上。我们相当于把页表又在内核的内存空间中又做了一份。

实际上这么做是为了建立资源之间生命周期的绑定,以借助 Rust 的 RAII 机制帮助我们管理资源

接下来考虑为内核建立内存空间,我们设计的布局如下:

低 256GB ,这个会在内核初始化的时候就被建立

前面四个段不用多说。最后一个段是物理内存中剩余的空间,这些空间可用来让 FrameAllocator 分配物理页

注意这里建立的内存空间的时候需要是对等映射,即在内核访问的虚拟内存就等同于地址一样的物理内存。这样可以确保内核在启用虚拟内存后仍然能继续正常工作

然后是高 256GB ,这个会在内核加载应用程序时建立

最高地址处的跳板稍后会说。后面紧跟着的是应用程序的内核栈,栈之间有一个 Guard Page,这个虚拟页并没有映射,为了让栈溢出的时候能够触发异常。

然后考虑建立应用的地址空间。这一章我们就不把编译出来的用户程序的 elf 头去掉了,因为 elf 头中有应用程序各个段的信息,有助于帮助我们建立内存空间

低 256GB,直接根据 elf 的信息建立即可。高 256GB,存放跳板和 Trap 上下文

然后我们的应用与内核切换的部分,即 trap 部分也需要修改

当应用发生 trap 时,CPU 跳到 __alltraps 处,此时我们还处在应用的地址空间,需要保存 trap 上下文(这也是为什么把 trap 上下文放到应用的虚拟空间中)。不同的是,在 sscratch 中我们存储的是 trap 上下文而不是内核栈(内核栈在内核的内存空间中,也存不了),然后把 sscratch 与 sp 交换,根据交换后的 sp 写入 trap 上下文。由于我们需要知道后续跳入内核之后要跳到哪个地址(trap_handler 在内核中的虚拟地址),kernel 内存空间的 token(用来切换内存空间),内核栈的虚拟地址,所以我们把这三者也用 trap 上下文维护。准备完毕之后先让 sp 指针指向内核栈,然后切换到内核空间,然后使用 jr 指令跳转到 trap_handler

注意,切换内存空间前后 pc 寄存器指向的指令的地址所属的空间就发生了变化,为了让 __alltraps 在切换内存空间前后指令仍然能顺利执行下去,我们就需要把 __alltraps__restore 的代码映射到所有内存空间的同一地方,也就是我们上文所说的位于最高地址处的跳板段。

但这又引发一个问题,如果我们直接调用 call 指令,那么编译器会基于当前代码链接的情况进行相对的跳转。而我们现在是单独把这两个函数抽出来映射到了某个地方,所以不能用 call 指令了,只能手动跳转

同时我们看到,上述 __alltraps__restore 是针对用户态和内核态之间切换这种具体的情况写的。而对于内核态 trap 到内核态这种情况就不适用了。所以 trap_handler 首先修改 stvec 寄存器,使得内核发生 trap 后 CPU 跳到另一个处理逻辑。trap_handler 在最后执行一个 trap_return ,来把 stvec 寄存器修改回来,同时使用 jr 指令跳到 __restore

当我们从页表中准备好地址空间的映射之后,就可以启用虚拟内存了,具体来说

RISCV 64 架构上有一个 satp 寄存器来配置虚拟内存,其存放的内容如下:

  • MODE 控制 CPU 使用哪种页表实现

  • PPN 控制当前虚拟地址空间的一级页表存在哪个物理页上

我们写 satp 这个寄存器即可

启用 SV39 分页机制后虚拟地址和物理地址的结构

页表项结构

  • V:为 1 时表示页表项是合法的
  • R/W/X:分别控制页表项对应的内存是否允许读/写/执行
  • U:控制页表项对应的内存是否在 U 模式下可被访问
  • G:暂时不用管
  • A:如果页表项被访问了,CPU 就将其置为 1
  • D:如果页表项被修改了,CPU 就将其置为 1

上述除了 G 位不能被操作系统修改,其他都可以

页表寻址方式

SV39 中采用三级页表。我们将虚拟地址的 27 位虚拟页号分成三个等长的部分,第 26-18 位为一级页索引,第 17-9 位为二级页索引,第 8-0 位为三级页索引

CPU 在进行地址转换时,首先根据 satp 寄存器的 PPN 字段拿到一级页表的地址。地址再加上一级页索引就得到了对应的页表项。拿到页表项之后 CPU 做如下判断:

  • V 值为 0 的话表示没有下一级节点,也就是要转换的虚拟地址是无效的
  • V 值为 1 后
    • R/W/X 位均为 0 时,表示这是一个中间节点,该表项记录的物理页标号表示的是下一级的页表所在的位置
    • R/W/X 不全为 0 的话,表示找到头了,该表项记录的物理页标号就是要转换为的物理地址

然后就这么一直往下找

值得注意的是,我们是每 9 位划分为一个索引,也就是每一级页表需要存 512 个页表项,每个页表项 8 字节,所以一个页表是占 4KB 的,正好对应于一个物理页。这也就使得根据物理页标号直接就能找到下一级页表所在位置是可行的

找到目标物理页标号后再加上页内偏移量就得到了具体的地址

我们考虑三级页表中的一个页表项表示 4KB 内存。二级页表中的一个页表项代表一个三级页表,包含了 512 个页表项,那么二级页表的一个页表项表示 2MB 内存。同理一级页表中的一个页表项表示 1GB 内存

假设现在一个程序需要使用大小为 TT 的内存,那么内核需要分配 T1GB\left \lceil \frac{T}{1GB} \right \rceil 个二级页表和 T2MB\left \lceil \frac{T}{2MB} \right \rceil 个三级页表。而一个页表需要占用 4KB,因此我们的页表额外占了 4KB×(T2MB+T1GB)4KB \times (\left \lceil \frac{T}{2MB} \right \rceil + \left \lceil \frac{T}{1GB} \right \rceil ) 内存。如果我们把 T1GB\left \lceil \frac{T}{1GB} \right \rceil 这一部分忽略,那么可以认为每分配内存给程序 TT ,页表额外消耗内存 T512\left \lceil \frac{T}{512} \right \rceil

ELF文件格式

ELF 头部

文件头部的前 32 字节,其组成如下:

名称 字节数 说明
e_ident 16 一些标志信息
e_type 2 ELF 文件类型
e_machine 2 指定 ELF 运行的 CPU 架构,具体看 这里
e_version 4 指定 ELF 版本,一般为 1
e_entry 8 elf 代码运行的入口,是一个虚拟地址。操作系统应在加载程序后直接执行该虚拟地址处的代码
e_phoff 8 Program Header Table 在文件中的偏移量
e_shoff 8 Section Header Table 在文件中的偏移量
e_flags 4 处理器特性标签,不太了解
e_ehsize 2 ELF 头的大小,单位为字节
e_phentsize 2 Program Header Table 中每个条目的大小
e_phnum 2 Program Header Table 中条目的数量
e_shentsize 2 Section Header Table 中每个条目的大小
e_shnum 2 Section Header Table 中条目的数量
e_shstrndx 2 每个 Section 都有一个名称。其中有一个 Section (.shstrtab)专门用来存储其他 Section 的名称。该值则表示储存名称的 Section 在 Section Header Table 中的索引

其中要展开说的如下:

  • e_ident,16字节
名称 字节数 说明
ELF 魔数 4 固定为 0x7f 0x45 0x4c 0x46 ,表示这是一个合法的 elf 文件
EI_CLASS 1 表示文件是 32 位还是 64 位,0x01 为 32 位,0x02 为 64 位
EI_DATA 1 指定大小端,0x01 为小端,0x02 为大端
EI_VERSION 1 ELF 规范版本。目前固定为 0x01
EI_OSABI 1 是否启用一些基于操作系统或者 CPU 特性。一般为 0x00
EI_ABIVERSION 1 指定当前 ABI 版本,配合 EI_OSABI 使用。一般也是为 0x00
EI_PAD 6 预留,一般都为 0x00
EI_NIDENT 1 不太清楚,文档上说的是 e_ident[] 数组大小

  • e_type,2 字节,表示 ELF 文件类型,常用的如下:
名称 数值 说明
ET_NONE 0 非文件类型
ET_REL 1 可重定位文件
ET_EXEC 2 可执行文件
ET_DYN 3 共享库
ET_CORE 4 内核转储文件
ET_LOOS 0xfe00 操作系统特定文件
ET_HIOS 0xfeff 操作系统特定文件
ET_LOPROC 0xff00 处理器特定文件
ET_HIPROC 0xffff 处理器特定文件

Section Header Table

由若干个条目组成。一个条目的结构如下:

名称 字节数 说明
sh_name 4 Section 的名称。这里存储的是名称在 .shstrtab 中的下表(把 .shstrtab)看成一个字符数组的话
sh_type 4 Section 的类型
sh_flags 8 Section 的一些配置
sh_addr 8 Section 所在的地址。为虚拟地址
sh_offset 8 Section 在文件中的偏移量
sh_size 8 Section 的大小
sh_link 4 一般为 0。否则表示这个节和某个节有关联,存储的是相关联的节在 Section Header Table 中的索引
sh_info 4 Section 的额外信息
sh_addralign 8 对齐数值。如果为 0 或者 1 则表示不对齐。启用对其后 sh_addr 必须是 sh_addralign 的倍数
sh_entsize 8 如果这个 Section 也是一个 Table (如 .shstrtab)的话,那么这个值就表示 Table 里每个条目的大小

其中需要具体说明的如下:

  • sh_type

常见取值如下

名称 数值 说明
SHT_PROGBITS 1 表示存放代码或者数据,如 .text.data
SHT_STRTAB 3 表示存放字符串表,如 .shstrtab
SHT_NOBITS 8 表示这个 Section 不在 ELF 文件中存放数据,含有的数据都是未初始化的数据。如 .bss
  • sh_flags
名称 说明
SHF_WRITE 0x1 表示该节是可写的
SHF_ALLOC 0x2 表示该节需要分配内存
SHF_EXECINSTR 0x4 表示该节可执行
SHF_MERGE 0x10 表示该节可以被合并
SHF_STRINGS 0x20 表示该节包含字符串
SHF_INTO_LINK 0x40 表示 sh_info 字段包含额外的语义信息
SHF_LINK_ORDER 0x80 表示该节的链接顺序依赖于另一个节
SHF_OS_NONCONFORMING 0x100 表示该节需要操作系统特殊处理
SHF_GROUP 0x200 表示该节属于一个节组
SHF_TLS 0x400 表示该节包含线程本地存储
SHF_MASKOS 0x0ff00000 保留给操作系统特定的语义
SHF_MASKPROC 0xf0000000 保留给处理器架构特定的语义

以上的值进行按位或即可得到 sh_flags

04-16~04-17(进程)

完成了第五章,这一章相对简单,主要集中在软件的部分,没有涉及到硬件的地方

这一章引入了进程这个概念。我个人理解进程大概就是可以让一个可执行文件在系统上同时运行多个副本。

关于进程有如下的几个操作

  • fork 把当前的进程复制一份,并把复制出来的进程作为当前进程的子进程。复制前后的进程是完全一致的,甚至从相同的地方开始执行相同的代码。fork 之后父子进程就都相当于从 fork 函数中返回了,但拿到的返回值是不同的,以区分父子进程
  • exec 将当前进程的内存空间清空,加载某个程序到当前进程,并从头开始执行
  • waitpid 判断指定的子进程是否结束,如果结束了就拿到其返回值

系统在初始化完毕后会加载第一个进程 iniproc。然后 iniproc 加载 shell 进程,shell 进程等待用户的操作,来执行其他进程。加载一个进程时先 fork,把当前进程复制一份,然后再 exec,加载指定程序。因此可以认为所有进程都是 initproc 派生出来的

当一个进程结束时,其虚拟空间会被释放,但是其他的资源(如内核栈,还有页表占用的内存(这个还不回收是因为其生命周期是绑定到 MemorySet 的,MemorySet 后续随 TaskControlBlock 释放))不会被释放。因为程序退出的时候实际上是进行了系统调用,而系统调用执行的过程中正用着内核栈了,直接把内核栈释放掉会出问题。此外返回值信息也记录在 TCB 中,需要被父进程获取,也不能释放。我们把已经退出但还有部分资源没被释放的进程的状态标为 Zombie

如果进程结束时其还存在没有结束的子进程,那么这些子进程会被挂在 initproc 下,作为 initproc 的子进程

父进程可以通过不断地调用 waitpid 来等待子进程结束并取返回值。如果目标进程状态为 Zombie,那么说明已经结束,可以取到返回值给父进程,并将子进程剩余的资源全部释放

进程调度

评价维度:

  1. 周转时间:结束时间 - 到达时间
  2. 响应时间:任务被第一次处理的时间 - 任务到达的时间

批处理系统调度

我们假设任务在执行过程中不会被抢占,且任务的耗费时间是确定的

FIFO/FCFS(先来先服务)策略

即从头开始依次执行进程

如果每个任务耗费的时间是相同的,那么是最优方案。

存在的问题是,如果每个任务耗费时间不同,可能一个耗时比较短的进程的前面有一个耗时很长的进程,这样的话就会拖累平均周转时间

SJF (最短作业优先)策略

即先根据任务需要耗费的时间进行排序,然后再走 FIFO 策略

在任务同时到达的情况下是最优的方案

存在的问题是:需要任务同时到达,这样才可以比较任务耗费的时间来确定优先级。如果不是同时到达的话策略不一定最优

交互式系统调度

这次,任务可能会被抢占。但任务耗费时间仍然是可计算的

STCF(最短完成时间优先)策略

在 SJF 策略之上,我们让任务可以抢占。即如果后面又来了个耗费时间更短的任务,那么就先执行这个任务

存在的问题是:如果两个耗费相同的时间的任务同时到达,尽管前三个策略的平均周转时间一样,但是平均响应时间都很慢,交互性很差

RB(轮转)策略

我们引入时间片,就跟之前我们做的一样

注意时间片不能设置的太小,不然任务调度所花费的时间占比太高

存在的问题是:虽然平均响应时间很快,但是平均周转时间就不太优了

通用计算机系统调度

MLFQ (多级反馈队列)策略

我们建立多个队列,每个队列都有一个优先级(队列的数量是经验值,如 Linux 系统设置 140 个队列)

我们考虑把 I/O 密集型的任务放到高优先级,把 CPU 密集型任务放到低优先级。然后在调度的时候,从最高优先级开始,按轮转调度策略进行调度,直到该优先级所有的任务都处于堵塞状态(等待 I/O 响应)

由于我们不可能提前知道一个任务是 I/O 密集型还是 CPU 密集型,那么就需要动态调整。首先假设新创建的任务是 I/O 密集型的,设为最高优先级,然后设置一个时间配额(一般是时间片的整数倍,经验值),如果该任务在时间配额内睡眠、等待 IO 而主动放弃处理器,那么其大概率还是 IO 密集型任务,保持其优先级。否则,其大概率是 CPU 密集型任务,降低其优先级

但还存在问题,优先级只能降低,会出问题。而且计算机系统中有大量交互进程,每个进程执行时间短,一致占用着最高优先级,这导致低优先级的进程一直无法执行,出现饥饿现象

解决方案是统计一个进程在就绪态的等待时间和在堵塞态的等待时间。然后让等待时间和优先级成反比关系。并且让就绪等待时间的优先级提升度的程度比堵塞等待时间的优先级提升度小,从而仍使得 IO 密集型任务仍由于 CPU 密集型任务

比例份额调度策略——彩票调度策略

首先按进程的优先级,分发相应数量的彩票。然后每次调度时内核生成一个随机数,作为中奖号码,然后看哪个进程中奖了,就让哪个进程执行

例如,计算机系统中有两个进程PA和PB,优先级分别为2和8,这样它们分别拥有2张(编号为0-1)和8张彩票(编号为2-9),按照彩票调度策略,操作系统会分配PA大约20%的处理器时间,而PB会分配到大约80%的处理器时间。

优点是能解决饥饿问题,即使优先级小,但经过长时间也有机会获得执行的机会。而且调度策略的开销比较小

缺点是,优先级不好确定,公平性不好保证。而且短时间内会有不确定性,必须经过很长的时间之后才能让进程之间被执行的时间比例近似于优先级比例

比例份额调度策略——步长调度

例如,计算机系统中有两个进程PA和PB几乎同时到达,优先级分别为2和8,用一个预设的大整数(如1000)去除以优先级,就可获得对应的步长,这样它们的步长分别是500和125在具体执行时,先选择PA执行,它在执行了一个时间片后,其行程为500;在接下来的4个时间片,将选择执行行程少的PB执行,它在连续执行执行4个时间片后,其形成也达到了500;并这样周而复始地执行下去,直到进程执行结束。,按照步长调度调度策略,操作系统会分配PA大约20%的处理器时间,而PB会分配到大约80%的处理器时间

优点是解决了彩票调度的不确定性

缺点是每个新来的进程在开始一段时间都会一直占用着 CPU,有点不公平了。而且优先级仍然不好确定

实时计算机系统调度

实时操作系统的每个任务都有一个截止时间。系统必须在截止时间之前把任务完成。

我们假设任务的执行时间是可以估计的

在实时操作系统中,周转时间和响应时间都不重要,能在截止时间之前完成任务才是最重要的。

我们考虑周期性任务:任务每隔一段时间就要执行一次,且需要在下一个周期来临之前执行完毕

设任务 ii 的周期为 PiP_i,需要 CiC_i 的时间处理这个任务。那么计算机系统能够实时调度这些任务的条件是

CiPi1\sum \frac{C_i}{P_i} \le 1

RMS(速率单调调度)策略

执行周期越短的进程优先级越高

前提是要执行的进程信息在最开始时就被确定,并在后续不接收新的进程,也不调整优先级或者进行 CPU 抢占。在静态实时调度算法中,RMS 是最优的

EDF(最早截止时间优先)策略

每次调度时,对所有任务的截止时间进行排序,选择截止时间最近的任务进行执行

EDF 策略是动态的实时调度算法

04-23~04-25(文件系统)

由于准备期中考试,进度严重停滞。考完之后赶紧来继续完成了第六章

这章制作了一个叫做 easy-fs 的文件系统,感觉比较困难的地方是整个的层次架构,在这里梳理一下。

整个文件系统的结构如图所示

文件系统把磁盘看成一块一块的,并根据块的编号来进行操作。第一个块是超级块,记录了文件系统的特征魔数,索引节点位图的大小(块数量),索引节点区域的大小,数据块位图的大小,数据块区域的大小

创建文件系统的时候,索引节点位图大小需要手动设置,然后索引节点区域大小就能计算出来。剩余区域根据神秘的数学公式也能计算出数据块位图和数据块区域的大小

easy-fs 作为一个 crate 被独立出来。在最底层有一个 BlockDevice 的接口,用于接入内核。BlockDevice 主要需要两个功能:读块和写块

再往上,有一个 BlockCacheManager,相当于一层缓存。注意的是,这个缓存层可以管理多个设备:其内部有一个队列,队列中维护当前被缓存的块,即 BlockCache ,而 BlockCache 内还包含了块设备对象的指针

再往上,是 EasyFileSystem,这一层表示文件系统,包含文件系统信息和块设备对象指针,提供了打开/创建文件系统、获取根目录的索引节点(root_inode),分配/释放索引节点,分配/释放数据节点等功能。其操作具体的块的时候都是调用 BlockCacheManager 来进行操作

再往上,是 Inode ,表示索引节点。这个 Inode 是存储在堆栈上的而不是存储在磁盘上的。其包含了索引节点所在的块的编号和块内偏移量,以及文件系统对象指针,块设备对象指针(EasyFileSystem 和 Inode 包含块设备对象指针是因为他们调用 BlockCacheManager 的时候需要提供块设备对象指针,感觉这里封装的不好)

由于索引节点的最终目的是要指向数据块,所以 DiskInode 存储了该索引节点表示的文件的数据所在的数据块的编号(可以理解为索引节点是一个胖指针)。为了充分利用空间,索引节点做如下设计:

在索引数据时,首先是若干个直接索引,直接索引直接存储数据块的块编号

直接索引不够用的话上一级间接索引,一级间接索引指向了一个数据块,但这个数据块并非存储文件数据,而是继续存储若干个数据块的编号,这些数据块中就存储文件数据了

一级间接索引不够用就上二级间接索引,其指向了一个数据块,这个数据块存储若干个数据块的编号,不过这一次每个编号指向的数据块存储的是一个一级间接索引表结构

整个索引结构有点类似虚拟内存的多级页表

Inode 提供了读写 DiskInode (即索引节点在磁盘上的数据),读写索引节点指向的数据块,清空索引节点指向的数据块,扩缩容索引节点的数据块总量等接口

再往上,操作系统将 Inode 封装成 OSInode

04-25~04-29(进程通信)

完成第七章

感觉这章难度不大了

这章首先给标准输入输出实现了 File trait,将其视为了一个文件,非常简单

然后是管道,管道建立在内核的堆中,并将生命周期绑定在 TCB 的文件描述符表上。管道具体的结构是一个循环队列。当队列满的时候写队列,或者队列为空的时候读队列,都会堵塞进程

然后是传递命令行参数,内核在启动进程前会先把参数信息写入内核栈,具体的结构是这样

然后把 argv_base 和参数个数存到寄存器里作为调用 main 函数的参数

IO 重定向这里感觉很妙,我们引入 sys_dup 这个系统调用,作用是复制一个文件描述符。具体操作时,我们先 fork,然后在子进程里判断是否要重定向 IO。如果要重定向 stdin,那么就先打开一个文件,获得文件描述符 fd,然后把 stdin 的文件描述符释放掉,再把 fd 复制一份。由于我们分配文件描述符的时候一定是分配最小编号,所以把 stdin 释放相当于腾出了个位置,后续 fd 复制出来的文件描述符编号必然是 stdin 的。这样就很巧妙地实现了 IO 重定向

信号这里首先涉及到了若干系统调用

  • sys_kill 向指定进程发送信号
  • sys_sigaction 设置信号处理的 handler。一些信号不能设置自定义的 handler,这些信号只能由内核处理
  • sys_procmask 设置当前进程的信号屏蔽掩码,即设置当前进程要屏蔽哪些信号
  • sys_sigreturn 从信号处理 handler 返回要执行这个

信号来源

  • 使用 kill 来给自身进程或是其他进程发送信号
  • 内核检测到了某些事件来给进程发信号,如子进程状态改变,内核就会给父进程发信号
  • 进程在执行的时候触发了某些异常,trap 到了内核

发送信号的过程其实就是内核在目标进程的 TCB 上打了个标记

当进程从内核态准备返回用户态之前(如进程被调度回来、系统调用返回、trap后返回),内核会检查这个进程是否标记有信号,如果有,那么进入信号处理阶段

对于一些信号,如 SIGKILL/SIGSTOP/SIGCONT/SIGDEF,由内核亲自处理。对于其他信号,内核会首先在进程的 TCB 上查找这个信号是否有对应的 handler,如果有,那么就准备跳入这个 handler

跳入之前内核会备份当前的 trap 上下文,然后修改当前上下文(修改跳入内核态之后开始执行的地址,以及保存参数的寄存器),后续直接跳入用户态就可以了

在信号处理阶段结束,跳入到内核态之前,还有一层判断,即是判断当前进程是否包含某些信号,如 SIGINT/SIGILL... 这些,这些信号标志着当前进程出现了错误,需要立刻被内核杀死

当 sigreturn 调用时,内核会把之前备份的 trap 上下文恢复,再次跳入到用户态后,就跟什么都没发生过一样,继续正常执行

一些信号的含义

  • SIGABRT 非正常的进程退出,可能由调用 abort 函数产生(产生后进程将被内核杀死)
  • SIGCHLD 子进程状态变更(通常是子进程结束)时产生
  • SIGINT 按下 Ctrl+C 组合键产生(感觉像是用户产生的结束进程信号)(产生后进程将被内核杀死)
  • SIGKILL 由内核或是其他进程产生的信号(感觉像是内核或者其他进程产生的结束进程信号)(不能自行处理,由内核处理)(产生后进程将被内核杀死)
  • SIGSEGV 非法内存访问异常,由内核发送给触发异常的进程(产生后进程将被内核杀死)
  • SIGILL 非法指令异常,由内核发送给触发异常的进程(产生后进程将被内核杀死)
  • SIGTSTP 按下 Ctrl+Z 组合键产生,表示暂停进程,这个信号能被进程自行处理决定是否暂停以及如何暂停
  • SIGSTOP 也表示暂停进程,但是这个是由内核强制暂停(不能自行处理,由内核处理)
  • SIGCONT 恢复暂停的进程(不能自行处理,由内核处理)
  • SIGUSR1/2 用户自定义信号

可能出现的锅

这章信号一节感觉锅不少,这里列几个作为备忘。作为一个量级不小的项目,确实很难考虑的非常完善

  • sigreturn 的系统调用没有检查 handling_sig 是否已经为 -1,不然用户程序在非信号处理函数中调用这个系统调用会直接使内核 panic
  • 如果信号处理函数没有调用 sigreturn 而是正常返回了呢?似乎这种情况下,用户栈会直接弹到触发信号之前的情形(毕竟我们在调用信号处理函数的时候保留了用户栈),但是调用者保存寄存器没有保存,感觉还是会出问题
  • 如果信号处理函数执行到一般又有信号触发了,且没有被 mask (即嵌套 trap),那么目前的代码还是会跑一边信号处理流程,又把当前的 trap 上下文备份起来,但之前已经有了一个备份了,最开始的 trap 上下文会丢失
  • fork 后,signal_actions 和 signal_mask 会从父进程继承,signal_mask 继承感觉问题不大,但是 signal_action 继承后,后续如果执行 exec,信号处理函数的地址就无效了

04-29~05-02(并发)

完成了第八章

这一章首先使我们的内核支持多线程,将内核任务执行的架构进行了大规模的重构。任务调度的单位从进程变成了线程。一个进程包含若干个线程。进程之间是隔离的,而与进程不同的是,线程是追求共享的。线程之间共享进程的资源。线程支持的具体实现就是把线程需要共享的东西提到 PCB 中,TCB 从描述一个进程变为描述一个线程,而内核在调度的时候还是对于 TCB 进行调度的。每个线程都需要有自己的内核栈,用户栈,trap 上下文,task 上下文。

有了线程之后就会出现一些线程问题,涉及到锁、信号量、条件变量

互斥

  • 单标记尝试
rust
1
2
3
4
5
6
7
8
9
10
static mut OCCUPIED: bool = false; unsafe fn lock() { while vload!(OCCUPIED) {} OCCUPIED = true; } unsafe fn unlock() { OCCUPIED = false; }

其中不断循环判断锁是否释放是一种忙等待,也被称为自旋。对于单核 CPU,在时间片用完切换到下一线程之前,OCCUPIED 是不会发生改变的,因此 CPU 资源造成了浪费。rust 中提供了 spin_loop_hint 函数,在循环体内调用该函数可以通知 CPU 当前线程处于忙等待状态,CPU 可能会进行一些优化(如降频,减少功率)

同时对 OCCUPIED 的赋值操作也并非原子操作,该方案是错误的

  • Peterson 算法
rust
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// user/src/bin/adder_peterson_spin.rs /// FLAG[i]=true 表示线程 i 想要进入或已经进入临界区 static mut FLAG: [bool; 2] = [false; 2]; /// TURN=i 表示轮到线程 i 进入临界区 static mut TURN: usize = 0; /// id 表示当前的线程 ID ,为 0 或 1 unsafe fn lock(id: usize) { FLAG[id] = true; let j = 1 - id; TURN = j; compiler_fence(Ordering::SeqCst); while vload!(FLAG[j]) && vload!(TURN) == j {} // 上述代码等价于 while FLAG[j] && TURN == j {} } unsafe fn unlock(id: usize) { FLAG[id] = false; }

上述代码可以保证在无硬件支持情况下,全靠软件实现互斥的功能。我们根据锁的评价指标来分析一下:

(1)空闲则入:只有单线程要进入临界区,那么一定能够进去。上述代码中,如果只有一个线程要获取锁,那么 flag[j]false ,忙等待循环结束,线程进入临界区

(2)互斥访问:

首先考虑两个 lock 操作不重叠。假设 TjT_j 已经进入临界区,那么此时 flag[j]=trueturn=j。现在 TiT_i 尝试拿锁,他将 flag[i] 置为 true,并将 turn 设为 j,我们发现此时循环条件满足,TiT_i 进入忙等待,直到 TjT_j 把锁还回去,即设置 flag[j]false

再考虑两个 lock 操作重叠的情况。简单起见我们假设 flag 都是原子操作,不会产生影响。两个线程开始都把 flag 设为 true ,于是忙等待条件就变成了线程 TiT_i 只要有 turnj 就忙等待,TjT_j 同理。我们发现,一个线程仅靠自己是无法让 turn 为自身的,必须有另一个线程的帮忙。而另一个线程帮忙设置完 turn 之后自己仍是忙等待。而对于 turn 的修改,无论是否原子,总会有一个靠后的修改操作。哪个线程最后修改了 turn,那么另一个线程就能进入临界区域

(3)有界等待

TiT_i 让出锁之后,此时 turn 仍为 j。此时 TiT_i 仍尝试拿锁,turn 仍为 j,而退出忙等待的条件为 turn=i。而只有 TjT_j 能做到将 turn 置为 i,因此后续必然是 TiT_i 拿不到锁 TjT_j 也别想拿到锁

综上 Peterson 算法是一个合格的锁实现。但是这种算法只能运行在单核 CPU 上

  • 关中断

可以考虑拿锁之后就把时钟中断关闭,把锁放回去之后就启动时钟中断

这种做法无法适应多处理器架构

  • 使用原子指令的单标记
rust
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// user/src/bin/adder_atomic.rs static OCCUPIED: AtomicBool = AtomicBool::new(false); fn lock() { while OCCUPIED .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed) .is_err() { yield_(); } } fn unlock() { OCCUPIED.store(false, Ordering::Relaxed); }

其中 compare_exchange 的接口定义为:

rust
1
2
3
4
5
6
7
pub fn compare_exchange( &self, current: bool, new: bool, success: Ordering, failure: Ordering, ) -> Result<bool, bool>;

其功能为:如果原子变量当前值与 current 相同,则将其修改为 new,并返回 Ok。否则不进行修改,返回 Err。无论返回 Ok 还是 Err,其包裹的值均为该原子变量在操作之前的值

compare_exchange 是原子操作

在硬件层面,该操作对应的指令为 cas rd, rs1, rs2, rs3 ,其意义为:将一个内存位置存放的值(地址保存在 rs1)与一个期待值(值保存在 rs2)进行比较,如果相同则将内存位置存放的值修改为 rs3。无论是否相同,都将执行该指令之前的这个内存位置存放的值写入寄存器 rd

实际上 riscv 并不原生支持 CAS 指令。我们需要使用 LR 和 SC 指令进行模拟

(1)LR:加载保留指令,用来加载一个地址处的数据到寄存器内,并在 CPU 内部维护一个保留集,将这个地址放到保留集中

(2)SC:条件存储指令,一般和 LR 配套使用,如果目标地址存在于保留集中,那么就把指定数据写入到目标地址,否则不写入。指令执行完毕之后,会在一个寄存器中指出是否成功对目标地址进行了修改

如果在 LR 和 SC 中间,对一个地址中的数据进行了修改,那么就把这个地址从保留集中移除。这样一来,只有 LR 和 SC 中间目标地址的数据没有被修改,SC 才会进行修改

通过这两个指令可以实现 cas

Unknown
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 参数 a0 存放内存中的值的所在地址 # 参数 a1 存放 expected # 参数 a2 存放 new # 返回值 a0 略有不同:这里若比较结果相同则返回 0 ,否则返回 1 # 而不是返回 CAS 之前内存中的值 cas: lr.w t0, (a0) # LR 将值加载到 t0 bne t0, a1, fail # 如果值和 a1 中的 expected 不同,跳转到 fail sc.w t0, a2, (a0) # SC 尝试将值修改为 a2 中的 new bnez t0, cas # 如果 SC 的目标寄存器 t0 不为 0 ,说明 LR/SC 中间值被修改,重试 li a0, 0 # 成功,返回值为 0 ret # 返回 fail: li a0, 1 # 失败,返回值为 1 ret # 返回

大概可以理解为对目标地址的修改必须是连读带写一块完成的,如果写回的时候发现目标地址已经被修改过了,那么就重新来一次

如果存在多个 CPU 同时执行原子指令,那么这些指令会被发送到 CPU 和 RAM 之间的总线上,总线会对这些请求进行排序,且只有排在最前面的那个指令会成功,后面的指令会失败

让权等待

显然直接忙等会让线程吃满当前时间片,这样做是不行的。我们可以在忙等的时候通过 yield 系统调用来主动让权。但是这样做有一些缺点:

首先是这样做很大程度上受到操作系统调度策略的影响,让出去之后什么时候再回来就不好说了

而且让权的过程中会产生上下文的切换,而上下文切换的开销很大,除了要保存和恢复寄存器之外,CPU 各类缓存,TLB 等都需要清空,这会破坏程序的时间和空间局部性,降低效率

所以我们引入了堵塞和唤醒机制。当一个线程处于堵塞状态时,将该线程从调度队列中移除,内核在后续将不会调度该线程。这样就避免反复地调度然后又让出。同时将该线程加入到锁的唤醒队列中。直到锁被释放,锁释放的同时,唤醒队列中的一个线程(注意只能唤醒一个,因为唤醒后的瞬间线程进入临界区)。唤醒即为将这个线程重新加入到调度队列中。

不过还有另一个方案,我们把自旋的过程放到内核里,就可以避免上下文切换的开销。具体来说,加锁的时候,内核会判断是否已经上锁,如果已经上锁了,就接着调度下一个线程,直到锁被释放。

信号量机制

信号量可以认为是锁的推广。信号量是一个值,其有两个操作:

V 操作:将信号量的值加一

P 操作:将信号量的值减一,如果减一之前信号量的值已经为 0 了,那么就堵塞,直到信号量的值为 1 之后再减一

信号量可以解决如下问题:

  • 限制进入临界区的线程数量
  • 条件同步:我们先创建一个值为 0 的信号量。此时线程一先尝试进行 P 操作,堵塞。直到线程二进行了 V 操作,线程一才能继续执行下去。这样就可以做到条件同步
  • 生产者和消费者问题:有多个生产者和一个消费者,有一个容量有限的缓冲区。缓冲区满的时候生产者再生产就会堵塞。缓冲区空时消费者再消费就会堵塞(很像管道)。这容易使用信号量实现。不过注意的是需要两个信号量,一个记录可生产量,一个记录可消费量(因为一个信号量只能做到单边限制,此问题两边都有限制,需要两个信号量)

条件变量机制

感觉这个条件变量不太好理解

条件变量是解决这样的一个问题:我们的线程需要一直堵塞,直到某个条件成立才继续执行。实际上上面的信号量已经做到了这一点,但是信号量的实现有一些不足:首先这个条件可能会比较复杂,涉及多个共享量。某个线程更新了一个共享值之后未必使得条件一定成立。可能有多个线程都在等待这个条件,信号量实现会比较复杂。

于是就产生了条件变量。条件变量主要有两个操作:

等待条件变量:把当前线程堵塞,加入到该条件变量的唤醒队列中

唤醒条件变量:把条件变量唤醒队列中的一个线程进行唤醒

条件变量主要和 mutex 一块使用,大体流程是:为了判断一个条件是否成立,我先拿锁,然后判断,然后释放锁。如果条件不满足,那么我就堵塞(这里是因为条件变量而堵塞)。于此同时,另一个线程拿锁,修改条件,释放锁,然后唤醒一个因条件变量而堵塞的线程。当一个线程被唤醒后,再次重复拿锁,判断...这些过程。之所以唤醒后又重复判断,是因为我们需要考虑多个线程等待条件的时候,有的线程不是条件更改后立刻被唤醒,可能被唤醒之前条件又被先前唤醒的线程更改了。而且唤醒可能不意味着一个复杂条件的成立。

(这里先释放锁再唤醒和先唤醒再释放锁应该没什么区别吧)

不过具体实现中,条件变量在等待的时候需要传入一个 mutex。condvar_wait 执行的时候,先释放锁,然后挂起当前线程,等线程被唤醒之后,再拿锁。这样再次循环判断的时候仍是正常处于临界区。

rust
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
pub fn signal(&self) { let mut inner = self.inner.exclusive_access(); if let Some(task) = inner.wait_queue.pop_front() { wakeup_task(task); } } pub fn wait(&self, mutex: Arc<dyn Mutex>) { mutex.unlock(); let mut inner = self.inner.exclusive_access(); inner.wait_queue.push_back(current_task().unwrap()); drop(inner); block_current_and_run_next(); mutex.lock(); }

比如:

rust
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
const CONDVAR_ID: usize = 0; const MUTEX_ID: usize = 0; unsafe fn first() -> ! { sleep(10); println!("First work, Change A --> 1 and wakeup Second"); mutex_lock(MUTEX_ID); A = 1; condvar_signal(CONDVAR_ID); mutex_unlock(MUTEX_ID); exit(0) } unsafe fn second() -> ! { println!("Second want to continue,but need to wait A=1"); mutex_lock(MUTEX_ID); while A == 0 { println!("Second: A is {}", A); condvar_wait(CONDVAR_ID, MUTEX_ID); } println!("A is {}, Second can work now", A); mutex_unlock(MUTEX_ID); exit(0) } #[no_mangle] pub fn main() -> i32 { // create condvar & mutex assert_eq!(condvar_create() as usize, CONDVAR_ID); assert_eq!(mutex_blocking_create() as usize, MUTEX_ID); ... }

first 在条件变量修改完毕之后释放锁,并通知 second 是时候唤醒了

second 中,他先拿锁,然后判断,如果不满足条件,就执行condvar_wait,在 condvar_wait 中,先把锁释放(我都判断完了就没必要拿着锁了),然后堵塞直到被唤醒,当再被唤醒的时候,再拿锁,然后 condvar_wait 结束,再次循环判断(我们发现 condvar_wait 把释放锁和拿锁的过程包装了起来,使得执行前后锁的状态不变)

条件变量还有个问题是同步屏障问题,如

rust
1
2
3
4
5
6
fn thread_fn() { for _ in 0..300 { print!("a"); } for _ in 0..300 { print!("b"); } for _ in 0..300 { print!("c"); } exit(0) }

我们希望在所有线程打印完 a 之后,再一块打印 b。b,c 同理

使用条件变量做法:

rust
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
const THREAD_NUM: usize = 3; struct Barrier { mutex_id: usize, condvar_id: usize, count: UnsafeCell<usize>, } impl Barrier { pub fn new() -> Self { Self { mutex_id: mutex_create() as usize, condvar_id: condvar_create() as usize, count: UnsafeCell::new(0), } } pub fn block(&self) { mutex_lock(self.mutex_id); let count = self.count.get(); // SAFETY: Here, the accesses of the count is in the // critical section protected by the mutex. unsafe { *count = *count + 1; } if unsafe { *count } == THREAD_NUM { condvar_signal(self.condvar_id); } else { condvar_wait(self.condvar_id, self.mutex_id); condvar_signal(self.condvar_id); } mutex_unlock(self.mutex_id); } } unsafe impl Sync for Barrier {} lazy_static! { static ref BARRIER_AB: Barrier = Barrier::new(); static ref BARRIER_BC: Barrier = Barrier::new(); } fn thread_fn() { for _ in 0..300 { print!("a"); } BARRIER_AB.block(); for _ in 0..300 { print!("b"); } BARRIER_BC.block(); for _ in 0..300 { print!("c"); } exit(0) }

首先拿锁,然后进行判断和修改。如果条件已经成立,那么唤醒一个线程。如果条件不成立,那么就堵塞,当堵塞结束之后,说明条件成立了(在这个场景下我们在堵塞结束后不需要再次判断条件是否仍然成立了)线程继续执行。不同的是,线程继续执行的同时也唤醒其他线程。这样,最开始的线程由于判断条件成立从而被唤醒,这个线程又去唤醒其他线程,线程一个接着一个地被唤醒,形成了一个唤醒链条,从而所有线程都被唤醒

并发中的问题

  • 读取变量的时候也要加锁
rust
1
2
3
4
5
6
7
8
9
10
11
12
static mut A: usize = 0; //... other code unsafe fn thr1() -> ! { if (A == 0) { println!("thr1: A is Zero --> {}", A); } //... other code } unsafe fn thr2() -> ! { A = A+1; println!("thr2: A is One --> {}", A); }

看样子一个只写,一个只读,应该是没问题?但可能出现,判断 A 为 0 之后,发生中断,切换到 thr2 ,对 A 进行修改,再切回 thr1 的时候输出 thr1: A is Zero --> 1

  • 同步缺陷
rust
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
static mut A: usize = 0; ... unsafe fn thr1() -> ! { ... //在某种情况下会休眠 A = 1; ... } unsafe fn thr2() -> ! { if A==1 { println!("Correct"); }else{ println!("Panic"); } } pub fn main() -> i32 { let mut v = Vec::new(); v.push(thread_create(thr1 as usize, 0)); sleep(10); ... v.push(thread_create(thr2 as usize, 0)); ... }

上述代码能够确保一个线程先执行,一个线程后执行,会不会出问题?实际上虽然线程一率先执行,但有可能在 A = 1; 前被切换出去执行线程二了,此时就会出现问题。因此线程要想有优先顺序必然要使用同步原语

死锁检测

本章的实验给出了一个死锁检测的算法。在指导书中称此算法为银行家算法,不过群里的助教表示银行家算法的原型是不可实现的,实验中给出的算法并非完全的银行家算法,当然我看也有人表示就是一样的。我这里称这个算法为神秘算法吧

这个神秘算法需要维护三个量

  • Availablei=kAvailable_i=k 表示第 ii 类资源(如信号量,锁)的可用数量为 kk (锁的话要么为 0 要么为 1,信号量的话从 0 到 res_count 取值)
  • Allocationt,i=gAllocation_{t,i}=g 表示线程 tt 当前已经拿到了第 ii 类资源,拿到的数量为 gg
  • Needt,i=dNeed_{t,i}=d 表示线程 tt 还需要第 ii 类资源的数量为 dd (刚开始以为这个量需要线程提前告知,或者内核通过什么高科技手段解析线程的代码获取什么的。后来看群里讨论,这个 NeedNeed 只需要在线程正在申请资源的时候加一,申请完毕(包括成功和失败)之后减一即可)

每次要处理线程申请请求的时候根据如下算法判断是否会产生死锁。算法的过程如下:

  • 第一步

WorkWork 为系统可分配给线程的资源数目,初始时 Work=AvailableWork=Available。设 FinishtFinish_t 表示这样分配完毕之后线程 tt 是否可以顺利执行完毕。初始时 Finisht=falseFinish_t=false

  • 第二步

从线程集合中寻找满足 Finisht=falseFinish_t=false(即还没有拿到想要的资源,正在被堵塞),Needt,iWorkiNeed_{t,i} \le Work_i (即这个线程所需要的资源都可以被操作系统分配)的线程 tt。如果找到,说明操作系统可以给这个线程分配资源,从而使得这个线程解除堵塞顺利执行完毕

如果找到则继续,否则跳到第四步

  • 第三步

Finisht=trueFinish_t=true。线程能够运行结束并释放其资源,令 Worki+=Allocationt,iWork_i+=Allocation_{t,i}

然后跳回第二步再次寻找

  • 第四步

如果 FinishFinish 均为 truetrue,那么系统处于安全状态(说明存在一个特定的执行顺序使得所有线程顺利执行完毕。具体方案则是第二步寻找出来的顺序),否则表示系统不安全,出现死锁,请求应该被拒绝。

我们举几个例子判断一下该算法是否可以正常识别出死锁

  • 重复拿锁:拿完锁,还没释放,再次拿锁

一个程序拿完锁之后,此时 AvailableAvailable00,然后又尝试拿锁,AvailableAvailable 无法满足 NeedNeed 的要求,因此请求被拒绝

  • 双线程互锁:线程一先申请锁一,再申请锁二。线程二先申请锁二,再申请锁一。当两个线程重叠执行的时候发生死锁

线程一申请完锁一,线程二申请完锁二之后,Available1=Available=0Available_1=Available=0, 后面线程一再申请锁二的时候 AvailableAvailable 无法满足 NeedNeed,请求被拒绝

之前的疑惑

这章结束之后,前面的一些疑惑就解决了:

在文件系统这一章中开始大面积使用 Mutex 来实现内部可变性,而我之前一直好奇在没有操作系统的支撑下是如何做到互斥的。我们可以看到 Mutex 是在 spin 这个 crate 中的,不难猜出其内部大概率是忙等待,即自旋锁。并且应该是使用了类似 Peterson 的算法实现,或者是使用了原子指令操作

在之前还好奇为什么在内核中实现内部可变性都是用 UPSafeCell 而不是使用 Mutex,反而实现文件系统的时候反过来了。现在明白,在内核中不会发生时钟中断,所以不会存在线程安全问题。不过应用程序进行的所有文件操作都是通过系统调用,又内核代理执行,而在内核态又不会发生时钟中断,所以对文件系统的操作都可以视为原子的,所以理论上文件系统使用 UPSafeCell 应该也没问题。只能推测使用 Mutex 是为了让文件系统能兼容多线程情况

并发相关术语

  • 共享资源(shared resource):不同的线程/进程都能访问的变量或数据结构。
  • 临界区(critical section):访问共享资源的一段代码。
  • 竞态条件(race condition):多个线程/进程都进入临界区时,都试图更新共享的数据结构,导致产生了不期望的结果。
  • 不确定性(indeterminate): 多个线程/进程在执行过程中出现了竞态条件,导致执行结果取决于哪些线程在何时运行, 即执行结果不确定,而开发者期望得到的是确定的结果。
  • 互斥(mutual exclusion):一种操作原语,能保证只有一个线程进入临界区,从而避免出现竞态,并产生确定的执行结果。
  • 原子性(atomic):一系列操作要么全部完成,要么一个都没执行,不会看到中间状态。在数据库领域, 具有原子性的一系列操作称为事务(transaction)。
  • 同步(synchronization):多个并发执行的进程/线程在一些关键点上需要互相等待,这种相互制约的等待称为进程/线程同步。
  • 死锁(dead lock):一个线程/进程集合里面的每个线程/进程都在等待只能由这个集合中的其他一个线程/进程 (包括他自身)才能引发的事件,这种情况就是死锁。
  • 饥饿(hungry):指一个可运行的线程/进程尽管能继续执行,但由于操作系统的调度而被无限期地忽视,导致不能执行的情况。

read/write_volatile

编译器可能会对代码的内存读写操作进行优化(删除或者重排),比如写一个地址然后立刻读一个地址,编译器可能认为读这个地址一定会是前面写入的那个值,于是就把读入操作优化。但有的时候,如 I/O 外设以 MMIO 方式映射设备寄存器时,相同内存位置进行读取和写入的意义可能完全不一样,此时编译器的优化就是错误的

使用 volatile 则可保证读写不会被编译器优化

最后更新于:2025-05-02 09:12:04

Caiwen
本文作者
一只蒟蒻,爱好编程和算法