Caiwen的博客

无锁数据结构

2026-06-25 05:24

无锁指的是不用操作系统提供的互斥锁之类的,而是使用原子变量。实际上在硬件上,原子操作其实也是有锁,比如 X86 架构在进行原子操作的时候会把总线锁住。个人理解无锁指的是,不会出现一个线程拿到锁进入临界区之后,这个线程被调度出去了,此时就会出现,没拿到锁的线程需要等待这个线程释放锁,而这个线程却又因为被调度出去了导致进度一直推进不了。无锁确保进度时刻在推进。

1. 内存安全

无锁数据结构首先需要考虑的是内存安全。无锁数据结构经常会使用指针,而朴素地使用 AtomicPtr 会出现问题:

rust
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
use std::sync::atomic::{AtomicPtr, Ordering}; use std::ptr; struct Node<T> { data: T, next: *mut Node<T> } struct Stack<T> { head: AtomicPtr<Node<T>> } impl<T> Stack<T> { fn pop(&self) -> Option<T> { loop { let head = self.head.load(Ordering::Acquire); if head.is_null() { return None; } let next = unsafe { (*head).next }; if self.head.compare_exchange( head, next, Ordering::Release, Ordering::Relaxed ).is_ok() { let data = unsafe { ptr::read(&(*head).data) }; unsafe { drop(Box::from_raw(head)); } return Some(data); } } } }

可能出现这种情况,多个线程都在 let head = self.head.load(Ordering::Acquire); 这一步拿到同样的指针,然后一个线程率先经过 unsafe { drop(Box::from_raw(head)); } 将内存释放,而另一个线程跑到 let next = unsafe { (*head).next };,对已经悬垂的指针解引用。

问题的关键在于拿到指针和对指针解引用之间存在窗口,所以我们需要一种延迟回收内存机制。直接使用引用计数不太行,因为拿到指针和引用计数加一之间还是存在窗口。我们还需要考虑其他的方案。

1.1 Hazard Pointer

参考 jonhoo/haphazard: Hazard pointers in Rust.

Hazard Pointer 的大概思路是这样的。首先我们会有一个 Domain,一般是一个全局的 Domain,然后这个 Domain 里面会包含有若干个 record。当我们对一个 AtomicPtr 进行 load 的时候,会先从 Domain 中分配一个 record 出来,然后把这个 ptr 放在 record 里面,表示这个 ptr 目前正在被使用。

当我们不需要引用这个 ptr 之后会把这个 ptr 从 record 中删掉,然后把 record 交回给 Domain。

当需要释放某个 ptr 的时候,会先判断这个 ptr 是不是存在于某个 record 里,如果是的话就说明还存在引用,于是就不释放。

在 haphazard 中,既有全局的 Domain 又可以我们自行创建 Domain。我们先只考虑全局 Domain。

Domain 的结构是这样的:

rust
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
pub struct Domain<F> { hazptrs: HazPtrRecords, untagged: [RetiredList; NUM_SHARDS], family: PhantomData<F>, #[cfg(all(feature = "std", target_pointer_width = "64", not(loom)))] due_time: AtomicU64, nbulk_reclaims: AtomicUsize, count: AtomicIsize, shutdown: bool, } struct HazPtrRecords { head: AtomicPtr<HazPtrRecord>, head_available: AtomicPtr<HazPtrRecord>, count: AtomicIsize, }

Domain 维护了两个链表,都在 HazPtrRecords 中。HazPtrRecords::head 表示所有曾分配过的 record 的全量链表。HazPtrRecords::head_available 表示空闲的 record 的链表。HazPtrRecords::count 表示已经分配出去的 record 的总数。

链表的元素都是 HazPtrRecord

rust
1
2
3
4
5
pub(crate) struct HazPtrRecord { pub(crate) ptr: AtomicPtr<u8>, pub(crate) next: AtomicPtr<HazPtrRecord>, pub(crate) available_next: AtomicPtr<HazPtrRecord>, }

ptr 表示包裹的 core::sync::atomic::AtomicPtrnext 表示当前这个 record 在全量链表中下一个元素,available_next 表示在空闲链表中下一个元素(如果当前元素没有处于空闲链表中,就为空指针)。

泛型 F 表示 Family,haphazard 是这么设计的:Family 相当于一个标识,然后一个 Family 下可以有多个 Domain。Domain 维护了各种信息,所以一个 Domain 创建出来的 hazard pointer 只能在这个 Domain 上面回收。通过类型系统可以限制住,一个 Domain 创建的 hazard pointer 不能被不同的 Family 的 Domain 回收。不过类型系统并不能保证 hazard pointer 不能被同一个 Family 但是不同 Domain 回收。这一点还是需要人工去保证,这也就是后面我们会看到很多函数都是 unsafe 的原因之一。Family 设计的意义可能在于,让全局的 Domain 的很多操作不需要 unsafe。我们后面简单起见都讨论全局 Domain,于是有关泛型 F 就不再解释了。

haphazard 定义了一个自己的 AtomicPtr,来把 core::sync::atomic::AtomicPtr 包裹住。

rust
1
2
3
4
5
#[repr(transparent)] pub struct AtomicPtr<T, F = domain::Global, P = alloc::boxed::Box<T>>( crate::sync::atomic::AtomicPtr<T>, PhantomData<(F, *mut P)>, );

使用泛型 P 记录了 T 是由哪个 Box 得来的。Box 这个类型并不是唯一的,因为可能 Box 后面的内存分配器不同,我们平时都默认 Box 使用全局内存分配器所以可能没有意识到。通过泛型 P 我们可以知道在后面释放 T 的时候要通过什么内存分配器释放。

AtomicPtr 只能通过 Box 转换而来:

rust
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
impl<T, F, P> From<P> for AtomicPtr<T, F, P> where P: raw::Pointer<T>, { fn from(p: P) -> Self { Self( crate::sync::atomic::AtomicPtr::new(p.into_raw()), PhantomData, ) } } unsafe impl<T> Pointer<T> for Box<T> { fn into_raw(self) -> *mut T { Box::into_raw(self) } unsafe fn from_raw(ptr: *mut T) -> Self { unsafe { Box::from_raw(ptr) } } }

AtomicPtrload 需要一个 HazardPointerHazardPointer 相当于引用这个 AtomicPtr 的凭证。load 是 unsafe 的,需要确保 HazardPointer 在一个 Domain 创建和释放。由于我们暂时仅考虑全局 Domain,所以这个安全性不用管。

rust
1
2
3
4
5
6
// impl<T, F, P> AtomicPtr<T, F, P> pub unsafe fn load<'hp, 'd>(&'_ self, hp: &'hp mut HazardPointer<'d, F>) -> Option<&'hp T> where T: Sync + 'hp, F: 'static ...

创建 HarzardPointer 时,需要先从 Domain 中获取 HazPtrRecord

rust
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
pub struct HazardPointer<'domain, F = crate::Global> { hazard: &'domain HazPtrRecord, pub(crate) domain: &'domain Domain<F>, } // impl<'domain, F> HazardPointer<'domain, F> pub fn new_in_domain(domain: &'domain Domain<F>) -> Self { Self { hazard: domain.acquire(), domain, } } // impl HazardPointer<'static, crate::Global> pub fn new() -> Self { HazardPointer::new_in_domain(Domain::global()) } // impl<F> Domain<F> pub(crate) fn acquire(&self) -> &HazPtrRecord { self.acquire_many::<1>()[0] } pub(crate) fn acquire_many<const N: usize>(&self) -> [&HazPtrRecord; N] { let (mut head, n) = self.try_acquire_available::<N>(); let mut tail = core::ptr::null(); [(); N].map(|_| { if !head.is_null() { tail = head; // Safety: HazPtrRecords are never deallocated. let rec = unsafe { &*head }; head = rec.available_next.load(Ordering::Relaxed); rec } else { // 空闲链表不够用了,就创建一个 let rec = self.acquire_new(); if !tail.is_null() { unsafe { &*tail } .available_next .store(rec as *const _ as *mut _, Ordering::Relaxed); } tail = rec as *const _; rec } }) } fn try_acquire_available<const N: usize>(&self) -> (*const HazPtrRecord, usize) { loop { let avail = self.hazptrs.head_available.load(Ordering::Acquire); if avail.is_null() { return (avail, 0); } if (avail as usize & LOCK_BIT) == 0 { if self .hazptrs .head_available .compare_exchange_weak( avail, with_lock_bit(avail), Ordering::AcqRel, Ordering::Relaxed, ) .is_ok() { let (rec, n) = unsafe { self.try_acquire_available_locked::<N>(avail) }; return (rec, n); } else { crate::sync::yield_now(); } } } } unsafe fn try_acquire_available_locked<const N: usize>( &self, head: *const HazPtrRecord, ) -> (*const HazPtrRecord, usize) { let mut tail = head; let mut n = 1; let mut next = unsafe { &*tail }.available_next.load(Ordering::Relaxed); while !next.is_null() && n < N { tail = next; next = unsafe { &*tail }.available_next.load(Ordering::Relaxed); n += 1; } self.hazptrs.head_available.store(next, Ordering::Release); unsafe { &*tail } .available_next .store(core::ptr::null_mut(), Ordering::Relaxed); (head, n) }

