概述

本文是《用 Golang 实现一个 Redis》系列文章第二篇,本文将记录介绍 Godis 是如何实现数据结构以及 redis 的各种命令,包括 TTL、发布订阅、命令的原子性以及事务等功能。

根据 KWHL 图表来看,本文会着重介绍数据结构以及命令实现部分

命令执行

在上一篇文章《TCP 服务以及协议解析》中介绍了 Godis 是使用 RESP 协议来进行客户端和服务端的协议通信,服务端如何读取 RESP 协议消息的;那读取完之后需要干什么呢?

Godis 在协议解析完之后会将客户端输入的命令追加到一个切片内,然后交给 Handler 对象的 db 字段执行 Exec 函数( godis/redis/server/server.go 109 行 )执行,如下:

1
result := h.db.Exec(client, r.Args)

Exec 函数的流程图:

Exec 函数的执行流程如下:

  1. 首先截取命令切片的第一位,用 if 判断命令 key 是否是 subscribe 、publish、bgrewriteaof、rewriteaof、flushall、save、bgsave、select;
    1. 如果是这些命令,则单独启用执行逻辑,最后转 5;
    2. 如果不是这些命令,则调用选择当前选择的库(db 对象),用该库调用 Exec 库调用,转 2;
  2. 确认执行命令的库对象,调用该 db 对象的 Exec 函数执行命令,依然需要判断命令是否是 multi、discard、exec、watch、flushdb、
    1. 如果是这些命令的话则单独启用执行逻辑,最后转 5;
    2. 如果不是这些命令则执行 execNormalCommand 函数执行,转 3;
  3. 执行 execNormalCommand 函数,该函数会根据命令 key (比如 set name ,命令 key 就是 set )作为键去从一个名称为 cmdTable 的 map 去值
    1. 获取到了代表系统设置了该命令,执行 4
    2. 获取不到则代表未识别该命令,返回错误结果,最后转 5;
  4. 获取到值之后获取命令所对应的函数执行
  5. 返回结果

在上面的执行流程中有出现多次 Exec 函数,那这前后关系到底是怎样的呢?

MultiDB 对象的 Exec 函数( godis/database/database.go 81 行 )具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
func (mdb *MultiDB) Exec(c redis.Connection, cmdLine [][]byte) (result redis.Reply) {  
defer func() {
if err := recover(); err != nil {
logger.Warn(fmt.Sprintf("error occurs: %v\n%s", err, string(debug.Stack())))
result = &protocol.UnknownErrReply{}
}
}()

cmdName := strings.ToLower(string(cmdLine[0]))
// authenticate
if cmdName == "auth" {
return Auth(c, cmdLine[1:])
}
if !isAuthenticated(c) {
return protocol.MakeErrReply("NOAUTH Authentication required")
}

// special commands which cannot execute within transaction
// 判断命令是否是以下命令,是以下命令的话则执行单独逻辑并返回结果
if cmdName == "subscribe" {
if len(cmdLine) < 2 {
return protocol.MakeArgNumErrReply("subscribe")
}
return pubsub.Subscribe(mdb.hub, c, cmdLine[1:])
} else if cmdName == "publish" {
return pubsub.Publish(mdb.hub, cmdLine[1:])
} else if cmdName == "unsubscribe" {
return pubsub.UnSubscribe(mdb.hub, c, cmdLine[1:])
} else if cmdName == "bgrewriteaof" {
// aof.go imports router.go, router.go cannot import BGRewriteAOF from aof.go
return BGRewriteAOF(mdb, cmdLine[1:])
} else if cmdName == "rewriteaof" {
return RewriteAOF(mdb, cmdLine[1:])
} else if cmdName == "flushall" {
return mdb.flushAll()
} else if cmdName == "save" {
return SaveRDB(mdb, cmdLine[1:])
} else if cmdName == "bgsave" {
return BGSaveRDB(mdb, cmdLine[1:])
} else if cmdName == "select" {
if c != nil && c.InMultiState() {
return protocol.MakeErrReply("cannot select database within multi")
}
if len(cmdLine) != 2 {
return protocol.MakeArgNumErrReply("select")
}
return execSelect(c, mdb, cmdLine[1:])
}
// todo: support multi database transaction

// normal commands
// 代码走到这代表着都是正常命令
// 获取当前所选择库的下标,通过下标获取 DB 对象,在使用 DB 对象的 Exec 函数执行逻辑
dbIndex := c.GetDBIndex()
if dbIndex >= len(mdb.dbSet) {
return protocol.MakeErrReply("ERR DB index is out of range")
}
selectedDB := mdb.dbSet[dbIndex]
return selectedDB.Exec(c, cmdLine)
}

