从零开始的 6.824 学习

Prologue

这篇博客记录的是自己关于 6.824 的一些学习的心得体会, 以四个实验为核心,记录下自己完成实验过程的心得体会,重点关注 Raft 算法以及一个基本的具有容错性的 K/V 存储系统的实现。

使用下面的命令 clone 6.824 的 git 仓库:

git clone git://g.csail.mit.edu/6.824-golabs-2020 6.824

注: 根据 6.824 的 collaboration policy,在这整篇博客中不会出现几个实验的源代码。
另外,由于笔者也是第一次接触分布式,难免会出现不少问题,还请见谅。

Why distributed

考虑这样一些问题:

编写一个程序,满足下面的要求:
给定 N 个单词(可能有重复),统计出这 N 个字符中每个单词出现的次数。

一个最直接的解决方法是,使用 HashMap 进行统计,key 为单词,value 为单词出现的次数。顺序遍历每个单词,对遇到的每个单词,将其出现次数 +1 即可。

给定 N 个字符串,要求对所有字符串进行排序并输出。

这种问题解决方法也比较直接,可以使用快速排序、归并排序等方式处理,复杂度为 O(nlogn)。
这两种做法当 N 很大时,都会遇到一些问题: 内存不够。当 N 达到 1e9 的级别的时候,需要的内存空间就要按 G 计算了。

这时,可以将部分数据保存到文件中去,或者将原问题拆分成几个更细小的问题: 对于单词数统计来说,可以把单词分成多份,然后交给多个电脑去分别执行,最后再将几个执行的结果加起来,就得到了正确的最终结果。某种程度上说,这和计算机 CPU 采用多核的方式是很类似的。

wiki 上对于分布式计算(Distributed Computing)是这样定义的:

A distributed system is a system whose components are located on different networked computers, which communicate and coordinate their action by passing messages to one another.

从笔者的角度上看,分布式系统就是多个相互之间存在联系的计算机的集合,并且,这个集合能够接受某个计算任务,正确执行并返回结果。并且至少还要有以下性质:

  1. fault tolerance. 当某个(或某少数几个)计算机内部因故障出现问题,或者是计算机断开网络连接,无法和其他计算机进行通信时,系统应该依旧能正常的工作。
  2. configurable. 集合应该是可调整(配置)的,在某个时刻,可能会有多个计算机加入或离开当前集合。
  3. linear. 对于每个客户端的请求,集合应该按客户端发送顺序正确地执行并返回正确的结果,就像只有一个计算机在处理一样。

对于计算机之间的联系,有多种处理的方式。比如每台计算机之间建立一个 http 服务器,相互之间使用 http 通信。或者使用类似 RPC(Remote Procedure Control) 的技术,调用目的服务器上的某个方法等等。

P1

图1: 分布式模型

此外,分布式还有另一个重要的功能就是容错。考虑 CS 模型:客户端向服务器请求数据,服务器作出响应并返回结果。在正常情况下并没有什么问题。但是,加入服务器此时因为某些故障而宕机,又或是网络通信出现问题,这时客户端将得不到任何回复。如果此时有另一台可以备用的服务器,当主服务器发生故障时能”临时顶替”一下,直到故障修复好了。这样对于客户端而言,服务器就是一直可用的。

Lab1: MapReduce

MapReduce 是分布式计算中,一个处理大数据的算法,或者说是编程模型,在 2004 年由 google 提出。在 MapReduce 的规范中,处理的对象都是键值对这样的数据。用户可以将计算分解为 (一个或多个) Map 和 Reduce 这样的操作,调用 MapReduce 函数库去实现大数据处理的功能。

Map 函数读取一个键值对,并产生一个中间键值对的集合。MapReduce 库会使用自动将中间键的集合进行分组,并传入 Reduce 函数。
Reduce 函数读取一个 key 以及对应的 value 的集合,它将 values 集合进行合并操作,最后产出一个 String 的输出。

论文中给出的样例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
map(String key, String value):
// key: document name
// value: document contents
for each word w in value:
EmitIntermediate(w, "1");

reduce(String key, Iterator values):
// key: a word
// values: a list of counts
int result = 0;
for each v in values:
result += ParseInt(v);
Emit(AsString(result));

在样例中输入的是文件名-文件内容的键值对,map 函数读取出每个文档中的所有单词,对于每个单词 w 产生一个 w->1 的映射,即单词 w 出现了一次。经过 MapReduce Library 根据 key 分组后,reduce 函数接收到的 key 就是各个单词,values 的数量就是出现的次数,最后输出出来就得到了每个单词的出现次数了。

简单来说,map 是一个打散数据的过程,而 reduce 是一个合并数据的过程

仿照上面的例子,我们可以写出符合上述接口的一个排序算法。

