This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new ed75066c119 PipeConsensus: Avoid pipe task being restarted frequently 
by Pipe framework (#12931)
ed75066c119 is described below

commit ed75066c119926ea9cee0e126328af00fea8808a
Author: Peng Junzhi <[email protected]>
AuthorDate: Mon Jul 15 07:39:33 2024 -0500

    PipeConsensus: Avoid pipe task being restarted frequently by Pipe framework 
(#12931)
---
 .../protocol/pipeconsensus/PipeConsensusAsyncConnector.java       | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
index 6088ed8498b..a61156a98ed 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
@@ -27,6 +27,7 @@ import 
org.apache.iotdb.commons.client.async.AsyncPipeConsensusServiceClient;
 import 
org.apache.iotdb.commons.client.container.PipeConsensusClientMgrContainer;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorRetryTimesConfigurableException;
 import org.apache.iotdb.commons.pipe.connector.protocol.IoTDBConnector;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.progress.PipeEventCommitManager;
@@ -56,7 +57,6 @@ import 
org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
-import org.apache.iotdb.pipe.api.exception.PipeException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -233,7 +233,8 @@ public class PipeConsensusAsyncConnector extends 
IoTDBConnector implements Conse
 
     boolean enqueueResult = addEvent2Buffer((EnrichedEvent) 
tabletInsertionEvent);
     if (!enqueueResult) {
-      throw new PipeException(ENQUEUE_EXCEPTION_MSG);
+      throw new PipeRuntimeConnectorRetryTimesConfigurableException(
+          ENQUEUE_EXCEPTION_MSG, Integer.MAX_VALUE);
     }
     // batch transfer tablets.
     if (isTabletBatchModeEnabled) {
@@ -328,7 +329,8 @@ public class PipeConsensusAsyncConnector extends 
IoTDBConnector implements Conse
 
     boolean enqueueResult = addEvent2Buffer((EnrichedEvent) 
tsFileInsertionEvent);
     if (!enqueueResult) {
-      throw new PipeException(ENQUEUE_EXCEPTION_MSG);
+      throw new PipeRuntimeConnectorRetryTimesConfigurableException(
+          ENQUEUE_EXCEPTION_MSG, Integer.MAX_VALUE);
     }
     final PipeTsFileInsertionEvent pipeTsFileInsertionEvent =
         (PipeTsFileInsertionEvent) tsFileInsertionEvent;

Reply via email to