装饰工程网站模板下载,中国做外贸的网站有哪些内容,拍网制作方法图片教程,wordpress 忘记数据库密码破解目录
一、设计思路
二、核心代码
三、测试功能 一、设计思路
数据结构#xff1a;使用哈希表#xff08;链式地址法解决哈希冲突#xff09;存储键值对 全量持久化和增量持久化的核心流程#xff1a; 增删改操作#xff1a;先写 WAL 日志 → 再更新内存哈希表 Checkpo…目录一、设计思路二、核心代码三、测试功能一、设计思路数据结构使用哈希表链式地址法解决哈希冲突存储键值对全量持久化和增量持久化的核心流程增删改操作先写 WAL 日志 → 再更新内存哈希表Checkpoint全量持久化内存数据 → 截断 WAL 日志清空重启恢复加载全量文件 → 重放 WAL 日志 → 恢复增量数据异步 Checkpoint后台线程执行全量持久化避免阻塞前台写操作。全量持久化Checkpoint的触发条件后台线程checkpoint_worker循环检查触发条件满足任一条件则执行 Checkpoint定时触发每隔CHECKPOINT_INTERVAL30 秒执行一次。条件触发WAL 文件大小≥10MB。二、核心代码实现全量持久化的两个函数int kvs_hash_persist(hashtable_t *hash) { if (hash NULL) return -1; FILE *fp fopen(PERSIST_FILE, w); if (fp NULL) return -1; // 遍历所有哈希桶 for (size_t i 0; i hash-max_slots; i) { hashnode_t *curr hash-nodes[i]; while (curr ! NULL) { // 写入格式key value\n if (fprintf(fp, %s %s\n, curr-key, curr-value) 0) { fclose(fp); return -1; } curr curr-next; } } fflush(fp);//用户态缓冲区→内核缓冲区 的刷新,参数是流指针FILE * fclose(fp); return 0; } int kvs_hash_load(hashtable_t *hash) { if (hash NULL) return -1; FILE *fp fopen(PERSIST_FILE, r); if (fp NULL) return -1; char line[4096]; // 每行最大长度可根据需求调整 while (fgets(line, sizeof(line), fp) ! NULL) { // 去除换行符 int len strlen(line); if(len0)continue;//跳过空行 if (len 0 line[len - 1] \n) { line[len - 1] \0; } // 分割键和值空格 为分隔符 char *key strtok(line, ); char *value strtok(NULL, ); if (key NULL || value NULL) { /*fclose(fp); return -1;*/ continue; } // 插入到KVStorehash存储引擎先初始化才能调用如下函数,否则报段错误 if (kvs_hash_set(hash, key, value) ! 0) { /*fclose(fp); return -1;*/ continue; } } fclose(fp); return 0; }实现增量持久化的两个函数int kvs_hash_logpersist(char *op_type,char *key,char *value){ FILE *fpfopen(WAL_FILE,a);//权限a只追加写权限a追加写可读 if(!fp)return -1; if(value){//增/改操作 fprintf(fp,%s %s %s\n,op_type,key,value); }else {//删除操作 fprintf(fp,%s %s\n,op_type,key); } fflush(fp); //fsync(fp); fclose(fp); } int kvs_hash_logreplay(hashtable_t *hash){ FILE *fpfopen(WAL_FILE,r); if(!fp)return -1; char line[1024];//行缓冲区 int count0;//重放计数器 //逐行读取WAL日志文件 while(fgets(line,sizeof(line),fp)){ // 去除行末的换行符 line[strcspn(line,\n)]\0; //跳过空行 if(strlen(line)0)continue; char *datastrdup(line);//strtok会改变原字符串 char *op_typestrtok(data, ); char *keystrtok(NULL, ); char *valuestrtok(NULL, ); if(!op_type||!key)continue; if(strcmp(op_type,HSET)0||strcmp(op_type,HMOD)0){ if(value){ kvs_hash_set(hash,key,value); count; } }else if(strcmp(op_type,HDEL)0){ kvs_hash_del(hash,key); count; } free(data); } fclose(fp); return count; }后台线程实现全量持久化的入口函数// -------------------------- Checkpoint后台线程 -------------------------- //获取文件大小 static int get_file_size(char *filename){ struct stat stat_buf; if(stat(filename,stat_buf)-1)return -1; return (int)stat_buf.st_size; } // 检查Checkpoint触发条件 static int is_checkpoint_needed(hashtable_t *hash) { // 1. 检查WAL文件大小 int wal_size get_file_size(WAL_FILE); if (wal_size WAL_SIZE_THRESHOLD) return 1; return 0; } // Checkpoint后台线程函数 void *checkpoint_worker(void *arg) { hashtable_t *hash (hashtable_t *)arg; time_t last_checkpoint time(NULL);//返回的单位是秒s while (ENABLE_PERSIST) { time_t now time(NULL); //以下两者都是满足条件进行全量持久化并截断WAL日志清空 // 触发条件1定时间隔30秒 if (difftime(now, last_checkpoint) CHECKPOINT_INTERVAL) { kvs_hash_persist(hash); FILE *fpfopen(WAL_FILE,w); if(fpNULL)return NULL; fclose(fp); last_checkpoint now; sleep(1); // 避免频繁触发 continue; } // 触发条件2WAL大小/ KV数量阈值 if (is_checkpoint_needed(hash)) { kvs_hash_persist(hash); FILE *fpfopen(WAL_FILE,w); if(fpNULL)return NULL; fclose(fp); last_checkpoint now; sleep(1); continue; } // 每秒检查一次 sleep(1); } return NULL; }创建线程进行异步全量持久化并在重新运行kvstore时恢复数据加载全量和增量文件int init_persist(void){ //创建线程 pthread_t pid; pthread_create(pid,NULL,checkpoint_worker,(void *)Hash); pthread_detach(pid); //恢复数据加载全量 重放WAL增量日志 kvstore_hash_load(); kvstore_hash_logreplay(); }kvstore解析协议函数增删改操作时先进行增量持久化再进行内存写操作//“CRUD” 对应 Create创建、Read读取、Update更新、Delete删除 四个关键动作 int kvstore_parser_protocol(struct conn_item *item,char **tokens,int count){ if(itemNULL||tokensNULL||count0)return -1; int cmd; for(cmdKVS_CMD_START;cmdKVS_CMD_SIZE;cmd){ if(strcmp(commands[cmd],tokens[0])0){//strcmp当中传的就是地址自动根据地址比较值 break; } } char *msgitem-wbuffer; //memset(msg,0,BUFFER_LENGTH); int lenstrlen(msg); char *keytokens[1]; char *valuetokens[2]; //printf(cmd%d,commands%s\n,cmd,commands[cmd]); switch(cmd){ 。。。。。。 //hash case KVS_CMD_HSET:{ //printf(set\n); #if ENABLE_PERSIST kvs_hash_logpersist(tokens[0],key,value); #endif int reskvstore_hash_set(key,value); //需要给客户端回信息 if(res0){ snprintf(msglen,BUFFER_LENGTH-len,SUCCESS\r\n);//指定缓冲区最大容量避免缓冲区溢出 }else{ snprintf(msglen,BUFFER_LENGTH-len,FAILED\r\n); } break; } case KVS_CMD_HGET:{ //printf(get\n); char *valkvstore_hash_get(key); if(val){//value不为空 snprintf(msglen,BUFFER_LENGTH-len,%s\r\n,val); }else{ snprintf(msglen,BUFFER_LENGTH-len,NO EXIST\r\n); } //LOG(get:%s\n,val); break; } case KVS_CMD_HDEL:{ //printf(del\n); #if ENABLE_PERSIST kvs_hash_logpersist(tokens[0],key,value); #endif int reskvstore_hash_del(key); if(res0){ snprintf(msglen,BUFFER_LENGTH-len,ERROR\r\n); }else if(res0){ snprintf(msglen,BUFFER_LENGTH-len,SUCCESS\r\n); }else { snprintf(msglen,BUFFER_LENGTH-len,NO EXIST\r\n); } break; } case KVS_CMD_HMOD:{ //printf(mod\n); #if ENABLE_PERSIST kvs_hash_logpersist(tokens[0],key,value); #endif int reskvstore_hash_mod(key,value); if(res0){ snprintf(msglen,BUFFER_LENGTH-len,ERROR\r\n); }else if(res0){ snprintf(msglen,BUFFER_LENGTH-len,SUCCESS\r\n); }else { snprintf(msglen,BUFFER_LENGTH-len,NO EXIST\r\n); } break; } case KVS_CMD_HCOUNT:{ int countkvstore_hash_count(); if(count0){ snprintf(msglen,BUFFER_LENGTH-len,ERROR\r\n); }else { snprintf(msglen,BUFFER_LENGTH-len,%d\r\n,count);//%d,count } break; } default:{ //LOG(cmd:%s\n,commands[cmd]); assert(0); } } }三、测试功能上次运行kvstore服务器是已通过协议存入key-value对并进行了持久化1-22-33-4重新运行测试恢复数据情况