1
2
3
4
5
6
7
8
9
10
11
map(String key, String value):
// key: document name
// value: document contents
for each word w in value:
EmitIntermediate(w.substring(0,2), w);

reduce(String key, Iterator values):
// key: prefix of word
// values: a list of words
result = sort(list(values));
Emit(AsString(result));

map 函数将读取每个单词,将单词的前两位作为 key, 在 reduce 函数中,对所有以 key 开头的单词做排序处理,并输出排序好的数据。最后只要再按照 key 的大小排序,即可得到所有单词排序后的结果。

而在 lab1 中,我们要做的,并不是调用 Map Reduce 这样的方法,而是基于实验中给的代码,完成一个 Map/Reduce 函数库的设计。论文中给出的 Map/Reduce 工作原理如下图所示:

mapreduce

图2: MapReduce

在上面的示例中,一共有几个重要的组成单元。

  1. master。即总控单元,负责 map 和 reduce 任务的分配
  2. worker。不断从 master 那里请求新的执行任务并进行计算,当计算完毕时将通知 master
  3. 用户的程序,即用户编写的,满足 MapReduce 接口规范的函数。计算开始后,这些程序代码将被 fork 到各个 work 中,以便 worker 可以执行用户的代码进行计算。

按照上面的单词统计为例子,对于输入部分,以多个文件的形式,每个文件作为一个单独的执行任务,将由 Master 分配到各个 worker 中。每个 worker 将产生一个类似下面格式的中间文件:

mr-x-y: x 表示当前 worker 的编号,y 为需要对该文件进行处理的 reduce 任务编号(可以通过哈希求得,这样对于同一个单词,y 一定是相同的,即在同一个 reduce 任务中进行统计)

worker 处理完毕后将文件写在本地磁盘中。当所有输出文件都经过 map 步骤后,master 将开始进行 reduce 的分配。每个 reduce 可能从多个单元中获取输入,经计算后产生一个输出文件。

对于执行的 reduce 编号为 y 的 worker,将拉取所有编号为 mr-?-y 的文件进行处理。

综上考虑,任务中 master 和 worker 的通信的格式基本上至少要包含如下内容:

master->worker:

  1. task type
  2. input file urls

worker->master:

  1. task type
  2. output file urls

另: 这种简单的实现是会有很多实际问题的,比如 worker 突然宕机,比如负载不均衡等等。

Lab2: Basic Raft

在实验二中,我们需要实现的是一个 Raft 算法,位于 raft/raft.go 文件中。
Raft 算法 是一个共识性算法,它能够保证对于一个集群,只要超过一半的结点是可用的,这个集群就能对外作出响应。由于分布式中在行业广泛使用的 paxos 算法过于晦涩难懂,因此 Raft 作为一个简单易懂的算法被提了出来。在保证较高执行效率的基础上,Raft 容易理解以及编程实现。

附:
结点: 可以理解为一个计算机或者计算单元
集群: 一群联系紧密的结点的集合,对外界而言则可以视为一个单一的计算机。

按惯例,先给例子解释 Raft 能拿来干什么。

图3: Raft 的使用

上面一层是 KVServer,可以理解为 Key-Value 键值对的服务器,一个 KVServer 和一个 Raft 服务器一一对应,各个 KVServer 服务器之间没有任何联系,Raft 服务器之间两两互联。当一个请求过来后,KVServer 会将请求提交给 Raft 层,确保多数 Raft 服务器认可这一个提交。最后会通过管道等方式,将 Raft 中送出来的提交进行处理,并返回给客户端。
注: 一个 Raft 和 KVServer 服务器很可能位于同一台计算机上。

State machine & Log

要理解好 Raft 算法,首先要理解好状态机这样一个概念。状态机模型事实上是一个很广义的概念,现实中的很多东西都能抽象成一个状态机来表示。比如门可以是一个状态机,它包含两种状态: 打开,关闭。我们动作类游戏时,角色的运动状态也是一个状态机: 包括站立,走路,跑步,跳跃等状态。甚至我们人也可以是一个状态机: idle 状态,吃饭状态,洗澡状态,学习状态等等。。

我个人习惯把状态机模型分为以下三个部分: 状态(state)-刺激(activate)-反应(action)

state

图4: 状态机

其中,反应是一个函数 f,以当前状态和接受的刺激为输入,输出的是下一个状态。怎么理解呢?以玩游戏为例子。在玩游戏时,角色站立(状态)在某个地板上,这时我按下前进键(刺激),角色切换(反应)到向前走的状态。再按下空格键(刺激),角色又切换(反应)到了跳跃的状态。最后落地(刺激), 角色再次切换回站立状态。
或者说以自己为例,早上起床人处于精神饱满的状态,经过了码一天代码这样的刺激,消耗了大量的能量,到了晚上就是处于一个好困好饿想吃饭睡觉的状态了。

