int loadAppendOnlyFile(char *filename) { struct redisClient *fakeClient; FILE *fp = fopen(filename,"r"); struct redis_stat sb; int old_aof_state = server.aof_state; long loops = 0; //redis_fstat就是fstat64函数,通过fileno(fp)得到文件描述符,获取文件的状态存储于sb中, //具体可以参考stat函数,st_size就是文件的字节数 if (fp && redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) { server.aof_current_size = 0; fclose(fp); return REDIS_ERR; } if (fp == NULL) {//打开文件失败 redisLog(REDIS_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno)); exit(1); } /* Temporarily disable AOF, to prevent EXEC from feeding a MULTI * to the same file we're about to read. */ server.aof_state = REDIS_AOF_OFF; fakeClient = createFakeClient(); //建立伪终端 startLoading(fp); // 定义于 rdb.c ,更新服务器的载入状态 while(1) { int argc, j; unsigned long len; robj **argv; char buf[128]; sds argsds; struct redisCommand *cmd; /* Serve the clients from time to time */ // 有间隔地处理外部请求,ftello()函数得到文件的当前位置,返回值为long if (!(loops++ % 1000)) { loadingProgress(ftello(fp));//保存aof文件读取的位置,ftellno(fp)获取文件当前位置 aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT);//处理事件 } //按行读取AOF数据 if (fgets(buf,sizeof(buf),fp) == NULL) { if (feof(fp))//达到文件尾EOF break; else goto readerr; } //读取AOF文件中的命令,依照Redis的协议处理 if (buf[0] != '*') goto fmterr; argc = atoi(buf+1);//参数个数 if (argc < 1) goto fmterr; argv = zmalloc(sizeof(robj*)*argc);//参数值 for (j = 0; j < argc; j++) { if (fgets(buf,sizeof(buf),fp) == NULL) goto readerr; if (buf[0] != '$') goto fmterr; len = strtol(buf+1,NULL,10);//每个bulk的长度 argsds = sdsnewlen(NULL,len);//新建一个空sds //按照bulk的长度读取 if (len && fread(argsds,len,1,fp) == 0) goto fmterr; argv[j] = createObject(REDIS_STRING,argsds); if (fread(buf,2,1,fp) == 0) goto fmterr; /* discard CRLF 跳过\r\n*/ } /* Command lookup */ cmd = lookupCommand(argv[0]->ptr); if (!cmd) { redisLog(REDIS_WARNING,"Unknown command '%s' reading the append only file", (char*)argv[0]->ptr); exit(1); } /* Run the command in the context of a fake client */ fakeClient->argc = argc; fakeClient->argv = argv; cmd->proc(fakeClient);//执行命令 /* The fake client should not have a reply */ redisAssert(fakeClient->bufpos == 0 && listLength(fakeClient->reply) == 0); /* The fake client should never get blocked */ redisAssert((fakeClient->flags & REDIS_BLOCKED) == 0); /* Clean up. Command code may have changed argv/argc so we use the * argv/argc of the client instead of the local variables. */ for (j = 0; j < fakeClient->argc; j++) decrRefCount(fakeClient->argv[j]); zfree(fakeClient->argv); } /* This point can only be reached when EOF is reached without errors. * If the client is in the middle of a MULTI/EXEC, log error and quit. */ if (fakeClient->flags & REDIS_MULTI) goto readerr; fclose(fp); freeFakeClient(fakeClient); server.aof_state = old_aof_state; stopLoading(); aofUpdateCurrentSize(); //更新server.aof_current_size,AOF文件大小 server.aof_rewrite_base_size = server.aof_current_size; return REDIS_OK; ………… }在前面一篇关于AOF参数配置的博客遗留了一个问题,server.aof_current_size参数的初始化,下面解决这个疑问。
void aofUpdateCurrentSize(void) { struct redis_stat sb; if (redis_fstat(server.aof_fd,&sb) == -1) { redisLog(REDIS_WARNING,"Unable to obtain the AOF file length. stat: %s", strerror(errno)); } else { server.aof_current_size = sb.st_size; } }redis_fstat是作者对Linux中fstat64函数的重命名,该还是就是获取文件相关的参数信息,具体可以Google之,sb.st_size就是当前AOF文件的大小。这里需要知道server.aof_fd即AOF文件描述符,该参数的初始化在initServer()函数中
/* Open the AOF file if needed. */ if (server.aof_state == REDIS_AOF_ON) { server.aof_fd = open(server.aof_filename,O_WRONLY|O_APPEND|O_CREAT,0644); if (server.aof_fd == -1) { redisLog(REDIS_WARNING, "Can't open the append-only file: %s",strerror(errno)); exit(1); } }至此,Redis Server启动加载硬盘中AOF文件数据的操作就成功结束了。
在上一篇博客中,提到了三种fsync方式:appendfsync always, appendfsync everysec, appendfsync no. 具体体现在server.aof_fsync参数中。
首先看当客户端请求的指令造成数据被修改,Redis是如何将修改数据的指令添加到server.aof_buf中的。
call() -> propagate() -> feedAppendOnlyFile(),call()函数判断执行指令后是否造成数据被修改。
feedAppendOnlyFile函数首先会判断Server是否开启了AOF,如果开启AOF,那么根据Redis通讯协议将修改数据的指令重现成请求的字符串,注意在超时设置的处理方式,接着将字符串append到server.aof_buf中即可。该函数最后两行代码需要注意,这才是重点,如果server.aof_child_pid != -1那么表明此时Server正在重写rewrite AOF文件,需要将被修改的数据追加到server.aof_rewrite_buf_blocks链表中,等待rewrite结束后,追加到AOF文件中。具体见下面代码的注释。
/* Propagate the specified command (in the context of the specified database id) * to AOF and Slaves. * * flags are an xor between: * + REDIS_PROPAGATE_NONE (no propagation of command at all) * + REDIS_PROPAGATE_AOF (propagate into the AOF file if is enabled) * + REDIS_PROPAGATE_REPL (propagate into the replication link) */ void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int flags) { //将cmd指令变动的数据追加到AOF文件中 if (server.aof_state != REDIS_AOF_OFF && flags & REDIS_PROPAGATE_AOF) feedAppendOnlyFile(cmd,dbid,argv,argc); if (flags & REDIS_PROPAGATE_REPL) replicationFeedSlaves(server.slaves,dbid,argv,argc); }
//cmd指令修改了数据,先将更新的数据写到server.aof_buf中 void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) { sds buf = sdsempty(); robj *tmpargv[3]; /* The DB this command was targeting is not the same as the last command * we appendend. To issue a SELECT command is needed. */ // 当前 db 不是指定的 aof db,通过创建 SELECT 命令来切换数据库 if (dictid != server.aof_selected_db) { char seldb[64]; snprintf(seldb,sizeof(seldb),"%d",dictid); buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n", (unsigned long)strlen(seldb),seldb); server.aof_selected_db = dictid; } // 将 EXPIRE / PEXPIRE / EXPIREAT 命令翻译为 PEXPIREAT 命令 if (cmd->proc == expireCommand || cmd->proc == pexpireCommand || cmd->proc == expireatCommand) { /* Translate EXPIRE/PEXPIRE/EXPIREAT into PEXPIREAT */ buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]); }// 将 SETEX / PSETEX 命令翻译为 SET 和 PEXPIREAT 组合命令 else if (cmd->proc == setexCommand || cmd->proc == psetexCommand) { /* Translate SETEX/PSETEX to SET and PEXPIREAT */ tmpargv[0] = createStringObject("SET",3); tmpargv[1] = argv[1]; tmpargv[2] = argv[3]; buf = catAppendOnlyGenericCommand(buf,3,tmpargv); decrRefCount(tmpargv[0]); buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]); } else {//其他的指令直接追加 /* All the other commands don't need translation or need the * same translation already operated in the command vector * for the replication itself. */ buf = catAppendOnlyGenericCommand(buf,argc,argv); } /* Append to the AOF buffer. This will be flushed on disk just before * of re-entering the event loop, so before the client will get a * positive reply about the operation performed. */ // 将 buf 追加到服务器的 aof_buf 末尾,在beforeSleep中写到AOF文件中,并且根据情况fsync刷新到硬盘 if (server.aof_state == REDIS_AOF_ON) server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf)); /* If a background append only file rewriting is in progress we want to * accumulate the differences between the child DB and the current one * in a buffer, so that when the child process will do its work we * can append the differences to the new append only file. */ //如果server.aof_child_pid不为1,那就说明有快照进程正在写数据到临时文件(已经开始rewrite), //那么必须先将这段时间接收到的指令更新的数据先暂时存储起来,等到快照进程完成任务后, //将这部分数据写入到AOF文件末尾,保证数据不丢失 //解释为什么需要aof_rewrite_buf_blocks,当server在进行rewrite时即读取所有数据库中的数据, //有些数据已经写到新的AOF文件,但是此时客户端执行指令又将该值修改了,因此造成了差异 if (server.aof_child_pid != -1) aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf)); /*这里说一下server.aof_buf和server.aof_rewrite_buf_blocks的区别 aof_buf是正常情况下aof文件打开的时候,会不断将这份数据写入到AOF文件中。 aof_rewrite_buf_blocks 是如果用户主动触发了写AOF文件的命令时,比如 config set appendonly yes命令 那么redis会fork创建一个后台进程,也就是当时的数据快照,然后将数据写入到一个临时文件中去。 在此期间发送的命令,我们需要把它们记录起来,等后台进程完成AOF临时文件写后,serverCron定时任务 感知到这个退出动作,然后就会调用backgroundRewriteDoneHandler进而调用aofRewriteBufferWrite函数, 将aof_rewrite_buf_blocks上面的数据,也就是diff数据写入到临时AOF文件中,然后再unlink替换正常的AOF文件。 因此可以知道,aof_buf一般情况下比aof_rewrite_buf_blocks要少, 但开始的时候可能aof_buf包含一些后者不包含的前面部分数据。*/ sdsfree(buf); }Server在每次事件循环之前会调用一次beforeSleep函数,下面看看这个函数做了什么工作?
/* This function gets called every time Redis is entering the * main loop of the event driven library, that is, before to sleep * for ready file descriptors. */ void beforeSleep(struct aeEventLoop *eventLoop) { REDIS_NOTUSED(eventLoop); listNode *ln; redisClient *c; /* Run a fast expire cycle (the called function will return * ASAP if a fast cycle is not needed). */ if (server.active_expire_enabled && server.masterhost == NULL) activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST); /* Try to process pending commands for clients that were just unblocked. */ while (listLength(server.unblocked_clients)) { ln = listFirst(server.unblocked_clients); redisAssert(ln != NULL); c = ln->value; listDelNode(server.unblocked_clients,ln); c->flags &= ~REDIS_UNBLOCKED; /* Process remaining data in the input buffer. */ //处理客户端在阻塞期间接收到的客户端发送的请求 if (c->querybuf && sdslen(c->querybuf) > 0) { server.current_client = c; processInputBuffer(c); server.current_client = NULL; } } /* Write the AOF buffer on disk */ //将server.aof_buf中的数据追加到AOF文件中并fsync到硬盘上 flushAppendOnlyFile(0); }通过上面的代码及注释可以发现,beforeSleep函数做了三件事:1、处理过期键,2、处理阻塞期间的客户端请求,3、将server.aof_buf中的数据追加到AOF文件中并fsync刷新到硬盘上,flushAppendOnlyFile函数给定了一个参数force,表示是否强制写入AOF文件,0表示非强制即支持延迟写,1表示强制写入。
void flushAppendOnlyFile(int force) { ssize_t nwritten; int sync_in_progress = 0; if (sdslen(server.aof_buf) == 0) return; // 返回后台正在等待执行的 fsync 数量 if (server.aof_fsync == AOF_FSYNC_EVERYSEC) sync_in_progress = bioPendingJobsOfType(REDIS_BIO_AOF_FSYNC) != 0; // AOF 模式为每秒 fsync ,并且 force 不为 1 如果可以的话,推延冲洗 if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) { /* With this append fsync policy we do background fsyncing. * If the fsync is still in progress we can try to delay * the write for a couple of seconds. */ // 如果 aof_fsync 队列里已经有正在等待的任务 if (sync_in_progress) { // 上一次没有推迟冲洗过,记录推延的当前时间,然后返回 if (server.aof_flush_postponed_start == 0) { /* No previous write postponinig, remember that we are * postponing the flush and return. */ server.aof_flush_postponed_start = server.unixtime; return; } else if (server.unixtime - server.aof_flush_postponed_start < 2) { // 允许在两秒之内的推延冲洗 /* We were already waiting for fsync to finish, but for less * than two seconds this is still ok. Postpone again. */ return; } /* Otherwise fall trough, and go write since we can't wait * over two seconds. */ server.aof_delayed_fsync++; redisLog(REDIS_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis."); } } /* If you are following this code path, then we are going to write so * set reset the postponed flush sentinel to zero. */ server.aof_flush_postponed_start = 0; /* We want to perform a single write. This should be guaranteed atomic * at least if the filesystem we are writing is a real physical one. * While this will save us against the server being killed I don't think * there is much to do about the whole server stopping for power problems * or alike */ // 将 AOF 缓存写入到文件,如果一切幸运的话,写入会原子性地完成 nwritten = write(server.aof_fd,server.aof_buf,sdslen(server.aof_buf)); if (nwritten != (signed)sdslen(server.aof_buf)) {//出错 /* Ooops, we are in troubles. The best thing to do for now is * aborting instead of giving the illusion that everything is * working as expected. */ if (nwritten == -1) { redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno)); } else { redisLog(REDIS_WARNING,"Exiting on short write while writing to " "the append-only file: %s (nwritten=%ld, " "expected=%ld)", strerror(errno), (long)nwritten, (long)sdslen(server.aof_buf)); if (ftruncate(server.aof_fd, server.aof_current_size) == -1) { redisLog(REDIS_WARNING, "Could not remove short write " "from the append-only file. Redis may refuse " "to load the AOF the next time it starts. " "ftruncate: %s", strerror(errno)); } } exit(1); } server.aof_current_size += nwritten; /* Re-use AOF buffer when it is small enough. The maximum comes from the * arena size of 4k minus some overhead (but is otherwise arbitrary). */ // 如果 aof 缓存不是太大,那么重用它,否则,清空 aof 缓存 if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) { sdsclear(server.aof_buf); } else { sdsfree(server.aof_buf); server.aof_buf = sdsempty(); } /* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are * children doing I/O in the background. */ //aof rdb子进程运行中不支持fsync并且aof rdb子进程正在运行,那么直接返回, //但是数据已经写到aof文件中,只是没有刷新到硬盘 if (server.aof_no_fsync_on_rewrite && (server.aof_child_pid != -1 || server.rdb_child_pid != -1)) return; /* Perform the fsync if needed. */ if (server.aof_fsync == AOF_FSYNC_ALWAYS) {//总是fsync,那么直接进行fsync /* aof_fsync is defined as fdatasync() for Linux in order to avoid * flushing metadata. */ aof_fsync(server.aof_fd); /* Let's try to get this data on the disk */ server.aof_last_fsync = server.unixtime; } else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC && server.unixtime > server.aof_last_fsync)) { if (!sync_in_progress) aof_background_fsync(server.aof_fd);//放到后台线程进行fsync server.aof_last_fsync = server.unixtime; } }上述代码中请关注server.aof_fsync参数,即设置Redis fsync AOF文件到硬盘的策略,如果设置为AOF_FSYNC_ALWAYS,那么直接在主进程中fsync,如果设置为AOF_FSYNC_EVERYSEC,那么放入后台线程中fsync,后台线程的代码在bio.c中。
文章写到这,已经解决的了Redis Server启动加载AOF文件和如何将客户端请求产生的新的数据追加到AOF文件中,对于追加数据到AOF文件中,根据fsync的配置策略如何将写入到AOF文件中的新数据刷新到硬盘中,直接在主进程中fsync或是在后台线程fsync。
至此,AOF数据持久化还剩下如何rewrite AOF,接受客户端发送的BGREWRITEAOF请求,此部分内容待下篇博客中解析。
感谢此篇博客给我在理解Redis AOF数据持久化方面的巨大帮助,http://chenzhenianqing.cn/articles/786.html
本人Redis-2.8.2的源码注释已经放到Github中,有需要的读者可以下载,我也会在后续的时间中更新,https://github.com/xkeyideal/annotated-redis-2.8.2
本人不怎么会使用Git,望有人能教我一下。