/* Start a scheduled AOF rewrite if this was requested by the user while
* a BGSAVE was in progress. */
// 如果用户执行 BGREWRITEAOF 命令的话,在后台开始 AOF 重写
//当用户执行BGREWRITEAOF命令时,如果RDB文件正在写,那么将server.aof_rewrite_scheduled标记为1
//当RDB文件写完后开启AOF rewrite
if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
server.aof_rewrite_scheduled)
{
rewriteAppendOnlyFileBackground();
}
Server自动对AOF进行rewrite
在serverCron函数中会周期性判断
/* Trigger an AOF rewrite if needed */
//满足一定条件rewrite AOF文件
if (server.rdb_child_pid == -1 &&
server.aof_child_pid == -1 &&
server.aof_rewrite_perc &&
server.aof_current_size > server.aof_rewrite_min_size)
{
long long base = server.aof_rewrite_base_size ?
server.aof_rewrite_base_size : 1;
long long growth = (server.aof_current_size*100/base) - 100;
if (growth >= server.aof_rewrite_perc) {
redisLog(REDIS_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
rewriteAppendOnlyFileBackground();
}
}
config set appendonly yes
当客户端发送该指令时,config.c中的configSetCommand函数会做出响应,startAppendOnly函数会执行AOF rewrite
if (!strcasecmp(c->argv[2]->ptr,"appendonly")) {
int enable = yesnotoi(o->ptr);
if (enable == -1) goto badfmt;
if (enable == 0 && server.aof_state != REDIS_AOF_OFF) {//appendonly no 关闭AOF
stopAppendOnly();
} else if (enable && server.aof_state == REDIS_AOF_OFF) {//appendonly yes rewrite AOF
if (startAppendOnly() == REDIS_ERR) {
addReplyError(c,
"Unable to turn on AOF. Check server logs.");
return;
}
}
}
int startAppendOnly(void) {
server.aof_last_fsync = server.unixtime;
server.aof_fd = open(server.aof_filename,O_WRONLY|O_APPEND|O_CREAT,0644);
redisAssert(server.aof_state == REDIS_AOF_OFF);
if (server.aof_fd == -1) {
redisLog(REDIS_WARNING,"Redis needs to enable the AOF but can't open the append only file: %s",strerror(errno));
return REDIS_ERR;
}
if (rewriteAppendOnlyFileBackground() == REDIS_ERR) {//rewrite
close(server.aof_fd);
redisLog(REDIS_WARNING,"Redis needs to enable the AOF but can't trigger a background AOF rewrite operation. Check the above logs for more info about the error.");
return REDIS_ERR;
}
/* We correctly switched on AOF, now wait for the rerwite to be complete
* in order to append data on disk. */
server.aof_state = REDIS_AOF_WAIT_REWRITE;
return REDIS_OK;
}
Redis AOF rewrite机制的实现
从上述分析可以看出rewrite的实现全部依靠rewriteAppendOnlyFileBackground函数,下面分析该函数,通过下面的代码可以看出,Redis是fork出一个子进程来操作AOF rewrite,然后子进程调用rewriteAppendOnlyFile函数,将数据写到一个临时文件temp-rewriteaof-bg-%d.aof中。如果子进程完成会通过exit(0)函数通知父进程rewrite结束,在serverCron函数中使用wait3函数接收子进程退出状态,然后执行后续的AOF rewrite的收尾工作,后面将会分析。父进程的工作主要包括清楚server.aof_rewrite_scheduled标志,记录子进程IDserver.aof_child_pid = childpid,记录rewrite的开始时间server.aof_rewrite_time_start = time(NULL)等。
int rewriteAppendOnlyFileBackground(void) {
pid_t childpid;
long long start;
// 后台重写正在执行
if (server.aof_child_pid != -1) return REDIS_ERR;
start = ustime();
if ((childpid = fork()) == 0) {
char tmpfile[256];
/* Child */
closeListeningSockets(0);//
redisSetProcTitle("redis-aof-rewrite");
snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {
size_t private_dirty = zmalloc_get_private_dirty();
if (private_dirty) {
redisLog(REDIS_NOTICE,
"AOF rewrite: %zu MB of memory used by copy-on-write",
private_dirty/(1024*1024));
}
exitFromChild(0);
} else {
exitFromChild(1);
}
} else {
/* Parent */
server.stat_fork_time = ustime()-start;
if (childpid == -1) {
redisLog(REDIS_WARNING,
"Can't rewrite append only file in background: fork: %s",
strerror(errno));
return REDIS_ERR;
}
redisLog(REDIS_NOTICE,
"Background append only file rewriting started by pid %d",childpid);
server.aof_rewrite_scheduled = 0;
server.aof_rewrite_time_start = time(NULL);
server.aof_child_pid = childpid;
updateDictResizePolicy();
/* We set appendseldb to -1 in order to force the next call to the
* feedAppendOnlyFile() to issue a SELECT command, so the differences
* accumulated by the parent into server.aof_rewrite_buf will start
* with a SELECT statement and it will be safe to merge. */
server.aof_selected_db = -1;
replicationScriptCacheFlush();
return REDIS_OK;
}
return REDIS_OK; /* unreached */
}
接下来介绍rewriteAppendOnlyFile函数,该函数的主要工作为:遍历所有数据库中的数据,将其写入到临时文件temp-rewriteaof-%d.aof中,写入函数定义在rio.c中,比较简单,然后将数据刷新到硬盘中,然后将文件名rename为其调用者给定的临时文件名,注意仔细看代码,这里并没有修改为正式的AOF文件名。在写入文件时如果设置server.aof_rewrite_incremental_fsync参数,那么在rioWrite函数中fwrite部分数据就会将数据fsync到硬盘中,来保证数据的正确性。
int rewriteAppendOnlyFile(char *filename) {
dictIterator *di = NULL;
dictEntry *de;
rio aof;
FILE *fp;
char tmpfile[256];
int j;
long long now = mstime();
/* Note that we have to use a different temp name here compared to the
* one used by rewriteAppendOnlyFileBackground() function. */
snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
fp = fopen(tmpfile,"w");
if (!fp) {
redisLog(REDIS_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno));
return REDIS_ERR;
}
rioInitWithFile(&aof,fp); //初始化读写函数,rio.c
//设置r->io.file.autosync = bytes;每32M刷新一次
if (server.aof_rewrite_incremental_fsync)
rioSetAutoSync(&aof,REDIS_AOF_AUTOSYNC_BYTES);
for (j = 0; j < server.dbnum; j++) {//遍历每个数据库
char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
redisDb *db = server.db+j;
dict *d = db->dict;
if (dictSize(d) == 0) continue;
di = dictGetSafeIterator(d);
if (!di) {
fclose(fp);
return REDIS_ERR;
}
/* SELECT the new DB */
if (rioWrite(&aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr;
if (rioWriteBulkLongLong(&aof,j) == 0) goto werr;
/* Iterate this DB writing every entry */
while((de = dictNext(di)) != NULL) {
sds keystr;
robj key, *o;
long long expiretime;
keystr = dictGetKey(de);
o = dictGetVal(de);
initStaticStringObject(key,keystr);
expiretime = getExpire(db,&key);
/* If this key is already expired skip it */
if (expiretime != -1 && expiretime < now) continue;
/* Save the key and associated value */
if (o->type == REDIS_STRING) {
/* Emit a SET command */
char cmd[]="*3\r\n$3\r\nSET\r\n";
if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
/* Key and value */
if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
if (rioWriteBulkObject(&aof,o) == 0) goto werr;
} else if (o->type == REDIS_LIST) {
if (rewriteListObject(&aof,&key,o) == 0) goto werr;
} else if (o->type == REDIS_SET) {
if (rewriteSetObject(&aof,&key,o) == 0) goto werr;
} else if (o->type == REDIS_ZSET) {
if (rewriteSortedSetObject(&aof,&key,o) == 0) goto werr;
} else if (o->type == REDIS_HASH) {
if (rewriteHashObject(&aof,&key,o) == 0) goto werr;
} else {
redisPanic("Unknown object type");
}
/* Save the expire time */
if (expiretime != -1) {
char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n";
if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
if (rioWriteBulkLongLong(&aof,expiretime) == 0) goto werr;
}
}
dictReleaseIterator(di);
}
/* Make sure data will not remain on the OS's output buffers */
fflush(fp);
aof_fsync(fileno(fp));//将tempfile文件刷新到硬盘
fclose(fp);
/* Use RENAME to make sure the DB file is changed atomically only
* if the generate DB file is ok. */
if (rename(tmpfile,filename) == -1) {//重命名文件名,注意rename后的文件也是一个临时文件
redisLog(REDIS_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
unlink(tmpfile);
return REDIS_ERR;
}
redisLog(REDIS_NOTICE,"SYNC append only file rewrite performed");
return REDIS_OK;
werr:
fclose(fp);
unlink(tmpfile);
redisLog(REDIS_WARNING,"Write error writing append only file on disk: %s", strerror(errno));
if (di) dictReleaseIterator(di);
return REDIS_ERR;
}
AOF rewrite工作到这里已经结束一半,上一篇文章提到如果server.aof_state != REDIS_AOF_OFF,那么就会将客户端请求指令修改的数据通过feedAppendOnlyFile函数追加到AOF文件中,那么此时AOF已经rewrite了,必须要处理此时出现的差异数据,记得在feedAppendOnlyFile函数中有这么一段代码
if (server.aof_child_pid != -1)
aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));
如果AOF rewrite正在进行,那么就将修改数据的指令字符串存储到server.aof_rewrite_buf_blocks链表中,等待AOF rewrite子进程结束后处理,处理此部分数据的代码在serverCron函数中。需要指出的是wait3函数我不了解,可能下面注释会有点问题。
/* Check if a background saving or AOF rewrite in progress terminated. */
//如果RDB bgsave或AOF rewrite子进程已经执行,通过获取子进程的退出状态,对后续的工作进行处理
if (server.rdb_child_pid != -1 || server.aof_child_pid != -1) {//
int statloc;
pid_t pid;
if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
int exitcode = WEXITSTATUS(statloc);//获取退出的状态
int bysignal = 0;
if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
if (pid == server.rdb_child_pid) {
backgroundSaveDoneHandler(exitcode,bysignal);
} else if (pid == server.aof_child_pid) {
backgroundRewriteDoneHandler(exitcode,bysignal);
} else {
redisLog(REDIS_WARNING,
"Warning, detected child with unmatched pid: %ld",
(long)pid);
}
// 如果 BGSAVE 和 BGREWRITEAOF 都已经完成,那么重新开始 REHASH
updateDictResizePolicy();
}
}
对于AOF rewrite期间出现的差异数据,Server通过backgroundSaveDoneHandler函数将server.aof_rewrite_buf_blocks链表中数据追加到新的AOF文件中。 backgroundSaveDoneHandler函数执行步骤:
1、通过判断子进程的退出状态,正确的退出状态为exit(0),即exitcode为0,bysignal我不清楚具体意义,如果退出状态正确,backgroundSaveDoneHandler函数才会开始处理 2、通过对rewriteAppendOnlyFileBackground函数的分析,可以知道rewrite后的AOF临时文件名为temp-rewriteaof-bg-%d.aof(%d=server.aof_child_pid)中,接着需要打开此临时文件 3、调用aofRewriteBufferWrite函数将server.aof_rewrite_buf_blocks中差异数据写到该临时文件中 4、如果旧的AOF文件未打开,那么打开旧的AOF文件,将文件描述符赋值给临时变量oldfd 5、将临时的AOF文件名rename为正常的AOF文件名 6、如果旧的AOF文件未打开,那么此时只需要关闭新的AOF文件,此时的server.aof_rewrite_buf_blocks数据应该为空;如果旧的AOF是打开的,那么将server.aof_fd指向newfd,然后根据相应的fsync策略将数据刷新到硬盘上 7、调用aofUpdateCurrentSize函数统计AOF文件的大小,更新server.aof_rewrite_base_size,为serverCron中自动AOF rewrite做相应判断 8、如果之前是REDIS_AOF_WAIT_REWRITE状态,则设置server.aof_state为REDIS_AOF_ON,因为只有“config set appendonly yes”指令才会设置这个状态,也就是需要写完快照后,立即打开AOF;而BGREWRITEAOF不需要打开AOF 9、调用后台线程去关闭旧的AOF文件下面是backgroundSaveDoneHandler函数的注释代码
/* A background append only file rewriting (BGREWRITEAOF) terminated its work.
* Handle this. */
void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
if (!bysignal && exitcode == 0) {//子进程退出状态正确
int newfd, oldfd;
char tmpfile[256];
long long now = ustime();
redisLog(REDIS_NOTICE,
"Background AOF rewrite terminated with success");
/* Flush the differences accumulated by the parent to the
* rewritten AOF. */
snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof",
(int)server.aof_child_pid);
newfd = open(tmpfile,O_WRONLY|O_APPEND);
if (newfd == -1) {
redisLog(REDIS_WARNING,
"Unable to open the temporary AOF produced by the child: %s", strerror(errno));
goto cleanup;
}
//处理server.aof_rewrite_buf_blocks中DIFF数据
if (aofRewriteBufferWrite(newfd) == -1) {
redisLog(REDIS_WARNING,
"Error trying to flush the parent diff to the rewritten AOF: %s", strerror(errno));
close(newfd);
goto cleanup;
}
redisLog(REDIS_NOTICE,
"Parent diff successfully flushed to the rewritten AOF (%lu bytes)", aofRewriteBufferSize());
/* The only remaining thing to do is to rename the temporary file to
* the configured file and switch the file descriptor used to do AOF
* writes. We don't want close(2) or rename(2) calls to block the
* server on old file deletion.
*
* There are two possible scenarios:
*
* 1) AOF is DISABLED and this was a one time rewrite. The temporary
* file will be renamed to the configured file. When this file already
* exists, it will be unlinked, which may block the server.
*
* 2) AOF is ENABLED and the rewritten AOF will immediately start
* receiving writes. After the temporary file is renamed to the
* configured file, the original AOF file descriptor will be closed.
* Since this will be the last reference to that file, closing it
* causes the underlying file to be unlinked, which may block the
* server.
*
* To mitigate the blocking effect of the unlink operation (either
* caused by rename(2) in scenario 1, or by close(2) in scenario 2), we
* use a background thread to take care of this. First, we
* make scenario 1 identical to scenario 2 by opening the target file
* when it exists. The unlink operation after the rename(2) will then
* be executed upon calling close(2) for its descriptor. Everything to
* guarantee atomicity for this switch has already happened by then, so
* we don't care what the outcome or duration of that close operation
* is, as long as the file descriptor is released again. */
if (server.aof_fd == -1) {
/* AOF disabled */
/* Don't care if this fails: oldfd will be -1 and we handle that.
* One notable case of -1 return is if the old file does
* not exist. */
oldfd = open(server.aof_filename,O_RDONLY|O_NONBLOCK);
} else {
/* AOF enabled */
oldfd = -1; /* We'll set this to the current AOF filedes later. */
}
/* Rename the temporary file. This will not unlink the target file if
* it exists, because we reference it with "oldfd". */
//把临时文件改名为正常的AOF文件名。由于当前oldfd已经指向这个之前的正常文件名的文件,
//所以当前不会造成unlink操作,得等那个oldfd被close的时候,内核判断该文件没有指向了,就删除之。
if (rename(tmpfile,server.aof_filename) == -1) {
redisLog(REDIS_WARNING,
"Error trying to rename the temporary AOF file: %s", strerror(errno));
close(newfd);
if (oldfd != -1) close(oldfd);
goto cleanup;
}
//如果AOF关闭了,那只要处理新文件,直接关闭这个新的文件即可
//但是这里会不会导致服务器卡呢?这个newfd应该是临时文件的最后一个fd了,不会的,
//因为这个文件在本函数不会写入数据,因为stopAppendOnly函数会清空aof_rewrite_buf_blocks列表。
if (server.aof_fd == -1) {
/* AOF disabled, we don't need to set the AOF file descriptor
* to this new file, so we can close it. */
close(newfd);
} else {
/* AOF enabled, replace the old fd with the new one. */
oldfd = server.aof_fd;
//指向新的fd,此时这个fd由于上面的rename语句存在,已经为正常aof文件名
server.aof_fd = newfd;
//fsync到硬盘
if (server.aof_fsync == AOF_FSYNC_ALWAYS)
aof_fsync(newfd);
else if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
aof_background_fsync(newfd);
server.aof_selected_db = -1; /* Make sure SELECT is re-issued */
aofUpdateCurrentSize();
server.aof_rewrite_base_size = server.aof_current_size;
/* Clear regular AOF buffer since its contents was just written to
* the new AOF from the background rewrite buffer. */
//rewrite得到的肯定是最新的数据,所以aof_buf中的数据没有意义,直接清空
sdsfree(server.aof_buf);
server.aof_buf = sdsempty();
}
server.aof_lastbgrewrite_status = REDIS_OK;
redisLog(REDIS_NOTICE, "Background AOF rewrite finished successfully");
/* Change state from WAIT_REWRITE to ON if needed */
//下面判断是否需要打开AOF,比如bgrewriteaofCommand就不需要打开AOF。
if (server.aof_state == REDIS_AOF_WAIT_REWRITE)
server.aof_state = REDIS_AOF_ON;
/* Asynchronously close the overwritten AOF. */
//让后台线程去关闭这个旧的AOF文件FD,只要CLOSE就行,会自动unlink的,因为上面已经有rename
if (oldfd != -1) bioCreateBackgroundJob(REDIS_BIO_CLOSE_FILE,(void*)(long)oldfd,NULL,NULL);
redisLog(REDIS_VERBOSE,
"Background AOF rewrite signal handler took %lldus", ustime()-now);
} else if (!bysignal && exitcode != 0) {
server.aof_lastbgrewrite_status = REDIS_ERR;
redisLog(REDIS_WARNING,
"Background AOF rewrite terminated with error");
} else {
server.aof_lastbgrewrite_status = REDIS_ERR;
redisLog(REDIS_WARNING,
"Background AOF rewrite terminated by signal %d", bysignal);
}
cleanup:
aofRewriteBufferReset();
aofRemoveTempFile(server.aof_child_pid);
server.aof_child_pid = -1;
server.aof_rewrite_time_last = time(NULL)-server.aof_rewrite_time_start;
server.aof_rewrite_time_start = -1;
/* Schedule a new rewrite if we are waiting for it to switch the AOF ON. */
if (server.aof_state == REDIS_AOF_WAIT_REWRITE)
server.aof_rewrite_scheduled = 1;
}
至此,AOF数据持久化已经全部结束了,剩下的就是一些细节的处理,以及一些Linux库函数的理解,对于rename、unlink、wait3等库函数的深入认识就去问Google吧。
小结
Redis AOF数据持久化的实现机制通过三篇文章基本上比较详细的分析了,但这只是从代码层面去看AOF,对于AOF持久化的优缺点网上有很多分析,Redis的官方网站也有英文介绍,Redis的数据持久化还有一种方法叫RDB,更多RDB的内容等下次再分析。感谢此篇博客给我在理解Redis AOF数据持久化方面的巨大帮助,http://chenzhenianqing.cn/articles/786.html,此篇博客对AOF的分析十分的详细。