您的当前位置:首页正文

Redis数据持久化机制AOF原理分析二

2020-11-09 来源:筏尚旅游网

/* Start a scheduled AOF rewrite if this was requested by the user while
 * a BGSAVE was in progress. */
 // 如果用户执行 BGREWRITEAOF 命令的话,在后台开始 AOF 重写
 //当用户执行BGREWRITEAOF命令时,如果RDB文件正在写,那么将server.aof_rewrite_scheduled标记为1
 //当RDB文件写完后开启AOF rewrite
 if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
 server.aof_rewrite_scheduled)
 {
 rewriteAppendOnlyFileBackground();
 }

Server自动对AOF进行rewrite

在serverCron函数中会周期性判断
/* Trigger an AOF rewrite if needed */
 //满足一定条件rewrite AOF文件
 if (server.rdb_child_pid == -1 &&
 server.aof_child_pid == -1 &&
 server.aof_rewrite_perc &&
 server.aof_current_size > server.aof_rewrite_min_size)
 {
 long long base = server.aof_rewrite_base_size ?
 server.aof_rewrite_base_size : 1;
 long long growth = (server.aof_current_size*100/base) - 100;
 if (growth >= server.aof_rewrite_perc) {
 redisLog(REDIS_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
 rewriteAppendOnlyFileBackground();
 }
 }

config set appendonly yes

当客户端发送该指令时,config.c中的configSetCommand函数会做出响应,startAppendOnly函数会执行AOF rewrite
if (!strcasecmp(c->argv[2]->ptr,"appendonly")) {
	int enable = yesnotoi(o->ptr);

	if (enable == -1) goto badfmt;
	if (enable == 0 && server.aof_state != REDIS_AOF_OFF) {//appendonly no 关闭AOF
	stopAppendOnly();
	} else if (enable && server.aof_state == REDIS_AOF_OFF) {//appendonly yes rewrite AOF
	if (startAppendOnly() == REDIS_ERR) {
	addReplyError(c,
	"Unable to turn on AOF. Check server logs.");
	return;
	}
	}
}
int startAppendOnly(void) {
 server.aof_last_fsync = server.unixtime;
 server.aof_fd = open(server.aof_filename,O_WRONLY|O_APPEND|O_CREAT,0644);
 redisAssert(server.aof_state == REDIS_AOF_OFF);
 if (server.aof_fd == -1) {
 redisLog(REDIS_WARNING,"Redis needs to enable the AOF but can't open the append only file: %s",strerror(errno));
 return REDIS_ERR;
 }
 if (rewriteAppendOnlyFileBackground() == REDIS_ERR) {//rewrite
 close(server.aof_fd);
 redisLog(REDIS_WARNING,"Redis needs to enable the AOF but can't trigger a background AOF rewrite operation. Check the above logs for more info about the error.");
 return REDIS_ERR;
 }
 /* We correctly switched on AOF, now wait for the rerwite to be complete
 * in order to append data on disk. */
 server.aof_state = REDIS_AOF_WAIT_REWRITE;
 return REDIS_OK;
}

Redis AOF rewrite机制的实现

从上述分析可以看出rewrite的实现全部依靠rewriteAppendOnlyFileBackground函数,下面分析该函数,通过下面的代码可以看出,Redis是fork出一个子进程来操作AOF rewrite,然后子进程调用rewriteAppendOnlyFile函数,将数据写到一个临时文件temp-rewriteaof-bg-%d.aof中。如果子进程完成会通过exit(0)函数通知父进程rewrite结束,在serverCron函数中使用wait3函数接收子进程退出状态,然后执行后续的AOF rewrite的收尾工作,后面将会分析。父进程的工作主要包括清楚server.aof_rewrite_scheduled标志,记录子进程IDserver.aof_child_pid = childpid,记录rewrite的开始时间server.aof_rewrite_time_start = time(NULL)等。
int rewriteAppendOnlyFileBackground(void) {
 pid_t childpid;
 long long start;

 // 后台重写正在执行
 if (server.aof_child_pid != -1) return REDIS_ERR;
 start = ustime();
 if ((childpid = fork()) == 0) {
 char tmpfile[256];

 /* Child */
 closeListeningSockets(0);//
 redisSetProcTitle("redis-aof-rewrite");
 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
 if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {
 size_t private_dirty = zmalloc_get_private_dirty();

 if (private_dirty) {
 redisLog(REDIS_NOTICE,
 "AOF rewrite: %zu MB of memory used by copy-on-write",
 private_dirty/(1024*1024));
 }
 exitFromChild(0);
 } else {
 exitFromChild(1);
 }
 } else {
 /* Parent */
 server.stat_fork_time = ustime()-start;
 if (childpid == -1) {
 redisLog(REDIS_WARNING,
 "Can't rewrite append only file in background: fork: %s",
 strerror(errno));
 return REDIS_ERR;
 }
 redisLog(REDIS_NOTICE,
 "Background append only file rewriting started by pid %d",childpid);
 server.aof_rewrite_scheduled = 0;
 server.aof_rewrite_time_start = time(NULL);
 server.aof_child_pid = childpid;
 updateDictResizePolicy();
 /* We set appendseldb to -1 in order to force the next call to the
 * feedAppendOnlyFile() to issue a SELECT command, so the differences
 * accumulated by the parent into server.aof_rewrite_buf will start
 * with a SELECT statement and it will be safe to merge. */
 server.aof_selected_db = -1;
 replicationScriptCacheFlush();
 return REDIS_OK;
 }
 return REDIS_OK; /* unreached */
}
接下来介绍rewriteAppendOnlyFile函数,该函数的主要工作为:遍历所有数据库中的数据,将其写入到临时文件temp-rewriteaof-%d.aof中,写入函数定义在rio.c中,比较简单,然后将数据刷新到硬盘中,然后将文件名rename为其调用者给定的临时文件名,注意仔细看代码,这里并没有修改为正式的AOF文件名。在写入文件时如果设置server.aof_rewrite_incremental_fsync参数,那么在rioWrite函数中fwrite部分数据就会将数据fsync到硬盘中,来保证数据的正确性。
int rewriteAppendOnlyFile(char *filename) {
 dictIterator *di = NULL;
 dictEntry *de;
 rio aof;
 FILE *fp;
 char tmpfile[256];
 int j;
 long long now = mstime();

 /* Note that we have to use a different temp name here compared to the
 * one used by rewriteAppendOnlyFileBackground() function. */
 snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
 fp = fopen(tmpfile,"w");
 if (!fp) {
 redisLog(REDIS_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno));
 return REDIS_ERR;
 }

 rioInitWithFile(&aof,fp); //初始化读写函数,rio.c
 //设置r->io.file.autosync = bytes;每32M刷新一次
 if (server.aof_rewrite_incremental_fsync)
 rioSetAutoSync(&aof,REDIS_AOF_AUTOSYNC_BYTES);
 for (j = 0; j < server.dbnum; j++) {//遍历每个数据库
 char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
 redisDb *db = server.db+j;
 dict *d = db->dict;
 if (dictSize(d) == 0) continue;
 di = dictGetSafeIterator(d);
 if (!di) {
 fclose(fp);
 return REDIS_ERR;
 }

 /* SELECT the new DB */
 if (rioWrite(&aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr;
 if (rioWriteBulkLongLong(&aof,j) == 0) goto werr;

 /* Iterate this DB writing every entry */
 while((de = dictNext(di)) != NULL) {
 sds keystr;
 robj key, *o;
 long long expiretime;

 keystr = dictGetKey(de);
 o = dictGetVal(de);
 initStaticStringObject(key,keystr);

 expiretime = getExpire(db,&key);

 /* If this key is already expired skip it */
 if (expiretime != -1 && expiretime < now) continue;

 /* Save the key and associated value */
 if (o->type == REDIS_STRING) {
 /* Emit a SET command */
 char cmd[]="*3\r\n$3\r\nSET\r\n";
 if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
 /* Key and value */
 if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
 if (rioWriteBulkObject(&aof,o) == 0) goto werr;
 } else if (o->type == REDIS_LIST) {
 if (rewriteListObject(&aof,&key,o) == 0) goto werr;
 } else if (o->type == REDIS_SET) {
 if (rewriteSetObject(&aof,&key,o) == 0) goto werr;
 } else if (o->type == REDIS_ZSET) {
 if (rewriteSortedSetObject(&aof,&key,o) == 0) goto werr;
 } else if (o->type == REDIS_HASH) {
 if (rewriteHashObject(&aof,&key,o) == 0) goto werr;
 } else {
 redisPanic("Unknown object type");
 }
 /* Save the expire time */
 if (expiretime != -1) {
 char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n";
 if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
 if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
 if (rioWriteBulkLongLong(&aof,expiretime) == 0) goto werr;
 }
 }
 dictReleaseIterator(di);
 }

 /* Make sure data will not remain on the OS's output buffers */
 fflush(fp);
 aof_fsync(fileno(fp));//将tempfile文件刷新到硬盘
 fclose(fp);

 /* Use RENAME to make sure the DB file is changed atomically only
 * if the generate DB file is ok. */
 if (rename(tmpfile,filename) == -1) {//重命名文件名,注意rename后的文件也是一个临时文件
 redisLog(REDIS_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
 unlink(tmpfile);
 return REDIS_ERR;
 }
 redisLog(REDIS_NOTICE,"SYNC append only file rewrite performed");
 return REDIS_OK;

werr:
 fclose(fp);
 unlink(tmpfile);
 redisLog(REDIS_WARNING,"Write error writing append only file on disk: %s", strerror(errno));
 if (di) dictReleaseIterator(di);
 return REDIS_ERR;
}
AOF rewrite工作到这里已经结束一半,上一篇文章提到如果server.aof_state != REDIS_AOF_OFF,那么就会将客户端请求指令修改的数据通过feedAppendOnlyFile函数追加到AOF文件中,那么此时AOF已经rewrite了,必须要处理此时出现的差异数据,记得在feedAppendOnlyFile函数中有这么一段代码
if (server.aof_child_pid != -1)
 aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));
如果AOF rewrite正在进行,那么就将修改数据的指令字符串存储到server.aof_rewrite_buf_blocks链表中,等待AOF rewrite子进程结束后处理,处理此部分数据的代码在serverCron函数中。需要指出的是wait3函数我不了解,可能下面注释会有点问题。
/* Check if a background saving or AOF rewrite in progress terminated. */
//如果RDB bgsave或AOF rewrite子进程已经执行,通过获取子进程的退出状态,对后续的工作进行处理
if (server.rdb_child_pid != -1 || server.aof_child_pid != -1) {//
	int statloc;
	pid_t pid;

	if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
	int exitcode = WEXITSTATUS(statloc);//获取退出的状态
	int bysignal = 0;

	if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);

	if (pid == server.rdb_child_pid) {
	backgroundSaveDoneHandler(exitcode,bysignal);
	} else if (pid == server.aof_child_pid) {
	backgroundRewriteDoneHandler(exitcode,bysignal);
	} else {
	redisLog(REDIS_WARNING,
	"Warning, detected child with unmatched pid: %ld",
	(long)pid);
	}
	// 如果 BGSAVE 和 BGREWRITEAOF 都已经完成,那么重新开始 REHASH
	updateDictResizePolicy();
	}
}
对于AOF rewrite期间出现的差异数据,Server通过backgroundSaveDoneHandler函数将server.aof_rewrite_buf_blocks链表中数据追加到新的AOF文件中。 backgroundSaveDoneHandler函数执行步骤:
1、通过判断子进程的退出状态,正确的退出状态为exit(0),即exitcode为0,bysignal我不清楚具体意义,如果退出状态正确,backgroundSaveDoneHandler函数才会开始处理 2、通过对rewriteAppendOnlyFileBackground函数的分析,可以知道rewrite后的AOF临时文件名为temp-rewriteaof-bg-%d.aof(%d=server.aof_child_pid)中,接着需要打开此临时文件 3、调用aofRewriteBufferWrite函数将server.aof_rewrite_buf_blocks中差异数据写到该临时文件中 4、如果旧的AOF文件未打开,那么打开旧的AOF文件,将文件描述符赋值给临时变量oldfd 5、将临时的AOF文件名rename为正常的AOF文件名 6、如果旧的AOF文件未打开,那么此时只需要关闭新的AOF文件,此时的server.aof_rewrite_buf_blocks数据应该为空;如果旧的AOF是打开的,那么将server.aof_fd指向newfd,然后根据相应的fsync策略将数据刷新到硬盘上 7、调用aofUpdateCurrentSize函数统计AOF文件的大小,更新server.aof_rewrite_base_size,为serverCron中自动AOF rewrite做相应判断 8、如果之前是REDIS_AOF_WAIT_REWRITE状态,则设置server.aof_state为REDIS_AOF_ON,因为只有“config set appendonly yes”指令才会设置这个状态,也就是需要写完快照后,立即打开AOF;而BGREWRITEAOF不需要打开AOF 9、调用后台线程去关闭旧的AOF文件下面是backgroundSaveDoneHandler函数的注释代码
/* A background append only file rewriting (BGREWRITEAOF) terminated its work.
 * Handle this. */
