通过应用监控redis主从切换确保数据一致性

redis服务提供了两种方式保证数据不丢失:

  1. rdb持久化,通过将Redis在内存中的数据集快照保存到磁盘上来实现redis重启数据不丢失,开启方式:
1小时有1条修改命令 5分钟有100条修改命令 1分钟有10000条修改命令
save 3600 1 300 100 60 10000

数据文件目录
dir /home/redis-7.2.1/datas/

rdb文件名
dbfilename dump.rdb

是否对RDB文件压缩,开启后会使用LZF压缩
rdbcompression yes

通过应用监控redis主从切换确保数据一致性

这种持久化方式有可能导致数据丢失,一般在生产环境不配置这种持久化,客户端可以通过向Redis服务器发送save或bgsave命令让服务器生成rdb文件,主从复制时如果从节点比主节点落后数据非常多时会使用rdb方式更新数据,由于它是数据快照方式持久化,所以恢复数据性能非常好。

  1. aof持久化,通过保存服务器收到的每一个写操作命令到文件来进行数据持久化的。当Redis重启时,可以通过重新执行这些命令来恢复数据到内存中。
开启持久化
appendonly yes

append追加文件名
appendfilename "appendonly.aof"

文件目录
appenddirname "appendonlydir"

追加模式支持3种:always每条命令都保存、everysec每分钟保存一次、no不指定让操作系统决定持久化时间
appendfsync always
appendfsync everysec
appendfsync no

自动触发aof文件重写条件
auto-aof-rewrite-percentage 100
auto-aof-rewrite-min-size 64mb

如果同时开启rdb和aof持久化方式,redis重启时优先使用aof持久化文件,因为它可以确保丢失更少的数据,随着处理命令的增加,可以通过手动执行bgrewriteaof重写aof文件,避免文件增大过快。
上面介绍了两种持久化数据的方式,尽管aof方式提供了 always 模式保证数据一条不丢失,但一般在生产中都是集群部署,而redis在主从复制时是异步的,这就导致在主从切换时存在数据丢失的风险,如果程序使用redis时不允许数据丢失,那就需要在代码中保证数据一致性,而前面介绍的 通过redis实现高性能扣费 就是不允许数据丢失的使用场景,下面介绍一下我是如何确保主从切换时的数据一致性的。
要确保数据不丢失,就需要程序能够感知到redis主从切换,当redis产生了主从切换,通过加载本地已经持久化的数据补偿丢失的数据。可以通过本地启动一个定时任务,定时监控redis集群节点信息,发现master节点改变就重新加载本地数据:
要感知redis集群节点角色改变,我们要存储节点的角色信息,然后通过定时任务不断的获取集群当前角色与存储的历史角色比较,不一致就发生了变更。
首先定义一个数据结构保存节点信息:

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * redis集群节点数据
 *
 * @Author xingo
 * @Date 2024/9/18
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class ClusterNode {

    /**
     * 集群是否为主节点
     */
    private boolean master;

    /**
     * 节点索引值
     */
    private short idx;

    /**
     * 节点主机和端口
     */
    private String hostAndPort;
}

再定义一个服务类处理节点数据,节点的历史状态保存到一个哈希集合中:

import io.lettuce.core.cluster.SlotHash;
import org.springframework.stereotype.Service;
import org.xingo.entity.ClusterNode;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

/**
 * redis集群节点数据
 *
 * @Author xingo
 * @Date 2024/9/18
 */
@Service
public class RedisNodeSlotService {

    /**
     * 节点信息
     */
    private Map<String, ClusterNode> nodeMap = new HashMap<>();
    /**
     * hash槽信息
     */
    private Map<Integer, Short> slotMap = new HashMap<>();

    /**
     * 增加节点信息
     * @param hostAndPort
     * @param node
     */
    public void addNode(String hostAndPort, ClusterNode node) {
        nodeMap.put(hostAndPort, node);
    }

    /**
     * 获取节点信息
     * @param hostAndPort
     * @return
     */
    public ClusterNode getNode(String hostAndPort) {
        return nodeMap.get(hostAndPort);
    }

    /**
     * 返回节点集合
     * @return
     */
    public Collection<ClusterNode> allNodes() {
        return Collections.unmodifiableCollection(nodeMap.values());
    }

    /**
     * 清空节点信息集合
     */
    public void clearNodes() {
        nodeMap.clear();
    }

