Rabbitmq优化数据上传

背景:

在知识图谱可视化的系统中需要将数据同时导入到MySQL数据库和Neo4j数据库中,一次的数据就会有7-8万条,并且每天都会有新的数据上传。

下面的所有代码均为项目中代码的简化版本,提供一个解决方案。

上传代码

public boolean uploadGraphData(String jsonString) {
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
        GraphDataVo graphData;

        try {
            graphData = objectMapper.readValue(jsonString, GraphDataVo.class);
        } catch (JsonProcessingException e) {
            logger.error("JSON 解析失败", e);
            return false; // JSON解析失败
        }

        JsonNode jsonNode;
        JsonNode nodesArray;
        try {
            jsonNode = objectMapper.readTree(jsonString);
            nodesArray = jsonNode.get("nodes");
        } catch (IOException e) {
            logger.error("JSON 解析失败", e);
            return false; // JSON解析失败
        }

        Set<String> processedTypes = new HashSet<>();

        try (Session session = driver.session()) {
            for (int i = 0; i < graphData.getNodes().size(); i++) {
                NodeVo node = graphData.getNodes().get(i);
                JsonNode nodeJson = nodesArray.get(i);
                try {
                    // 插入节点到 Neo4j
                    String nodeCypher = String.format("MERGE (n:%s {id: $id}) ON CREATE SET n.name = $name, n.attributes = $attributes", node.getType());
                    Result nodeResult = session.writeTransaction(tx ->
                            tx.run(nodeCypher, parameters("id", node.getId(), "name", node.getName(), "attributes", node.getAttributes())));
                    if (nodeResult.consume().counters().nodesCreated() == 0) {
                        logger.error("创建新节点失败");
                        return false; // 创建新节点失败
                    }

                    // 检查并插入新的 node type
                    String nodeType = node.getType();
                    if (!processedTypes.contains(nodeType)) {
                        if (!nodeTypesService.exists(nodeType)) {
                            nodeTypesService.add(new NodeTypes(nodeType));
                        }
                        processedTypes.add(nodeType);
                    }

                    // 插入节点到 SQL
                    Nodes newNode = new Nodes();
                    newNode.setId(node.getId());
                    newNode.setName(node.getName());
                    newNode.setAttributes(node.getAttributes());
                    newNode.setType(node.getType());

                    // 从 JSON 提取 imgUrl
                    String imgUrl = null;
                    if (nodeJson.has("imgUrl")) {
                        imgUrl = nodeJson.get("imgUrl").asText();
                    }
                    newNode.setImgurl(imgUrl);

                    nodesService.add(newNode);

                } catch (Exception e) {
                    logger.error("插入节点失败", e);
                    return false; // 插入节点失败
                }
            }

            for (RelationshipVo relationship : graphData.getRelationships()) {
                try {
                    // 插入关系到 Neo4j
                    Result relationshipResult = session.writeTransaction(tx ->
                            tx.run("MATCH (a), (b) WHERE a.id = $sourceId AND b.id = $targetId " +
                                            "MERGE (a)-[r:" + relationship.getType() + " {attributes: $attributes}]->(b)",
                                    parameters("sourceId", relationship.getSource(), "targetId", relationship.getTarget(), "attributes", relationship.getAttributes())));
                    if (relationshipResult.consume().counters().relationshipsCreated() == 0) {
                        logger.error("创建新关系失败");
                        return false; // 创建新关系失败
                    }

                    // 插入关系到 SQL
                    Relationships newRelationship = new Relationships();
                    newRelationship.setSource(relationship.getSource());
                    newRelationship.setTarget(relationship.getTarget());
                    newRelationship.setType(relationship.getType());
                    newRelationship.setAttributes(relationship.getAttributes());
                    relationshipsService.add(newRelationship);

                } catch (Exception e) {
                    logger.error("插入关系失败", e);
                    return false; // 插入关系失败
                }
            }

            return true; // 数据导入成功
        } catch (Exception e) {
            logger.error("数据库会话出错", e);
            return false; // 数据库会话出错
        }
    }

可以看出这段代码是同步执行的,并且在一个单一的线程中依次处理上传到Neo4j和MySQL数据库的逻辑,没有使用任何异步编程模式或并发执行的机制,像背景中的数据量就很容易会遇到性能瓶颈等一系列问题。


优化-RabbitMQ

“消息队列(Message Queue)”是在消息的传输过程中保存消息的容器。在消息队列中,通常有生产者和消费者两个角色。生产者只负责发送数据到消息队列,谁从消息队列中取出数据处理,他不管。消费者只负责从消息队列中取出数据处理,他不管这是谁发送的数据。

生产者代码:

public class DataUploadProducer {
    // 队列名称,用于在RabbitMQ中标识队列
    private static final String QUEUE_NAME = "graph_data_queue";

    private final Logger logger = LoggerFactory.getLogger(DataUploadProducer.class);

    @Value("${spring.rabbitmq.host}")
    private String url;

    /**
     * 将图形数据上传到RabbitMQ队列的方法。
     * @param jsonString 需要上传的图形数据,以JSON字符串格式传入。
     * @return 返回一个布尔值,表示数据是否成功发送到队列。
     */
    public boolean uploadGraphData(String jsonString) {
        // 创建连接工厂,设置RabbitMQ服务器的地址
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(url); // 使用从配置文件注入的URL配置连接工厂

        try (// 尝试与RabbitMQ服务器建立连接
             Connection connection = factory.newConnection();
             // 创建一个新的通道
             Channel channel = connection.createChannel()) {
            // 声明一个队列,如果队列不存在,则RabbitMQ会自动创建
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            // 将图形数据作为消息发送到队列
            channel.basicPublish("", QUEUE_NAME, null, jsonString.getBytes());
            // 记录日志,表示消息已成功发送到队列
            logger.info("Graph data sent to queue.");
            return true; // 返回true表示数据成功发送
        } catch (Exception e) {

            throw new RuntimeException("Failed to send graph data to queue", e);
        }
    }
}

