Linux自旋锁

面对没有获取锁的现场,通常有两种处理方式。

  • 互斥锁:堵塞自己,等待重新调度请求
  • 自旋锁:循环等待该锁是否已经释放

本文主要讲述自旋锁

自旋锁其实是一种很乐观的锁,他认为只要再等一下下锁便能释放,避免了操作系统进程调度和线程切换。

自旋锁演进

  1. 传统TAS自旋锁:只有0、1两张状态,线程无序抢锁,不公平
  2. ticket based自旋锁:状态表示为两个counter——owner、next,线程有序持有锁,公平持锁
  3. queued自旋锁:只有一个线程自旋在spinlock,其他线程在自己的per cpu mcs lock自旋

ticket spinlock

ticket spinlock通过当前服务号来确定当前获取锁的对象,每一个对象在获取锁时都会获取到自己的服务号,当当前服务号与自己服务号一致时,就说明获取到锁

这有些像去外面吃饭等餐,先去服务员那里取号,然后空出位置来,服务员就会叫下一个号

typedef struct {
	union {
		u32 slock;
		struct __raw_tickets {
#ifdef __ARMEB__
			u16 next;
			u16 owner;
#else
			u16 owner;
			u16 next;
#endif
		} tickets;
	};
} arch_spinlock_t;

以下以ARMv6 体系结构的 ticket-based 自旋锁为例

static inline void arch_spin_lock(arch_spinlock_t *lock)
{
  // 临时变量,用于存储中间结果
	unsigned long tmp;
  // 新的锁值
	u32 newval;
  // 当前锁的值
	arch_spinlock_t lockval;

  // 预取指令,提前加载 lock->slock 到缓存,以减少锁操作的内存访问延迟
	prefetchw(&lock->slock);
  // 使用 ARM 汇编代码实现的原子操作:
	// •	ldrex %0, [%3]:加载并排他地读取 lock->slock 的值到 lockval。
	// •	add %1, %0, %4:将 lockval 增加一个票数,存储到 newval。
	// •	strex %2, %1, [%3]:尝试将 newval 写回 lock->slock,并将结果存储到 tmp。
	// •	teq %2, #0:检查 tmp 是否为 0,表示写回操作是否成功。
	// •	bne 1b:如果写回操作失败,跳回到 1 继续尝试。
	__asm__ __volatile__(
"1:	ldrex	%0, [%3]\n"
"	add	%1, %0, %4\n"
"	strex	%2, %1, [%3]\n"
"	teq	%2, #0\n"
"	bne	1b"
	: "=&r" (lockval), "=&r" (newval), "=&r" (tmp)
	: "r" (&lock->slock), "I" (1 << TICKET_SHIFT)
	: "cc");

  // 检查当前票数是否为锁的所有者票数,确保锁按照票数顺序被获取。如果当前票数不是所有者票数,处理器将进入低功耗等待状态,直到锁的所有者票数更新为当前票数
	while (lockval.tickets.next != lockval.tickets.owner) {
		wfe();
		lockval.tickets.owner = READ_ONCE(lock->tickets.owner);
	}

  // 内存屏障,确保在获取锁后所有的内存操作在锁的获取之后发生。这是因为 ARMv6 CPU 假设内存是弱顺序的(weakly ordered),需要内存屏障来保证操作顺序
	smp_mb();
}

// 解锁
static inline void arch_spin_unlock(arch_spinlock_t *lock)
{
	smp_mb();
	lock->tickets.owner++;
	dsb_sev();
}

在旧的自旋锁实现中,所有争用一个锁的处理器都会争来争去,看谁能先得到它。现在他们整齐地排队等候,按到达的顺序抢锁。多线程的运行时间甚至减少了,最大延迟也减少了(更重要的是,使其成为确定性的)。Nick说,新实现有轻微的成本,但在当代处理器上的成本非常小,相对于缓存未命中的成本基本上为零——这是处理争用锁时常见的事件。x86的维护者显然认为,消除自旋锁不合适的争夺所带来的好处超过了这个小成本;其他人似乎不太可能不同意。

以下是go实现版本

type ticketLock struct {
	// 记录当前服务号
	nowServing uint32
	// 记录下一个服务号
	nextTicket uint32
}

// NewTicketLock instantiates a ticket-lock.
func NewTicketLock() *ticketLock {
	return &ticketLock{}
}

// Lock locks the ticket-lock.
func (t *ticketLock) Lock() {
	// 递增下一个服务号,并获取当前服务号
	myTicket := atomic.AddUint32(&t.nextTicket, 1) - 1

	// 自旋等待直到轮到自己
	for myTicket != atomic.LoadUint32(&t.nowServing) {
		runtime.Gosched()
	}
}

// Unlock unlocks the ticket-lock.
func (t *ticketLock) Unlock() {
	// 递增当前服务号
	atomic.AddUint32(&t.nowServing, 1)
}

