This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 2471938c505 Pipe: Fixed the semantic of reporting interval && Trimmed
the "toString" of InsertMultiTabletsStatement (#17044)
2471938c505 is described below
commit 2471938c5056a2ef2ba1743679935a4520134050
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jan 20 16:35:00 2026 +0800
Pipe: Fixed the semantic of reporting interval && Trimmed the "toString" of
InsertMultiTabletsStatement (#17044)
* log
* bz
* partial
* ex-t
* fix
* del
* refactor
* suppress
* sonar
---
.../pipe/agent/task/PipeConfigNodeSubtask.java | 27 +----------
.../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 6 ++-
.../subtask/processor/PipeProcessorSubtask.java | 8 ++--
.../agent/task/subtask/sink/PipeSinkSubtask.java | 40 +++--------------
.../protocol/thrift/IoTDBDataNodeReceiver.java | 2 +-
.../sink/protocol/writeback/WriteBackSink.java | 2 +-
...istoricalDataRegionTsFileAndDeletionSource.java | 3 +-
.../queryengine/execution/QueryStateMachine.java | 2 +-
.../execution/executor/RegionReadExecutor.java | 4 +-
.../fragment/FragmentInstanceContext.java | 2 +-
.../execution/schedule/AbstractDriverThread.java | 4 +-
.../queryengine/plan/execution/QueryExecution.java | 2 +-
.../crud/InsertMultiTabletsStatement.java | 6 ++-
.../apache/iotdb/db/utils/ErrorHandlingUtils.java | 14 ++----
.../iotdb/commons/auth/entity/TablePrivilege.java | 3 ++
...imeSinkNonReportTimeConfigurableException.java} | 40 +++++++++++++----
...RuntimeSinkRetryTimesConfigurableException.java | 27 ++++++++++-
.../task/subtask/PipeAbstractSinkSubtask.java | 52 ++++++++++++++++++++++
.../pipe/config/constant/SystemConstant.java | 2 +-
.../pipe/receiver/PipeReceiverStatusHandler.java | 36 +++++----------
.../ErrorHandlingCommonUtils.java} | 16 ++++---
21 files changed, 172 insertions(+), 126 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java
index e922c23321a..a79da86968f 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.confignode.manager.pipe.agent.task;
-import org.apache.iotdb.commons.exception.pipe.PipeNonReportException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
@@ -41,7 +40,6 @@ import org.apache.iotdb.pipe.api.PipeProcessor;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
-import org.apache.iotdb.pipe.api.exception.PipeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -191,30 +189,8 @@ public class PipeConfigNodeSubtask extends
PipeAbstractSinkSubtask {
}
decreaseReferenceCountAndReleaseLastEvent(event, true);
sleepInterval =
PipeConfig.getInstance().getPipeSinkSubtaskSleepIntervalInitMs();
- } catch (final PipeNonReportException e) {
- sleep4NonReportException();
- } catch (final PipeException e) {
- setLastExceptionEvent(event);
- if (!isClosed.get()) {
- throw e;
- } else {
- LOGGER.info(
- "{} in pipe transfer, ignored because pipe is dropped.",
- e.getClass().getSimpleName(),
- e);
- clearReferenceCountAndReleaseLastEvent(event);
- }
} catch (final Exception e) {
- setLastExceptionEvent(event);
- if (!isClosed.get()) {
- throw new PipeException(
- String.format(
- "Exception in pipe transfer, subtask: %s, last event: %s",
taskID, lastEvent),
- e);
- } else {
- LOGGER.info("Exception in pipe transfer, ignored because pipe is
dropped.", e);
- clearReferenceCountAndReleaseLastEvent(event);
- }
+ handleException(event, e);
}
return true;
@@ -259,6 +235,7 @@ public class PipeConfigNodeSubtask extends
PipeAbstractSinkSubtask {
@Override
protected void report(final EnrichedEvent event, final PipeRuntimeException
exception) {
+ lastExceptionTime = Long.MAX_VALUE;
PipeConfigNodeAgent.runtime().report(event, exception);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 438dfa3f233..44f02bbd4bb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -778,7 +778,8 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
// If the source is not history, we do not need to allocate memory
boolean isExtractorHistory =
sourceParameters.getBooleanOrDefault(
- SystemConstant.RESTART_OR_NEWLY_ADDED_KEY,
SystemConstant.RESTART_DEFAULT_VALUE)
+ SystemConstant.RESTART_OR_NEWLY_ADDED_KEY,
+ SystemConstant.RESTART_OR_NEWLY_ADDED_DEFAULT_VALUE)
|| sourceParameters.getBooleanOrDefault(
Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY,
SOURCE_HISTORY_ENABLE_KEY),
EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE);
@@ -862,7 +863,8 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
// If the source is history enable, we need to transfer tsfile
boolean needTransferTsFile =
sourceParameters.getBooleanOrDefault(
- SystemConstant.RESTART_OR_NEWLY_ADDED_KEY,
SystemConstant.RESTART_DEFAULT_VALUE)
+ SystemConstant.RESTART_OR_NEWLY_ADDED_KEY,
+ SystemConstant.RESTART_OR_NEWLY_ADDED_DEFAULT_VALUE)
|| sourceParameters.getBooleanOrDefault(
Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY,
SOURCE_HISTORY_ENABLE_KEY),
EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
index b481117c5e0..6f4dba7a68a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
@@ -29,6 +29,7 @@ import
org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
import org.apache.iotdb.commons.pipe.agent.task.subtask.PipeReportableSubtask;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
+import org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.pipe.agent.task.connection.PipeEventCollector;
import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
@@ -39,7 +40,6 @@ import
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
import org.apache.iotdb.db.pipe.metric.processor.PipeProcessorMetrics;
import org.apache.iotdb.db.pipe.processor.pipeconsensus.PipeConsensusProcessor;
import org.apache.iotdb.db.storageengine.StorageEngine;
-import org.apache.iotdb.db.utils.ErrorHandlingUtils;
import org.apache.iotdb.pipe.api.PipeProcessor;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
@@ -256,7 +256,7 @@ public class PipeProcessorSubtask extends
PipeReportableSubtask {
lastEvent instanceof EnrichedEvent
? ((EnrichedEvent) lastEvent).coreReportMessage()
: lastEvent,
- ErrorHandlingUtils.getRootCause(e).getMessage()),
+ ErrorHandlingCommonUtils.getRootCause(e).getMessage()),
e);
} else {
LOGGER.info(
@@ -293,7 +293,7 @@ public class PipeProcessorSubtask extends
PipeReportableSubtask {
LOGGER.info(
"Exception occurred when closing pipe processor subtask {}, root
cause: {}",
taskID,
- ErrorHandlingUtils.getRootCause(e).getMessage(),
+ ErrorHandlingCommonUtils.getRootCause(e).getMessage(),
e);
} finally {
// should be called after pipeProcessor.close()
@@ -337,7 +337,7 @@ public class PipeProcessorSubtask extends
PipeReportableSubtask {
@Override
protected String getRootCause(final Throwable throwable) {
- return ErrorHandlingUtils.getRootCause(throwable).getMessage();
+ return ErrorHandlingCommonUtils.getRootCause(throwable).getMessage();
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
index dba2269b281..96c519ce636 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
@@ -19,13 +19,13 @@
package org.apache.iotdb.db.pipe.agent.task.subtask.sink;
-import org.apache.iotdb.commons.exception.pipe.PipeNonReportException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
import
org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
import
org.apache.iotdb.commons.pipe.agent.task.subtask.PipeAbstractSinkSubtask;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.sink.protocol.IoTDBSink;
+import org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
@@ -35,14 +35,12 @@ import
org.apache.iotdb.db.pipe.metric.sink.PipeDataRegionSinkMetrics;
import
org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink;
import
org.apache.iotdb.db.pipe.sink.protocol.thrift.sync.IoTDBDataRegionSyncSink;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
-import org.apache.iotdb.db.utils.ErrorHandlingUtils;
import org.apache.iotdb.metrics.type.Histogram;
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
-import org.apache.iotdb.pipe.api.exception.PipeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -134,37 +132,8 @@ public class PipeSinkSubtask extends
PipeAbstractSinkSubtask {
decreaseReferenceCountAndReleaseLastEvent(event, true);
sleepInterval =
PipeConfig.getInstance().getPipeSinkSubtaskSleepIntervalInitMs();
- } catch (final PipeNonReportException e) {
- sleep4NonReportException();
- } catch (final PipeException e) {
- if (!isClosed.get()) {
- setLastExceptionEvent(event);
- throw e;
- } else {
- LOGGER.info(
- "{} in pipe transfer, ignored because the connector subtask is
dropped.{}",
- e.getClass().getSimpleName(),
- e.getMessage() != null ? " Message: " + e.getMessage() : "");
- clearReferenceCountAndReleaseLastEvent(event);
- }
} catch (final Exception e) {
- if (!isClosed.get()) {
- setLastExceptionEvent(event);
- throw new PipeException(
- String.format(
- "Exception in pipe transfer, subtask: %s, last event: %s, root
cause: %s",
- taskID,
- event instanceof EnrichedEvent
- ? ((EnrichedEvent) event).coreReportMessage()
- : event,
- ErrorHandlingUtils.getRootCause(e).getMessage()),
- e);
- } else {
- LOGGER.info(
- "Exception in pipe transfer, ignored because the sink subtask is
dropped.{}",
- e.getMessage() != null ? " Message: " + e.getMessage() : "");
- clearReferenceCountAndReleaseLastEvent(event);
- }
+ handleException(event, e);
}
return true;
@@ -216,7 +185,7 @@ public class PipeSinkSubtask extends
PipeAbstractSinkSubtask {
LOGGER.info(
"Exception occurred when closing pipe connector subtask {}, root
cause: {}",
taskID,
- ErrorHandlingUtils.getRootCause(e).getMessage(),
+ ErrorHandlingCommonUtils.getRootCause(e).getMessage(),
e);
} finally {
inputPendingQueue.discardAllEvents();
@@ -377,11 +346,12 @@ public class PipeSinkSubtask extends
PipeAbstractSinkSubtask {
@Override
protected String getRootCause(final Throwable throwable) {
- return ErrorHandlingUtils.getRootCause(throwable).getMessage();
+ return ErrorHandlingCommonUtils.getRootCause(throwable).getMessage();
}
@Override
protected void report(final EnrichedEvent event, final PipeRuntimeException
exception) {
+ lastExceptionTime = Long.MAX_VALUE;
PipeDataNodeAgent.runtime().report(event, exception);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index b0122323799..f0796a3b1a0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -136,8 +136,8 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static
org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils.getRootCause;
import static
org.apache.iotdb.db.exception.metadata.DatabaseNotSetException.DATABASE_NOT_SET;
-import static org.apache.iotdb.db.utils.ErrorHandlingUtils.getRootCause;
public class IoTDBDataNodeReceiver extends IoTDBFileReceiver {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
index 767d84a7da6..c47e00ff92e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
@@ -97,8 +97,8 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SIN
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_SKIP_IF_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_USE_EVENT_USER_NAME_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.WRITE_BACK_CONNECTOR_SKIP_IF_DEFAULT_VALUE;
+import static
org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils.getRootCause;
import static
org.apache.iotdb.db.exception.metadata.DatabaseNotSetException.DATABASE_NOT_SET;
-import static org.apache.iotdb.db.utils.ErrorHandlingUtils.getRootCause;
@TreeModel
@TableModel
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
index e71e4ed3819..ab74d094740 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
@@ -257,7 +257,8 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource
// enabling the historical data extraction, which may affect the realtime
data extraction.
isHistoricalSourceEnabled =
parameters.getBooleanOrDefault(
- SystemConstant.RESTART_OR_NEWLY_ADDED_KEY,
SystemConstant.RESTART_DEFAULT_VALUE)
+ SystemConstant.RESTART_OR_NEWLY_ADDED_KEY,
+ SystemConstant.RESTART_OR_NEWLY_ADDED_DEFAULT_VALUE)
|| parameters.getBooleanOrDefault(
Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY,
SOURCE_HISTORY_ENABLE_KEY),
EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java
index c7c16f20baf..146359bc215 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java
@@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicReference;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
+import static
org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils.getRootCause;
import static org.apache.iotdb.db.queryengine.execution.QueryState.ABORTED;
import static org.apache.iotdb.db.queryengine.execution.QueryState.CANCELED;
import static org.apache.iotdb.db.queryengine.execution.QueryState.DISPATCHING;
@@ -42,7 +43,6 @@ import static
org.apache.iotdb.db.queryengine.execution.QueryState.PENDING_RETRY
import static org.apache.iotdb.db.queryengine.execution.QueryState.PLANNED;
import static org.apache.iotdb.db.queryengine.execution.QueryState.QUEUED;
import static org.apache.iotdb.db.queryengine.execution.QueryState.RUNNING;
-import static org.apache.iotdb.db.utils.ErrorHandlingUtils.getRootCause;
/**
* State machine for a {@link QueryExecution}. It stores the states for the
{@link QueryExecution}.
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java
index 2e63db2e2e7..69eb5e4af2c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.queryengine.execution.executor;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils;
import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.consensus.IConsensus;
@@ -33,7 +34,6 @@ import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceInfo;
import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceManager;
import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.storageengine.dataregion.VirtualDataRegion;
-import org.apache.iotdb.db.utils.ErrorHandlingUtils;
import org.apache.iotdb.db.utils.SetThreadName;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -116,7 +116,7 @@ public class RegionReadExecutor {
RegionExecutionResult resp =
RegionExecutionResult.create(
false, String.format(ERROR_MSG_FORMAT, e.getMessage()), null);
- Throwable t = ErrorHandlingUtils.getRootCause(e);
+ Throwable t = ErrorHandlingCommonUtils.getRootCause(e);
if (t instanceof ReadException
|| t instanceof ReadIndexException
|| t instanceof NotLeaderException
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
index 681e77518b7..1a1b426006a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
@@ -80,11 +80,11 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
+import static
org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils.getRootCause;
import static
org.apache.iotdb.db.queryengine.metric.DriverSchedulerMetricSet.BLOCK_QUEUED_TIME;
import static
org.apache.iotdb.db.queryengine.metric.DriverSchedulerMetricSet.READY_QUEUED_TIME;
import static
org.apache.iotdb.db.storageengine.dataregion.VirtualDataRegion.EMPTY_QUERY_DATA_SOURCE;
import static
org.apache.iotdb.db.storageengine.dataregion.VirtualDataRegion.UNFINISHED_QUERY_DATA_SOURCE;
-import static org.apache.iotdb.db.utils.ErrorHandlingUtils.getRootCause;
import static org.apache.iotdb.rpc.TSStatusCode.DATE_OUT_OF_RANGE;
public class FragmentInstanceContext extends QueryContext {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/AbstractDriverThread.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/AbstractDriverThread.java
index dc8511582b6..96e6b4e6328 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/AbstractDriverThread.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/AbstractDriverThread.java
@@ -21,9 +21,9 @@ package org.apache.iotdb.db.queryengine.execution.schedule;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
+import org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils;
import
org.apache.iotdb.db.queryengine.execution.schedule.queue.IndexedBlockingQueue;
import org.apache.iotdb.db.queryengine.execution.schedule.task.DriverTask;
-import org.apache.iotdb.db.utils.ErrorHandlingUtils;
import org.apache.iotdb.db.utils.SetThreadName;
import org.slf4j.Logger;
@@ -82,7 +82,7 @@ public abstract class AbstractDriverThread extends Thread
implements Closeable {
// reset the thread name here
try (SetThreadName driverTaskName =
new
SetThreadName(next.getDriver().getDriverTaskId().getFullId())) {
- Throwable rootCause = ErrorHandlingUtils.getRootCause(e);
+ Throwable rootCause = ErrorHandlingCommonUtils.getRootCause(e);
if (rootCause instanceof IoTDBRuntimeException) {
next.setAbortCause(rootCause);
} else if (rootCause instanceof IoTDBException) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
index 4734db5850a..581f4ee6c56 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
@@ -71,9 +71,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Throwables.throwIfUnchecked;
+import static
org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils.getRootCause;
import static
org.apache.iotdb.db.queryengine.common.DataNodeEndPoints.isSameNode;
import static
org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet.WAIT_FOR_RESULT;
-import static org.apache.iotdb.db.utils.ErrorHandlingUtils.getRootCause;
import static org.apache.iotdb.rpc.TSStatusCode.DATE_OUT_OF_RANGE;
/**
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertMultiTabletsStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertMultiTabletsStatement.java
index 68ad6995384..efccf2719ff 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertMultiTabletsStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertMultiTabletsStatement.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.queryengine.plan.statement.crud;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
@@ -209,9 +210,12 @@ public class InsertMultiTabletsStatement extends
InsertBaseStatement {
@Override
public String toString() {
+ final int size =
CommonDescriptor.getInstance().getConfig().getPathLogMaxSize();
return "InsertMultiTabletsStatement{"
+ "insertTabletStatementList="
- + insertTabletStatementList
+ + (Objects.nonNull(insertTabletStatementList) &&
insertTabletStatementList.size() > size
+ ? "(Partial) " + insertTabletStatementList.subList(0, size)
+ : insertTabletStatementList)
+ '}';
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
index 46544fc9834..ded717f5a49 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
import org.apache.iotdb.commons.exception.QuerySchemaFetchFailedException;
+import org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils;
import org.apache.iotdb.db.exception.BatchProcessException;
import org.apache.iotdb.db.exception.QueryInBatchStatementException;
import org.apache.iotdb.db.exception.StorageGroupNotReadyException;
@@ -70,7 +71,7 @@ public class ErrorHandlingUtils {
LOGGER.warn(ERROR_OPERATION_LOG, statusCode, operation, e);
}
if (e instanceof SemanticException) {
- Throwable rootCause = getRootCause(e);
+ Throwable rootCause = ErrorHandlingCommonUtils.getRootCause(e);
if (e.getCause() instanceof IoTDBException) {
return RpcUtils.getStatus(
((IoTDBException) e.getCause()).getErrorCode(),
rootCause.getMessage());
@@ -87,13 +88,6 @@ public class ErrorHandlingUtils {
return onNpeOrUnexpectedException(e, operation.getName(), statusCode);
}
- public static Throwable getRootCause(Throwable e) {
- while (e.getCause() != null) {
- e = e.getCause();
- }
- return e;
- }
-
public static TSStatus onQueryException(Exception e, String operation,
TSStatusCode statusCode) {
TSStatus status = tryCatchQueryException(e);
if (status != null) {
@@ -143,7 +137,7 @@ public class ErrorHandlingUtils {
}
private static TSStatus tryCatchQueryException(Exception e) {
- Throwable rootCause = getRootCause(e);
+ Throwable rootCause = ErrorHandlingCommonUtils.getRootCause(e);
// ignore logging sg not ready exception
if (rootCause instanceof StorageGroupNotReadyException) {
return RpcUtils.getStatus(TSStatusCode.STORAGE_ENGINE_NOT_READY,
rootCause.getMessage());
@@ -220,7 +214,7 @@ public class ErrorHandlingUtils {
LOGGER.warn(message, e);
return
RpcUtils.getStatus(Arrays.asList(batchException.getFailingStatus()));
} else if (e instanceof IoTDBException) {
- Throwable rootCause = getRootCause(e);
+ Throwable rootCause = ErrorHandlingCommonUtils.getRootCause(e);
// ignore logging sg not ready exception
if (!(rootCause instanceof StorageGroupNotReadyException)) {
LOGGER.warn(message, e);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/auth/entity/TablePrivilege.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/auth/entity/TablePrivilege.java
index 606b2fbbe2e..5f3488a7596 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/auth/entity/TablePrivilege.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/auth/entity/TablePrivilege.java
@@ -117,6 +117,7 @@ public class TablePrivilege {
return privilege;
}
+ @Override
public boolean equals(Object o) {
if (this == o) {
return true;
@@ -130,10 +131,12 @@ public class TablePrivilege {
&& grantOption.equals(that.grantOption);
}
+ @Override
public int hashCode() {
return Objects.hash(tableName, privileges, grantOption);
}
+ @Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append(this.tableName).append("(");
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeSinkRetryTimesConfigurableException.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeSinkNonReportTimeConfigurableException.java
similarity index 53%
copy from
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeSinkRetryTimesConfigurableException.java
copy to
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeSinkNonReportTimeConfigurableException.java
index 62b54334a93..eafa4f45a7f 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeSinkRetryTimesConfigurableException.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeSinkNonReportTimeConfigurableException.java
@@ -19,24 +19,48 @@
package org.apache.iotdb.commons.exception.pipe;
-public class PipeRuntimeSinkRetryTimesConfigurableException
+import java.util.Objects;
+
+public class PipeRuntimeSinkNonReportTimeConfigurableException
extends PipeRuntimeSinkCriticalException {
- private final int retryTimes;
+ private final long interval;
- public PipeRuntimeSinkRetryTimesConfigurableException(
- final String message, final int retryTimes) {
+ public PipeRuntimeSinkNonReportTimeConfigurableException(
+ final String message, final long interval) {
super(message);
- this.retryTimes = retryTimes;
+ this.interval = interval;
}
- public int getRetryTimes() {
- return retryTimes;
+ public long getInterval() {
+ return interval;
}
// We do not record the timestamp here for logger reduction detection
@Override
public String toString() {
- return "PipeRuntimeSinkRetryTimesConfigurableException{" + "message='" +
getMessage() + "}";
+ return "PipeRuntimeSinkNonReportTimeConfigurableException{"
+ + "message='"
+ + "', interval='"
+ + interval
+ + "'}";
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final PipeRuntimeSinkNonReportTimeConfigurableException that =
+ (PipeRuntimeSinkNonReportTimeConfigurableException) o;
+ return super.equals(that) && interval == that.interval;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), interval);
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeSinkRetryTimesConfigurableException.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeSinkRetryTimesConfigurableException.java
index 62b54334a93..aa64e533528 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeSinkRetryTimesConfigurableException.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeSinkRetryTimesConfigurableException.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.commons.exception.pipe;
+import java.util.Objects;
+
public class PipeRuntimeSinkRetryTimesConfigurableException
extends PipeRuntimeSinkCriticalException {
@@ -37,6 +39,29 @@ public class PipeRuntimeSinkRetryTimesConfigurableException
// We do not record the timestamp here for logger reduction detection
@Override
public String toString() {
- return "PipeRuntimeSinkRetryTimesConfigurableException{" + "message='" +
getMessage() + "}";
+ return "PipeRuntimeSinkRetryTimesConfigurableException{"
+ + "message='"
+ + getMessage()
+ + "', retryTimes='"
+ + retryTimes
+ + "'}";
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final PipeRuntimeSinkRetryTimesConfigurableException that =
+ (PipeRuntimeSinkRetryTimesConfigurableException) o;
+ return super.equals(that) && retryTimes == that.retryTimes;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), retryTimes);
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
index 09083d45bca..af75e6424f7 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
@@ -20,12 +20,15 @@
package org.apache.iotdb.commons.pipe.agent.task.subtask;
import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkCriticalException;
+import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkNonReportTimeConfigurableException;
import org.apache.iotdb.commons.pipe.agent.task.execution.PipeSubtaskScheduler;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils;
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
+import org.apache.iotdb.pipe.api.exception.PipeException;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@@ -54,6 +57,7 @@ public abstract class PipeAbstractSinkSubtask extends
PipeReportableSubtask {
protected volatile Event lastExceptionEvent;
protected long sleepInterval =
PipeConfig.getInstance().getPipeSinkSubtaskSleepIntervalInitMs();
+ protected long lastExceptionTime = Long.MAX_VALUE;
protected PipeAbstractSinkSubtask(
final String taskID, final long creationTime, final PipeConnector
outputPipeSink) {
@@ -261,4 +265,52 @@ public abstract class PipeAbstractSinkSubtask extends
PipeReportableSubtask {
Thread.currentThread().interrupt();
}
}
+
+ @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity
warning
+ protected void handleException(final Event event, final Exception e) {
+ if (e instanceof PipeRuntimeSinkNonReportTimeConfigurableException) {
+ if (lastExceptionTime == Long.MAX_VALUE) {
+ lastExceptionTime = System.currentTimeMillis();
+ }
+ if (System.currentTimeMillis() - lastExceptionTime
+ < ((PipeRuntimeSinkNonReportTimeConfigurableException)
e).getInterval()) {
+ sleep4NonReportException();
+ return;
+ }
+ handlePipeException(event, (PipeException) e);
+ } else if (e instanceof PipeException) {
+ handlePipeException(event, (PipeException) e);
+ } else {
+ if (!isClosed.get()) {
+ setLastExceptionEvent(event);
+ throw new PipeException(
+ String.format(
+ "Exception in pipe transfer, subtask: %s, last event: %s, root
cause: %s",
+ taskID,
+ event instanceof EnrichedEvent
+ ? ((EnrichedEvent) event).coreReportMessage()
+ : event,
+ ErrorHandlingCommonUtils.getRootCause(e).getMessage()),
+ e);
+ } else {
+ LOGGER.info(
+ "Exception in pipe transfer, ignored because the sink subtask is
dropped.{}",
+ e.getMessage() != null ? " Message: " + e.getMessage() : "");
+ clearReferenceCountAndReleaseLastEvent(event);
+ }
+ }
+ }
+
+ protected void handlePipeException(final Event event, final PipeException e)
{
+ if (!isClosed.get()) {
+ setLastExceptionEvent(event);
+ throw e;
+ } else {
+ LOGGER.info(
+ "{} in pipe transfer, ignored because the connector subtask is
dropped.{}",
+ e.getClass().getSimpleName(),
+ e.getMessage() != null ? " Message: " + e.getMessage() : "");
+ clearReferenceCountAndReleaseLastEvent(event);
+ }
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/SystemConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/SystemConstant.java
index de0d6050167..30475bf9688 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/SystemConstant.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/SystemConstant.java
@@ -36,7 +36,7 @@ public class SystemConstant {
// This can be arbitrarily changed since it's only a memory key and not
stored
public static final String RESTART_OR_NEWLY_ADDED_KEY =
"__system.restart_or_newly_added";
- public static final boolean RESTART_DEFAULT_VALUE = false;
+ public static final boolean RESTART_OR_NEWLY_ADDED_DEFAULT_VALUE = false;
public static final String SQL_DIALECT_KEY = "__system.sql-dialect";
public static final String SQL_DIALECT_TREE_VALUE = "tree";
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
index fb997d0b14a..9343e6fbf1d 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
@@ -21,15 +21,12 @@ package org.apache.iotdb.commons.pipe.receiver;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import
org.apache.iotdb.commons.exception.pipe.PipeConsensusRetryWithIncreasingIntervalException;
-import org.apache.iotdb.commons.exception.pipe.PipeNonReportException;
-import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkRetryTimesConfigurableException;
-import org.apache.iotdb.commons.pipe.agent.task.subtask.PipeSubtask;
+import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkNonReportTimeConfigurableException;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import org.apache.iotdb.commons.utils.RetryUtils;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.pipe.api.event.Event;
-import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
@@ -52,8 +49,6 @@ public class PipeReceiverStatusHandler {
private static final String UNCLASSIFIED_EXCEPTION = "Unclassified
exception";
private static final String NO_PERMISSION_STR = "No permissions for this
operation";
- private static final int CONFLICT_RETRY_MAX_TIMES = 100;
-
private final boolean isRetryAllowedWhenConflictOccurs;
private final long retryMaxMillisWhenConflictOccurs;
private final boolean shouldRecordIgnoredDataWhenConflictOccurs;
@@ -99,7 +94,7 @@ public class PipeReceiverStatusHandler {
* exception if retry the {@link Event}. Upper class must ensure that the
method is invoked only
* by a single thread.
*
- * @throws PipeException to retry the current {@link Event}
+ * @throws PipeRuntimeSinkNonReportTimeConfigurableException to retry the
current {@link Event}
* @param status the {@link TSStatus} to judge
* @param exceptionMessage The exception message to throw
* @param recordMessage The message to record an ignored {@link Event}, the
caller should assure
@@ -142,7 +137,8 @@ public class PipeReceiverStatusHandler {
LOGGER::info,
"Temporary unavailable exception: will retry forever. status:
%s",
status);
- throw new PipeNonReportException(exceptionMessage);
+ throw new PipeRuntimeSinkNonReportTimeConfigurableException(
+ exceptionMessage, Long.MAX_VALUE);
}
case 1810: // PIPE_RECEIVER_USER_CONFLICT_EXCEPTION
@@ -181,16 +177,12 @@ public class PipeReceiverStatusHandler {
+ " seconds",
status);
exceptionEventHasBeenRetried.set(true);
- throw status.getCode() == 1815
- &&
PipeConfig.getInstance().isPipeRetryLocallyForParallelOrUserConflict()
- ? new PipeNonReportException(exceptionMessage)
- : new PipeRuntimeSinkRetryTimesConfigurableException(
- exceptionMessage,
- (int)
- Math.max(
- PipeSubtask.MAX_RETRY_TIMES,
- Math.min(
- CONFLICT_RETRY_MAX_TIMES,
retryMaxMillisWhenConflictOccurs * 1.1)));
+ throw new PipeRuntimeSinkNonReportTimeConfigurableException(
+ exceptionMessage,
+ status.getCode() == 1815
+ &&
PipeConfig.getInstance().isPipeRetryLocallyForParallelOrUserConflict()
+ ? Long.MAX_VALUE
+ : retryMaxMillisWhenConflictOccurs);
}
case 803: // NO_PERMISSION
@@ -266,12 +258,8 @@ public class PipeReceiverStatusHandler {
}
exceptionEventHasBeenRetried.set(true);
- throw new PipeRuntimeSinkRetryTimesConfigurableException(
- exceptionMessage,
- (int)
- Math.max(
- PipeSubtask.MAX_RETRY_TIMES,
- Math.min(CONFLICT_RETRY_MAX_TIMES,
retryMaxMillisWhenOtherExceptionsOccur * 1.1)));
+ throw new PipeRuntimeSinkNonReportTimeConfigurableException(
+ exceptionMessage, retryMaxMillisWhenOtherExceptionsOccur);
}
private static String getNoPermission(final boolean noPermission) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeNonReportException.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/ErrorHandlingCommonUtils.java
similarity index 70%
rename from
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeNonReportException.java
rename to
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/ErrorHandlingCommonUtils.java
index 572a7764518..01b8b64442b 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeNonReportException.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/ErrorHandlingCommonUtils.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -17,11 +17,17 @@
* under the License.
*/
-package org.apache.iotdb.commons.exception.pipe;
+package org.apache.iotdb.commons.utils;
-public class PipeNonReportException extends PipeRuntimeNonCriticalException {
+public class ErrorHandlingCommonUtils {
+ public static Throwable getRootCause(Throwable e) {
+ while (e.getCause() != null) {
+ e = e.getCause();
+ }
+ return e;
+ }
- public PipeNonReportException(final String message) {
- super(message);
+ private ErrorHandlingCommonUtils() {
+ // Utility class
}
}