状态机广泛应用于各个软件开发领域,比如下面的这个图,就是一个典型的状态机模型(Android 中的 Mediaplayer 类)。蓝色表示的是状态,箭头表示的是刺激。

mediaplayer

图5: Mediaplayer

切换回到 Raft 中,对于每个 raft 服务器而言,要保存的就是一个 key-value map。每次一个新的刺激(Put/Append/Get 请求)到来,都有可能会改变当前的状态。使用状态机模型有一个很大的好处,就是保证了一致性。只要给定相同初始状态以及执行操作序列,以及一个确定的反应函数(如何响应某个操作),最终服务器的状态一定是一致的

这样一来,就引入了另外一个概念:如何保存执行操作序列?在 Raft 中,使用的是 Log 来保存的。每一个 LogEntry 可能包含如下的一些信息:

  1. 请求类型: 在 Key/Value 存储系统中为 Put、Append、Get 请求
  2. 请求附带的信息: 比如 Get 请求需要一个请求的 Key,Put 和 Append 请求则需要一个 Key 和 Value

这样,在服务器启动时,我们只要初始化一个空的 map,然后保证集群中所有服务器执行的操作序列都相同,那么服务器的状态就一定会是相同的,对外界而言,这个集群看起来就像是一个单一的结点。

raft_state

图6: Raft 的状态

Leader election

对于一个由多个个体组成的集体,如果要使集体作出统一的决策,一个最简单的办法就是: 通过投票选出一个 leader,由 leader 来做决定,所有的服务器听从 leader 的指挥。当一个请求过来的时候,leader 将这个请求分发给其他服务器,如果有超过一半的服务器收到了该请求,那么就说明这个请求被认可。如果某个服务器收到了一个请求,并且它不是 leader,那么它将拒绝对这个请求做处理,客户端将转而尝试发送请求到其他服务器上。

Term

在 raft 中,时间是按照 term 进行划分的。每个 term 我们可以理解为一个任期。如下图所示:

raft_state

图7: Term

Raft 保证下面这样的一个性质:在每个 term 中最多只会出现一个 leader (Election safety)。每个 leader 在任期内需要每隔一定的时间(Heartbeat timeout)就向其他服务器发送心跳包,说明 leader 是存活的。

  1. 服务器发起选举机制。而对于每个服务器,如果一定的时间内(Election timeout, 通常是 Heartbeat 的两倍以上)没有收到来自 leader 的心跳包,那么当前服务器就认为 leader 已死,新王当立,于是自己的 term+1,同时发起请求,状态转变为 Candidate,先投自己一票并要求其他的服务器给自己投票。
  2. 服务器投票机制。每个服务器在一个 term 中只投出一票,先到先得。
  3. 服务器一旦收到包含有 term 大于自己的任何请求,转移到 follower 状态,更新自己的 term,并重新计时。

注意,Raft 为了避免陷入一直无法选出 leader 的情形(多个 server 同时希望成为 leader),采用了随机设置 Election timeout 的方式,这样大多数情况下,一个很短的时间内往往只有一个 server 发起投票,从而能够顺利选出 leader。

另外,按照 Raft 的规则,虽然能够保证一个 term 内最多只有一个 leader,但是在某些状态下,可能会出现不止一个 server 认为自己是 leader! 但上面的规则保证了,它们一定是位于不同的 term 当中。

图8: Server 分裂

Log replication

Raft 算法另一个重要的部分就是 log 的分发/复制(replication)。前面提到,只有 leader 才会接受某个请求,当 leader 收到后,会调用 AppendEntries 尝试将 log 分发给其他 server。当 leader 收到超过一半的服务器的回复后,leader 就认为当前整个 Raft 集群可以接受这个请求了,或者说这个可以被提交(commit)了,server 会增加自己的 committedIndex,当下次再发送 AppendEntries 请求时,携带的 committedIndex 就会告诉其他服务器哪些 log 可以被提交,经过管道送出(按上面的例子,KVServer 就会收到 Raft 送出的 LogEntry),此时就说这个 Log 被应用(apply)了。

log

图9: 一种可能的 Log 列表

论文中给出的一种可能的情形如上图所示。每一行表示一个服务器的 log 列表,每个小格子中,上面的数字表示 log 提交时 leader 所在的 term,下面的信息表示请求的具体内容(Put 请求)。
根据上文的表述,每个 log 隐式地存在三种状态:

  1. 存在于某(几)个 server 的 log 列表中
  2. 由于 leader 超过一半收到了 follower 的添加 log 成功回复,log 被 commit 了。
  3. 有新的 log 被 commit 了,因此将它们送出 Raft 层,称为被 apply 了。

commit