mcs spinlock

mcs是简单的自旋锁,具有公平性,每个CPU在尝试获取锁时会在本地变量上自旋,从而避免了test-and-set自旋锁实现中的昂贵缓存争用。

mcsNode问题在于空间占用大,这对于操作系统并不是好消息

#ifndef __LINUX_MCS_SPINLOCK_H
#define __LINUX_MCS_SPINLOCK_H

#include <asm/mcs_spinlock.h>

struct mcs_spinlock {
  // 指向下一个等待的节点
	struct mcs_spinlock *next;
  // 表示锁是否被获取。1已获取0未获取
	int locked; /* 1 if lock acquired */
  // 用于嵌套计数
	int count;  /* nesting count, see qspinlock.c */
};

#ifndef arch_mcs_spin_lock_contended
// 使用smp_cond_load_acquire提供获取语义,确保后续操作在锁被获取后进行
#define arch_mcs_spin_lock_contended(l)					\
do {									\
	smp_cond_load_acquire(l, VAL);					\
} while (0)
#endif

#ifndef arch_mcs_spin_unlock_contended

// 使用smp_store_release提供内存屏障,确保临界区的所有操作在解锁前完成
#define arch_mcs_spin_unlock_contended(l)				\
	smp_store_release((l), 1)
#endif

static inline
void mcs_spin_lock(struct mcs_spinlock **lock, struct mcs_spinlock *node)
{
	struct mcs_spinlock *prev;

	// 初始化节点,将locked设为0,next设为NULL
	node->locked = 0;
	node->next   = NULL;

  // 使用xchg函数交换锁变量和当前节点,获取前驱节点。
  // 通过使用 xchg(),我们确保了在 xchg() 之前对 @node 的所有初始化存储操作是正确排序的,并且这些操作在全局范围内被其他处理器或线程正确地看到。此外,这个 xchg() 指令还提供了与锁相关的 ACQUIRE 排序,以确保在获取锁时正确的内存可见性。
	prev = xchg(lock, node);
  
  // 如果前驱节点为空,说明锁已被获取,直接返回
	if (likely(prev == NULL)) {
		return;
	}
  
  // 否则,将前驱节点的next指针指向当前节点,并等待锁持有者传递锁
	WRITE_ONCE(prev->next, node);

	// 在节点的 locked 变量上自旋,直到前一个持有锁的线程将其设置为解锁状态
	arch_mcs_spin_lock_contended(&node->locked);
}

/*
 * Releases the lock. The caller should pass in the corresponding node that
 * was used to acquire the lock.
 */
static inline
void mcs_spin_unlock(struct mcs_spinlock **lock, struct mcs_spinlock *node)
{
  // 获取当前节点的下一个节点指针
	struct mcs_spinlock *next = READ_ONCE(node->next);

  // 如果没有下一个节点,尝试将锁设为NULL(使用cmpxchg_release)
	if (likely(!next)) {
    // 如果成功,直接返回
		if (likely(cmpxchg_release(lock, node, NULL) == node))
			return;
    
    // 否则,等待下一个节点的next指针被设置
		while (!(next = READ_ONCE(node->next)))
			cpu_relax();
	}

  // 将锁传递给下一个等待节点
	arch_mcs_spin_unlock_contended(&next->locked);
}

#endif /* __LINUX_MCS_SPINLOCK_H */

以下是尝试仿照得到的go实现

type mcsNode struct {
	next    *mcsNode
	waiting uint32
}

type mcsLock struct {
	tail *mcsNode
}

func newMCSLock() *mcsLock {
	return &mcsLock{}
}

func (m *mcsLock) Lock(node *mcsNode) {
	// 初始化 node 节点,将其 next 设为 nil,并将 waiting 设为 1,表示该节点正在等待
	node.next = nil
	node.waiting = 1

	// 使用 atomic.SwapPointer 将锁的 tail 指向当前节点,并返回先前的 tail(即前一个等待锁的节点)
	predecessor := (*mcsNode)(atomic.SwapPointer((*unsafe.Pointer)(unsafe.Pointer(&m.tail)), unsafe.Pointer(node)))

	// 如果先前没有其他等待节点(即 pred 为 nil),当前节点直接获得锁。
	if predecessor != nil {
		// 如果有前一个节点,将当前节点的 next 指向当前节点,并在 waiting 变量上自旋等待,直到前一个节点将 waiting 设为 0,表示锁已释放
		predecessor.next = node
		for atomic.LoadUint32(&node.waiting) == 1 {
			// 在等待期间,使用 runtime.Gosched() 函数让出当前线程,以便其他线程可以获得CPU时间片
			runtime.Gosched()
		}
	}
}