消费者:


public abstract class DataImportConsumer implements Runnable {
 
    private static final String QUEUE_NAME = "graph_data_queue";
   
    protected final Logger logger = LoggerFactory.getLogger(this.getClass());

    private final ObjectMapper objectMapper = new ObjectMapper();


    @Value("${spring.rabbitmq.host}")
    private String url;

    @Override
    public void run() {
        try {
            // 创建连接工厂并配置RabbitMQ服务器地址
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost(url);
            // 建立到RabbitMQ服务器的连接
            Connection connection = factory.newConnection();
            // 创建通道
            Channel channel = connection.createChannel();

            // 声明队列,确保队列存在
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            logger.info("Waiting for messages...");

            // 定义消息接收的回调函数
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                try {
                    // 抽象方法,子类需要实现具体的消息处理逻辑
                    processMessage(message);
                } catch (Exception e) {
                    logger.error("Failed to process message", e);
                }
            };
            // 开始消费队列中的消息
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
        } catch (Exception e) {
            logger.error("Failed to start consumer", e);
        }
    }

    // 抽象方法,用于在子类中实现具体的消息处理逻辑
    protected abstract void processMessage(String message) throws Exception;
}

由于需要上传数据到Neo4j和MySQL中,所以定义抽象类,并且开启两个线程。

上传逻辑:

public class MySQLDataImportConsumer extends DataImportConsumer {
    private final Logger logger = LoggerFactory.getLogger(MySQLDataImportConsumer.class);
    private final ObjectMapper objectMapper = new ObjectMapper();
    private final NodesService nodesService; 
    private final RelationshipsService relationshipsService;

    public MySQLDataImportConsumer(NodesService nodesService, RelationshipsService relationshipsService) {
        this.nodesService = nodesService;
        this.relationshipsService = relationshipsService;
    }

    @Override
    protected void processMessage(String message) {
        try {
            GraphDataVo graphData = objectMapper.readValue(message, GraphDataVo.class);
            JsonNode jsonNode = objectMapper.readTree(message);
            JsonNode nodesArray = jsonNode.get("nodes");

            for (int i = 0; i < graphData.getNodes().size(); i++) {
                NodeVo node = graphData.getNodes().get(i);
                JsonNode nodeJson = nodesArray.get(i);

                Nodes newNode = new Nodes();
                newNode.setId(node.getId());
                newNode.setName(node.getName());
                newNode.setAttributes(node.getAttributes());
                newNode.setType(node.getType());

                String imgUrl = null;
                if (nodeJson.has("imgUrl")) {
                    imgUrl = nodeJson.get("imgUrl").asText();
                }
                newNode.setImgurl(imgUrl);

                nodesService.add(newNode);
            }

            for (RelationshipVo relationship : graphData.getRelationships()) {
                Relationships newRelationship = new Relationships();
                newRelationship.setSource(relationship.getSource());
                newRelationship.setTarget(relationship.getTarget());
                newRelationship.setType(relationship.getType());
                newRelationship.setAttributes(relationship.getAttributes());
                relationshipsService.add(newRelationship);
            }
        } catch (Exception e) {
            logger.error("Failed to import data into MySQL", e);
        }
    }
}
public class Neo4jDataImportConsumer extends DataImportConsumer {
    private final Logger logger = LoggerFactory.getLogger(Neo4jDataImportConsumer.class);
    private final ObjectMapper objectMapper = new ObjectMapper();
    private final Driver driver; // 假设你已经有了Neo4j驱动的实例

    public Neo4jDataImportConsumer(Driver driver) {
        this.driver = driver;
    }

    @Override
    protected void processMessage(String message) {
        try (Session session = driver.session()) {
            GraphDataVo graphData = objectMapper.readValue(message, GraphDataVo.class);
            for (NodeVo node : graphData.getNodes()) {
                String nodeCypher = String.format("MERGE (n:%s {id: $id}) ON CREATE SET n.name = $name, n.attributes = $attributes", node.getType());
                session.writeTransaction(tx -> tx.run(nodeCypher, parameters("id", node.getId(), "name", node.getName(), "attributes", node.getAttributes())));
            }

            for (RelationshipVo relationship : graphData.getRelationships()) {
                session.writeTransaction(tx -> tx.run("MATCH (a), (b) WHERE a.id = $sourceId AND b.id = $targetId MERGE (a)-[r:" + relationship.getType() + " {attributes: $attributes}]->(b)", parameters("sourceId", relationship.getSource(), "targetId", relationship.getTarget(), "attributes", relationship.getAttributes())));
            }
        } catch (Exception e) {
            logger.error("Failed to import data into Neo4j", e);
        }
    }
}

总结:

使用RabbitMQ异步的上传数据解决了大数据量上传的性能瓶颈等问题,提供了一个不错的思路,其实还可以接着优化,如果某一天的数据量特别的大,可以先将数据加入到队列中,消费者可以分批次的从队列中添加数据到库中。

消息盒子
# 您需要首次评论以获取消息 #
# 您需要首次评论以获取消息 #

只显示最新10条未读和已读信息