DB 对象的 Exec 函数( godis/database/singil_db.go 82 行)具体代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// Exec executes command within one database
func (db *DB) Exec(c redis.Connection, cmdLine [][]byte) redis.Reply {
// transaction control commands and other commands which cannot execute within transaction
cmdName := strings.ToLower(string(cmdLine[0]))
if cmdName == "multi" {
if len(cmdLine) != 1 {
return protocol.MakeArgNumErrReply(cmdName)
}
return StartMulti(c)
} else if cmdName == "discard" {
if len(cmdLine) != 1 {
return protocol.MakeArgNumErrReply(cmdName)
}
return DiscardMulti(c)
} else if cmdName == "exec" {
if len(cmdLine) != 1 {
return protocol.MakeArgNumErrReply(cmdName)
}
return execMulti(db, c)
} else if cmdName == "watch" {
if !validateArity(-2, cmdLine) {
return protocol.MakeArgNumErrReply(cmdName)
}
return Watch(db, c, cmdLine[1:])
} else if cmdName == "flushdb" {
if !validateArity(1, cmdLine) {
return protocol.MakeArgNumErrReply(cmdName)
}
if c.InMultiState() {
return protocol.MakeErrReply("ERR command 'FlushDB' cannot be used in MULTI")
}
return execFlushDB(db, cmdLine[1:])
}
if c != nil && c.InMultiState() {
EnqueueCmd(c, cmdLine)
return protocol.MakeQueuedReply()
}

// 执行正常命令
return db.execNormalCommand(cmdLine)
}

execNormalCommand 函数具体代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// ExecFunc is interface for command executor// args don't include cmd line  
type ExecFunc func(db *DB, args [][]byte) redis.Reply

// PreFunc analyses command line when queued command to `multi`// returns related write keys and read keys
type PreFunc func(args [][]byte) ([]string, []string)

// UndoFunc returns undo logs for the given command line// execute from head to tail when undo
type UndoFunc func(db *DB, args [][]byte) []CmdLine

// CmdLine is alias for [][]byte, represents a command linetype CmdLine = [][]byte
var cmdTable = make(map[string]*command)

type command struct {
executor ExecFunc
prepare PreFunc // return related keys command
undo UndoFunc
arity int // allow number of args, arity < 0 means len(args) >= -arity flags int
}

// 执行正常命令,比如get、set、plist.....之类的
func (db *DB) execNormalCommand(cmdLine [][]byte) redis.Reply {
// 判断命令前缀是否在cmdTable这个map里面
cmdName := strings.ToLower(string(cmdLine[0]))
cmd, ok := cmdTable[cmdName]
if !ok {
// 命令不存在
return protocol.MakeErrReply("ERR unknown command '" + cmdName + "'")
}
// 检查参数个数是否符合命令要求
if !validateArity(cmd.arity, cmdLine) {
return protocol.MakeArgNumErrReply(cmdName)
}

// 在执行ExecFunc函数前执行的函数,负责分析命令行读写了哪些key便于加锁
prepare := cmd.prepare
write, read := prepare(cmdLine[1:])
db.addVersion(write...)
db.RWLocks(write, read)
defer db.RWUnLocks(write, read)
// 实际执行命令的函数
fun := cmd.executor
return fun(db, cmdLine[1:])
}