try_acquire_available 函数,会在 Domain 的 head_available 中取走最多 N 个空闲的 record。

注意到,available_nextHazPtrRecord 指针,而 HazPtrRecord 是 8 字节对齐的,这意味着 available_next 的最低位是 0。于是我们就可以利用这点,将 available_next 最低位视为一个自旋锁标记位,如果为 1 则表明其他线程正在持有自旋锁,我们需要自旋等待。

我们可以看到,hazard pointer 的取 record 部分其实算不上无锁,但是其实大多数情况下都是只取一个 record,try_acquire_available 的自旋锁临界区非常短,极小概率会出现在自旋锁临界区线程被操作系统调度出去的情况。取多个 record 是 haphazard 为了实现在 Domain 中创建多个 HazardPointer 实例的接口。

try_acquire_available 的自旋重试比较有意思。如果是直接读 available_next 发现锁标识位存在那么就直接再判断一次,如果 CAS 失败会直接 yield。这么做的具体原因貌似难说。

空闲 record 链表的 push 和 pop 都是在链表的头部进行的,看起来是一个栈,可以考虑使用 Treiber's Stack 来实现无锁化。Treiber's Stack 的 pop 需要解决内存安全问题,不过我们这里的空闲 record 是不会回收的,只是单纯地把空闲的 record 的引用全部串到一起。但是最重要的是,Treiber's Stack 在解决内存安全问题的同时还把 ABA 问题给解决了,如果我们在这里使用 Treiber's Stack 而不去解决内存安全问题,就会出现 ABA 问题。而我们现在正在实现的 hazard pointer 就是来解决内存安全问题的,于是这就产生了一个依赖循环,而 hazard pointer 显然是不能这样自举的,于是最后只能采用自旋锁了。

如果空闲链表中能够提供的 record 不能满足要求,就需要创建新的 record。