    /**
     * 获取节点的下一个索引值
     * @return
     */
    public short nextIdx() {
        short maxIdx = 0;
        if(nodeMap.isEmpty()) {
            maxIdx = 0;
        } else {
            for (ClusterNode node : nodeMap.values()) {
                if(node.getIdx() > maxIdx) {
                    maxIdx = node.getIdx();
                }
            }
        }
        maxIdx += 1;
        return maxIdx;
    }

    /**
     * 添加一个hash槽所在的节点索引值
     * @param slot
     * @param idx
     */
    public void addSlot(Integer slot, Short idx) {
        slotMap.put(slot, idx);
    }

    /**
     * 获取某个hash槽所在的节点索引值
     * @param slot
     */
    public void getSlotNodeIdx(Integer slot) {
        slotMap.get(slot);
    }

    /**
     * 获取键的hash槽
     * @param key
     * @return
     */
    public Integer getKeySlot(String key) {
        if(key == null || "".equals(key.trim())) {
            return null;
        }

        return SlotHash.getSlot(key);
    }

    /**
     * 获取键的节点
     * @param key
     * @return
     */
    public Short getKeyNode(String key) {
        Integer slot = getKeySlot(key);
        return slot != null ? slotMap.get(slot) : null;
    }
}

上面分析时已经阐述了要通过一个定时任务不断监听集群的当前状态,通过比较历史状态是否一致来判断是否发生变化:

import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.RedisClusterConnection;
import org.springframework.data.redis.connection.RedisClusterNode;
import org.springframework.data.redis.connection.RedisSentinelConnection;
import org.springframework.data.redis.connection.RedisServer;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.RedisConnectionUtils;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.xingo.common.JacksonUtils;
import org.xingo.entity.ClusterNode;
import org.xingo.front.service.impl.LoadDataService;
import org.xingo.front.service.impl.RedisNodeSlotService;

import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.Set;

/**
 * @Author xingo
 * @Date 2024/9/18
 */
@Slf4j
@Component
@EnableScheduling
public class CheckRedisNodeSlotJob {

    @Autowired
    private RedisNodeSlotService redisNodeSlotService;
    @Autowired
    private LoadDataService loadDataService;
    @Autowired
    private StringRedisTemplate redisTemplate;

    @Scheduled(cron="0/5 * * * * ?")
    public void run() {
//        JedisConnectionFactory
        LettuceConnectionFactory factory = (LettuceConnectionFactory) redisTemplate.getConnectionFactory();

        if(factory.getClusterConfiguration() != null) {     // 集群模式
            RedisClusterConnection conn = factory.getClusterConnection();
            Iterable<RedisClusterNode> nodes = conn.clusterGetNodes();
            RedisConnectionUtils.releaseConnection(conn, factory);
            Iterator<RedisClusterNode> iterator = nodes.iterator();
            boolean changeStatus = false;
            while (iterator.hasNext()) {
                RedisClusterNode next = iterator.next();
                String hostAndPort = next.getHost() + ":" + next.getPort();

                // redis节点集合
                if (redisNodeSlotService.getNode(hostAndPort) == null) {
                    ClusterNode clusterNode = ClusterNode.builder().master(next.isMaster()).hostAndPort(hostAndPort).idx(redisNodeSlotService.nextIdx()).build();
                    redisNodeSlotService.addNode(hostAndPort, clusterNode);
                }
                ClusterNode node = redisNodeSlotService.getNode(hostAndPort);
                // 从节点升级为主节点
                if(next.isMaster() && !node.isMaster()) {
                    log.error("redis集群发生主从切换|{}|{}|{}", hostAndPort, node.isMaster(), next.isMaster());
                    changeStatus = true;
                }
            }
            if(changeStatus) {
                // redis集群状态发生变化通知所有应用变更本地缓存的redis节点状态并且检查最近一段时间的数据
                this.init();
                loadDataService.checkCache();
            }
        } else if(factory.getSentinelConfiguration() != null) {     // 哨兵模式
            RedisSentinelConnection conn = factory.getSentinelConnection();
            Collection<RedisServer> masters = conn.masters();
            try {
                conn.close();
            } catch (IOException e) {
                log.error("关闭连接异常", e);
            }
            for (RedisServer master : masters) {
                String hostAndPort = master.getHost() + ":" + master.getPort();
                if(redisNodeSlotService.getNode(hostAndPort) == null) {
                    // redis主节点发生变化通知所有应用变更本地缓存的redis节点状态并且检查最近一段时间的数据
                    log.error("redis哨兵主节点切换|{}|{}", JacksonUtils.toJSONString(redisNodeSlotService.allNodes()), hostAndPort);
                    this.init();
                    loadDataService.checkCache();
                }
            }
        } else {    // 单机模式
            System.out.println("======== 单机模式 ========");
        }
    }

