This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
new 044d712ef06 PipeConsensus: Avoid replicate block && Avoid transfer
error when connector is closed. (#13146) (#13169)
044d712ef06 is described below
commit 044d712ef0686277e2d7a7fa7af3140a5abb768c
Author: Peng Junzhi <[email protected]>
AuthorDate: Wed Aug 14 18:24:19 2024 +0800
PipeConsensus: Avoid replicate block && Avoid transfer error when connector
is closed. (#13146) (#13169)
---
.../pipeconsensus/PipeConsensusAsyncConnector.java | 14 +++++++++++++-
1 file changed, 13 insertions(+), 1 deletion(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
index 7520f217df3..85ba921e24d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
@@ -167,6 +167,11 @@ public class PipeConsensusAsyncConnector extends
IoTDBConnector implements Conse
event.getCommitId(),
event);
}
+ // Special judge to avoid transfer stuck when re-transfer events that
will not be put in
+ // retryQueue.
+ if (transferBuffer.contains(event)) {
+ return true;
+ }
long currentTime = System.nanoTime();
boolean result =
transferBuffer.offer(
@@ -203,6 +208,13 @@ public class PipeConsensusAsyncConnector extends
IoTDBConnector implements Conse
transferBuffer.size(),
IOTDB_CONFIG.getPipeConsensusPipelineSize());
}
+ if (transferBuffer.isEmpty()) {
+ LOGGER.info(
+ "PipeConsensus-ConsensusGroup-{}: try to remove event-{} after
pipeConsensusAsyncConnector being closed. Ignore it.",
+ consensusGroupId,
+ event);
+ return;
+ }
Iterator<EnrichedEvent> iterator = transferBuffer.iterator();
EnrichedEvent current = iterator.next();
while (!current.equalsInPipeConsensus(event) && iterator.hasNext()) {
@@ -458,7 +470,7 @@ public class PipeConsensusAsyncConnector extends
IoTDBConnector implements Conse
polledEvent);
}
}
- if (polledEvent != null && LOGGER.isDebugEnabled()) {
+ if (polledEvent != null) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Polled event {} from retry queue.", polledEvent);
}