背景:
在知识图谱可视化的系统中需要将数据同时导入到MySQL数据库和Neo4j数据库中,一次的数据就会有7-8万条,并且每天都会有新的数据上传。
下面的所有代码均为项目中代码的简化版本,提供一个解决方案。
上传代码:
public boolean uploadGraphData(String jsonString) {
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
GraphDataVo graphData;
try {
graphData = objectMapper.readValue(jsonString, GraphDataVo.class);
} catch (JsonProcessingException e) {
logger.error("JSON 解析失败", e);
return false; // JSON解析失败
}
JsonNode jsonNode;
JsonNode nodesArray;
try {
jsonNode = objectMapper.readTree(jsonString);
nodesArray = jsonNode.get("nodes");
} catch (IOException e) {
logger.error("JSON 解析失败", e);
return false; // JSON解析失败
}
Set<String> processedTypes = new HashSet<>();
try (Session session = driver.session()) {
for (int i = 0; i < graphData.getNodes().size(); i++) {
NodeVo node = graphData.getNodes().get(i);
JsonNode nodeJson = nodesArray.get(i);
try {
// 插入节点到 Neo4j
String nodeCypher = String.format("MERGE (n:%s {id: $id}) ON CREATE SET n.name = $name, n.attributes = $attributes", node.getType());
Result nodeResult = session.writeTransaction(tx ->
tx.run(nodeCypher, parameters("id", node.getId(), "name", node.getName(), "attributes", node.getAttributes())));
if (nodeResult.consume().counters().nodesCreated() == 0) {
logger.error("创建新节点失败");
return false; // 创建新节点失败
}
// 检查并插入新的 node type
String nodeType = node.getType();
if (!processedTypes.contains(nodeType)) {
if (!nodeTypesService.exists(nodeType)) {
nodeTypesService.add(new NodeTypes(nodeType));
}
processedTypes.add(nodeType);
}
// 插入节点到 SQL
Nodes newNode = new Nodes();
newNode.setId(node.getId());
newNode.setName(node.getName());
newNode.setAttributes(node.getAttributes());
newNode.setType(node.getType());
// 从 JSON 提取 imgUrl
String imgUrl = null;
if (nodeJson.has("imgUrl")) {
imgUrl = nodeJson.get("imgUrl").asText();
}
newNode.setImgurl(imgUrl);
nodesService.add(newNode);
} catch (Exception e) {
logger.error("插入节点失败", e);
return false; // 插入节点失败
}
}
for (RelationshipVo relationship : graphData.getRelationships()) {
try {
// 插入关系到 Neo4j
Result relationshipResult = session.writeTransaction(tx ->
tx.run("MATCH (a), (b) WHERE a.id = $sourceId AND b.id = $targetId " +
"MERGE (a)-[r:" + relationship.getType() + " {attributes: $attributes}]->(b)",
parameters("sourceId", relationship.getSource(), "targetId", relationship.getTarget(), "attributes", relationship.getAttributes())));
if (relationshipResult.consume().counters().relationshipsCreated() == 0) {
logger.error("创建新关系失败");
return false; // 创建新关系失败
}
// 插入关系到 SQL
Relationships newRelationship = new Relationships();
newRelationship.setSource(relationship.getSource());
newRelationship.setTarget(relationship.getTarget());
newRelationship.setType(relationship.getType());
newRelationship.setAttributes(relationship.getAttributes());
relationshipsService.add(newRelationship);
} catch (Exception e) {
logger.error("插入关系失败", e);
return false; // 插入关系失败
}
}
return true; // 数据导入成功
} catch (Exception e) {
logger.error("数据库会话出错", e);
return false; // 数据库会话出错
}
}
可以看出这段代码是同步执行的,并且在一个单一的线程中依次处理上传到Neo4j和MySQL数据库的逻辑,没有使用任何异步编程模式或并发执行的机制,像背景中的数据量就很容易会遇到性能瓶颈等一系列问题。
优化-RabbitMQ
“消息队列(Message Queue)”是在消息的传输过程中保存消息的容器。在消息队列中,通常有生产者和消费者两个角色。生产者只负责发送数据到消息队列,谁从消息队列中取出数据处理,他不管。消费者只负责从消息队列中取出数据处理,他不管这是谁发送的数据。
生产者代码:
public class DataUploadProducer {
// 队列名称,用于在RabbitMQ中标识队列
private static final String QUEUE_NAME = "graph_data_queue";
private final Logger logger = LoggerFactory.getLogger(DataUploadProducer.class);
@Value("${spring.rabbitmq.host}")
private String url;
/**
* 将图形数据上传到RabbitMQ队列的方法。
* @param jsonString 需要上传的图形数据,以JSON字符串格式传入。
* @return 返回一个布尔值,表示数据是否成功发送到队列。
*/
public boolean uploadGraphData(String jsonString) {
// 创建连接工厂,设置RabbitMQ服务器的地址
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(url); // 使用从配置文件注入的URL配置连接工厂
try (// 尝试与RabbitMQ服务器建立连接
Connection connection = factory.newConnection();
// 创建一个新的通道
Channel channel = connection.createChannel()) {
// 声明一个队列,如果队列不存在,则RabbitMQ会自动创建
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 将图形数据作为消息发送到队列
channel.basicPublish("", QUEUE_NAME, null, jsonString.getBytes());
// 记录日志,表示消息已成功发送到队列
logger.info("Graph data sent to queue.");
return true; // 返回true表示数据成功发送
} catch (Exception e) {
throw new RuntimeException("Failed to send graph data to queue", e);
}
}
}
消费者:
public abstract class DataImportConsumer implements Runnable {
private static final String QUEUE_NAME = "graph_data_queue";
protected final Logger logger = LoggerFactory.getLogger(this.getClass());
private final ObjectMapper objectMapper = new ObjectMapper();
@Value("${spring.rabbitmq.host}")
private String url;
@Override
public void run() {
try {
// 创建连接工厂并配置RabbitMQ服务器地址
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(url);
// 建立到RabbitMQ服务器的连接
Connection connection = factory.newConnection();
// 创建通道
Channel channel = connection.createChannel();
// 声明队列,确保队列存在
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
logger.info("Waiting for messages...");
// 定义消息接收的回调函数
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
try {
// 抽象方法,子类需要实现具体的消息处理逻辑
processMessage(message);
} catch (Exception e) {
logger.error("Failed to process message", e);
}
};
// 开始消费队列中的消息
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
} catch (Exception e) {
logger.error("Failed to start consumer", e);
}
}
// 抽象方法,用于在子类中实现具体的消息处理逻辑
protected abstract void processMessage(String message) throws Exception;
}
由于需要上传数据到Neo4j和MySQL中,所以定义抽象类,并且开启两个线程。
上传逻辑:
public class MySQLDataImportConsumer extends DataImportConsumer {
private final Logger logger = LoggerFactory.getLogger(MySQLDataImportConsumer.class);
private final ObjectMapper objectMapper = new ObjectMapper();
private final NodesService nodesService;
private final RelationshipsService relationshipsService;
public MySQLDataImportConsumer(NodesService nodesService, RelationshipsService relationshipsService) {
this.nodesService = nodesService;
this.relationshipsService = relationshipsService;
}
@Override
protected void processMessage(String message) {
try {
GraphDataVo graphData = objectMapper.readValue(message, GraphDataVo.class);
JsonNode jsonNode = objectMapper.readTree(message);
JsonNode nodesArray = jsonNode.get("nodes");
for (int i = 0; i < graphData.getNodes().size(); i++) {
NodeVo node = graphData.getNodes().get(i);
JsonNode nodeJson = nodesArray.get(i);
Nodes newNode = new Nodes();
newNode.setId(node.getId());
newNode.setName(node.getName());
newNode.setAttributes(node.getAttributes());
newNode.setType(node.getType());
String imgUrl = null;
if (nodeJson.has("imgUrl")) {
imgUrl = nodeJson.get("imgUrl").asText();
}
newNode.setImgurl(imgUrl);
nodesService.add(newNode);
}
for (RelationshipVo relationship : graphData.getRelationships()) {
Relationships newRelationship = new Relationships();
newRelationship.setSource(relationship.getSource());
newRelationship.setTarget(relationship.getTarget());
newRelationship.setType(relationship.getType());
newRelationship.setAttributes(relationship.getAttributes());
relationshipsService.add(newRelationship);
}
} catch (Exception e) {
logger.error("Failed to import data into MySQL", e);
}
}
}
public class Neo4jDataImportConsumer extends DataImportConsumer {
private final Logger logger = LoggerFactory.getLogger(Neo4jDataImportConsumer.class);
private final ObjectMapper objectMapper = new ObjectMapper();
private final Driver driver; // 假设你已经有了Neo4j驱动的实例
public Neo4jDataImportConsumer(Driver driver) {
this.driver = driver;
}
@Override
protected void processMessage(String message) {
try (Session session = driver.session()) {
GraphDataVo graphData = objectMapper.readValue(message, GraphDataVo.class);
for (NodeVo node : graphData.getNodes()) {
String nodeCypher = String.format("MERGE (n:%s {id: $id}) ON CREATE SET n.name = $name, n.attributes = $attributes", node.getType());
session.writeTransaction(tx -> tx.run(nodeCypher, parameters("id", node.getId(), "name", node.getName(), "attributes", node.getAttributes())));
}
for (RelationshipVo relationship : graphData.getRelationships()) {
session.writeTransaction(tx -> tx.run("MATCH (a), (b) WHERE a.id = $sourceId AND b.id = $targetId MERGE (a)-[r:" + relationship.getType() + " {attributes: $attributes}]->(b)", parameters("sourceId", relationship.getSource(), "targetId", relationship.getTarget(), "attributes", relationship.getAttributes())));
}
} catch (Exception e) {
logger.error("Failed to import data into Neo4j", e);
}
}
}
总结:
使用RabbitMQ异步的上传数据解决了大数据量上传的性能瓶颈等问题,提供了一个不错的思路,其实还可以接着优化,如果某一天的数据量特别的大,可以先将数据加入到队列中,消费者可以分批次的从队列中添加数据到库中。