在某些情况下,会出现下面这样一种情况。当服务器被选举为 leader 时,可能存在某个 follower 位于 a-f 中的某个状态。a,b 可能是因为 follower 还没有收到某个 log,c-f 可能是由于该服务器成为 leader 并收到请求后,还没有将 log 分发到其他服务器上(或只分发了一部分)就 crash 了。

differ

图10: corner cases

leader 在任期间,会不断地往其他 server 发送请求,并且强制要求所有的 server 的 log 要和 leader 匹配。由于 server 的 log 和 leader 的可能不匹配,leader 需要对每个 server,维护 nextIndex 数组和 matchIndex 数组,分别维护需要发给某个 server 的下一个 log 的 index,以及已知的 server 的 commitIndex。每次 leader 发现 server 的 index 和对应的 nextIndex 不匹配的时候,就会 -1 直到两者匹配,并把 nextIndex ~ leader 的最新的 log 信息全部发送给 server。

当然,由于上面的性质,一个 log 有可能虽然被提交到了 leader 手上,但却由于 crash 后丢失 leader 地位,导致当前位置的 log 被新 leader 接受的其他 log 替代掉。

election restriction

前面提到,每个结点在一个 term 内只能投出一票。那么,什么情况下才能投出自己的选票呢?比较明显的一点是,当 term < 自己一定不能投票。但是 term 更大就能投票吗?参考图 3,5 个 server 被分割成两部分,三个的那一部分能正常工作,但两个的那一部分由于没办法获得多数结点的认可,将不断地尝试成为 leader,term 会不停的增加,比此时另一边的 term 要大很多。这时如果再将它们合并在一起,考虑到三个的那一部分很可能已经 commit 过好几个 log 了,这时如果这三个给两个的那一部分投票,那很有可能导致已经 commit 的 log 被再次覆盖! 因此对投票需要再加一个限制:

  1. term > 自己或 term = 自己并且当前 term 没投过票
  2. log 不会比自己的更”旧”。(我们说 log A 比 log B 更新,意味着 A.term > B.term 或者 A.term == B.term && A.index > B.index)

committed entries for previous term

每个 leader 在自己的任期内,不能直接提交非自己任期的 log。考虑下面的一个场景。

delete

图11: commit for previous term/dev>

S1-S5 分别对应 5 个不同的 server。一开始 S1 成为了 leader,并接受了一个请求。然后,在 (b) 中,S5 收到了 S3-5 的票成为了 leader,并接受了一个请求,然后又马上 crash。在 (c) 中,S1 重新当上 leader,并把 term = 2 的 log 分发到 S3。此时如果 S1 这个 log 已经分发到多数结点上了,然后 commit 该 log,紧接着马上 crash,随后在 (d) 中 S5 当选,并把 term=3 的块分发给了所有 leader。这时就发生了已经 commit 的 log 被覆盖掉的情况!几个服务器的状态就不一致了。

因此,leader 只能在 commit 自己的 log 的同时,顺便把前面没提交的 log commit 了。比如图 (e) 中,S1 再次成为 leader 后,将 term = 4 的块分发到 S2 和 S3,并 commit,此时 term = 2 的块也会连带着一起提交了。注意到此后 S5 不可能再次当选! S1-S3 都不会再投票给它了。

总结上面的各个点,最后得到一副下面完整的 Raft 算法实现思路(同样摘自原论文):

raft_all

图12: Raft 完整实现

Proof

经过上面的算法,Raft 能保证以下性质:

Election Safety: 每个 term 最多只有一个 leader 当选
Leader Append-Only: leader 只会往 log 列表中添加新的 log,而不会删除
Log Matching: 如果两个 server 的 log 列表包含一个 index 和 term 都相同的 entry,那么这个 entry 以及它之前的所有 entry 都是相同的。
Leader Completeness: 某个 entry 如果已经被 commit,那么它将出现在后面所有 leader 的 log 列表中。
State Machine Safety: server 如果 apply 了某个 index 的 entry,它不会再 apply 另一个 index 相同的 entry

其中,第一点可以由每个 server 在每个任期只能投出一票证明。第二点是 Raft 对 leader 的约束。第三点可以由归纳法证: term 相同说明是由同一个 server 接收的,index 相同说明存放的 log 信息是完全相同的。而由于前面提到,follower 只有和 leader 对应的 nextIndex 数组相匹配的时候,它才会接受 leader 分发的 entry,因此可以保证,前面的 log 一定会是相同的。

第四点的证明则比较复杂。使用反证法证明: 当一个 entry 已经被 commit,则说明超过一半的结点已经接受了这个 entry。这时如果一个新的 leader 出现(它收到了超过一半的结点的投票)但又没有这个 entry,说明至少有一个结点持有了这个 entry 并且给这个新的 leader 投票。但是根据投票的规则,新的 leader 的 log 列表肯定不能比投票给它的结点更旧。这就产生了矛盾。