execNormalCommand 函数会从一个名称为 cmdTable 的 map 根据键获取值,存放的值类型是 command 类型,该类型有 4 个字段,分别是 3 个函数类型和一个 int 类型:

  1. ExecFunc:执行命令的的函数
  2. PreFunc:将命令排队到“multi”时,PreFunc 分析命令行返回相关的写键和读键
  3. UndoFunc:该函数会返回给定命令行的撤消日志,撤消时从头到尾执行
  4. arity:命令的参数个数( set name ,arity 的个数则为 2 )

根据阅读源码得知,arity 的取值有 2 种,为正数时就必须要求命令的参数个数为 arity ,如果为负数则代表参数个数最少为 arity 的绝对值数
举例
注册 get 命令时, arity 的值为 2,那么代表着输入命令时,连带 get 命令,你的参数只能是 2 个,不能是 2 个以下或者 2 个以上的参数数量:

正例:get name
反例:1. get 2. get name age

注册 set 命令时,arity 的值为 -3,代表着输入命令时,连带 set 命令,最少输入的参数数量是 3 个,不能是 3 个以下的参数数量

正例: 1. set name sunshine 2. set name sunshine EX 10 3. set name sunshine EX 10 NX
反例:set

那既然得知用命令作为 key 和相对应的 command 类型实例存入 cmdTable 中,那什么时候存的呢?阅读代码得知,是通过每一个 go 文件的 init 函数里调用 RegisterCommand 函数( godis/database/route.go)进行注册,RegisterCommand 函数的具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
// RegisterCommand registers a new command// arity means allowed number of cmdArgs, arity < 0 means len(args) >= -arity.  
// for example: the arity of `get` is 2, `mget` is -2
func RegisterCommand(name string, executor ExecFunc, prepare PreFunc, rollback UndoFunc, arity int) {
name = strings.ToLower(name)
cmdTable[name] = &command{
executor: executor,
prepare: prepare,
undo: rollback,
arity: arity,
}
}

所支持的命令

那 Godis 有多少命令可以支持呢?

将近有 100 多条命令的支持,所以执行命令的时候就在 cmdTable Map 里面获取相对应的命令函数执行。

这还是正常命令,godis 还支持一些特殊命令,特殊命令的判断条件以及执行都在 godis/database/database.go Exec 函数中。

命令 含义 所属分类
publish 将信息发送到指定的频道 发布/订阅
subscribe 订阅给定的一个或多个频道的信息 发布/订阅
ubsubscribe 指退订给定的频道 发布/订阅
discard 取消执行事务块里的命令 事务
exec 执行事务块里的命令 事务
multi 标记一个事务块的开始 事务
watch 监视一个(或多个) key ,如果在事务执行之前这个(或这些) key 被其他命令所改动,那么事务将被打断。 事务
bgrewriteaof 异步执行一个 AOF(AppendOnly File) 文件重写操作 服务器
rewriteaof 与上一条同理 服务器
flushall 删除所有数据库的所有 key 服务器
save 同步保存数据到硬盘 服务器
bgsave 在后台异步保存当前数据库的数据到磁盘 服务器
auth 验证密码是否正确 连接
select 切换到指定的数据库 连接
geo 存储地理位置信息 地理位置

在 Godis 中,正常命令的实现函数都在 godis/database 目录中,作者按照 redis 的分类把这些命令分别归类到了文件,共 10 个文件,分别对应 redis 的 key、string、hash、list、set、sortedSet;下面对这些命令做一些描述。

key

在 godis 中,对于 key 所支持的命令如下表:

命令 含义
del 删除一个或多个 key
expire 设置一个 key 的过期时间,时间的格式为秒
expireat 设置一个 key 的过期时间,时间的格式是 uinx 时间戳并精确到秒
pexpire 设置一个 key 的过期时间,时间的格式为毫秒
pexpireat 设置一个 key 的国企时间,时间的格式是 uinx 时间戳并精确到毫秒
ttl 以秒为单位返回 key 的剩余过期时间
pttl 以毫秒为单位返回 key 的剩余过期时间
persist 删除 key 的过期时间,使得 key 永不过期
exists 检查给定 key 是否存在
type 以字符串的形式返回存在在 key 中的值的类型
rename 修改 key 的名字为 newkey,如果 key 不存在则返回错误
renamenx 在新的 key 不存在时修改 key 的名称为 newkey,newkey 存在时不做操作
keys 用于查找所有匹配给定模式 pattern 的 key