    /**
     * 服务启动时初始化集群信息
     */
    @PostConstruct
    public void init() {
        LettuceConnectionFactory factory = (LettuceConnectionFactory) redisTemplate.getConnectionFactory();

        if(factory.getClusterConfiguration() != null) {     // 集群模式
            RedisClusterConnection conn = factory.getClusterConnection();
            Iterable<RedisClusterNode> nodes = conn.clusterGetNodes();
            RedisConnectionUtils.releaseConnection(conn, factory);
            Iterator<RedisClusterNode> iterator = nodes.iterator();
            while (iterator.hasNext()) {
                RedisClusterNode next = iterator.next();
                String hostAndPort = next.getHost() + ":" + next.getPort();

                // redis节点集合
                if (redisNodeSlotService.getNode(hostAndPort) == null) {
                    ClusterNode clusterNode = ClusterNode.builder().master(next.isMaster()).hostAndPort(hostAndPort).idx(redisNodeSlotService.nextIdx()).build();
                    redisNodeSlotService.addNode(hostAndPort, clusterNode);
                } else {
                    redisNodeSlotService.getNode(hostAndPort).setMaster(next.isMaster());
                }
                short idx = redisNodeSlotService.getNode(hostAndPort).getIdx();

                // redis槽集合
                RedisClusterNode.SlotRange slotRange = next.getSlotRange();
                Set<Integer> slots = slotRange.getSlots();
                if (!slots.isEmpty()) {
                    for (Integer slot : slots) {
                        redisNodeSlotService.addSlot(slot, idx);
                    }
                }
            }
        } else if(factory.getSentinelConfiguration() != null) {     // 哨兵模式
            redisNodeSlotService.clearNodes();
            RedisSentinelConnection conn = factory.getSentinelConnection();
            Collection<RedisServer> masters = conn.masters();
            try {
                conn.close();
            } catch (IOException e) {
                log.error("关闭连接异常", e);
            }
            for (RedisServer master : masters) {
                String hostAndPort = master.getHost() + ":" + master.getPort();
                ClusterNode clusterNode = ClusterNode.builder().master(master.isMaster()).hostAndPort(hostAndPort).idx(redisNodeSlotService.nextIdx()).build();
                redisNodeSlotService.addNode(hostAndPort, clusterNode);
            }
        } else {    // 单机模式
            System.out.println("======== 单机模式 ========");
        }
    }
}

截止到当前,监听redis集群状态的代码已经全部完成,接下来就是当监听到集群状态变化时的处理逻辑,我们这里模拟的是当集群状态改变时重新加载本地数据到redis确保数据不丢失。业务数据采用的是日志先行WAL(Write-Ahead Logging)方式确保数据不丢失,所以也可以通过这个日志恢复数据到redis。还是用前面的扣费逻辑数据举例:

2024-09-19 10:30:52.542|9|1836593772545314816|100|1726713052295

规定使用“|”分隔日志内容,第一列是时间戳、第二列是用户ID、第三列是订单ID、第四列是扣费金额、第五列是扣费时间戳,日志文件名是deduct.log,通过解析这个日志文件拆分字段,比较订单ID是否存在,把那些不在redis中的数据重新加载到redis:

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.xingo.common.RedisKeyUtils;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.math.BigDecimal;
import java.sql.Timestamp;
import java.util.Arrays;
import java.util.List;

/**
 * 加载最近时间失败数据
 *
 * @Author xingo
 * @Date 2024/9/19
 */
@Slf4j
@Service
public class LoadDataService {

    /**
     * 日志文件路径
     */
    @Value("${log-path}")
    private String logPath;
    @Autowired
    private StringRedisTemplate redisTemplate;
    /**
     * 一行数据可能的最大长度
     */
    private int lineSize = 88;