func (m *mcsLock) Unlock(node *mcsNode) {
	// 如果当前节点的 next 为 nil,表示没有其他节点在等待锁,直接将 tail 设为 nil,释放锁
	if node.next == nil {
		if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&m.tail)), unsafe.Pointer(node), nil) {
			return
		}

		// 如果 CAS 操作失败,表示有其他节点在等待锁,等待其它节点将 waiting 设为 0
		for node.next == nil {
			runtime.Gosched()
		}
	}

	// 如果当前节点的 next 不为 nil,将 next 节点的 waiting 设为 0,表示锁已释放
	atomic.StoreUint32(&node.next.waiting, 0)
}

qspinlock

qspinlock基于MCS锁(队列锁),并结合锁状态,旨在提高多处理器系统中的性能,特别是在NUMA(非统一内存访问)系统中

MCS锁在NUMA系统中表现得接近最佳,因为每个CPU只关注自己的队列条目,极大减少了处理器之间的缓存行传递

其主要原理如下:

  1. 基于MCS锁的队列机制
    • 每个等待获取锁的CPU在一个每CPU结构数组中有一个专用的mcs_spinlock结构。
    • 当锁被占用时,CPU会将自己添加到一个队列中,并在自己的mcs_spinlock结构上自旋,减少对共享锁变量的访问,从而减少缓存行的争用。
  2. 32位锁字段设计
    • 锁字段被分为多个部分,包括一个整数计数器(类似于锁定字段)、一个两位索引字段(指示队列尾部的条目)、一个单个位的“pending”字段和一个整数字段(保存队列尾部的CPU编号)
  3. 优化策略
    • 当CPU是下一个获取锁的候选者时,它会直接在锁本身上自旋,而不是在其每CPU结构上自旋,减少缓存行的访问
    • 如果锁被占用但没有其他CPU在等待,CPU会设置“pending”位,不使用其mcs_spinlock结构,直到有第二个CPU加入队列
    • 为了防止锁在某个节点上过长时间停留,如果主队列在一定时间内(默认10毫秒)没有清空,整个次要队列将被提升到主队列的头部,强制锁转移到另一个节点。
typedef struct qspinlock {
	union {
		atomic_t val;

		/*
		 * By using the whole 2nd least significant byte for the
		 * pending bit, we can allow better optimization of the lock
		 * acquisition for the pending bit holder.
		 */
#ifdef __LITTLE_ENDIAN
		struct {
      // 持锁状态
			u8	locked;
      // 是否有pending线程。
      // 1表示有thread正自旋在spinlock上,0表示没有pending thread
			u8	pending;
		};
		struct {
      // 锁位和等待位
			u16	locked_pending;
      // 指向mcs node队列的尾部节点
      // 这个队列中的thread有两种状态:头部的节点对应的thread自旋在pending+locked域,其他节点自旋在其自己的mcs lock上。
			u16	tail;
		};
#else
		struct {
			u16	tail;
			u16	locked_pending;
		};
		struct {
			u8	reserved[2];
			u8	pending;
			u8	locked;
		};
#endif
	};
} arch_spinlock_t;

qspinlock由locked、pending和tail的三元组表示。常见状态组合有

  • 0、0、0: 代表初始状态
  • 0、0、1: 代表仅有一个thread持有锁,无等待及MCS队列
  • 0、1、1: 代表锁被持有且存在等待者
  • n、1、1: 代表锁被持有、存在等待者且存在一个MCS队列
  • *、1、1: 代表锁被持有、存在等待者且存在多个MCS队列

状态迁移图如下

在这里插入图片描述

流程

线程以T表示

  1. 初始锁状态为(0, 0, 0),T1直接获取锁成功

    (0, 0, 0) -> (0, 0, 1)

  2. T2获取锁,设置等待位,并等待锁释放

    (0, 0, 1) -> (0, 1, 1)

    在获取锁之后,将会重置等待位,并设置锁

    (0, 1, 1) -> (0, 0, 1)

  3. T3获取锁,由于锁被持有且存在等待,直接进入队列等待锁

  4. 由于队列仅有T3一个等待者,所以一直自旋等待锁非锁持有且等待位的情况出现,也就是锁被释放,且没有等待者

以下是代码详细解释

获取锁时先快速判断是否无任何竞争状态,然后直接加锁

/**
 * queued_spin_lock - acquire a queued spinlock
 * @lock: Pointer to queued spinlock structure
 */
static __always_inline void queued_spin_lock(struct qspinlock *lock)
{
	u32 val = 0;

  // 首先尝试直接获取锁,如果锁可用(lock->val 为 0),则获取锁并返回
  // 此处对应(0, 0, 0)->(0, 0, 1)
	if (likely(atomic_try_cmpxchg_acquire(&lock->val, &val, _Q_LOCKED_VAL)))
		return;

  // 如果锁不可用,则进入慢路径,加入等待队列
	queued_spin_lock_slowpath(lock, val);
}

接着走入慢路径