这部分我对设置过期时间更感兴趣,所以 key 这部分我只对 expire、expireat、pexpire、pexpireat、ttl、pttl 这几个命令做描述。

expire

从上面的章节中可以得知,正常命令最后都会走到 execNormalCommand 函数里面,在函数里会从 cmdTable 这个 map 根据命令去获取对应的 command 对象,最后拿到执行函数去执行;

对于 expire 命令,执行函数会在 godis/database/keys.go 146 行的 execExpire 函数中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// execExpire sets a key's time to live in seconds
func execExpire(db *DB, args [][]byte) redis.Reply {
// 获取到命令所执行对象的key
key := string(args[0])

// 将秒数转换为int64类型
ttlArg, err := strconv.ParseInt(string(args[1]), 10, 64)
if err != nil {
return protocol.MakeErrReply("ERR value is not an integer or out of range")
}
// 将int64 转换成time.Duration 类型
ttl := time.Duration(ttlArg) * time.Second

// 查看该key是否存在,不存在则直接返回
_, exists := db.GetEntity(key)
if !exists {
return protocol.MakeIntReply(0)
}
// 计算到期的时间
expireAt := time.Now().Add(ttl)
// 调用Expire函数,真正逻辑在这个函数里面
db.Expire(key, expireAt)
// 追加到aof文件里
db.addAof(aof.MakeExpireCmd(key, expireAt).Args)
return protocol.MakeIntReply(1)
}

从上述源代码可以看出,该代码只获取到要设置过期时间的key以及最终过期的时间,真正逻辑在 db.Expire() 函数里面:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// Expire sets ttlCmd of key
func (db *DB) Expire(key string, expireTime time.Time) {
db.stopWorld.Wait()
db.ttlMap.Put(key, expireTime)
taskKey := genExpireTask(key)
timewheel.At(expireTime, taskKey, func() {
keys := []string{key}
db.RWLocks(keys, nil)
defer db.RWUnLocks(keys, nil)
// check-lock-check, ttl may be updated during waiting lock
logger.Info("expire " + key)
rawExpireTime, ok := db.ttlMap.Get(key)
if !ok {
return
}
expireTime, _ := rawExpireTime.(time.Time)
expired := time.Now().After(expireTime)
if expired {
db.Remove(key)
}
})
}

将获取到的过期时间存放到 ttlMap 字段里,这个字段是一个字典,在使用 ttl 命令时获取到的剩余过期时间也是根据key去查询这个字段。

ttlMap 字段只是存储最终过期时间而已,实现自动过期后删除key的还是 timewheel.At() 函数,该函数是传入过期时间、key以及过期时间到时所执行的任务,将参数传入到时间轮,由时间轮去删除那些过期的key。

数据结构

godis 的数据结构实现在 godis/datastruct 文件夹中,实现的数据结构有链表、字典、set、跳表以及跳表实现的其他数据结构。

字典

godis 定义了字典的接口,目前有 2 个结构体实现了该接口,分别是 ConcurrentDict 和 simpleDict;其中 ConcurrentDict 内置了 sync.RWMutex ,所以该结构是并发安全的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
type Dict interface {  
Get(key string) (val interface{}, exists bool)
Len() int
Put(key string, val interface{}) (result int)
PutIfAbsent(key string, val interface{}) (result int)
PutIfExists(key string, val interface{}) (result int)
Remove(key string) (result int)
ForEach(consumer Consumer)
Keys() []string
RandomKeys(limit int) []string
RandomDistinctKeys(limit int) []string
Clear()
}

// ConcurrentDict is thread safe map using sharding lock
type ConcurrentDict struct {
table []*shard
count int32
shardCount int
}

type shard struct {
m map[string]interface{}
mutex sync.RWMutex
}

concurrent.go 整个文件也就将近 300 行代码,阅读起来难度也不是很大