    /**
     * 检查缓存数据是否有差异
     */
    @Async
    public void checkCache() {
        log.info("开始检查数据差异|{}", new Timestamp(System.currentTimeMillis()));

        // 查找数据:二分法查找
        // 将查找到的订单与缓存中的订单进行比较,如果订单没有在缓存中,表示当前的订单在redis集群主从切换时还未同步到从节点
        // 但是这个订单已经存入数据库;这样就导致了缓存数据与数据库数据无法达到最终一致性,需要把这部分数据再次加载到缓存中保持数据的一致性
        File file = null;
        if(logPath.endsWith(File.separator)) {
            file = new File(logPath + "deduct.log");
        } else {
            file = new File(logPath + File.separator + "deduct.log");
        }
        RandomAccessFile raf = null;
        try {
            raf = new RandomAccessFile(file, "rw");
            long start = 0L;
            long end = file.length();
            long point = 0L;
            String line = null;
            // 2024-09-19 10:30:52.542|9|1836593772545314816|100|1726713052295
            boolean lackKey = false;
            while (true) {
                if(end - start < 60) {
                    break;
                }
                point = (start + end) / 2;
                raf.seek(Math.max(0, point - lineSize));
                line = raf.readLine();
                if (line == null || line.trim().isEmpty()) {
                    break;
                }
                String[] arr = line.split("\\|");
                if (arr.length != 5) {
                    line = raf.readLine();
                    if (line == null || line.trim().isEmpty()) {
                        break;
                    }
                    arr = line.split("\\|");
                }

                int userId = Integer.parseInt(arr[1]);
                long orderId = Long.parseLong(arr[2]);
                String orderKey = RedisKeyUtils.userOrderKey(userId, orderId);
                Boolean hasKey = redisTemplate.hasKey(orderKey);
                if (hasKey != null && hasKey) {
                    start = point;
                } else {
                    lackKey = true;
                    break;
                }
            }

            if (lackKey) {
                raf.seek(start);
                line = raf.readLine();
                String[] arr = line.split("\\|");
                if (arr.length != 5) {
                    line = raf.readLine();
                }
                while (line != null && !line.trim().isEmpty()) {
                    arr = line.split("\\|");
                    int userId = Integer.parseInt(arr[1]);
                    long orderId = Long.parseLong(arr[2]);
                    String orderKey = RedisKeyUtils.userOrderKey(userId, orderId);
                    Boolean hasKey = redisTemplate.hasKey(orderKey);
                    if(hasKey == null || !hasKey) {
                        String key = RedisKeyUtils.userBalanceKey(userId);
                        String key2 = RedisKeyUtils.userOrderZsetKey(userId);
                        List<String> keys = Arrays.asList(key, orderKey, key2);
                        String execute = redisTemplate.execute(LuaScriptUtils.ASYNC_DEDUCT_SCRIPT, keys,
                                new BigDecimal(arr[3]).multiply(BigDecimal.valueOf(100)).intValue() + "", arr[4], arr[2]);
                        log.info("redis丢失数据重新载入|{}|{}|{}|{}|{}", userId, orderId, arr[3], arr[4], execute);
                    }
                    line = raf.readLine();
                }
            }
        } catch (Exception e) {
            log.error("读取文件异常", e);
        } finally {
            try {
                if(raf != null) {
                    raf.close();
                }
            } catch (IOException e) {
                log.error("关闭文件异常", e);
            }
        }
        log.info("检查数据差异完成|{}", new Timestamp(System.currentTimeMillis()));
    }
}

上面的讲解就是通过业务代码确保数据不丢失的逻辑,这种补偿数据方式可以确保大部分情况下redis数据不丢失,除非一种情况发生:redis发生了主从切换、并且刚好在这个时候应用服务宕机且日志文件丢失无法加载,但是这种情况发生的概率非常低,基本上可以忽略,所以通过业务系统可以保证redis数据一致性。
但是还有一种情况会导致数据处理异常,比如某个订单在还未恢复到redis时又重新发起了扣费请求,这种情况就有可能导致重复扣费,这种情况也好处理,我们在恢复数据时比较一下扣费时间,如果在redis中存在了相同订单ID的扣费时间比恢复数据的时间要晚,则表示这个订单在恢复数据的过程中又发起了请求,相当于两次扣费,只需要把其中一笔扣费回滚即可。

版权声明:如无特殊标注,文章均来自网络,本站编辑整理,转载时请以链接形式注明文章出处,请自行分辨。

本文链接:https://www.shbk5.com/dnsj/72079.html