db

1
2
3
4
5
6
7
8
9
10
typedef struct redisDb {
dict *dict; /* The keyspace for this DB */
dict *expires; /* Timeout of keys with a timeout set */
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/
dict *ready_keys; /* Blocked keys that received a PUSH */
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
int id; /* Database ID */
long long avg_ttl; /* Average TTL, just for stats */
list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */
} redisDb;

save

书上说save的时候过期键不保留,load的时候保留.但是5.0代码中,情况相反.可能是saveInfo的关系.即使是定时任务,因为在redis的过期处理规则可能会有部分过期键没有删除掉(serverCron->databasesCron->activeExpireCycle),所以这个不保留不是绝对的

  1. rdbSave->rdbSaveRio

    REDIS|db_version(4)|saveInfo|database|EOF|check_sum

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
    dictIterator *di = NULL;
    dictEntry *de;
    char magic[10];
    int j;
    uint64_t cksum;
    size_t processed = 0;

    if (server.rdb_checksum)
    // 校验函数
    rdb->update_cksum = rioGenericUpdateChecksum;
    snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION);
    // 从这里开始,redis+4个字节的版本号
    if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;
    // 写入saveInfo,type是 RDB_OPCODE_AUX
    if (rdbSaveInfoAuxFields(rdb,flags,rsi) == -1) goto werr;

    for (j = 0; j < server.dbnum; j++) {
    redisDb *db = server.db+j;
    dict *d = db->dict;
    if (dictSize(d) == 0) continue;
    di = dictGetSafeIterator(d);

    /* Write the SELECT DB opcode */
    // 写入SELECTDB = 254
    if (rdbSaveType(rdb,RDB_OPCODE_SELECTDB) == -1) goto werr;
    // 调用的是写入长度,其实是写入数字
    if (rdbSaveLen(rdb,j) == -1) goto werr;

    /* Write the RESIZE DB opcode. We trim the size to UINT32_MAX, which
    * is currently the largest type we are able to represent in RDB sizes.
    * However this does not limit the actual size of the DB to load since
    * these sizes are just hints to resize the hash tables. */
    uint64_t db_size, expires_size;
    db_size = dictSize(db->dict);
    expires_size = dictSize(db->expires);
    // 写入db的size,RDB_OPCODE_RESIZEDB = 251
    if (rdbSaveType(rdb,RDB_OPCODE_RESIZEDB) == -1) goto werr;
    if (rdbSaveLen(rdb,db_size) == -1) goto werr;
    // 跟着过期字典的大小
    if (rdbSaveLen(rdb,expires_size) == -1) goto werr;

    /* Iterate this DB writing every entry */
    while((de = dictNext(di)) != NULL) {
    sds keystr = dictGetKey(de);
    robj key, *o = dictGetVal(de);
    long long expire;

    initStaticStringObject(key,keystr);
    expire = getExpire(db,&key);
    // key,value保存
    if (rdbSaveKeyValuePair(rdb,&key,o,expire) == -1) goto werr;

    /* When this RDB is produced as part of an AOF rewrite, move
    * accumulated diff from parent to child while rewriting in
    * order to have a smaller final write. */
    // 这个应该是AOF重写中被rdb阻塞了而造成延迟
    if (flags & RDB_SAVE_AOF_PREAMBLE &&
    rdb->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES)
    {
    processed = rdb->processed_bytes;
    aofReadDiffFromParent();
    }
    }
    dictReleaseIterator(di);
    di = NULL; /* So that we don't release it again on error. */
    }

    /* If we are storing the replication information on disk, persist
    * the script cache as well: on successful PSYNC after a restart, we need
    * to be able to process any EVALSHA inside the replication backlog the
    * master will send us. */
    if (rsi && dictSize(server.lua_scripts)) {
    di = dictGetIterator(server.lua_scripts);
    while((de = dictNext(di)) != NULL) {
    robj *body = dictGetVal(de);
    if (rdbSaveAuxField(rdb,"lua",3,body->ptr,sdslen(body->ptr)) == -1)
    goto werr;
    }
    dictReleaseIterator(di);
    di = NULL; /* So that we don't release it again on error. */
    }

    /* EOF opcode */
    if (rdbSaveType(rdb,RDB_OPCODE_EOF) == -1) goto werr;

    /* CRC64 checksum. It will be zero if checksum computation is disabled, the
    * loading code skips the check in this case. */
    cksum = rdb->cksum;
    memrev64ifbe(&cksum);
    if (rioWrite(rdb,&cksum,8) == 0) goto werr;
    return C_OK;

    werr:
    if (error) *error = errno;
    if (di) dictReleaseIterator(di);
    return C_ERR;
    }

    // 各类型在rdb中的编码方式
    int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime) {
    int savelru = server.maxmemory_policy & MAXMEMORY_FLAG_LRU;
    int savelfu = server.maxmemory_policy & MAXMEMORY_FLAG_LFU;

    /* Save the expire time */
    //如果设置了过期时间,先写过期时间 RDB_OPCODE_EXPIRETIME_MS = 252
    if (expiretime != -1) {
    if (rdbSaveType(rdb,RDB_OPCODE_EXPIRETIME_MS) == -1) return -1;
    // 8个字节长的时间
    if (rdbSaveMillisecondTime(rdb,expiretime) == -1) return -1;
    }

    /* Save the LRU info. */
    // 如果设置了LRU策略,就要保存对象的lru信息,为了后面策略的执行
    if (savelru) {
    // 因为lru是相对时间,所以要修饰一下
    uint64_t idletime = estimateObjectIdleTime(val);
    idletime /= 1000; /* Using seconds is enough and requires less space.*/
    if (rdbSaveType(rdb,RDB_OPCODE_IDLE) == -1) return -1;
    if (rdbSaveLen(rdb,idletime) == -1) return -1;
    }

    /* Save the LFU info. */
    // 按频率的删除策略,要存储频率
    if (savelfu) {
    uint8_t buf[1];
    buf[0] = LFUDecrAndReturn(val);
    /* We can encode this in exactly two bytes: the opcode and an 8
    * bit counter, since the frequency is logarithmic with a 0-255 range.
    * Note that we do not store the halving time because to reset it
    * a single time when loading does not affect the frequency much. */
    if (rdbSaveType(rdb,RDB_OPCODE_FREQ) == -1) return -1;
    if (rdbWriteRaw(rdb,buf,1) == -1) return -1;
    }

    /* Save type, key, value */
    // 存储对象类型
    if (rdbSaveObjectType(rdb,val) == -1) return -1;
    // 存储key,一定是string对象类型
    if (rdbSaveStringObject(rdb,key) == -1) return -1;
    // 存储值,key在类型为OBJ_STREAM和OBJ_MODULE中有用到
    // 除了quicklist
    if (rdbSaveObject(rdb,val,key) == -1) return -1;
    return 1;
    }
  2. rdbSaveObject

  • String.会尝试存储成整形(编码为整数,或者长度小于11),长度大于20尝试LZF压缩
  • List.quickList,如果是压缩过的quickList(lzf),如果没有压缩过,保存string(sz)
  • SET,hash编码按hash,如果是OBJ_ENCODING_INTSET转string保存
  • 其他类似