有了第四点后,第五点也可以很容易被证明了: 只有 leader 能发起 commit,并且一个 entry 一旦被 commit 后,一定会出现在后面的 leader 的 log 列表中,并且 index 是单调递增的,最后一定会按顺序被 apply。

到这里就完成了 raft 算法基本的证明。

Snapshot

接下来再考虑一个问题: 内存的大小是有限的,对于一个要持续运行的系统,log 不可能无限增长。因此,需要有某种方法可以保存 log。考虑状态机的那个模型,每次 log 的 apply 其实就是状态的一次改变,如果我们把当前的状态保存起来,那么 log 就可以不需要了,这也就是 snapshot。

因此,系统需要检测当前 log 占用的空间,如果太大了,则使用 snapshot 进行压缩,然后把没用的 log 丢弃。这就涉及到了几个问题。

第一个问题是如何保证 snapshot 的同步? 只需要在请求时,附带 leader 当前的 commitIndex,term,lastIncludedIndex(snapshot 时最后一个 log 的 index),以及状态等信息即可。
第二个问题是 follower 的 log 列表如果和 server 不一致怎么办?做法其实也比较简单,直接丢弃所有 index 在 lastIncludedIndex 之前的所有请求即可。

Lab3: Basic Fault Tolerant K/V System

注: 开始实验三前一定要确保实验二的实现是正确的。可以使用 go test 跑个几十次,确保没出问题再继续下一步。

第三个实验内容大致是: 基于实验二实现的 Raft 算法,实现一个简单的具有容错功能的 K/V 存储系统。第三个实验中,需要编写这样几个类:

  1. KVServer,存储状态的类,处理 Raft apply 的请求并变更自己的状态。一个 KVServer 和一个 Raft Server 一一对应。同时,需要响应从客户端发起的请求。
  2. Clerk,存储系统内部相对于 KVServer 的客户端(对于存储系统外部而言,这个类则相当于 Server),包含 Get,Put,Append 等方法,持有所有 KVServer 的实例(准确来说是能和所有 KVServer 通信),向 KVServer 发起请求。为了简化,本实验中一个 Clerk 一次只会发起一个请求。

用图像来表示关系则如图:

图13: Basic kv system

Clerk

第一步是 clerk 的编写。总共有两个需要注意的地方。

Timeout scheme

Clerk 的请求是直接发往 KVServer 的,而前面我们知道,KVServer 和 Raft 是一一对应的,这就导致了一个重要问题: 虽然 Clerk 持有 KVServer 的实例,但是它并不知道哪一个 KVServer 是 leader(只有 leader 才能接受往 log 列表中添加请求)。并且,由于实际中通信并不是一直良好,请求有可能丢失导致 KVServer 没有收到。因此,我们需要一个重复发起请求的机制。

每次发起一个 Get 等请求的时候,需要开启一个计时器,当限定时间内没收到任何请求或者 KVServer 拒绝接受请求时,更换 Server 进行重试,因此代码类似下面这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
timer := time.NewTimer(randTimeout())
for {
// 收到 KVServer 的回复时,会从 channel 拿到一个结果
select {
case reply := <- channel:
if reply.Err == OK {
// do something...
return
} else {
// retry with another server
}
case <- timer.C:
// retry with another server
}
}

注: 这里有一个可行的优化: Clerk 每次收到请求时,记住这个 KVServer,下次优先向这个 server 发起请求,当 leader 不会一直变动时,这时一个很大的优化。

Duplicate detection

前面的超时重发机制保障了 Clerk 会一直尝试发送请求直到得到结果。这就导致了另外一个问题: 重复! 这里也是这个实验中一个较大的难点,需要保证 KVServer 不会重复处理同一个请求。

解决这个问题的一个方法是,使用序列号(seqId)来标识每一个请求,使用客户端编号来标识每一个 Clerk。序列号可以是一个从 0/1 开始的无符号整数,Clerk 的编号则应该保证不重复(可使用随机函数生成)。这样,KVServer 可以通过记住每一个 Clerk 发起的最后一个请求的 seqId,当收到 seqId 更小的请求时,说明这是一个重复请求,因此不进行处理。(注意: 这里的做法依赖于实验给的限定条件,即每个 Client 一次只会发起一个请求)

KVServer

KVServer 本质上还是一个存储 K/V 对的状态机。它有几个工作:

  1. 接受 Clerk 发过来的请求,并尝试提交给 Raft 层,失败则返回错误信息
  2. 处理从 Raft 的 channel 提交出来的请求,更新自己的状态
  3. 在 Lab3B 中,需要检测 Raft 层占用的空间是否过大,如果超过限定大小需要向 Raft 层发起 Snapshot 请求。