void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
 if (!bysignal && exitcode == 0) {//子进程退出状态正确
 int newfd, oldfd;
 char tmpfile[256];
 long long now = ustime();

 redisLog(REDIS_NOTICE,
 "Background AOF rewrite terminated with success");

 /* Flush the differences accumulated by the parent to the
 * rewritten AOF. */
 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof",
 (int)server.aof_child_pid);
 newfd = open(tmpfile,O_WRONLY|O_APPEND);
 if (newfd == -1) {
 redisLog(REDIS_WARNING,
 "Unable to open the temporary AOF produced by the child: %s", strerror(errno));
 goto cleanup;
 }
 //处理server.aof_rewrite_buf_blocks中DIFF数据
 if (aofRewriteBufferWrite(newfd) == -1) {
 redisLog(REDIS_WARNING,
 "Error trying to flush the parent diff to the rewritten AOF: %s", strerror(errno));
 close(newfd);
 goto cleanup;
 }

 redisLog(REDIS_NOTICE,
 "Parent diff successfully flushed to the rewritten AOF (%lu bytes)", aofRewriteBufferSize());

 /* The only remaining thing to do is to rename the temporary file to
 * the configured file and switch the file descriptor used to do AOF
 * writes. We don't want close(2) or rename(2) calls to block the
 * server on old file deletion.
 *
 * There are two possible scenarios:
 *
 * 1) AOF is DISABLED and this was a one time rewrite. The temporary
 * file will be renamed to the configured file. When this file already
 * exists, it will be unlinked, which may block the server.
 *
 * 2) AOF is ENABLED and the rewritten AOF will immediately start
 * receiving writes. After the temporary file is renamed to the
 * configured file, the original AOF file descriptor will be closed.
 * Since this will be the last reference to that file, closing it
 * causes the underlying file to be unlinked, which may block the
 * server.
 *
 * To mitigate the blocking effect of the unlink operation (either
 * caused by rename(2) in scenario 1, or by close(2) in scenario 2), we
 * use a background thread to take care of this. First, we
 * make scenario 1 identical to scenario 2 by opening the target file
 * when it exists. The unlink operation after the rename(2) will then
 * be executed upon calling close(2) for its descriptor. Everything to
 * guarantee atomicity for this switch has already happened by then, so
 * we don't care what the outcome or duration of that close operation
 * is, as long as the file descriptor is released again. */
 if (server.aof_fd == -1) {
 /* AOF disabled */

 /* Don't care if this fails: oldfd will be -1 and we handle that.
 * One notable case of -1 return is if the old file does
 * not exist. */
 oldfd = open(server.aof_filename,O_RDONLY|O_NONBLOCK);
 } else {
 /* AOF enabled */
 oldfd = -1; /* We'll set this to the current AOF filedes later. */
 }

 /* Rename the temporary file. This will not unlink the target file if
 * it exists, because we reference it with "oldfd". */
 //把临时文件改名为正常的AOF文件名。由于当前oldfd已经指向这个之前的正常文件名的文件,
 //所以当前不会造成unlink操作,得等那个oldfd被close的时候,内核判断该文件没有指向了,就删除之。
 if (rename(tmpfile,server.aof_filename) == -1) {
 redisLog(REDIS_WARNING,
 "Error trying to rename the temporary AOF file: %s", strerror(errno));
 close(newfd);
 if (oldfd != -1) close(oldfd);
 goto cleanup;
 }
 //如果AOF关闭了,那只要处理新文件,直接关闭这个新的文件即可
 //但是这里会不会导致服务器卡呢?这个newfd应该是临时文件的最后一个fd了,不会的,
 //因为这个文件在本函数不会写入数据,因为stopAppendOnly函数会清空aof_rewrite_buf_blocks列表。
 if (server.aof_fd == -1) {
 /* AOF disabled, we don't need to set the AOF file descriptor
 * to this new file, so we can close it. */
 close(newfd);
 } else {
 /* AOF enabled, replace the old fd with the new one. */
 oldfd = server.aof_fd;
 //指向新的fd,此时这个fd由于上面的rename语句存在,已经为正常aof文件名
 server.aof_fd = newfd;
 //fsync到硬盘
 if (server.aof_fsync == AOF_FSYNC_ALWAYS)
 aof_fsync(newfd);
 else if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
 aof_background_fsync(newfd);
 server.aof_selected_db = -1; /* Make sure SELECT is re-issued */
 aofUpdateCurrentSize();
 server.aof_rewrite_base_size = server.aof_current_size;

 /* Clear regular AOF buffer since its contents was just written to
 * the new AOF from the background rewrite buffer. */
 //rewrite得到的肯定是最新的数据,所以aof_buf中的数据没有意义,直接清空
 sdsfree(server.aof_buf);
 server.aof_buf = sdsempty();
 }

 server.aof_lastbgrewrite_status = REDIS_OK;

 redisLog(REDIS_NOTICE, "Background AOF rewrite finished successfully");
 /* Change state from WAIT_REWRITE to ON if needed */
 //下面判断是否需要打开AOF,比如bgrewriteaofCommand就不需要打开AOF。
 if (server.aof_state == REDIS_AOF_WAIT_REWRITE)
 server.aof_state = REDIS_AOF_ON;

 /* Asynchronously close the overwritten AOF. */
 //让后台线程去关闭这个旧的AOF文件FD,只要CLOSE就行,会自动unlink的,因为上面已经有rename
 if (oldfd != -1) bioCreateBackgroundJob(REDIS_BIO_CLOSE_FILE,(void*)(long)oldfd,NULL,NULL);

 redisLog(REDIS_VERBOSE,
 "Background AOF rewrite signal handler took %lldus", ustime()-now);
 } else if (!bysignal && exitcode != 0) {
 server.aof_lastbgrewrite_status = REDIS_ERR;

 redisLog(REDIS_WARNING,
 "Background AOF rewrite terminated with error");
 } else {
 server.aof_lastbgrewrite_status = REDIS_ERR;

 redisLog(REDIS_WARNING,
 "Background AOF rewrite terminated by signal %d", bysignal);
 }

cleanup:
 aofRewriteBufferReset();
 aofRemoveTempFile(server.aof_child_pid);
 server.aof_child_pid = -1;
 server.aof_rewrite_time_last = time(NULL)-server.aof_rewrite_time_start;
 server.aof_rewrite_time_start = -1;
 /* Schedule a new rewrite if we are waiting for it to switch the AOF ON. */
 if (server.aof_state == REDIS_AOF_WAIT_REWRITE)
 server.aof_rewrite_scheduled = 1;
}
至此,AOF数据持久化已经全部结束了,剩下的就是一些细节的处理,以及一些Linux库函数的理解,对于rename、unlink、wait3等库函数的深入认识就去问Google吧。

小结

Redis AOF数据持久化的实现机制通过三篇文章基本上比较详细的分析了,但这只是从代码层面去看AOF,对于AOF持久化的优缺点网上有很多分析,Redis的官方网站也有英文介绍,Redis的数据持久化还有一种方法叫RDB,更多RDB的内容等下次再分析。感谢此篇博客给我在理解Redis AOF数据持久化方面的巨大帮助,http://chenzhenianqing.cn/articles/786.html,此篇博客对AOF的分析十分的详细。
显示全文