This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
     new 044d712ef06 PipeConsensus: Avoid replicate block && Avoid transfer 
error when connector is closed. (#13146) (#13169)
044d712ef06 is described below

commit 044d712ef0686277e2d7a7fa7af3140a5abb768c
Author: Peng Junzhi <[email protected]>
AuthorDate: Wed Aug 14 18:24:19 2024 +0800

    PipeConsensus: Avoid replicate block && Avoid transfer error when connector 
is closed. (#13146) (#13169)
---
 .../pipeconsensus/PipeConsensusAsyncConnector.java         | 14 +++++++++++++-
 1 file changed, 13 insertions(+), 1 deletion(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
index 7520f217df3..85ba921e24d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
@@ -167,6 +167,11 @@ public class PipeConsensusAsyncConnector extends 
IoTDBConnector implements Conse
             event.getCommitId(),
             event);
       }
+      // Special judge to avoid transfer stuck when re-transfer events that 
will not be put in
+      // retryQueue.
+      if (transferBuffer.contains(event)) {
+        return true;
+      }
       long currentTime = System.nanoTime();
       boolean result =
           transferBuffer.offer(
@@ -203,6 +208,13 @@ public class PipeConsensusAsyncConnector extends 
IoTDBConnector implements Conse
           transferBuffer.size(),
           IOTDB_CONFIG.getPipeConsensusPipelineSize());
     }
+    if (transferBuffer.isEmpty()) {
+      LOGGER.info(
+          "PipeConsensus-ConsensusGroup-{}: try to remove event-{} after 
pipeConsensusAsyncConnector being closed. Ignore it.",
+          consensusGroupId,
+          event);
+      return;
+    }
     Iterator<EnrichedEvent> iterator = transferBuffer.iterator();
     EnrichedEvent current = iterator.next();
     while (!current.equalsInPipeConsensus(event) && iterator.hasNext()) {
@@ -458,7 +470,7 @@ public class PipeConsensusAsyncConnector extends 
IoTDBConnector implements Conse
                 polledEvent);
           }
         }
-        if (polledEvent != null && LOGGER.isDebugEnabled()) {
+        if (polledEvent != null) {
           if (LOGGER.isDebugEnabled()) {
             LOGGER.debug("Polled event {} from retry queue.", polledEvent);
           }

Reply via email to