void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)
{
	struct mcs_spinlock *prev, *next, *node;
	u32 old, tail;
	int idx;

  // 检查系统配置和虚拟化情况
  // ... 

	/*
	 * 0,1,0 -> 0,0,1
	 */
  // 如果仅有等待者,说明锁被释放,但等待着还未获取锁,则有限等待锁被获取
	if (val == _Q_PENDING_VAL) {
		int cnt = _Q_PENDING_LOOPS;
		val = atomic_cond_read_relaxed(&lock->val,
					       (VAL != _Q_PENDING_VAL) || !cnt--);
	}

  // 若除锁状态位以外的其他位被设置,说明有队列等待,或者存在等待位,则直接队列等待
	if (val & ~_Q_LOCKED_MASK)
		goto queue;

	/*
	 * trylock || pending
	 *
	 * 0,0,* -> 0,1,* -> 0,0,1 pending, trylock
	 */
  // 设置等待位
	val = queued_fetch_set_pending_acquire(lock);

  // 再次检测竞争状态
	if (unlikely(val & ~_Q_LOCKED_MASK)) {

		/* Undo PENDING if we set it. */
		if (!(val & _Q_PENDING_MASK))
			clear_pending(lock);

		goto queue;
	}

	/*
	 * 0,1,1 -> 0,1,0
	 */
  // 处于仅持有状态,说明可以直接去等待持有锁了
	if (val & _Q_LOCKED_MASK)
		atomic_cond_read_acquire(&lock->val, !(VAL & _Q_LOCKED_MASK));

	/*
	 * 0,1,0 -> 0,0,1
	 */
  // 获取锁。清除pending位,设置locked
	clear_pending_set_locked(lock);
	lockevent_inc(lock_pending);
	return;

queue:
	lockevent_inc(lock_slowpath);
pv_queue:
  // 初始化MCS节点
  // 获取当前CPU上的MCS节点
	node = this_cpu_ptr(&qnodes[0].mcs);
  // 增加MCS节点计数器
	idx = node->count++;
  // 编码尾指针,包含当前 CPU 的 ID 和 MCS 节点索引
	tail = encode_tail(smp_processor_id(), idx);

  // 检查MCS节点数量。如果当前 CPU 的 MCS 节点数量超过最大值 MAX_NODES,则进入自旋锁的等待循环
	if (unlikely(idx >= MAX_NODES)) {
		lockevent_inc(lock_no_node);
		while (!queued_spin_trylock(lock))
			cpu_relax();
		goto release;
	}

  // 获取MCS节点
	node = grab_mcs_node(node, idx);

	/*
	 * Keep counts of non-zero index values:
	 */
	lockevent_cond_inc(lock_use_node2 + idx - 1, idx);

	barrier();

  // 初始化
	node->locked = 0;
	node->next = NULL;
	pv_init_node(node);

  // 再次尝试直接获取锁
	if (queued_spin_trylock(lock))
		goto release;

	smp_wmb();

	/*
	 *
	 * p,*,* -> n,*,*
	 */
  // 设置节点到锁尾位中
	old = xchg_tail(lock, tail);
	next = NULL;

  // 如果MCS节点不为空,则获取上一个节点,将当前节点加入等待队列,直到当前节点解锁
	if (old & _Q_TAIL_MASK) {
		prev = decode_tail(old);

		/* Link @node into the waitqueue. */
		WRITE_ONCE(prev->next, node);

		pv_wait_node(node, prev);
		arch_mcs_spin_lock_contended(&node->locked);

    // 在等待 MCS 锁的时候,尝试提前加载下一个指针并预取它的缓存行,以减少即将进行的 MCS 解锁操作的延迟
		next = READ_ONCE(node->next);
		if (next)
			prefetchw(next);
	}

  // 与Paravirtualization有关,暂忽略
  // ... 

  // 等待锁释放且无等待位
	val = atomic_cond_read_acquire(&lock->val, !(VAL & _Q_LOCKED_PENDING_MASK));

locked:
  // 如果等待者就是当前节点,那么直接重置锁为仅持有锁
  // (n, 0, 1) -> (0, 0, 1)
	if ((val & _Q_TAIL_MASK) == tail) {
		if (atomic_try_cmpxchg_relaxed(&lock->val, &val, _Q_LOCKED_VAL))
			goto release; /* No contention */
	}

  // 若MCS队列中还有其他节点,则设置锁位
  // (n, 0, 0) -> (n, 0, 1)
	set_locked(lock);

  // 解除下一个节点的自旋
	if (!next)
		next = smp_cond_load_relaxed(&node->next, (VAL));

	arch_mcs_spin_unlock_contended(&next->locked);
	pv_kick_node(lock, next);

release:
  // 释放当前cpu的MCS节点
  // 也就是MCS节点数量递减
	__this_cpu_dec(qnodes[0].mcs.count);
}
EXPORT_SYMBOL(queued_spin_lock_slowpath);


/*
 * We must be able to distinguish between no-tail and the tail at 0:0,
 * therefore increment the cpu number by one.
 */