因此,KVServer 需要保存以下状态: 存储 K/V 的一个 map,每个 Clerk 对应的最大的 seqId,raft 占用的最大空间限制。

Command submit

第一件需要做的事情是对 Clerk 发起的请求(Op) 进行设计,这里我的设计是:

1
2
3
4
5
6
7
type Op struct {
OpType int // 请求的类型
Key string // 请求的 Key
Value string // 请求的 Value,仅用于 Put/Append 请求
ClientId int64 // 发起这个请求的客户端的编号
SeqId int // 客户端发起的请求的 id,和编号一起用于实现去重
}

这样,当 KVServer 发送请求到 KVServer 时,KVServer 将其封装称一个 Op 对象,然后提交给 Raft,随后等待 Raft 将请求送出并处理完毕,将结果返回给 Clerk,期间出现异常(比如 Raft 不是 leader,无法提交请求),则将异常信息返回给 Clerk,Clerk 则会向下一个 KVServer 重新发起请求。

注: 这里等待有多种处理方式,比如 sync.WaitGroup 或者 channel,需要注意请求成功提交给了 Raft 并不意味着这个请求一定会被送出,有可能提交后 Raft 就 crash 了,随后新的 leader 上任,将这个 Log 覆盖掉。

State update

在本实验中,Raft 通过 channel 将请求送出。因此在初始化 KVServer 的时候,需要开启一个线程去不断地处理从 Raft 中送出的数据。

1
2
3
4
5
6
func (KVServer* kv) loop() {
for !kv.killed() {
msg := <- kv.applyCh
// process msg
}
}

Log compaction(Snapshot)

Snapshot 有几个问题需要处理。

第一个问题是 Snapshot 需要保存的数据。这里我选择的是保存 K/V 键值对以及每个 Clerk 的最大 seqId。这样就可以在重启后依然可以做到去重。

第二个问题是 Snapshot 的时机。最容易想到的方式是,开启一个新的线程,每隔一定时间判断 raft 大小是否超过要求,如果超过则进行 Snapshot 操作。经过尝试后发现,这种做法时间间隔不好确定。时间间隔太短影响性能,时间间隔太长导致有时候没有及时进行 Snapshot,无法通过测试。因此采用了另一种方式: 在 Raft 层 apply 一批数据后,向 KVServer 发送一个特殊的消息,提醒 KVServer 新数据的到来并进行检查。

注: 为了发送特殊类型的消息,raft 中送到 KVServer 的消息对象 ApplyMsg 需要进行更改: 添加一个 type 项,标明消息的类型。

Lab4: Sharded Fault Tolerant K/V System

经过前面的几个实验,已经能够完成一个基本的 K/V 系统。然而这样的一个系统还是存在很多问题:

  1. 服务器是固定不变的,一旦开始运行后就无法发生改变。(raft 的论文中给出了变更 raft 集群大小的方式,实验中不实现这个功能)
  2. 虽然使用了多个服务器保证部分结点故障,系统还能正常使用,但付出了很多通信代价,单从性能上讲,比单个 K/V 服务器要慢很多。为了提高性能,在实验 4 中需要实现一个具有分片功能的 K/V 存储系统。

Shard

对于 K/V 存储系统,假设需要管理的数据的量特别大,又或者并发量的要求很高,以致于一个集群并没有办法完全处理。这时显而易见的做法就是将数据集拆分成多个部分,每个集群只负责处理其中的一小部分,这样就使得整体的效率可以有数倍的提升。shard 可以理解成整个 K/V 存储系统中数据的子集。实验三中的实验就可以看成是只有一个 shard 的存储系统。

比如 Key 以 “a” 开头的所有请求都交给集群 1 处理,以 “b” 开头的所有请求都交给集群 2 处理。在本实验中,使用的下面的函数判断某个 key 该交给哪个 shard 处理:

1
2
3
4
5
6
7
8
9
func key2shard(key string) int {
shard := 0
if len(key) > 0 {
shard = int(key[0])
}
// 在本实验中 NShards 为一个固定常量 10
shard %= shardmaster.NShards
return shard
}

把数据分成 shard 以后,我们就可以将 shard 分别交给某个集群去处理。为了增加整个系统的灵活性,我们使用配置(Config) 去管理每个 shard 是由哪个集群负责的,定义如下:

1
2
3
4
5
type Config struct {
Num int // config number
Shards [NShards]int // shard -> gid
Groups map[int][]string // gid -> servers[]
}

Num 用来表示当前配置的版本号,每次配置发生变化时都会递增。Shards 是 shard 到 gid(每个集群都有一个 id,这里记为 gid)的映射。Groups 则是 gid 到 servers 的映射,表示每个集群中包含有哪些服务器,用于和服务器的通信。