rust
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
pub(crate) fn acquire_new(&self) -> &HazPtrRecord { // No free HazPtrRecords -- need to allocate a new one let hazptr = Box::into_raw(Box::new(HazPtrRecord { ptr: AtomicPtr::new(core::ptr::null_mut()), next: AtomicPtr::new(core::ptr::null_mut()), available_next: AtomicPtr::new(core::ptr::null_mut()), })); // And stick it at the head of the linked list let mut head = self.hazptrs.head.load(Ordering::Acquire); loop { // Safety: hazptr was never shared, so &mut is ok. unsafe { &mut *hazptr }.next.with_mut(|p| *p = head); match self.hazptrs.head.compare_exchange_weak( head, hazptr, // NOTE: Folly uses Release, but needs to be both for the load on success. Ordering::AcqRel, Ordering::Acquire, ) { Ok(_) => { // NOTE: Folly uses SeqCst because it's the default, not clear if // necessary. self.hazptrs.count.fetch_add(1, Ordering::SeqCst); // Safety: HazPtrRecords are never de-allocated while the domain lives. break unsafe { &*hazptr }; } Err(head_now) => { // Head has changed, try again with that as our next ptr. head = head_now } } } }

这里是比较经典的单链表头插。不过我认为内存序选择的比较保守。load 可以选择 Relaxed,compare_exchange_weak 成功时可以选择 Release 确保对要插的节点的 next 指针修改可见,失败时可以选择 Relaxed,fetch_add 可以选择 Relaxed。

有了 HazardPointer 之后我们先看 retire 操作。retire 表示我要释放某个指针指向的内存了。至于什么时候释放由 hazard pointer 内部的机制决定。

rust
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
// impl<T, P> AtomicPtr<T, raw::families::Global, P> pub unsafe fn retire(self) -> usize where T: Send, { unsafe { self.retire_in(Domain::global()) } } // impl<T, F, P> AtomicPtr<T, F, P> pub unsafe fn retire_in(self, domain: &Domain<F>) -> usize where T: Send, { let ptr = self.into_inner(); unsafe { domain.retire_ptr::<T, P>(ptr) } } // impl<F> Domain<F> pub unsafe fn retire_ptr<T, P>(&self, ptr: *mut T) -> usize where T: Send, P: Pointer<T>, { let retired = Box::new(unsafe { Retired::new(self, ptr, |ptr: *mut dyn Reclaim| { let _ = P::from_raw(ptr as *mut T); }) }); self.push_list(retired) } fn push_list(&self, mut retired: Box<Retired>) -> usize { crate::asymmetric_light_barrier(); let retired = Box::into_raw(retired); unsafe { self.untagged[Self::calc_shard(retired)].push(retired, retired) }; self.count.fetch_add(1, Ordering::Release); se } fn check_threshold_and_reclaim(&self) -> usize { #[allow(unused_mut)] let mut rcount = self.check_count_threshold(); if rcount == 0 { rcount = self.check_due_time(); if rcount == 0 { return 0; } } self.nbulk_reclaims.fetch_add(1, Ordering::Acquire); self.do_reclamation(rcount) } fn check_count_threshold(&self) -> isize { let mut rcount = self.count.load(Ordering::Acquire); while rcount > self.threshold() { match self .count .compare_exchange_weak(rcount, 0, Ordering::AcqRel, Ordering::Relaxed) { Ok(_) => { self.due_time .store(Self::now() + SYNC_TIME_PERIOD, Ordering::Release); return rcount; } Err(rcount_now) => rcount = rcount_now, } } 0 } fn threshold(&self) -> isize { RCOUNT_THRESHOLD.max(HCOUNT_MULTIPLIER * self.hazptrs.count.load(Ordering::Acquire)) } fn check_due_time(&self) -> isize { let time = Self::now(); let due = self.due_time.load(Ordering::Acquire); if time < due || self .due_time .compare_exchange( due, time + SYNC_TIME_PERIOD, Ordering::AcqRel, Ordering::Relaxed, ) .is_err() { // Not yet due, or someone else noticed we were due already. return 0; } self.count.swap(0, Ordering::AcqRel) }

ptr 可以被安全释放的时机比较难以确定(注意引用计数的方案在引出内存安全问题的时候已经被否了)。所以我们要把释放 ptr 的请求给挂起来,这对应于上面代码中将要 retire 的指针转为 Retired 然后挂在 Domain 的 untagged 上面。然后还需要在合适的时机触发内存回收。内存回收时需要遍历所有的 record 和 Retired 来判断哪些 Retired 对应的指针没有出现在任何 record 中了,然后就可以安全释放这个指针。

显然一次内存回收的开销是很大的。于是我们有这样一个策略:self.count 记录的就是当前尚未处理完的 Retired 的数量,如果想要进行内存回收,需要满足以下两种条件之一:

  • 距离上一次内存回收的时间比较久

  • 当前挤压的 Retired 数量比较多。

    首先这个数量必须超过 RCOUNT_THRESHOLD,因为内存回收是有固定成本的,数量超过这个才算比较划算,这个下限也使得在 Retired 数量比较少的时候不会频繁触发内存回收。

    其次这个数量还必须超过 HCOUNT_MULTIPLIER * self.hazptrs.count,因为内存回收的时间开销会随着 self.hazptrs.count 的增加而线性增加,该限制可以确保均摊下来,单个指针内存回收的开销是 O(1)O(1) 的。同时,假如说我们有 xxRetiredyy 个已经分配出去的 record,最坏情况下,xx 里面有 yy 个指针都在 record 里面挂着,导致实际只能回收 xyx - y 个指针。为了确保实际回收指针数量也是要跟 self.hazptrs.count 成正比,以将均摊复杂度变为 O(1)O(1),这里乘了个 HCOUNT_MULTIPLIER,且 HCOUNT_MULTIPLIER 要大于 11,一般为 22

上述代码有关计数相关的原子操作我认为都是可以 Relaxed 的,haphazard 这个库的内存序比较保守。

每次执行 retire 操作,除了会把 Retired 挂在 Domain 上,还会判断是否满足内存回收条件,满足的话就执行内存回收。

内存回收的代码:

rust
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
fn do_reclamation(&self, mut rcount: isize) -> usize { let mut total_reclaimed = 0; loop { let mut done = true; let mut stolen_heads = [core::ptr::null_mut(); NUM_SHARDS]; let mut empty = true; for (stolen_head, untagged) in stolen_heads.iter_mut().zip(&self.untagged) { *stolen_head = untagged.pop_all(); if !stolen_head.is_null() { empty = false; } } if !empty { crate::asymmetric_heavy_barrier(crate::HeavyBarrierKind::Expedited); let mut guarded_ptrs = BTreeSet::new(); let mut node = self.hazptrs.head.load(Ordering::Acquire); while !node.is_null() { // Safety: HazPtrRecords are never de-allocated while the domain lives. let n = unsafe { &*node }; guarded_ptrs.insert(n.ptr.load(Ordering::Acquire)); node = n.next.load(Ordering::Relaxed); } let (nreclaimed, is_done) = self.match_reclaim_untagged(stolen_heads, &guarded_ptrs); done = is_done; rcount -= nreclaimed as isize; total_reclaimed += nreclaimed; } if rcount != 0 { self.count.fetch_add(rcount, Ordering::Release); } rcount = self.check_count_threshold(); if rcount == 0 && done { break; } } self.nbulk_reclaims.fetch_sub(1, Ordering::Acquire); total_reclaimed } fn match_reclaim_untagged( &self, stolen_heads: [*mut Retired; NUM_SHARDS], guarded_ptrs: &BTreeSet<*mut u8>, ) -> (usize, bool) { let mut unreclaimed = core::ptr::null_mut(); let mut unreclaimed_tail = unreclaimed; let mut nreclaimed = 0; for mut node in stolen_heads { let mut reclaimable = core::ptr::null_mut(); while !node.is_null() { let n = unsafe { &*node }; let next = n.next.load(Ordering::Relaxed); if !guarded_ptrs.contains(&(n.ptr as *mut u8)) { // No longer guarded -- safe to reclaim. n.next.store(reclaimable, Ordering::Relaxed); reclaimable = node; nreclaimed += 1; } else { // Not safe to reclaim -- still guarded. n.next.store(unreclaimed, Ordering::Relaxed); unreclaimed = node; if unreclaimed_tail.is_null() { unreclaimed_tail = unreclaimed; } } node = next; } unsafe { self.reclaim_unprotected(reclaimable) }; } let done = self.untagged.iter().all(|u| u.is_empty()); unsafe { self.untagged[0].push(unreclaimed, unreclaimed_tail) }; (nreclaimed, done) }

do_reclamation 传递的 rcount 表示计划处理的 Retired 个数。该函数首先会收集各个分片链表上的 Retired,然后再遍历所有的 record,将 record 中出现的指针全部放入 BTreeSet 中。

再调用 match_reclaim_untagged,该函数会检查收集到的 Retired 的指针是否出现在 BTreeSet 中,是的话就说明这个指针还存在引用,不回收,将 Retired 放到 unreclaimed 链表中,不是的话就将其放在 reclaimable 中,最后调用 reclaim_unprotected 进行回收。函数的最后会把 unreclaimed 再放回 untagged[0] 上面。match_reclaim_untagged 会返回实际回收了多少个指针,同时还返回一个 done 标记,表示在收集完 Retiredmatch_reclaim_untagged 结束中间是否又来了新的 retired 任务。

可能会进行好几轮的清理。每轮清理结束之后,都会先 rcount -= nreclaimed,再 self.count.fetch_add(rcount, Ordering::Release); ,这确保,如果我实际上处理的 Retiredrcount 多,就会再从 count 中把多出来的扣掉,如果比 rcount 少,就会把多余没处理的加回 count 中。

然后还会再通过 check_count_threshold 来更新 rcount,如果 rcount 不为 0 ,说明又超过阈值了,需要再来一轮。如果 done 不为 true,说明中间又来了一个 retire 操作,需要再来一轮。

我个人认为 done 标记是没必要的。如果中间持续来了零星的 retire 操作,这里的 done 就会一直为 false,然后循环反复进行。haphazard 这么做的原因不明。

rust
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// impl<F> Domain<F> pub unsafe fn retire_ptr<T, P>(&self, ptr: *mut T) -> usize where T: Send, P: Pointer<T>, { let retired = Box::new(unsafe { Retired::new(self, ptr, |ptr: *mut dyn Reclaim| { let _ = P::from_raw(ptr as *mut T); }) }); self.push_list(retired) } pub trait Reclaim {} impl<T> Reclaim for T {} impl Retired { unsafe fn new<'domain, F>( _: &'domain Domain<F>, ptr: *mut (dyn Reclaim + 'domain), deleter: unsafe fn(ptr: *mut dyn Reclaim), ) -> Self { Retired { ptr: unsafe { core::mem::transmute::<*mut dyn Reclaim, *mut (dyn Reclaim + 'static)>(ptr) }, deleter, next: AtomicPtr::new(core::ptr::null_mut()), } } } unsafe fn reclaim_unprotected(&self, mut retired: *mut Retired) { while !retired.is_null() { let next = unsafe { &*retired }.next.load(Ordering::Relaxed); let n = unsafe { Box::from_raw(retired) }; unsafe { (n.deleter)(n.ptr) }; retired = next; } }

AtomicPtr 可能指向各种类型,每种类型都有自己的析构函数,所以我们需要一点技巧来做到在最后回收内存的时候能够适配各种类型的析构函数。

首先需要把指针的类型进行擦除,定义了一个 Reclaim Trait,并对所有类型都实现这个 Trait,从而所有类型都可以转为 dyn Reclaim。然后使用 core::mem::transmute,将生命周期也给擦除掉。然后 Retired::new 里面有一个 deleter 参数,表示析构函数的指针,通过传递执行 let _ = P::from_raw(ptr as *mut T); 的闭包做到了类似析构函数的效果。在 reclaim_unprotected 里面调用 unsafe { (n.deleter)(n.ptr) };

然后再来看 load 操作的实现:

rust
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
// impl<T, F, P> AtomicPtr<T, F, P> pub unsafe fn load<'hp, 'd>(&'_ self, hp: &'hp mut HazardPointer<'d, F>) -> Option<&'hp T> where T: Sync + 'hp, F: 'static, { unsafe { hp.protect(&self.0) } } // impl<'domain, F> HazardPointer<'domain, F> pub unsafe fn protect<'l, T>(&'l mut self, src: &'_ AtomicPtr<T>) -> Option<&'l T> where T: Sync, F: 'static, { let (ptr, _proof): (_, PhantomData<&'l T>) = self.protect_ptr(src)?; Some(unsafe { ptr.as_ref() }) } pub fn protect_ptr<'l, T>( &'l mut self, src: &'_ AtomicPtr<T>, ) -> Option<(NonNull<T>, PhantomData<&'l T>)> where F: 'static, { let mut ptr = src.load(Ordering::Relaxed); loop { // Safety: same safety requirements as try_protect. match self.try_protect_ptr(ptr, src) { Ok(None) => break None, Ok(Some((ptr, _h))) => { break Some((ptr, PhantomData)); } Err(ptr2) => { ptr = ptr2; } } } } #[allow(clippy::type_complexity)] pub fn try_protect_ptr<'l, T>( &'l mut self, ptr: *mut T, src: &'_ AtomicPtr<T>, ) -> Result<Option<(NonNull<T>, PhantomData<&'l T>)>, *mut T> where F: 'static, { self.hazard.protect(ptr as *mut u8); crate::asymmetric_light_barrier(); let ptr2 = src.load(Ordering::Acquire); if !core::ptr::eq(ptr, ptr2) { self.hazard.reset(); Err(ptr2) } else { // All good -- protected Ok(core::ptr::NonNull::new(ptr).map(|ptr| (ptr, PhantomData))) } } // impl HazPtrRecord pub(crate) fn reset(&self) { self.ptr.store(core::ptr::null_mut(), Ordering::Release); } pub(crate) fn protect(&self, ptr: *mut u8) { self.ptr.store(ptr, Ordering::Release); } impl<F> Drop for HazardPointer<'_, F> { fn drop(&mut self) { self.hazard.reset(); self.domain.release(self.hazard); } }

整个代码的核心都在这了:

rust
1
2
3
4
5
6
7
8
9
10
self.hazard.protect(ptr as *mut u8); crate::asymmetric_light_barrier(); let ptr2 = src.load(Ordering::Acquire); if !core::ptr::eq(ptr, ptr2) { self.hazard.reset(); Err(ptr2) } else { // All good -- protected Ok(core::ptr::NonNull::new(ptr).map(|ptr| (ptr, PhantomData))) }

其中 crate::asymmetric_light_barrier(); 是 SeqCst Fence。这里的道理需要结合回收时的代码来看:

rust
1
2
3
4
5
6
7
8
9
10
11
... crate::asymmetric_heavy_barrier(crate::HeavyBarrierKind::Expedited); let mut guarded_ptrs = BTreeSet::new(); let mut node = self.hazptrs.head.load(Ordering::Acquire); while !node.is_null() { // Safety: HazPtrRecords are never de-allocated while the domain lives. let n = unsafe { &*node }; guarded_ptrs.insert(n.ptr.load(Ordering::Acquire)); node = n.next.load(Ordering::Relaxed); } ...

整个 hazard pointer 的时序可以概括如下:

Unknown
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
reader ... [try_protect_ptr] (a) store(release) ptr to hazard record (b) seqcst (c) load(acquire) ptr 复核 writer ... (d) get old ptr [push_list] (e) seqcst (f) push old ptr to list reclaimer [do_reclaimation] (g) pop all old ptr from list (h) seqcst (i) scan all hazard records (j) recalim
  • reader 的 seqcst 有两个作用:
    • 让 ptr 放入 record 这个行为全局可见
    • 阻止了后面 load 复核与前面的 store 的重排
  • writer 的 seqcst 是为了让 AtomicPtr 的最新状态全局可见。注意,尽管 push_list 是在 old ptr 上面进行 retire 操作的时候执行的,这里的 seqcst 仍然可以反应 AtomicPtr 的最新状态。这是因为一个因果链条,只要你拿到了一个旧的 ptr,就一定说明是有了新的 ptr 才使得这个 ptr 变旧,于是这个旧的 ptr 就反映出来新的 ptr 了。
  • reclaimer 的 seqcst 是为了得到所有 hazard record 的最新状态,和 (b) 同步:
    • 只要 (b) 先于 (h) 执行,那么 (i) 在扫描所有 hazard 的 record 的时候就会得知指针被保护了
    • 如果 (a) 之前或者 (a) (b) 之间已经执行了 (i) 了,说明 (g) 已经执行,说明 (f) (e) (d) 都已经执行,所以再次复核时,通过 (b) 一定可以和 (e) 这里同步,使得 (c) 复核失败

1.2 EBR

参考 crossbeam-epoch。

ebr 有一个 collector 的概念,相当于 hazard pointer 里的 domain。collector 可以自己创建。默认时有一个全局 collector:

rust
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/// An epoch-based garbage collector. pub struct Collector { pub(crate) global: Arc<Global>, } /// The global data for a garbage collector. pub(crate) struct Global { locals: List<Local>, queue: Queue<SealedBag>, pub(crate) epoch: CachePadded<AtomicEpoch>, } fn collector() -> &'static Collector { /// The global data for the default garbage collector. static COLLECTOR: OnceLock<Collector> = OnceLock::new(); COLLECTOR.get_or_init(Collector::new) } // impl Collector pub fn new() -> Self { Self::default() } impl Default for Collector { fn default() -> Self { Self { global: Arc::new(Global::new()), } } } // impl Global pub(crate) fn new() -> Self { Self { locals: List::new(), queue: Queue::new(), epoch: CachePadded::new(AtomicEpoch::new(Epoch::starting())), } }

其中的 locals 是无锁的链表,queue 是无锁队列。这两个无锁数据结构也会面临着内存回收的问题,解决方案也是当前的这个 EBR。于是我们看到,我们实现 EBR 需要无锁数据结构,而无锁数据结构的实现又需要 EBR。这看似是循环依赖,但是 EBR 的回收机制可以让这个循环收敛,EBR 是可以自举的。

一个 collector 下面有若干个 local,每个线程持有一个 local。默认时每个线程的 thread local 中都有一个 local,对应于全局的 collector:

rust
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
thread_local! { /// The per-thread participant for the default garbage collector. static HANDLE: LocalHandle = collector().register(); } /// A handle to a garbage collector. pub struct LocalHandle { pub(crate) local: *const Local, } impl Collector { /// Registers a new handle for the collector. pub fn register(&self) -> LocalHandle { Local::register(self) } } /// Participant for garbage collection. #[repr(C)] // Note: `entry` must be the first field pub(crate) struct Local { entry: Entry, collector: UnsafeCell<ManuallyDrop<Collector>>, pub(crate) bag: UnsafeCell<Bag>, guard_count: Cell<usize>, handle_count: Cell<usize>, pin_count: Cell<Wrapping<usize>>, epoch: CachePadded<AtomicEpoch>, } // impl Local pub(crate) fn register(collector: &Collector) -> LocalHandle { unsafe { // Since we dereference no pointers in this block, it is safe to use `unprotected`. let local = Owned::new(Self { entry: Entry::default(), collector: UnsafeCell::new(ManuallyDrop::new(collector.clone())), bag: UnsafeCell::new(Bag::new()), guard_count: Cell::new(0), handle_count: Cell::new(1), pin_count: Cell::new(Wrapping(0)), epoch: CachePadded::new(AtomicEpoch::new(Epoch::starting())), }) .into_shared(unprotected()); collector.global.locals.insert(local, unprotected()); LocalHandle { local: local.as_raw(), } } }

crossbeam 自己搞了个 Owned 类型,和 Box 差不多,在 Box 的基础上进行了一些扩展。

crossbeam 将 AtomicPtr 包裹成 Atomic 类型:

rust
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
pub struct Atomic<T: ?Sized + Pointable> { data: AtomicPtr<()>, _marker: PhantomData<*mut T>, } // impl<T> Atomic<T> pub fn new(init: T) -> Self { Self::init(init) } // impl<T: ?Sized + Pointable> Atomic<T> pub fn init(init: T::Init) -> Self { Self::from(Owned::init(init)) } impl<T: ?Sized + Pointable> From<Owned<T>> for Atomic<T> { fn from(owned: Owned<T>) -> Self { let data = owned.data; mem::forget(owned); Self::from_ptr(data) } }

Atomic 中进行设计到读的动作的时候,比如 load 操作,需要传递 guard,然后往往会返回一个 Shared 类型:

rust
1
2
3
4
// impl<T: ?Sized + Pointable> Atomic<T> pub fn load<'g>(&self, order: Ordering, _: &'g Guard) -> Shared<'g, T> { unsafe { Shared::from_ptr(self.data.load(order)) } }

其中 Shared 类型没啥特别的,只是单纯地对指针进行了包装,给指针加了个和 guard 相同的生命周期。

guard 需要在 Local 上进行 pin 操作才能得到。比如我们可以对当前线程进行 pin 操作:

rust
1
2
use crossbeam_epoch::self as epoch; let guard = &epoch::pin();

Share 必须在 guard 的生命周期内,即线程被 pin 住的期间内访问。当 guard 被 drop 掉的时候当前线程就 unpin 了。

具体过程:

rust
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
pub struct Guard { pub(crate) local: *const Local, } #[inline] pub unsafe fn unprotected() -> &'static Guard { struct GuardWrapper(Guard); unsafe impl Sync for GuardWrapper {} static UNPROTECTED: GuardWrapper = GuardWrapper(Guard { local: core::ptr::null(), }); &UNPROTECTED.0 } /// Pins the current thread. #[inline] pub fn pin() -> Guard { with_handle(|handle| handle.pin()) } #[inline] fn with_handle<F, R>(mut f: F) -> R where F: FnMut(&LocalHandle) -> R, { HANDLE .try_with(|h| f(h)) .unwrap_or_else(|_| f(&collector().register())) } // impl LocalHandle #[inline] pub fn pin(&self) -> Guard { unsafe { (*self.local).pin() } } // impl Local #[inline] pub(crate) fn global(&self) -> &Global { &self.collector().global } #[inline] pub(crate) fn pin(&self) -> Guard { let guard = Guard { local: self }; let guard_count = self.guard_count.get(); self.guard_count.set(guard_count.checked_add(1).unwrap()); if guard_count == 0 { let global_epoch = self.global().epoch.load(Ordering::Relaxed); let new_epoch = global_epoch.pinned(); self.epoch.store(new_epoch, Ordering::Relaxed); atomic::fence(Ordering::SeqCst); let count = self.pin_count.get(); self.pin_count.set(count + Wrapping(1)); if count.0 % Self::PINNINGS_BETWEEN_COLLECT == 0 { self.global().collect(&guard); } } guard }

local 维护了一个 guard_count,这个可以确保当前线程只会被 pin 一次,重复 pin 的话只会增加 guard_count,有种“可重入”的感觉。

每个线程都有自己的 epoch,然后还会有一个全局的 epoch。

rust
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#[derive(Default, Debug)] pub(crate) struct AtomicEpoch { data: AtomicEpochRepr, } #[cfg(target_has_atomic = "64")] type AtomicEpochRepr = crate::primitive::sync::atomic::AtomicU64; // impl AtomicEpoch #[inline] pub(crate) fn load(&self, ord: Ordering) -> Epoch { Epoch { data: self.data.load(ord), } } #[derive(Copy, Clone, Default, Debug, Eq, PartialEq)] pub(crate) struct Epoch { data: EpochRepr, } #[cfg(target_has_atomic = "64")] type EpochRepr = u64; // impl Epoch #[inline] pub(crate) fn pinned(self) -> Self { Self { data: self.data | 1, } }

epoch 的最低位表示当前线程是否被 pin 住了。我们后面再说 epoch 的时候说的是去掉最低位后表示的数字。

pin 操作时,当前线程会把全局的 epoch 作为当前进程的 epoch。

当我们要析构一个指针的时候,把 Shared 传入 guarddefer_destroy 函数。

rust
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// impl Guard pub unsafe fn defer_destroy<T: ?Sized + Pointable>(&self, ptr: Shared<'_, T>) { unsafe { self.defer_unchecked(move || ptr.into_owned()) } } pub unsafe fn defer_unchecked<F, R>(&self, f: F) where F: FnOnce() -> R, { unsafe { if let Some(local) = self.local.as_ref() { local.defer(Deferred::new(move || drop(f())), self); } else { drop(f()); } } }

其中这个 Deferred 也比较有意思:

rust
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
const DATA_WORDS: usize = 3; type Data = [usize; DATA_WORDS]; pub(crate) struct Deferred { call: unsafe fn(*mut u8), data: MaybeUninit<Data>, _marker: PhantomData<*mut ()>, // !Send + !Sync } impl Deferred { /// Constructs a new `Deferred` from a `FnOnce()`. pub(crate) fn new<F: FnOnce()>(f: F) -> Self { let size = mem::size_of::<F>(); let align = mem::align_of::<F>(); unsafe { if size <= mem::size_of::<Data>() && align <= mem::align_of::<Data>() { let mut data = MaybeUninit::<Data>::uninit(); ptr::write(data.as_mut_ptr().cast::<F>(), f); unsafe fn call<F: FnOnce()>(raw: *mut u8) { let f: F = unsafe { ptr::read(raw.cast::<F>()) }; f(); } Self { call: call::<F>, data, _marker: PhantomData, } } else { let b: Box<F> = Box::new(f); let mut data = MaybeUninit::<Data>::uninit(); ptr::write(data.as_mut_ptr().cast::<Box<F>>(), b); unsafe fn call<F: FnOnce()>(raw: *mut u8) { // It's safe to cast `raw` from `*mut u8` to `*mut Box<F>`, because `raw` is // originally derived from `*mut Box<F>`. let b: Box<F> = unsafe { ptr::read(raw.cast::<Box<F>>()) }; (*b)(); } Self { call: call::<F>, data, _marker: PhantomData, } } } } /// Calls the function. #[inline] pub(crate) fn call(mut self) { let call = self.call; unsafe { call(self.data.as_mut_ptr().cast::<u8>()) }; } }

Deferred 相当于是闭包的一个包装。由于闭包的大小是不确定的,一般是需要放在堆上,然后存一个指向堆上的指针,即用 Box 包装。但是直接这么做的话,每次析构一个指针都要向堆申请一块内存,并且执行闭包的时候都要通过指针间接访问,这样效率比较低。Deferred 在结构体里面开了一个 data 字段,如果闭包大小小于等于 data,那么就直接把闭包存到这个字段的空间里面,就不用申请堆内存了。如果闭包大小大于 data,那么就还按照老办法放到堆里面。

rust
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
pub(crate) unsafe fn defer(&self, mut deferred: Deferred, guard: &Guard) { let bag = self.bag.with_mut(|b| unsafe { &mut *b }); while let Err(d) = unsafe { bag.try_push(deferred) } { self.global().push_bag(bag, guard); deferred = d; } } pub(crate) struct Bag { /// Stashed objects. deferreds: [Deferred; MAX_OBJECTS], len: usize, } // impl Bag pub(crate) unsafe fn try_push(&mut self, deferred: Deferred) -> Result<(), Deferred> { if self.len < MAX_OBJECTS { self.deferreds[self.len] = deferred; self.len += 1; Ok(()) } else { Err(deferred) } } // impl Global pub(crate) fn push_bag(&self, bag: &mut Bag, guard: &Guard) { let bag = mem::replace(bag, Bag::new()); atomic::fence(Ordering::SeqCst); let epoch = self.epoch.load(Ordering::Relaxed); self.queue.push(bag.seal(epoch), guard); } fn seal(self, epoch: Epoch) -> SealedBag { SealedBag { epoch, _bag: self } }

Deferred 可以是理解为释放指针的请求,这个请求要等到不存在其他线程引用要释放的指针了,才可以被处理掉。

Deferred 首先会被放入当前线程下(也就是 local)的 Bag 里,这样可以避免减小对全局变量的竞争。如果当前线程的 Bag 满了,就会给 Bag 打上当前全局 epoch,变成 SealedBag,挂到全局的队列中。

当 collector 被 drop ,或是一个线程每执行 PINNINGS_BETWEEN_COLLECT 次 pin 操作,就会触发 collect 操作:

rust
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
// impl Global pub(crate) fn collect(&self, guard: &Guard) { let global_epoch = self.try_advance(guard); let steps = Self::COLLECT_STEPS; for _ in 0..steps { match self.queue.try_pop_if( |sealed_bag: &SealedBag| sealed_bag.is_expired(global_epoch), guard, ) { None => break, Some(sealed_bag) => drop(sealed_bag), } } } pub(crate) fn try_advance(&self, guard: &Guard) -> Epoch { let global_epoch = self.epoch.load(Ordering::Relaxed); atomic::fence(Ordering::SeqCst); for local in self.locals.iter(guard) { match local { Err(IterError::Stalled) => { return global_epoch; } Ok(local) => { let local_epoch = local.epoch.load(Ordering::Relaxed); if local_epoch.is_pinned() && local_epoch.unpinned() != global_epoch { return global_epoch; } } } } atomic::fence(Ordering::Acquire); let new_epoch = global_epoch.successor(); self.epoch.store(new_epoch, Ordering::Release); new_epoch } // impl Epoch #[inline] pub(crate) fn successor(self) -> Self { Self { // 加了 2,所以去掉最低位的 pin 位后 epoch 实际上是加了 1 data: self.data.wrapping_add(2), } } impl SealedBag { fn is_expired(&self, global_epoch: Epoch) -> bool { // 这里相减是去掉最低位的 pin 位后的相减 global_epoch.wrapping_sub(self.epoch) >= 2 } }

每次 collect 的时候会先尝试推进全局 epoch。必须确保当前所有被 pin 住的线程的 epoch 和当前的全局 epoch 相等(也就是所有线程都是 pin 在了当前这个全局 epoch 下),才会向前推进。全局 epoch 每次推进会加 1。

这个推进规则可以确保当前所有被 pin 住的线程的 epoch 等于全局 epoch 或是比全局 epoch 小 1。

尝试推进全局 epoch 之后会进行 COLLECT_STEPS 次从队列中取满足条件的 SealedBag 的操作。如果 SealedBag 上面的 epoch 和全局 epoch 相差大于等于 2,就说明这个 Bag 内的所有 Deferred 都可以被执行了。

这个的道理在于,析构指针的时候当前线程肯定是被 pin 住的。如果这个指针同时还在被其他线程所引用,那么其他线程也是被 pin 住的。这两个线程可能 pin 在不同的 epoch 上,但是全局 epoch 一定是等于这两个线程的 epoch 最大值或者是比最大值还要大 1。析构的请求在变为 SealedBag 推入全局队列的时候,打上的 epoch 是当前全局 epoch。而当前的全局 epoch 如果要再往前推进两个 epoch 的话,必然是要这两个线程全都 unpin,否则这两个线程的 pin 状态会牵制住全局 epoch 的前进。

然后我们考虑一下内存序问题,整个 EBR 的过程可以简要总结如下:

rust
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
reader [pin] (a) load(relaxed) global epoch (b) set(relaxed) local epoch (c) seqcst (d) load ptr [unpin] (e) unpin(release) writer [pin] (f) load(relaxed) global epoch (g) set(relaxed) local epoch (h) seqcst (i) get old ptr [push_bag] (j) seqcst (k) load(relaxed) global epoch and push deferred reclaimer [try_advance] (l) load(relaxed) global epoch (m) seqcst (n) foreach(relexed) local (o) acquire (p) update(release) global epoch
  • reader 读全局 epoch 是 relaxed 的,所以可能读到的偏小,但是偏小只会让全局 epoch 无法再向前推进,内存回收进度放缓,对 EBR 的正确性没有影响。reclaimer 那里读全局 epoch 也是同样的道理。

  • reader 的 seqcst 有两个作用:

    • 让当前线程被 pin 住的这个状态立刻全局可见
    • 防止后续对 Atomic 的操作被重排到设置线程 pin 状态之前
  • unpin 的时候使用 release ,是为了防止前面的对 Atomic 的操作被重排到 unpin 之后

  • writer 在 (j) 那里有一个 seqcst,是为了读到当前最新的全局 epoch。

  • reclaimer 的 seqcst 可以和 (c) 那里形成同步,得知各个线程最新的 pin 情况

    • 只要 (c) 先于 (m),那么 reclaimer 肯定就会知道这个 ptr 已经有引用了,于是就起到了保护 ptr 的作用。

    • 如果 (m) 先于 (c),则有可能看到 reader 还是 unpin 状态或是 pin 在了一个旧 epoch 上。

      对于后者,全局 epoch 无法推进,不会有什么影响。

      对于前者,假如 writer 当前 pin 在 epoch xx,在把 SealedBag 推入全局队列的时候,打上的 epoch 为当前线程看到的全局 epoch,为 xx 或者是 x+1x+1。reader 在析构指针前拿到指针的引用,其 pin 到的 epoch 不会超过 SealedBag 上面打上的 epoch。即使全局 epoch 因为看到 reader 还是 unpin 状态从而向前推进了,那也只会使得新的全局 epoch 比 SealedBag 上的 epoch 大 1。reader 后面还是执行了 seqcst,全局 epoch 再次向前推进的时候就一定能看到 reader 的 pin 状态了。

  • reclaimer 在最后还有个 acquire fence 和 release store,这个的原因暂时不理解。我认为是没必要的。


hazard pointer 每次保护一个指针,都需要经过 load + seqcst + load 复核 这个过程耗时比较大。而 ebr 在 pin 完一次之后就可以随意安全地访问各种原子变量。因此 ebr 的吞吐量会大一点。

但是 hazard pointer 的回收策略保证当要析构的指针太多,或是距离上一次回收时间过长时,会执行回收。而 ebr 则可能出现,如果一个线程 pin 住的时间过长,迟迟不 unpin 的话,那么回收的进度就迟迟无法推进,一个线程可能拖累全局的回收。

总的来说是,追求高吞吐量就选 ebr,追求内存得到控制就选 hazard pointer。

2. 数据结构

2.1 Treiber‘s Stack

rust
1
2
3
pub struct Stack<T> { head: Atomic<Node<T>>, }

其中 Atomiccrossbeam_epoch::Atomic,使用 EBR 来确保内存安全。

head 指向栈顶。

rust
1
2
3
4
struct Node<T> { data: MaybeUninit<T>, next: *const Node<T>, }

dataMaybeUninit<T> 包裹。core::mem::MaybeUninit 表示这块内存可能是没初始化的。后续 pop 操作会把 data 的所有权拿走,拿走之后 T 在逻辑上就处于未初始化状态。MaybeUninit 在 drop 的时候不会把包裹的 T 也给 drop 掉,这样可以防止我们已经把 T 拿走了结果后面 Node 在 drop 掉的时候把 T drop 掉。使用 ManuallyDrop 也可以做到同样的效果。

rust
1
2
unsafe impl<T: Send> Send for Stack<T> {} unsafe impl<T: Send> Sync for Stack<T> {}

只要 T 可以在线程之间转移,那么 Stack 就是既可以 Send 又可以 Sync 的,因为 pushpop 会让 T 在线程间转移。T 不需要 Sync,因为 Stack 的语义是,push 进去的元素,在 pop 出来之前不会被任何人读。

rust
1
2
3
4
5
pub fn new() -> Stack<T> { Self { head: Atomic::null(), } }

push 操作:

rust
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/// Pushes a value on top of the stack. pub fn push(&self, t: T) { let mut node = Owned::new(Node { data: MaybeUninit::new(t), next: ptr::null(), }); // SAFETY: We don't dereference any pointers obtained from this guard. let guard = unsafe { crossbeam_epoch::unprotected() }; let mut head = self.head.load(Relaxed, guard); loop { node.next = head.as_raw(); match self .head .compare_exchange(head, node, Release, Relaxed, guard) { Ok(_) => break, Err(e) => { head = e.current; node = e.new; } } } }
  • push 的时候不需要把当前线程 pin 住,一方面 push 操作不会析构指针,另一方面如果我们拿到的 head 被析构了,那么就说明拿到的 head 一定是被 pop 出去了,后面的 cas 不会成立。

  • 这里不会出现 ABA 问题,因为我们不会访问 headnext

  • cas 成功的使用使用 release,因为我们修改了 node.next ,需要把这个修改的信息传出去。

pop 操作:

rust
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
pub fn pop(&self) -> Option<T> { let mut guard = crossbeam_epoch::pin(); loop { let head = self.head.load(Acquire, &guard); let h = unsafe { head.as_ref() }?; let next = Shared::from(h.next); if self .head .compare_exchange(head, next, Relaxed, Relaxed, &guard) .is_ok() { let result = unsafe { h.data.assume_init_read() }; // SAFETY: `head` is unreachable, and we no longer access `head`. unsafe { guard.defer_destroy(head) }; return Some(result); } // Repin to ensure the global epoch can make progress. guard.repin(); } }
  • pop 时的 cas,无论成功还是失败,都用 relaxed 就可以。

  • pop 时把当前线程 pin 住,除了解决内存安全问题,还能解决 ABA 问题。我们读到当前的 head 之后还会读 head.next,有可能此时当前线程被切到另一个线程,另一个线程把当前 head 给 pop 掉了,然后又 push 进去一个新的节点。如果这个新的节点的指针和 之前被 pop 掉的 head 相同,那么就会出现 ABA 问题,但这是不会发生的。因为 push 的时候创建的新节点的指针不可能和前面的 head 相同,因为最开始被切走线程还持有对 head 的引用,ebr 保证这个指针不会被析构,自然再次请求内存的时候不会从内存分配器中获得和 head 同样的指针了。

  • 最后走了个 repin,防止当前线程的重试次数过多导致拖慢整体的 ebr 的垃圾回收的进度。

2.2 Michael-Scott Queue

rust
1
2
3
4
pub struct Queue<T> { head: CachePadded<Atomic<Node<T>>>, tail: CachePadded<Atomic<Node<T>>>, }

相比于栈,队列要记录两个字段。在多线程同时 push 和 pop 的时候,headtail 可能会出现 false sharing,导致没有必要的效率下降,所以需要 CachePadded 来使得两个字段独占缓存行。

rust
1
2
3
4
struct Node<T> { data: MaybeUninit<T>, next: Atomic<Node<T>>, }

栈中每个节点只有在插入的时候才会设置要插入的节点的 next 为当前的栈首节点的指针,节点插入之后其 next 字段就不会再修改了。而在队列中 push 的时候,需要对队列尾部的节点的 next 做 cas 操作,所以队列节点的 next 需要是一个原子变量。

rust
1
2
unsafe impl<T: Send> Send for Stack<T> {} unsafe impl<T: Send> Sync for Stack<T> {}

同 Treiber‘s Stack。

rust
1
2
3
4
5
6
7
8
9
10
11
12
pub fn new() -> Self { let sentinel = Box::into_raw(Box::new(Node { data: MaybeUninit::uninit(), next: Atomic::null(), })) .cast_const(); Self { head: CachePadded::new(sentinel.into()), tail: CachePadded::new(sentinel.into()), } }

新建队列的时候,需要给队列中放置一个哨兵节点,这样可以统一后面的 push 和 pop 操作,无需特判 head 为 null。

push 操作:

rust
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
pub fn push(&self, t: T, guard: &mut Guard) { let mut new = Owned::new(Node { data: MaybeUninit::new(t), next: Atomic::null(), }); loop { // We push onto the tail, so we'll start optimistically by looking there first. let tail = self.tail.load(Acquire, guard); // Attempt to push onto the `tail` snapshot; fails if `tail.next` has changed. let tail_ref = unsafe { tail.deref() }; let next = tail_ref.next.load(Acquire, guard); // If `tail` is not the actual tail, try to "help" by moving the tail pointer forward. if !next.is_null() { let _ = self .tail .compare_exchange(tail, next, Release, Relaxed, guard); continue; } // looks like the actual tail; attempt to link at `tail.next`. match tail_ref .next .compare_exchange(Shared::null(), new, Release, Relaxed, guard) { Ok(new) => { // try to move the tail pointer forward. let _ = self .tail .compare_exchange(tail, new, Release, Relaxed, guard); break; } Err(e) => new = e.new, } guard.repin(); } }

首先取下尾部节点,然后尝试把新节点给插入到尾部。

acquire load 读队列的 tail 有可能会读到旧的 tail,同时 tail.next 可能也会读到旧值,比如读到 null,但是在后面进行 cas 的时候一定会看到最新的值,于是会再重试一遍。

将新节点插入到尾部,然后还会再更新队列的 tail。有可能在这两个操作的中间,又有线程进行 push 操作了,那么就会读到旧的 tail,但是再读 tail.next 的时候会发现不为 null。这种情况说明当前 CPU 还没看到最新的 tail,于是就会尝试把队列的 tail 更新为线程读到的 tail.next。当然这个更新操作有可能失败,因为可能别的线程抢先一步更新了或是怎么样,但无论更新成功还是失败,都说明 tail 发生了更新。

在循环开始尝试帮 tail 更新时,cas 在成功时使用了一个 release 的内存序,这个内存序看似没必要,因为前面没有 store 操作,没什么需要同步出去的。而实际上,这里的 release 的作用是,把 let next = tail_ref.next.load(Acquire, guard); 这一步,acquire 过来的视图再给发出去。acquire 得到的视图中,next 指向的内存是准备好的,可以直接解引用。cas 时的 release 可以把这个视图进一步通过更新后的 self.tail 更新出去。这样确保其他线程进行 let tail = self.tail.load(Acquire, guard); 后,可以拿到这个视图,进而后面直接对 tail 解引用是安全的。

pop 操作:

rust
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
pub fn try_pop(&self, guard: &mut Guard) -> Option<T> { loop { let head = self.head.load(Acquire, guard); let next = unsafe { head.deref() }.next.load(Acquire, guard); let next_ref = unsafe { next.as_ref() }?; // Moves `tail` if it's stale. Relaxed load is enough because if tail == head, then the // messages for that node are already acquired. let tail = self.tail.load(Relaxed, guard); if tail == head { let _ = self .tail .compare_exchange(tail, next, Release, Relaxed, guard); } // After the above load & CAS, the thread view ensures that the index of tail is greater // than that of current head. We relase that view to the head with the below CAS, // ensuring that the index of the new head is less than or equal to that of the tail. // // Note: this reasoning is also done in SC memory regarding index of head and tail, // albeit simpler. if self .head .compare_exchange(head, next, Release, Relaxed, guard) .is_ok() { // Since the above `compare_exchange()` succeeded, `head` is detached from `self` so // is unreachable from other threads. // SAFETY: `next` will never be the sentinel node, since it is the node after // `head`. Hence, it must have been a node made in `push()`, which is initialized. // // Also, we are returning ownership of `data` in `next` by making a copy of it via // `assume_init_read()`. This is safe as no other thread has access to `data` after // `head` is unreachable, so the ownership of `data` in `next` will never be used // again as it is now a sentinel node. let result = unsafe { next_ref.data.assume_init_read() }; // SAFETY: `head` is unreachable, and we no longer access `head`. We destroy `head` // after the final access to `next` above to ensure that `next` is also destroyed // after. unsafe { guard.defer_destroy(head) }; return Some(result); } guard.repin(); } }

首先取 headhead.next,然后再取 tail,判断 tail 是否为 head,如果是的话说明 tail 还没有被更新,于是尝试将其更新为 head.next。注意,这里读出来的 tail 可能并非最新值,但其实我们这里并不用读到最新值,只需要确保 tail 不指向当前将要被 pop 出去的节点就可以了。我们也不需要 tail 来做什么同步,只是单纯地想读这个值,所以 relaxed load 就足够了。当我们尝试用 head.next 去更新 tail 的时候,head.next 未必是当前队列末尾的节点,但达到 tail 不指向当前将要被 pop 出去的节点的目的就好。tail 指针具有一种单调性,他只会往队列的后方移动而不会往前方启动,从 tail 的全局顺序看,如果 tail 已经被更新为一个更新的状态,那么你拿一个旧的状态尝试去更新他的时候必然是 cas 失败的。

后面会对 self.head 进行 cas 操作,这一步也很有说法,尽管我们并没有修改 next 节点的字段,但是这里还是用到了 release。这里的道理在于,我们前面判断 tail == head,如果不成立还好,说明 tailhead 的后面,但是如果成立的话,我们会进行 cas 尝试更新 tail,这里就要用 release,将对 tail 的更改同步出去。这样,在最开始 let head = self.head.load(Acquire, guard); 的时候,这里的 acquire 就和那个 release 同步,可以保证后面的 self.tail.load(Relaxed, guard); 拿到的 tail 一定不会是指向原来排在 head 前面的那个节点。

head 为哨兵节点,head.next 才持有队列中第一个数据。我们把 head.next 置为新的 head 之后,其就变成了新的哨兵节点。