static inline __pure u32 encode_tail(int cpu, int idx)
{
	u32 tail;

	tail  = (cpu + 1) << _Q_TAIL_CPU_OFFSET;
	tail |= idx << _Q_TAIL_IDX_OFFSET; /* assume < 4 */

	return tail;
}

static inline __pure struct mcs_spinlock *decode_tail(u32 tail)
{
	int cpu = (tail >> _Q_TAIL_CPU_OFFSET) - 1;
	int idx = (tail &  _Q_TAIL_IDX_MASK) >> _Q_TAIL_IDX_OFFSET;

	return per_cpu_ptr(&qnodes[idx].mcs, cpu);
}

/**
 * set_locked - Set the lock bit and own the lock
 * @lock: Pointer to queued spinlock structure
 *
 * *,*,0 -> *,0,1
 */
static __always_inline void set_locked(struct qspinlock *lock)
{
	WRITE_ONCE(lock->locked, _Q_LOCKED_VAL);
}

/*
 * xchg_tail - Put in the new queue tail code word & retrieve previous one
 * @lock : Pointer to queued spinlock structure
 * @tail : The new queue tail code word
 * Return: The previous queue tail code word
 *
 * xchg(lock, tail), which heads an address dependency
 *
 * p,*,* -> n,*,* ; prev = xchg(lock, node)
 */
static __always_inline u32 xchg_tail(struct qspinlock *lock, u32 tail)
{
	/*
	 * We can use relaxed semantics since the caller ensures that the
	 * MCS node is properly initialized before updating the tail.
	 */
	return (u32)xchg_relaxed(&lock->tail,
				 tail >> _Q_TAIL_OFFSET) << _Q_TAIL_OFFSET;
}

解锁就很简单了,直接设置锁位为0就可以了

static inline void queued_spin_unlock(struct qspinlock *lock)
{
    // Release the lock
    atomic_set(&lock->val, 0);
}

以下是仿照qspinlock思想所做出的go版本实现

import (
	"runtime"
	"sync/atomic"
)

const (
	// LOCKED 表示锁被持有
	LOCKED uint32 = 1 << iota
	// PENDING 表示锁被持有,但有等待者
	PENDING
	// TAIL 表示等待者MCS节点尾部
	TAIL
	// LOCKED_PENDING_MASK 表示锁的状态
	LOCKED_PENDING_MASK = LOCKED | PENDING
	// PENDING_CLEAR_MASK 表示清除等待标记
	PENDING_CLEAR_MASK uint32 = ^PENDING
	// LOCKED_CLEAR_MASK 表示清除锁标记
	LOCKED_CLEAR_MASK uint32 = ^LOCKED
	// TAIL_OFFSET 表示尾部索引偏移
	TAIL_OFFSET = 2
	// TAIL_MASK 表示尾部索引掩码,用于检查尾部索引是否存在
	TAIL_MASK uint32 = 0xFFFFFFFC

	// MCS_UNLOCKED MCS解锁状态
	MCS_UNLOCKED uint32 = 1

	// COUNT_ADD_MASK 计数掩码
	COUNT_ADD_MASK uint64 = 1<<32 | 1
	// COUNT_INDEX_MASK 计数索引掩码
	COUNT_INDEX_MASK uint64 = 0xFFFFFFFF
	// COUNT_SUB_MASK 计数减法掩码
	COUNT_SUB_MASK uint64 = 0xFFFFFFFF00000000
)

type qspinlock struct {
	val   uint32
	count uint64
}

type qmcsNode struct {
	locked   uint32
	next     *qmcsNode
	released bool
}

const (
	maxMcsNodeCount = 16
	maxTrySetCount  = 1000
)

var (
	qnodes = make([]*qmcsNode, maxMcsNodeCount)
)

func init() {
	for i := 0; i < maxMcsNodeCount; i++ {
		qnodes[i] = &qmcsNode{}
	}
}

// atomicCondReadAcquire 自旋直到 addr 的值满足 condition
func atomicCondReadAcquire(addr *uint32, condition func(uint32) bool) uint32 {
	var val uint32
	for {
		val = atomic.LoadUint32(addr)
		if condition(val) {
			break
		}
		runtime.Gosched() // Yield the processor
	}

	return val
}

// clearPendingSetLocked 清除等待标记,设置锁标记
func clearPendingSetLocked(lock *qspinlock) {
	for {
		// 读取当前值
		old := atomic.LoadUint32(&lock.val)

		// 如果锁已经被持有,这是不应该出现的情况
		if old&LOCKED != 0 {
			panic("clearPendingSetLocked: Lock By Others")
		}

		// 计算新值,清除等待标记,设置锁标记
		val := old & PENDING_CLEAR_MASK
		val |= LOCKED

		// 尝试更新变量值
		if atomic.CompareAndSwapUint32(&lock.val, old, val) {
			break
		}
		// 如果更新失败,继续尝试
	}
}