此外,既然使用了动态配置,就需要有集群来管理这些配置信息,称之为 ShardMaster。这样整个系统的雏形就有了。每个集群不断向 ShardMaster 询问当前配置信息,当配置信息发生更改后,自身的状态也要发生变更(比如不能再接受某些 shard 等),遇到不是由自己负责的请求,则返回一个 ErrWrongGroup 这样的错误信息,Clerk 就可以转而向其他集群发起请求。

图14: Sharded kv system

每个 cluster 的定义见 图13。

此外,虽然实验中用的是 key 的 hash 值去做分区,那么在实际应用中,类似地也可以使用客户端所在地点等去做分区,每个客户端自动向最近的一个集群发起请求。又或者可以用于实现负载均衡。

ShardMaster

ShardMaster,暴露以下接口:

  1. Join(servers): 添加某(几)个集群,参与提供存储服务
  2. Leave(gids): 某(几)个集群离开,不再提供存储服务
  3. Move(shardId, gid): 将第 shardId 个 shard 交给 id 为 gid 的集群负责
  4. Query(num): 查询版本为 num 的配置信息

无论是 Join、Leave 还是 Move 都会对配置进行产生更改,因此需要产生一个新的 Config。而对于查询,则只需要直接返回配置信息即可。ShardMaster 的实现基本上代码可以全部照搬实验三,需要考虑的仅有一个点: 如何进行配置的变更。

Config 类中,对于 Groups 和 Num 的处理方式比较简单,而 shard 的分配则比较麻烦。为了尽可能地提高效率以及均衡负载,对于出现集群加入服务或者离开服务的情况,需要进行再平衡操作。Config 的变更要满足以下要求:

  1. 初始时,所有 Shard 都没有任何集群来负责,因此 Shards 中每个都映射到 gid = 0(即一个非法的 gid)。
  2. shard 需要尽可能平均地分配给各个集群,即各个集群负责的 shard 的数量的差不能超过 1
  3. 在满足条件 2 基础上,每次变更尽可能小,即使用尽量少的移动实现均匀地分配。

另外,一个需要注意的陷阱是,这个算法一定要是确定的(deterministic)。即对于相同的输入数据要给出相同的结果。而 go 中 map 的顺序是不确定的,因此算法需要兼容 map 遍历乱序的情况。

为了平均地分配,我们事实上可以这样操作: 统计每个 server 负责的 shard,当负责的 shard 的数量的极差(最大减最小)超过 1 时,将它负责的某个 shard 移交给负责的数量最少的 server。一个可行的算法基本步骤如下:

  1. clustering。统计每个 gid 负责的 shards,对于没有人负责的 shard(可能由于 leave 引起),则收集到一起
  2. 将没有人负责的 shard 放置到 gid 最大的那个集群
  3. 对每个集群负责的 shardId 进行排序
  4. 循环遍历各个集群负责的 shards,当出现极差超过 1 时,从负责 shard 数量最多的集群里面拿出 id 最大的几个 shard, 交给负责 shard 数量最少的集群。(负责数量相同则选择 gid 更大的那个,总之一定要保证确定性)
  5. 统计每个 shard 是哪个集群负责的,构造 Config 并返回

Shard Design

在本实验中,负责某个 shard 的集群常常会发生变动,并且这样的变动对于客户端而言应该是隐藏的。这就要求我们有足够的机制保证 shard 能够从一个集群转移到另一个集群,并且保留有足够的信息去避免发生重复请求或数据丢失的现象。此外,Challenge 中还要求我们要对已经移出的 shard 进行删除操作,以节省空间。这些条件都在说明: shard 相对于每个集群应该是尽可能独立的

综合上面的条件,可以使用下面这样的结构去描述一个 shard。每个集群要存储的数据就只有 Config 以及集群管理的 shard 对象(一个 shardId 到 shard 数据结构的映射)。

1
2
3
4
5
6
7
8
9
type Shard struct {
Data map[string]string // key-value storage of a shard
SeqMap map[int64]int // clientId -> seq map

ConfigNum int // currentshard config shard is using
Gid int // gid group owns this shard
ID int // shardId
Servers []string // shard in ConfigNum's server
}

State update

注意对外部而言,集群相当于一个结点,由 leader 负责接受消息以及发送消息到外部。考虑到 shard 的信息在集群内部要保持一致,因此,所有涉及到 Config 以及 Shards 变更的操作都需要提交到 Raft 层进行处理。因此,至少需要处理以下几种类型的消息:

  1. 一个新的 Get/Put/Append 请求,做法类似 Lab3
  2. 从 Snapshot 中恢复状态,同样类似 Lab3
  3. 新的 Config 到来
  4. 收到一个来自其他集群的 Shard
  5. 删除某个 Shard

Shard Move

