This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new ed75066c119 PipeConsensus: Avoid pipe task being restarted frequently
by Pipe framework (#12931)
ed75066c119 is described below
commit ed75066c119926ea9cee0e126328af00fea8808a
Author: Peng Junzhi <[email protected]>
AuthorDate: Mon Jul 15 07:39:33 2024 -0500
PipeConsensus: Avoid pipe task being restarted frequently by Pipe framework
(#12931)
---
.../protocol/pipeconsensus/PipeConsensusAsyncConnector.java | 8 +++++---
1 file changed, 5 insertions(+), 3 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
index 6088ed8498b..a61156a98ed 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
@@ -27,6 +27,7 @@ import
org.apache.iotdb.commons.client.async.AsyncPipeConsensusServiceClient;
import
org.apache.iotdb.commons.client.container.PipeConsensusClientMgrContainer;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorRetryTimesConfigurableException;
import org.apache.iotdb.commons.pipe.connector.protocol.IoTDBConnector;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.progress.PipeEventCommitManager;
@@ -56,7 +57,6 @@ import
org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
-import org.apache.iotdb.pipe.api.exception.PipeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -233,7 +233,8 @@ public class PipeConsensusAsyncConnector extends
IoTDBConnector implements Conse
boolean enqueueResult = addEvent2Buffer((EnrichedEvent)
tabletInsertionEvent);
if (!enqueueResult) {
- throw new PipeException(ENQUEUE_EXCEPTION_MSG);
+ throw new PipeRuntimeConnectorRetryTimesConfigurableException(
+ ENQUEUE_EXCEPTION_MSG, Integer.MAX_VALUE);
}
// batch transfer tablets.
if (isTabletBatchModeEnabled) {
@@ -328,7 +329,8 @@ public class PipeConsensusAsyncConnector extends
IoTDBConnector implements Conse
boolean enqueueResult = addEvent2Buffer((EnrichedEvent)
tsFileInsertionEvent);
if (!enqueueResult) {
- throw new PipeException(ENQUEUE_EXCEPTION_MSG);
+ throw new PipeRuntimeConnectorRetryTimesConfigurableException(
+ ENQUEUE_EXCEPTION_MSG, Integer.MAX_VALUE);
}
final PipeTsFileInsertionEvent pipeTsFileInsertionEvent =
(PipeTsFileInsertionEvent) tsFileInsertionEvent;