// trySetPending 尝试设置等待标记
func trySetPending(lock *qspinlock) bool {
	var old uint32
	loopCount := maxTrySetCount

	for loopCount > 0 {
		// 若已经被设置等待标记,则说明等待位已经被其他线程设置,直接返回
		old = atomic.LoadUint32(&lock.val)
		if old&PENDING != 0 {
			return false
		}

		// 尝试设置等待标记
		if atomic.CompareAndSwapUint32(&lock.val, old, old|PENDING) {
			return true
		}

		loopCount--
	}

	return false
}

// setLocked 设置锁标记
func setLocked(lock *qspinlock) {
	for {
		// 读取当前值
		old := atomic.LoadUint32(&lock.val)

		// 设置锁标记
		val := old | LOCKED

		// 如果锁已经被持有,则退出
		if val == old {
			panic("setLocked: Lock By Others")
		}

		// 尝试更新变量值
		if atomic.CompareAndSwapUint32(&lock.val, old, val) {
			break
		}

		// 如果更新失败,继续尝试
	}
}

// xchgTail 将尾部索引设置到锁变量中
func xchgTail(lock *qspinlock, tail uint32) uint32 {
	var old uint32
	for {
		// 读取当前值
		old = atomic.LoadUint32(&lock.val)

		// 计算新值,将尾部索引设置到变量中
		val := (old & LOCKED_PENDING_MASK) | tail

		// 尝试更新变量值
		if atomic.CompareAndSwapUint32(&lock.val, old, val) {
			break
		}
		// 如果更新失败,继续尝试
	}

	return old
}

// queuedSpinTryLock 快速尝试获取锁
func queuedSpinTryLock(lock *qspinlock) bool {
	return atomic.CompareAndSwapUint32(&lock.val, 0, LOCKED)
}

// decodeTail 解析尾部索引
func decodeTail(tail uint32) *qmcsNode {
	idx := tail >> TAIL_OFFSET
	return qnodes[idx-1]
}

// encodeTail 编码尾部索引。索引从1开始
func encodeTail(idx uint32) uint32 {
	return (idx + 1) << TAIL_OFFSET
}

// 锁计数以及索引递增
// Attention! 索引递增上限度为1<<32-1
func increaseLockCount(count *uint64) (uint32, uint32) {
	x := atomic.AddUint64(count, COUNT_ADD_MASK)
	return uint32((x-1)&COUNT_INDEX_MASK) & (maxMcsNodeCount - 1), uint32(x >> 32)
}

// 锁计数递减
func decreaseLockCount(count *uint64) {
	atomic.AddUint64(count, COUNT_SUB_MASK)
}

func (l *qspinlock) Lock() {
	val := atomic.LoadUint32(&l.val)
	if val == 0 && queuedSpinTryLock(l) {
		return
	}

	queuedSpinLockSlowPath(l, val)
}

func queuedSpinLockSlowPath(lock *qspinlock, val uint32) {
	var old, tail uint32
	var node, next *qmcsNode
	var success bool

	// b1 若仅有等待位,则等待非仅有等待位
	// b2 若仅有加锁位,则进入a逻辑
	
	if val == PENDING {
		val = atomicCondReadAcquire(&lock.val, func(v uint32) bool {
			return v != PENDING
		})
	}

	if val != LOCKED {
		goto queue
	}

	// a1 若原来仅有加锁位,则尝试设置等待位
	// a2 等待位设置成功后,等待获取锁,并设置锁位
	// a3 若等待位设置失败,则队列等待锁

	success = trySetPending(lock)
	if success {
		atomicCondReadAcquire(&lock.val, func(v uint32) bool {
			return v&LOCKED == 0
		})

		clearPendingSetLocked(lock)
		return
	}

	// 队列等待锁
	// c1. 申请一个MCS节点
	// c2. 将MCS节点尾部索引设置到锁中
	// c3. 若设置成功前的MCS节点已经存在,则获取上一个节点,添加到队列中,并等待上一个节点的解锁(主动解锁,或者不存在)
	// c4. 等待无加锁等待位
	// c5. 若当前尾节点就是当前节点,则尝试重置为仅有锁位
	// c6. 若当前尾节点不是当前节点,则获取下一个节点并解锁
	// c7. 设置锁位
queue:
	idx, count := increaseLockCount(&lock.count)
	if count > maxMcsNodeCount {
		for !queuedSpinTryLock(lock) {
			runtime.Gosched()
		}
		goto release
	}
	
	node = qnodes[idx]
	node.locked = 0
	node.next = nil
	node.released = false

	tail = encodeTail(idx)
	old = xchgTail(lock, tail)

	if old&TAIL_MASK != 0 {
		prev := decodeTail(old)
		if !prev.released {
			prev.next = node
			atomicCondReadAcquire(&node.locked, func(v uint32) bool {
				return v == MCS_UNLOCKED || prev.released
			})
		}
	}

	val = atomicCondReadAcquire(&lock.val, func(v uint32) bool {
		return v&LOCKED_PENDING_MASK == 0
	})

	if tail == val&TAIL_MASK {
		if atomic.CompareAndSwapUint32(&lock.val, val, LOCKED) {
			goto release
		}
	}

	next = node.next
	node.released = true
	if next != nil {
		atomic.StoreUint32(&next.locked, MCS_UNLOCKED)
	}

	setLocked(lock)

release:
	decreaseLockCount(&lock.count)
}