第一个考虑的问题是移动哪些以及如何移动。很显然,触发 shard 移动的驱动是 Config 的更新,如果 Config 发生了改变,某个 shard 不再由当前集群负责,那么就需要将其交给负责它的某个集群。这里有两种做法:

  1. pull 形式。当 config 更新后,每个集群查看自己是否持有所有自己负责的 shard,如果某个 shard 没有则查询 Config 并向对应的集群索取。
  2. push 形式。当 config 更新后,每个集群查看自己是否持有不是自己负责的 shard,如果有则查看 Config 并将其送到应该负责它的集群。

经过考虑,笔者采用的是第二种形式。(第一种实现起来更加复杂,并且效率上看第二种会更好一点)。和 Put/Append 等操作类似,发送 shard 的集群把自己当做一个客户端,增加一个 RPC 调用,将 shard 包含的数据封装在请求中发送。

同样的,由于各种潜在的错误,RPC 调用可能会失败,需要有重传的机制保证目标收到请求,这就再次引入了去重的问题。怎么做呢?

第一种做法是参考 Lab3,每个集群持有一个 id,每个发送 shard 的请求也有一个 id,通过这两个 id 来保证每个请求只会执行一次。但这显得过于麻烦了,其实 ConfigNum 和 ShardId 已经可以唯一地标识出一个请求了: 在每个 Config 中,一个 shard 只会由一个集群来负责。每次收到一个 shard 发送请求时,出现重复有以下情形:

  1. shard 的版本号小于当前版本号(下面会涉及)
  2. 当前集群已经持有了某个 shard 并且 shard 的版本号和持有的相同

另外,当 shard 不再属于自己后,应该拒绝接受所有属于这个 shard 的请求。

Config update

在 shardKV 启动的时候,我们需要一个循环不断从 ShardMaster 中拿 config 以更新自己的状态。这就涉及到了一个问题: 怎样去做更新?

如果不断地拿最新版本的 Config,那么就意味着很有可能会跳过某些版本的 Config,并且对于 shard 的管理也会更加不可控(也许是可行的,但至少也会带来过高的复杂度)。因此,一个比较稳妥的做法是,满足一定条件后,每个集群一次只拿比当前新一个版本的 Config,并针对拿到的 Config 对自己持有的 shard 做调整。

笔者拿下一个 config 的条件如下:

  1. 当前集群版本下,所有不属于自己的 shard 都被发送出去并收到了回复
  2. 当前集群版本下,所有属于自己的 shard 都已经持有且版本号和集群版本号相同

这里可以考虑下,如果没有条件 1,可能出现某个 shard 被发送出去并收到,但由于网络问题没收到回复,这时 Config 更新,shard 的持有者又变更了,这时又将同一个 shard 送给了另一个集群,这就导致了系统出现了同一个 shard 的多个副本。如果没有条件 2,可能出现收到了一个旧版本的 shard,但是却没办法确定这是否是一个重复副本的情形。

另外,收到一个新的 Config 时,需要对 shard 进行变更。当某个 shard 不再属于自己,更新 shard 的持有者,当某个 shard 依然属于自己时,更新版本号。同时对于不再属于自己的 shard,需要不断发起请求尝试将 shard 送给目标集群。当收到目标集群的回复时,则可以往 Raft 层中提交一个删除 shard 请求。删除成功后,就可以继续拉取下一个 Config 了。

Something else

在做实验 4 的时候,采用的方式是每次只会拉取下一个新的版本的 Config,这在 Config 频繁变动的情况下效率是很低的(实际应用的话应该不会那么频繁变动吧)。是否有办法可以避免这样的情况呢?但这样的话似乎又需要考虑这几个问题:

  1. 如何保证整个系统最多只有一个集群会响应某个 shard(不会出现多个合法副本)
  2. 如何保证 shard 一定会经过多个集群最终顺利送到负责处理该 shard 的集群(即一定会有集群响应某个 shard)
  3. 如何保证非法的副本可以在恰当的时间被回收

如果有一种机制可以保证整个系统对当前最新的 Config 一致认可(或者能同时知道最新版本是多少),那这两个问题都可以解决。不过似乎也不太可行。

Epilogue

6.824 实验到这里就结束了。尽管实验数量较少,难度却还是很大的。并且由于后面几个实验要用到前面的设计,如果 Raft 没有写好,后面极其容易遇到各种各样的问题,并且调试难度极大。

此外,另外值得一提的一点是,从第一个视频开始,教授就提到分布式是在单机性能实在无法完成的时候再去考虑的,毕竟分布式在实际应用中需要解决各种各样的 Corner cases,可以单服务器提升性能解决的问题就尽量不要使用分布式比较好。分布式系统这把双刃剑在提供更丰富的计算资源与系统容错性的同时,也大大增加了系统的复杂性

------------- The artical is over Thanks for your reading -------------