func (l *qspinlock) Unlock() {
	for {
		// 读取当前值
		old := atomic.LoadUint32(&l.val)
		
		// 如果锁未被持有,继续
		if old&LOCKED == 0 {
			runtime.Gosched()
			continue
		}
		
		// 计算新值,清除锁标记
		val := old & LOCKED_CLEAR_MASK
		
		// 尝试更新变量值
		if atomic.CompareAndSwapUint32(&l.val, old, val) {
			break
		}
		
		// 如果更新失败,继续尝试
	}
}

Ref

  1. https://lwn.net/Articles/267968/
  2. https://lwn.net/Articles/852138/
  3. http://www.wowotech.net/kernel_synchronization/queued_spinlock.html
  4. https://elixir.bootlin.com/linux/v5.5-rc2/source

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/717885.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Golang内存模型与分配机制

简述 mheap为堆&#xff0c;堆和进程是一对一的&#xff1b;mcentral&#xff08;小mheadp&#xff09;&#xff0c;mcahe&#xff08;GMP的P私有&#xff09;&#xff0c;分配内存顺序由后向前。 在解决这个问题&#xff0c;Golang 在堆 mheap 之上&#xff0c;依次细化粒度&a…

【UML用户指南】-17-对基本行为建模-交互

目录 1、消息的可视化表示 2、对象与角色 3、链和连接件 4、消息 5、序列 6、创建、修改和撤销 7、表示法 8、常用建模技术 8.1、对控制流建模 8.1.1、基于时间的控制流 8.1.2、基于结构的控制流 在任何有意义的系统中&#xff0c;对象都不是孤立存在的&#xff0c;…

4.类,方法,对象

1.1.2. 面向对象程序设计的三大特征 1.1.2.1. 封装 面向对象编程核心思想之一就是将数据和对数据的操作封装在一起&#xff0c;形成一般的概念&#xff0c;比如类的概念。 1.1.2.2. 继承 继承体现了一种先进的编程模式。子类可以继承父类的属性和方法。 1.1.2.3. 多态 多…

Novartis诺华制药社招综合能力性格动机问卷入职测评笔试题库答案及包过助攻

【华东同舟求职】由资深各行业从业者建立的一站式人才服务网络平台&#xff0c;现阶段目标是“提升全市场各行业岗位信息的流动性和透明度”。我们接受众多行业机构的直接委托发布&#xff0c;并尽力通过各种方法搜寻高价值岗位信息。事实上&#xff0c;我们以发现不为人知的优…

HQChart实战教程73-仿tradingview指标MACD

HQChart实战教程73-仿tradingview指标MACD MACD![在这里插入图片描述](https://img-blog.csdnimg.cn/direct/84d180b5620940f6b9fe08c6f10eb0f3.png)麦语法脚本实心MACD指标脚本效果 空心MACD指标脚本效果图 自定义指标添加到系统指标里HQChart插件源码地址 MACD tradingview中…

多模态LLM 跨越语言与视觉的边界

一、引言 在数字时代的浪潮中&#xff0c;我们被由语言和视觉等多种模态构成的信息海洋所包围。人类大脑以其卓越的多模态上下文理解能力&#xff0c;在日常任务中游刃有余。然而&#xff0c;在人工智能领域&#xff0c;如何将这种能力赋予机器&#xff0c;尤其是如何在语言模…

HarmoneyOS星河版 安装和启动

一、下载和安装DevEco Studio 官网链接&#xff1a;OpenAtom OpenHarmony 1.1 找到对应的操作系统进行下载 创建安装Harmony的文件夹&#xff1a; 1.2 下载后进行安装 1.3 分别安装Node、Ohpm、SDK 分别安装Node、Ohpm和SDK 二、.创建一个新项目并运行 2.1 选择[OpenHarmon…

复旦发布开源版本的EMO,只需输入一段音频和一张照片就可以让人物开始说话。

之前和大家介绍过阿里的EMO&#xff0c;用户只需要提供一张照片和一段任意音频文件&#xff0c;EMO即可生成会说话唱歌的AI视频。最长时间可达1分30秒左右。感兴趣的小伙伴可以点击下面链接阅读。 近日&#xff0c;复旦发布了一个开源版本的 EMO。 通过输入音频让面部照片开始…

【MySQL基础随缘更系列】AB复制

文章目录 mysql AB复制实战一、mysql AB复制二、AB复制原理三、master服务器设置3.1、安装mysql并启动3.2、关闭防火墙,selinux3.3、设置时间服务器3.4、修改配置文件 设置server-idN3.5、创建slave连接master的账号&#xff0c;用于取SQL语句 四、slave设置4.3、修改配置文件 …

数据可视化在智慧水利中的关键应用

数据可视化是如何在智慧水利中应用的&#xff1f;在现代水利管理中&#xff0c;面对复杂的水资源数据和动态变化的水文情况&#xff0c;数据可视化技术通过将繁杂的数据转化为直观、易理解的图表和图形&#xff0c;极大地提升了水利管理的效率和决策的科学性。智慧水利利用数据…

植物ATAC-seq文献集锦(四)——生物和非生物胁迫篇

ATAC-seq在植物研究领域的应用我们已经介绍3期了&#xff0c;最后一期我们聚焦ATAC-seq技术在生物和非生物胁迫方向的应用案例。 植物ATAC-seq文献集锦&#xff08;一&#xff09;——基因组篇 植物ATAC-seq文献集锦&#xff08;二&#xff09;——生长发育篇 植物ATAC-s…

自动采集软件||自动采集主流电商商品详情SKU数据价格功能实现||电商API接口的应用

实现自动化淘宝商品数据采集的方法有多种&#xff0c;一种常见的方式是利用网络 Python 技术。您可以编写一个网络 Python程序&#xff0c;通过模拟浏览器发送请求&#xff0c;获取淘宝商品页面的数据&#xff0c;并对数据进行解析和提取&#xff0c;最终存储到数据库或文件中。…

Android Compose 文本输入框TextField使用详解

一、 TextField介绍 TextField 允许用户输入和修改文本&#xff0c;也就是文本输入框。 TextField 分为三种&#xff1a; TextField是默认样式OutlinedTextField 是轮廓样式版本BasicTextField 允许用户通过硬件或软件键盘修改文本&#xff0c;但不提供提示或占位符等装饰&a…

车企高管组团“出道”,汽车营销已经Next level了?

汽车进入了“卷”老板、“卷”高管的时代&#xff01; 谁能想到&#xff0c;雷军凭一己之力&#xff0c;在一定程度上重塑了汽车的竞争策略。价格战之外&#xff0c;车市又开启了流量之战。 云略曾在《雷军20天吸粉500w&#xff01;……》一文中&#xff0c;提到继雷军之后&…

【问题记录】Ubuntu提示: “E: 软件包 gcc 没有可安装候选“

Ubuntu提示: "E: 软件包 gcc 没有可安装候选" 一&#xff0c;问题现象二&#xff0c;问题原因&解决方法 一&#xff0c;问题现象 在虚拟机Ubuntu中进行安装gcc命令时报错&#xff1a;“E: 软件包 gcc 没有可安装候选”: 二&#xff0c;问题原因&解决方法 …

树莓派 Thonny使用

在python中新建了虚拟环境&#xff0c;需要Thonny使用虚拟环境&#xff0c;在python executable中选中虚拟环境路径下的python3即可

银河麒麟4.0.2安装带有opengl的Qt5.12.9

银河麒麟4.0.2下载地址&#xff1a;银河麒麟-银河麒麟(云桌面系统)-银河麒麟最新版下载v4.0.2-92下载站 VirtualBox:https://www.virtualbox.org/wiki/Downloads qt下载&#xff1a;Index of /archive/qt/5.12/5.12.9 1安装VirtualBox:网上教材比较多 1&#xff09;安装完后安…

苹果的后来者居上策略:靠隐私保护打脸微软

01.苹果与微软相比更注重用户隐私 我一直是Windows的忠实用户&#xff0c;但微软疯狂地将人工智能融入一切&#xff0c;让我开始觉得应该咬咬牙换成Mac。 自小我几乎只用Windows电脑&#xff0c;所以我对MacOS一直不太适应。虽然Windows 11有其缺点&#xff0c;但总的来说&am…

车载ADAS面试题,零基础也能看得懂!

周一来刷刷ADAS相关的面试题吧&#xff01;相信看完这些题目&#xff0c;你会对ADAS有个更清晰的认识&#xff0c;即使你是零基础也可以轻松明白&#xff01; 1、描述 ADAS 系统的基本组成和功能 答案&#xff1a;高级驾驶辅助系统&#xff08;ADAS&#xff09;是一套融合了多种…

如何通过Appium连接真机调试

1、打开appium&#xff0c;点击启动appium服务器&#xff08;如图1&#xff09; 2、appium启动成功后&#xff0c;点击放大镜启动检查会话&#xff08;如图2&#xff09; 3、填写真机设备信息和APP的package、activity,点击启动会话&#xff08;如图3&#xff09; 4、打开运行A…