This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch fix-leak-1.3.3 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d8cc34270ab87233dd6e258700ce9a12e318f346 Author: Steve Yurong Su <[email protected]> AuthorDate: Fri Aug 30 00:55:16 2024 +0800 Pipe: Fix reference count leak when tasks restart (#13250) (cherry picked from commit b7369ee2e17fd90e9b03457506461299be4d64ab) --- .../protocol/IoTDBConfigRegionAirGapConnector.java | 20 ++-- .../protocol/IoTDBConfigRegionConnector.java | 20 ++-- .../pipe/execution/PipeConfigNodeSubtask.java | 14 ++- .../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 97 ++++++---------- .../evolvable/batch/PipeTabletEventBatch.java | 7 +- .../airgap/IoTDBDataNodeAirGapConnector.java | 10 +- .../airgap/IoTDBDataRegionAirGapConnector.java | 30 ++--- .../airgap/IoTDBSchemaRegionAirGapConnector.java | 10 +- .../protocol/legacy/IoTDBLegacyPipeConnector.java | 30 ++--- .../connector/protocol/opcua/OpcUaConnector.java | 18 +-- .../pipeconsensus/PipeConsensusAsyncConnector.java | 4 - .../pipeconsensus/PipeConsensusSyncConnector.java | 10 +- .../async/IoTDBDataRegionAsyncConnector.java | 6 - .../thrift/sync/IoTDBDataNodeSyncConnector.java | 10 +- .../thrift/sync/IoTDBDataRegionSyncConnector.java | 30 ++--- .../thrift/sync/IoTDBSchemaRegionConnector.java | 10 +- .../protocol/websocket/WebSocketConnector.java | 9 +- .../protocol/writeback/WriteBackConnector.java | 18 +-- .../PipeHistoricalDataRegionTsFileExtractor.java | 42 ++++--- .../realtime/assigner/PipeDataRegionAssigner.java | 25 +++- .../db/pipe/metric/PipeDataRegionEventCounter.java | 12 +- .../pipe/task/connection/PipeEventCollector.java | 17 ++- .../subtask/connector/PipeConnectorSubtask.java | 34 ++---- .../PipeRealtimePriorityBlockingQueue.java | 34 +++++- .../subtask/processor/PipeProcessorSubtask.java | 9 +- .../apache/iotdb/commons/conf/CommonConfig.java | 10 ++ .../iotdb/commons/conf/CommonDescriptor.java | 5 + .../iotdb/commons/pipe/config/PipeConfig.java | 7 ++ .../iotdb/commons/pipe/event/EnrichedEvent.java | 128 +++++++++++++-------- .../commons/pipe/metric/PipeEventCounter.java | 6 +- .../pipe/task/connection/BlockingPendingQueue.java | 32 +++++- .../task/subtask/PipeAbstractConnectorSubtask.java | 5 +- .../pipe/task/subtask/PipeReportableSubtask.java | 2 +- .../commons/pipe/task/subtask/PipeSubtask.java | 31 ++++- 34 files changed, 447 insertions(+), 305 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java index 0d466605d8f..e615616e2ba 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java @@ -137,12 +137,12 @@ public class IoTDBConfigRegionAirGapConnector extends IoTDBAirGapConnector { final AirGapSocket socket, final PipeConfigRegionWritePlanEvent pipeConfigRegionWritePlanEvent) throws PipeException, IOException { + // We increase the reference count for this event to determine if the event may be released. + if (!pipeConfigRegionWritePlanEvent.increaseReferenceCount( + IoTDBConfigRegionAirGapConnector.class.getName())) { + return; + } try { - // We increase the reference count for this event to determine if the event may be released. - if (!pipeConfigRegionWritePlanEvent.increaseReferenceCount( - IoTDBConfigRegionAirGapConnector.class.getName())) { - return; - } doTransfer(socket, pipeConfigRegionWritePlanEvent); } finally { pipeConfigRegionWritePlanEvent.decreaseReferenceCount( @@ -178,12 +178,12 @@ public class IoTDBConfigRegionAirGapConnector extends IoTDBAirGapConnector { private void doTransferWrapper( final AirGapSocket socket, final PipeConfigRegionSnapshotEvent pipeConfigRegionSnapshotEvent) throws PipeException, IOException { + // We increase the reference count for this event to determine if the event may be released. + if (!pipeConfigRegionSnapshotEvent.increaseReferenceCount( + IoTDBConfigRegionAirGapConnector.class.getName())) { + return; + } try { - // We increase the reference count for this event to determine if the event may be released. - if (!pipeConfigRegionSnapshotEvent.increaseReferenceCount( - IoTDBConfigRegionAirGapConnector.class.getName())) { - return; - } doTransfer(socket, pipeConfigRegionSnapshotEvent); } finally { pipeConfigRegionSnapshotEvent.decreaseReferenceCount( diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java index 1e2f3c19e5a..3e4ca8ac9c5 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java @@ -114,12 +114,12 @@ public class IoTDBConfigRegionConnector extends IoTDBSslSyncConnector { private void doTransferWrapper( final PipeConfigRegionWritePlanEvent pipeConfigRegionWritePlanEvent) throws PipeException { + // We increase the reference count for this event to determine if the event may be released. + if (!pipeConfigRegionWritePlanEvent.increaseReferenceCount( + IoTDBConfigRegionConnector.class.getName())) { + return; + } try { - // We increase the reference count for this event to determine if the event may be released. - if (!pipeConfigRegionWritePlanEvent.increaseReferenceCount( - IoTDBConfigRegionConnector.class.getName())) { - return; - } doTransfer(pipeConfigRegionWritePlanEvent); } finally { pipeConfigRegionWritePlanEvent.decreaseReferenceCount( @@ -175,12 +175,12 @@ public class IoTDBConfigRegionConnector extends IoTDBSslSyncConnector { private void doTransferWrapper(final PipeConfigRegionSnapshotEvent pipeConfigRegionSnapshotEvent) throws PipeException, IOException { + // We increase the reference count for this event to determine if the event may be released. + if (!pipeConfigRegionSnapshotEvent.increaseReferenceCount( + IoTDBConfigRegionConnector.class.getName())) { + return; + } try { - // We increase the reference count for this event to determine if the event may be released. - if (!pipeConfigRegionSnapshotEvent.increaseReferenceCount( - IoTDBConfigRegionConnector.class.getName())) { - return; - } doTransfer(pipeConfigRegionSnapshotEvent); } finally { pipeConfigRegionSnapshotEvent.decreaseReferenceCount( diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtask.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtask.java index 02cb5373359..cfd16480f62 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtask.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtask.java @@ -25,6 +25,7 @@ import org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeC import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment; import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskRuntimeEnvironment; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; +import org.apache.iotdb.commons.pipe.event.ProgressReportEvent; import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin; import org.apache.iotdb.commons.pipe.progress.PipeEventCommitManager; import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta; @@ -181,10 +182,11 @@ public class PipeConfigNodeSubtask extends PipeAbstractConnectorSubtask { return false; } - outputPipeConnector.transfer(event); - decreaseReferenceCountAndReleaseLastEvent(true); - - PipeConfigRegionConnectorMetrics.getInstance().markConfigEvent(taskID); + if (!(event instanceof ProgressReportEvent)) { + outputPipeConnector.transfer(event); + PipeConfigRegionConnectorMetrics.getInstance().markConfigEvent(taskID); + } + decreaseReferenceCountAndReleaseLastEvent(event, true); } catch (final PipeException e) { setLastExceptionEvent(event); if (!isClosed.get()) { @@ -194,7 +196,7 @@ public class PipeConfigNodeSubtask extends PipeAbstractConnectorSubtask { "{} in pipe transfer, ignored because pipe is dropped.", e.getClass().getSimpleName(), e); - clearReferenceCountAndReleaseLastEvent(); + clearReferenceCountAndReleaseLastEvent(event); } } catch (final Exception e) { setLastExceptionEvent(event); @@ -205,7 +207,7 @@ public class PipeConfigNodeSubtask extends PipeAbstractConnectorSubtask { e); } else { LOGGER.info("Exception in pipe transfer, ignored because pipe is dropped.", e); - clearReferenceCountAndReleaseLastEvent(); + clearReferenceCountAndReleaseLastEvent(event); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java index 888f7c50c96..8a73a19684f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java @@ -83,6 +83,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -92,6 +93,9 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { protected static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig(); + private static final AtomicLong LAST_FORCED_RESTART_TIME = + new AtomicLong(System.currentTimeMillis()); + ////////////////////////// Pipe Task Management Entry ////////////////////////// @Override @@ -475,18 +479,32 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { if (!tryWriteLockWithTimeOut(5)) { return; } + + final Set<PipeMeta> stuckPipes; try { - restartAllStuckPipesInternal(); + stuckPipes = findAllStuckPipes(); } finally { releaseWriteLock(); } + + // Restart all stuck pipes + stuckPipes.parallelStream().forEach(this::restartStuckPipe); } - private void restartAllStuckPipesInternal() { + private Set<PipeMeta> findAllStuckPipes() { + final Set<PipeMeta> stuckPipes = new HashSet<>(); + + if (System.currentTimeMillis() - LAST_FORCED_RESTART_TIME.get() + > PipeConfig.getInstance().getPipeSubtaskExecutorForcedRestartIntervalMs()) { + LAST_FORCED_RESTART_TIME.set(System.currentTimeMillis()); + for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) { + stuckPipes.add(pipeMeta); + } + return stuckPipes; + } + final Map<String, IoTDBDataRegionExtractor> taskId2ExtractorMap = PipeDataRegionExtractorMetrics.getInstance().getExtractorMap(); - - final Set<PipeMeta> stuckPipes = new HashSet<>(); for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) { final String pipeName = pipeMeta.getStaticMeta().getPipeName(); final List<IoTDBDataRegionExtractor> extractors = @@ -525,8 +543,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { } } - // Restart all stuck pipes - stuckPipes.parallelStream().forEach(this::restartStuckPipe); + return stuckPipes; } private boolean mayDeletedTsFileSizeReachDangerousThreshold() { @@ -565,59 +582,21 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { private void restartStuckPipe(final PipeMeta pipeMeta) { LOGGER.warn("Pipe {} will be restarted because of stuck.", pipeMeta.getStaticMeta()); - final long startTime = System.currentTimeMillis(); - changePipeStatusBeforeRestart(pipeMeta.getStaticMeta().getPipeName()); - handleSinglePipeMetaChangesInternal(pipeMeta); - LOGGER.warn( - "Pipe {} was restarted because of stuck, time cost: {} ms.", - pipeMeta.getStaticMeta(), - System.currentTimeMillis() - startTime); - } - - private void changePipeStatusBeforeRestart(final String pipeName) { - final PipeMeta pipeMeta = pipeMetaKeeper.getPipeMeta(pipeName); - final Map<Integer, PipeTask> pipeTasks = pipeTaskManager.getPipeTasks(pipeMeta.getStaticMeta()); - final Set<Integer> taskRegionIds = new HashSet<>(pipeTasks.keySet()); - final Set<Integer> dataRegionIds = - StorageEngine.getInstance().getAllDataRegionIds().stream() - .map(DataRegionId::getId) - .collect(Collectors.toSet()); - final Set<PipeTask> dataRegionPipeTasks = - taskRegionIds.stream() - .filter(dataRegionIds::contains) - .map(regionId -> pipeTaskManager.removePipeTask(pipeMeta.getStaticMeta(), regionId)) - .filter(Objects::nonNull) - .collect(Collectors.toSet()); - - // Drop data region tasks - dataRegionPipeTasks.parallelStream().forEach(PipeTask::drop); - - // Stop schema region tasks - pipeTaskManager.getPipeTasks(pipeMeta.getStaticMeta()).values().parallelStream() - .forEach(PipeTask::stop); - - // Re-create data region tasks - dataRegionPipeTasks.parallelStream() - .forEach( - pipeTask -> { - final PipeTask newPipeTask = - new PipeDataNodeTaskBuilder( - pipeMeta.getStaticMeta(), - ((PipeDataNodeTask) pipeTask).getRegionId(), - pipeMeta - .getRuntimeMeta() - .getConsensusGroupId2TaskMetaMap() - .get(((PipeDataNodeTask) pipeTask).getRegionId())) - .build(); - newPipeTask.create(); - pipeTaskManager.addPipeTask( - pipeMeta.getStaticMeta(), - ((PipeDataNodeTask) pipeTask).getRegionId(), - newPipeTask); - }); - - // Set pipe meta status to STOPPED - pipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.STOPPED); + acquireWriteLock(); + try { + final long startTime = System.currentTimeMillis(); + final PipeMeta originalPipeMeta = pipeMeta.deepCopy(); + handleDropPipe(pipeMeta.getStaticMeta().getPipeName()); + handleSinglePipeMetaChanges(originalPipeMeta); + LOGGER.warn( + "Pipe {} was restarted because of stuck, time cost: {} ms.", + originalPipeMeta.getStaticMeta(), + System.currentTimeMillis() - startTime); + } catch (final Exception e) { + LOGGER.warn("Failed to restart stuck pipe {}.", pipeMeta.getStaticMeta(), e); + } finally { + releaseWriteLock(); + } } ///////////////////////// Terminate Logic ///////////////////////// diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java index bae98a9efa2..75d4b2f9e3c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java @@ -26,6 +26,8 @@ import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.tsfile.exception.write.WriteProcessException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; @@ -34,6 +36,8 @@ import java.util.Objects; public abstract class PipeTabletEventBatch implements AutoCloseable { + private static final Logger LOGGER = LoggerFactory.getLogger(PipeTabletEventBatch.class); + protected final List<EnrichedEvent> events = new ArrayList<>(); private final int maxDelayInMs; @@ -74,8 +78,7 @@ public abstract class PipeTabletEventBatch implements AutoCloseable { firstEventProcessingTime = System.currentTimeMillis(); } } else { - ((EnrichedEvent) event) - .decreaseReferenceCount(PipeTransferBatchReqBuilder.class.getName(), false); + LOGGER.warn("Cannot increase reference count for event: {}, ignore it in batch.", event); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java index 221fa516448..33be8e002f4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java @@ -113,12 +113,12 @@ public abstract class IoTDBDataNodeAirGapConnector extends IoTDBAirGapConnector final AirGapSocket socket, final PipeSchemaRegionWritePlanEvent pipeSchemaRegionWritePlanEvent) throws PipeException, IOException { + // We increase the reference count for this event to determine if the event may be released. + if (!pipeSchemaRegionWritePlanEvent.increaseReferenceCount( + IoTDBDataNodeAirGapConnector.class.getName())) { + return; + } try { - // We increase the reference count for this event to determine if the event may be released. - if (!pipeSchemaRegionWritePlanEvent.increaseReferenceCount( - IoTDBDataNodeAirGapConnector.class.getName())) { - return; - } doTransfer(socket, pipeSchemaRegionWritePlanEvent); } finally { pipeSchemaRegionWritePlanEvent.decreaseReferenceCount( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java index 9874741c7f0..7b6c4376d0a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java @@ -150,12 +150,12 @@ public class IoTDBDataRegionAirGapConnector extends IoTDBDataNodeAirGapConnector final AirGapSocket socket, final PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent) throws PipeException, WALPipeException, IOException { + // We increase the reference count for this event to determine if the event may be released. + if (!pipeInsertNodeTabletInsertionEvent.increaseReferenceCount( + IoTDBDataRegionAirGapConnector.class.getName())) { + return; + } try { - // We increase the reference count for this event to determine if the event may be released. - if (!pipeInsertNodeTabletInsertionEvent.increaseReferenceCount( - IoTDBDataRegionAirGapConnector.class.getName())) { - return; - } doTransfer(socket, pipeInsertNodeTabletInsertionEvent); } finally { pipeInsertNodeTabletInsertionEvent.decreaseReferenceCount( @@ -195,12 +195,12 @@ public class IoTDBDataRegionAirGapConnector extends IoTDBDataNodeAirGapConnector private void doTransferWrapper( final AirGapSocket socket, final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent) throws PipeException, IOException { + // We increase the reference count for this event to determine if the event may be released. + if (!pipeRawTabletInsertionEvent.increaseReferenceCount( + IoTDBDataRegionAirGapConnector.class.getName())) { + return; + } try { - // We increase the reference count for this event to determine if the event may be released. - if (!pipeRawTabletInsertionEvent.increaseReferenceCount( - IoTDBDataRegionAirGapConnector.class.getName())) { - return; - } doTransfer(socket, pipeRawTabletInsertionEvent); } finally { pipeRawTabletInsertionEvent.decreaseReferenceCount( @@ -233,12 +233,12 @@ public class IoTDBDataRegionAirGapConnector extends IoTDBDataNodeAirGapConnector private void doTransferWrapper( final AirGapSocket socket, final PipeTsFileInsertionEvent pipeTsFileInsertionEvent) throws PipeException, IOException { + // We increase the reference count for this event to determine if the event may be released. + if (!pipeTsFileInsertionEvent.increaseReferenceCount( + IoTDBDataRegionAirGapConnector.class.getName())) { + return; + } try { - // We increase the reference count for this event to determine if the event may be released. - if (!pipeTsFileInsertionEvent.increaseReferenceCount( - IoTDBDataRegionAirGapConnector.class.getName())) { - return; - } doTransfer(socket, pipeTsFileInsertionEvent); } finally { pipeTsFileInsertionEvent.decreaseReferenceCount( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBSchemaRegionAirGapConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBSchemaRegionAirGapConnector.java index e53423779d0..342807a5dfb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBSchemaRegionAirGapConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBSchemaRegionAirGapConnector.java @@ -86,12 +86,12 @@ public class IoTDBSchemaRegionAirGapConnector extends IoTDBDataNodeAirGapConnect private void doTransferWrapper( final AirGapSocket socket, final PipeSchemaRegionSnapshotEvent pipeSchemaRegionSnapshotEvent) throws PipeException, IOException { + // We increase the reference count for this event to determine if the event may be released. + if (!pipeSchemaRegionSnapshotEvent.increaseReferenceCount( + IoTDBSchemaRegionAirGapConnector.class.getName())) { + return; + } try { - // We increase the reference count for this event to determine if the event may be released. - if (!pipeSchemaRegionSnapshotEvent.increaseReferenceCount( - IoTDBSchemaRegionAirGapConnector.class.getName())) { - return; - } doTransfer(socket, pipeSchemaRegionSnapshotEvent); } finally { pipeSchemaRegionSnapshotEvent.decreaseReferenceCount( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java index 5edf485adfb..62a1f040624 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java @@ -313,12 +313,12 @@ public class IoTDBLegacyPipeConnector implements PipeConnector { private void doTransferWrapper( final PipeInsertNodeTabletInsertionEvent pipeInsertNodeInsertionEvent) throws IoTDBConnectionException, StatementExecutionException { + // We increase the reference count for this event to determine if the event may be released. + if (!pipeInsertNodeInsertionEvent.increaseReferenceCount( + IoTDBLegacyPipeConnector.class.getName())) { + return; + } try { - // We increase the reference count for this event to determine if the event may be released. - if (!pipeInsertNodeInsertionEvent.increaseReferenceCount( - IoTDBLegacyPipeConnector.class.getName())) { - return; - } doTransfer(pipeInsertNodeInsertionEvent); } finally { pipeInsertNodeInsertionEvent.decreaseReferenceCount( @@ -344,12 +344,12 @@ public class IoTDBLegacyPipeConnector implements PipeConnector { private void doTransferWrapper(final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent) throws PipeException, IoTDBConnectionException, StatementExecutionException { + // We increase the reference count for this event to determine if the event may be released. + if (!pipeRawTabletInsertionEvent.increaseReferenceCount( + IoTDBLegacyPipeConnector.class.getName())) { + return; + } try { - // We increase the reference count for this event to determine if the event may be released. - if (!pipeRawTabletInsertionEvent.increaseReferenceCount( - IoTDBLegacyPipeConnector.class.getName())) { - return; - } doTransfer(pipeRawTabletInsertionEvent); } finally { pipeRawTabletInsertionEvent.decreaseReferenceCount( @@ -369,12 +369,12 @@ public class IoTDBLegacyPipeConnector implements PipeConnector { private void doTransferWrapper(final PipeTsFileInsertionEvent pipeTsFileInsertionEvent) throws PipeException, TException, IOException { + // We increase the reference count for this event to determine if the event may be released. + if (!pipeTsFileInsertionEvent.increaseReferenceCount( + IoTDBLegacyPipeConnector.class.getName())) { + return; + } try { - // We increase the reference count for this event to determine if the event may be released. - if (!pipeTsFileInsertionEvent.increaseReferenceCount( - IoTDBLegacyPipeConnector.class.getName())) { - return; - } doTransfer(pipeTsFileInsertionEvent); } finally { pipeTsFileInsertionEvent.decreaseReferenceCount( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java index 390b2fe73bb..1885f181677 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java @@ -204,12 +204,12 @@ public class OpcUaConnector implements PipeConnector { private void transferTabletWrapper( final PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent) throws UaException { + // We increase the reference count for this event to determine if the event may be released. + if (!pipeInsertNodeTabletInsertionEvent.increaseReferenceCount( + OpcUaConnector.class.getName())) { + return; + } try { - // We increase the reference count for this event to determine if the event may be released. - if (!pipeInsertNodeTabletInsertionEvent.increaseReferenceCount( - OpcUaConnector.class.getName())) { - return; - } for (final Tablet tablet : pipeInsertNodeTabletInsertionEvent.convertToTablets()) { nameSpace.transfer(tablet); } @@ -221,11 +221,11 @@ public class OpcUaConnector implements PipeConnector { private void transferTabletWrapper(final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent) throws UaException { + // We increase the reference count for this event to determine if the event may be released. + if (!pipeRawTabletInsertionEvent.increaseReferenceCount(OpcUaConnector.class.getName())) { + return; + } try { - // We increase the reference count for this event to determine if the event may be released. - if (!pipeRawTabletInsertionEvent.increaseReferenceCount(OpcUaConnector.class.getName())) { - return; - } nameSpace.transfer(pipeRawTabletInsertionEvent.convertToTablet()); } finally { pipeRawTabletInsertionEvent.decreaseReferenceCount(OpcUaConnector.class.getName(), false); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java index 85ba921e24d..e4489799652 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java @@ -274,8 +274,6 @@ public class PipeConsensusAsyncConnector extends IoTDBConnector implements Conse // We increase the reference count for this event to determine if the event may be released. if (!pipeInsertNodeTabletInsertionEvent.increaseReferenceCount( PipeConsensusAsyncConnector.class.getName())) { - pipeInsertNodeTabletInsertionEvent.decreaseReferenceCount( - PipeConsensusAsyncConnector.class.getName(), false); return; } @@ -354,8 +352,6 @@ public class PipeConsensusAsyncConnector extends IoTDBConnector implements Conse // We increase the reference count for this event to determine if the event may be released. if (!pipeTsFileInsertionEvent.increaseReferenceCount( PipeConsensusAsyncConnector.class.getName())) { - pipeTsFileInsertionEvent.decreaseReferenceCount( - PipeConsensusAsyncConnector.class.getName(), false); return; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java index ad7ae47b302..3d274812091 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java @@ -224,12 +224,12 @@ public class PipeConsensusSyncConnector extends IoTDBConnector { private void doTransferWrapper( final PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent) throws PipeException { + // We increase the reference count for this event to determine if the event may be released. + if (!pipeInsertNodeTabletInsertionEvent.increaseReferenceCount( + PipeConsensusSyncConnector.class.getName())) { + return; + } try { - // We increase the reference count for this event to determine if the event may be released. - if (!pipeInsertNodeTabletInsertionEvent.increaseReferenceCount( - PipeConsensusSyncConnector.class.getName())) { - return; - } doTransfer(pipeInsertNodeTabletInsertionEvent); } finally { pipeInsertNodeTabletInsertionEvent.decreaseReferenceCount( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java index 083f12485ba..5e13c4e0243 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java @@ -214,8 +214,6 @@ public class IoTDBDataRegionAsyncConnector extends IoTDBConnector { // We increase the reference count for this event to determine if the event may be released. if (!pipeInsertNodeTabletInsertionEvent.increaseReferenceCount( IoTDBDataRegionAsyncConnector.class.getName())) { - pipeInsertNodeTabletInsertionEvent.decreaseReferenceCount( - IoTDBDataRegionAsyncConnector.class.getName(), false); return; } @@ -240,8 +238,6 @@ public class IoTDBDataRegionAsyncConnector extends IoTDBConnector { // We increase the reference count for this event to determine if the event may be released. if (!pipeRawTabletInsertionEvent.increaseReferenceCount( IoTDBDataRegionAsyncConnector.class.getName())) { - pipeRawTabletInsertionEvent.decreaseReferenceCount( - IoTDBDataRegionAsyncConnector.class.getName(), false); return; } @@ -318,8 +314,6 @@ public class IoTDBDataRegionAsyncConnector extends IoTDBConnector { // We increase the reference count for this event to determine if the event may be released. if (!pipeTsFileInsertionEvent.increaseReferenceCount( IoTDBDataRegionAsyncConnector.class.getName())) { - pipeTsFileInsertionEvent.decreaseReferenceCount( - IoTDBDataRegionAsyncConnector.class.getName(), false); return; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java index 9580346d25b..294e69a769a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java @@ -104,12 +104,12 @@ public abstract class IoTDBDataNodeSyncConnector extends IoTDBSslSyncConnector { protected void doTransferWrapper( final PipeSchemaRegionWritePlanEvent pipeSchemaRegionWritePlanEvent) throws PipeException { + // We increase the reference count for this event to determine if the event may be released. + if (!pipeSchemaRegionWritePlanEvent.increaseReferenceCount( + IoTDBDataNodeSyncConnector.class.getName())) { + return; + } try { - // We increase the reference count for this event to determine if the event may be released. - if (!pipeSchemaRegionWritePlanEvent.increaseReferenceCount( - IoTDBDataNodeSyncConnector.class.getName())) { - return; - } doTransfer(pipeSchemaRegionWritePlanEvent); } finally { pipeSchemaRegionWritePlanEvent.decreaseReferenceCount( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java index a92790c5e80..772a55b0e17 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java @@ -268,12 +268,12 @@ public class IoTDBDataRegionSyncConnector extends IoTDBDataNodeSyncConnector { private void doTransferWrapper( final PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent) throws PipeException { + // We increase the reference count for this event to determine if the event may be released. + if (!pipeInsertNodeTabletInsertionEvent.increaseReferenceCount( + IoTDBDataRegionSyncConnector.class.getName())) { + return; + } try { - // We increase the reference count for this event to determine if the event may be released. - if (!pipeInsertNodeTabletInsertionEvent.increaseReferenceCount( - IoTDBDataRegionSyncConnector.class.getName())) { - return; - } doTransfer(pipeInsertNodeTabletInsertionEvent); } finally { pipeInsertNodeTabletInsertionEvent.decreaseReferenceCount( @@ -336,12 +336,12 @@ public class IoTDBDataRegionSyncConnector extends IoTDBDataNodeSyncConnector { private void doTransferWrapper(final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent) throws PipeException { + // We increase the reference count for this event to determine if the event may be released. + if (!pipeRawTabletInsertionEvent.increaseReferenceCount( + IoTDBDataRegionSyncConnector.class.getName())) { + return; + } try { - // We increase the reference count for this event to determine if the event may be released. - if (!pipeRawTabletInsertionEvent.increaseReferenceCount( - IoTDBDataRegionSyncConnector.class.getName())) { - return; - } doTransfer(pipeRawTabletInsertionEvent); } finally { pipeRawTabletInsertionEvent.decreaseReferenceCount( @@ -395,12 +395,12 @@ public class IoTDBDataRegionSyncConnector extends IoTDBDataNodeSyncConnector { private void doTransferWrapper(final PipeTsFileInsertionEvent pipeTsFileInsertionEvent) throws PipeException, IOException { + // We increase the reference count for this event to determine if the event may be released. + if (!pipeTsFileInsertionEvent.increaseReferenceCount( + IoTDBDataRegionSyncConnector.class.getName())) { + return; + } try { - // We increase the reference count for this event to determine if the event may be released. - if (!pipeTsFileInsertionEvent.increaseReferenceCount( - IoTDBDataRegionSyncConnector.class.getName())) { - return; - } doTransfer( Collections.singletonMap( new Pair<>( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBSchemaRegionConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBSchemaRegionConnector.java index 878430e027b..f70e18c0651 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBSchemaRegionConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBSchemaRegionConnector.java @@ -74,12 +74,12 @@ public class IoTDBSchemaRegionConnector extends IoTDBDataNodeSyncConnector { private void doTransferWrapper(final PipeSchemaRegionSnapshotEvent pipeSchemaRegionSnapshotEvent) throws PipeException, IOException { + // We increase the reference count for this event to determine if the event may be released. + if (!pipeSchemaRegionSnapshotEvent.increaseReferenceCount( + IoTDBSchemaRegionConnector.class.getName())) { + return; + } try { - // We increase the reference count for this event to determine if the event may be released. - if (!pipeSchemaRegionSnapshotEvent.increaseReferenceCount( - IoTDBSchemaRegionConnector.class.getName())) { - return; - } doTransfer(pipeSchemaRegionSnapshotEvent); } finally { pipeSchemaRegionSnapshotEvent.decreaseReferenceCount( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java index e1e5a5dd44c..cf258452346 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java @@ -118,8 +118,13 @@ public class WebSocketConnector implements PipeConnector { return; } - ((EnrichedEvent) tabletInsertionEvent) - .increaseReferenceCount(WebSocketConnector.class.getName()); + if (!((EnrichedEvent) tabletInsertionEvent) + .increaseReferenceCount(WebSocketConnector.class.getName())) { + LOGGER.warn( + "WebsocketConnector failed to increase the reference count of the event. Ignore it. Current event: {}.", + tabletInsertionEvent); + return; + } server.addEvent(tabletInsertionEvent, this); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java index 68789bd3ef5..7889b778d86 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java @@ -111,12 +111,12 @@ public class WriteBackConnector implements PipeConnector { private void doTransferWrapper( final PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent) throws PipeException, WALPipeException { + // We increase the reference count for this event to determine if the event may be released. + if (!pipeInsertNodeTabletInsertionEvent.increaseReferenceCount( + WriteBackConnector.class.getName())) { + return; + } try { - // We increase the reference count for this event to determine if the event may be released. - if (!pipeInsertNodeTabletInsertionEvent.increaseReferenceCount( - WriteBackConnector.class.getName())) { - return; - } doTransfer(pipeInsertNodeTabletInsertionEvent); } finally { pipeInsertNodeTabletInsertionEvent.decreaseReferenceCount( @@ -155,11 +155,11 @@ public class WriteBackConnector implements PipeConnector { private void doTransferWrapper(final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent) throws PipeException { + // We increase the reference count for this event to determine if the event may be released. + if (!pipeRawTabletInsertionEvent.increaseReferenceCount(WriteBackConnector.class.getName())) { + return; + } try { - // We increase the reference count for this event to determine if the event may be released. - if (!pipeRawTabletInsertionEvent.increaseReferenceCount(WriteBackConnector.class.getName())) { - return; - } doTransfer(pipeRawTabletInsertionEvent); } finally { pipeRawTabletInsertionEvent.decreaseReferenceCount(WriteBackConnector.class.getName(), false); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java index ca1888b0200..16f6c6b7d91 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java @@ -574,11 +574,17 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa final TsFileResource resource = pendingQueue.poll(); if (resource == null) { - isTerminateSignalSent = true; final PipeTerminateEvent terminateEvent = new PipeTerminateEvent(pipeName, creationTime, pipeTaskMeta, dataRegionId); - terminateEvent.increaseReferenceCount( - PipeHistoricalDataRegionTsFileExtractor.class.getName()); + if (!terminateEvent.increaseReferenceCount( + PipeHistoricalDataRegionTsFileExtractor.class.getName())) { + LOGGER.warn( + "Pipe {}@{}: failed to increase reference count for terminate event, will resend it", + pipeName, + dataRegionId); + return null; + } + isTerminateSignalSent = true; return terminateEvent; } @@ -603,18 +609,28 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa event.skipParsingTime(); } - event.increaseReferenceCount(PipeHistoricalDataRegionTsFileExtractor.class.getName()); try { - PipeDataNodeResourceManager.tsfile().unpinTsFileResource(resource); - } catch (final IOException e) { - LOGGER.warn( - "Pipe {}@{}: failed to unpin TsFileResource after creating event, original path: {}", - pipeName, - dataRegionId, - resource.getTsFilePath()); + final boolean isReferenceCountIncreased = + event.increaseReferenceCount(PipeHistoricalDataRegionTsFileExtractor.class.getName()); + if (!isReferenceCountIncreased) { + LOGGER.warn( + "Pipe {}@{}: failed to increase reference count for historical event {}, will discard it", + pipeName, + dataRegionId, + event); + } + return isReferenceCountIncreased ? event : null; + } finally { + try { + PipeDataNodeResourceManager.tsfile().unpinTsFileResource(resource); + } catch (final IOException e) { + LOGGER.warn( + "Pipe {}@{}: failed to unpin TsFileResource after creating event, original path: {}", + pipeName, + dataRegionId, + resource.getTsFilePath()); + } } - - return event; } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java index 762577d2f84..c36a8877dff 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java @@ -32,10 +32,15 @@ import org.apache.iotdb.db.pipe.pattern.CachedSchemaPatternMatcher; import org.apache.iotdb.db.pipe.pattern.PipeDataRegionMatcher; import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.Closeable; public class PipeDataRegionAssigner implements Closeable { + private static final Logger LOGGER = LoggerFactory.getLogger(PipeDataRegionAssigner.class); + private static final int nonForwardingEventsProgressReportInterval = PipeConfig.getInstance().getPipeNonForwardingEventsProgressReportInterval(); @@ -64,7 +69,11 @@ public class PipeDataRegionAssigner implements Closeable { } public void publishToAssign(final PipeRealtimeEvent event) { - event.increaseReferenceCount(PipeDataRegionAssigner.class.getName()); + if (!event.increaseReferenceCount(PipeDataRegionAssigner.class.getName())) { + LOGGER.warn( + "The reference count of the realtime event {} cannot be increased, skipping it.", event); + return; + } disruptor.publish(event); @@ -99,7 +108,12 @@ public class PipeDataRegionAssigner implements Closeable { extractor.getRealtimeDataExtractionStartTime(), extractor.getRealtimeDataExtractionEndTime()); reportEvent.bindProgressIndex(event.getProgressIndex()); - reportEvent.increaseReferenceCount(PipeDataRegionAssigner.class.getName()); + if (!reportEvent.increaseReferenceCount(PipeDataRegionAssigner.class.getName())) { + LOGGER.warn( + "The reference count of the event {} cannot be increased, skipping it.", + reportEvent); + return; + } extractor.extract(PipeRealtimeEventFactory.createRealtimeEvent(reportEvent)); return; } @@ -118,7 +132,12 @@ public class PipeDataRegionAssigner implements Closeable { .disableMod4NonTransferPipes(extractor.isShouldTransferModFile()); } - copiedEvent.increaseReferenceCount(PipeDataRegionAssigner.class.getName()); + if (!copiedEvent.increaseReferenceCount(PipeDataRegionAssigner.class.getName())) { + LOGGER.warn( + "The reference count of the event {} cannot be increased, skipping it.", + copiedEvent); + return; + } extractor.extract(copiedEvent); if (innerEvent instanceof PipeHeartbeatEvent) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataRegionEventCounter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataRegionEventCounter.java index 0aed61801ef..b1405628f37 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataRegionEventCounter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataRegionEventCounter.java @@ -36,17 +36,17 @@ public class PipeDataRegionEventCounter extends PipeEventCounter { private final AtomicInteger pipeHeartbeatEventCount = new AtomicInteger(0); @Override - public Integer getTsFileInsertionEventCount() { + public int getTsFileInsertionEventCount() { return tsFileInsertionEventCount.get(); } @Override - public Integer getTabletInsertionEventCount() { + public int getTabletInsertionEventCount() { return tabletInsertionEventCount.get(); } @Override - public Integer getPipeHeartbeatEventCount() { + public int getPipeHeartbeatEventCount() { return pipeHeartbeatEventCount.get(); } @@ -76,11 +76,11 @@ public class PipeDataRegionEventCounter extends PipeEventCounter { return; } if (event instanceof PipeHeartbeatEvent) { - pipeHeartbeatEventCount.decrementAndGet(); + pipeHeartbeatEventCount.getAndUpdate(count -> count > 0 ? count - 1 : 0); } else if (event instanceof TabletInsertionEvent) { - tabletInsertionEventCount.decrementAndGet(); + tabletInsertionEventCount.getAndUpdate(count -> count > 0 ? count - 1 : 0); } else if (event instanceof TsFileInsertionEvent) { - tsFileInsertionEventCount.decrementAndGet(); + tsFileInsertionEventCount.getAndUpdate(count -> count > 0 ? count - 1 : 0); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java index c4c91f80f58..de0ede99623 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java @@ -56,6 +56,7 @@ public class PipeEventCollector implements EventCollector { private final AtomicInteger collectInvocationCount = new AtomicInteger(0); private boolean hasNoGeneratedEvent = true; + private boolean isFailedToIncreaseReferenceCount = false; public PipeEventCollector( final UnboundedBlockingPendingQueue<Event> pendingQueue, @@ -162,10 +163,12 @@ public class PipeEventCollector implements EventCollector { } private void collectEvent(final Event event) { - collectInvocationCount.incrementAndGet(); - if (event instanceof EnrichedEvent) { - ((EnrichedEvent) event).increaseReferenceCount(PipeEventCollector.class.getName()); + if (!((EnrichedEvent) event).increaseReferenceCount(PipeEventCollector.class.getName())) { + LOGGER.warn("PipeEventCollector: The event {} is already released, skipping it.", event); + isFailedToIncreaseReferenceCount = true; + return; + } // Assign a commit id for this event in order to report progress in order. PipeEventCommitManager.getInstance() @@ -180,11 +183,13 @@ public class PipeEventCollector implements EventCollector { } pendingQueue.directOffer(event); + collectInvocationCount.incrementAndGet(); } - public void resetCollectInvocationCountAndGenerateFlag() { + public void resetFlags() { collectInvocationCount.set(0); hasNoGeneratedEvent = true; + isFailedToIncreaseReferenceCount = false; } public long getCollectInvocationCount() { @@ -198,4 +203,8 @@ public class PipeEventCollector implements EventCollector { public boolean hasNoGeneratedEvent() { return hasNoGeneratedEvent; } + + public boolean isFailedToIncreaseReferenceCount() { + return isFailedToIncreaseReferenceCount; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java index a832d43f73e..d55f18f9838 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java @@ -32,7 +32,6 @@ import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent; import org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent; import org.apache.iotdb.db.pipe.metric.PipeDataRegionConnectorMetrics; import org.apache.iotdb.db.pipe.metric.PipeSchemaRegionConnectorMetrics; -import org.apache.iotdb.db.pipe.task.connection.PipeEventCollector; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; import org.apache.iotdb.db.utils.ErrorHandlingUtils; import org.apache.iotdb.pipe.api.PipeConnector; @@ -100,6 +99,10 @@ public class PipeConnectorSubtask extends PipeAbstractConnectorSubtask { : UserDefinedEnrichedEvent.maybeOf(inputPendingQueue.waitedPoll()); // Record this event for retrying on connection failure or other exceptions setLastEvent(event); + if (event instanceof EnrichedEvent && ((EnrichedEvent) event).isReleased()) { + lastEvent = null; + return true; + } try { if (event == null) { @@ -133,7 +136,7 @@ public class PipeConnectorSubtask extends PipeAbstractConnectorSubtask { : event); } - decreaseReferenceCountAndReleaseLastEvent(true); + decreaseReferenceCountAndReleaseLastEvent(event, true); } catch (final PipeException e) { if (!isClosed.get()) { setLastExceptionEvent(event); @@ -143,7 +146,7 @@ public class PipeConnectorSubtask extends PipeAbstractConnectorSubtask { "{} in pipe transfer, ignored because the connector subtask is dropped.", e.getClass().getSimpleName(), e); - clearReferenceCountAndReleaseLastEvent(); + clearReferenceCountAndReleaseLastEvent(event); } } catch (final Exception e) { if (!isClosed.get()) { @@ -160,7 +163,7 @@ public class PipeConnectorSubtask extends PipeAbstractConnectorSubtask { } else { LOGGER.info( "Exception in pipe transfer, ignored because the connector subtask is dropped.", e); - clearReferenceCountAndReleaseLastEvent(); + clearReferenceCountAndReleaseLastEvent(event); } } @@ -204,13 +207,7 @@ public class PipeConnectorSubtask extends PipeAbstractConnectorSubtask { ErrorHandlingUtils.getRootCause(e).getMessage(), e); } finally { - inputPendingQueue.forEach( - event -> { - if (event instanceof EnrichedEvent) { - ((EnrichedEvent) event).clearReferenceCount(PipeEventCollector.class.getName()); - } - }); - inputPendingQueue.clear(); + inputPendingQueue.discardAllEvents(); // Should be called after outputPipeConnector.close() super.close(); @@ -223,18 +220,9 @@ public class PipeConnectorSubtask extends PipeAbstractConnectorSubtask { */ public void discardEventsOfPipe(final String pipeNameToDrop) { // Try to remove the events as much as possible - inputPendingQueue.removeIf( - event -> { - if (event instanceof EnrichedEvent - && pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName())) { - ((EnrichedEvent) event) - .clearReferenceCount(IoTDBDataRegionAsyncConnector.class.getName()); - return true; - } - return false; - }); - - // synchronized to use the lastEvent and lastExceptionEvent + inputPendingQueue.discardEventsOfPipe(pipeNameToDrop); + + // synchronized to use the lastEvent & lastExceptionEvent synchronized (this) { // Here we discard the last event, and re-submit the pipe task to avoid that the pipe task has // stopped submission but will not be stopped by critical exceptions, because when it acquires diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java index a169cb6a338..6b33c248bfb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.task.subtask.connector; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; +import org.apache.iotdb.commons.pipe.task.connection.BlockingPendingQueue; import org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue; import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent; import org.apache.iotdb.db.pipe.metric.PipeDataRegionEventCounter; @@ -33,7 +34,6 @@ import java.util.concurrent.BlockingDeque; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; -import java.util.function.Predicate; public class PipeRealtimePriorityBlockingQueue extends UnboundedBlockingPendingQueue<Event> { @@ -153,9 +153,35 @@ public class PipeRealtimePriorityBlockingQueue extends UnboundedBlockingPendingQ } @Override - public void removeIf(final Predicate<? super Event> filter) { - super.removeIf(filter); - pendingQueue.removeIf(filter); + public void discardAllEvents() { + super.discardAllEvents(); + tsfileInsertEventDeque.removeIf( + event -> { + if (event instanceof EnrichedEvent) { + if (((EnrichedEvent) event).clearReferenceCount(BlockingPendingQueue.class.getName())) { + eventCounter.decreaseEventCount(event); + } + } + return true; + }); + eventCounter.reset(); + } + + @Override + public void discardEventsOfPipe(final String pipeNameToDrop) { + super.discardEventsOfPipe(pipeNameToDrop); + tsfileInsertEventDeque.removeIf( + event -> { + if (event instanceof EnrichedEvent + && pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName())) { + if (((EnrichedEvent) event) + .clearReferenceCount(PipeRealtimePriorityBlockingQueue.class.getName())) { + eventCounter.decreaseEventCount(event); + } + return true; + } + return false; + }); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java index 812b47a978a..ff17a11d8b1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java @@ -129,7 +129,7 @@ public class PipeProcessorSubtask extends PipeReportableSubtask { return false; } - outputEventCollector.resetCollectInvocationCountAndGenerateFlag(); + outputEventCollector.resetFlags(); try { // event can be supplied after the subtask is closed, so we need to check isClosed here if (!isClosed.get()) { @@ -168,6 +168,9 @@ public class PipeProcessorSubtask extends PipeReportableSubtask { // of the event must be zero in the processor stage, at this time, the progress of the // event needs to be reported. && outputEventCollector.hasNoGeneratedEvent() + // If the event's reference count cannot be increased, it means that the event has + // been released, and the progress of the event can not be reported. + && !outputEventCollector.isFailedToIncreaseReferenceCount() // Events generated from consensusPipe's transferred data should never be reported. && !(pipeProcessor instanceof PipeConsensusProcessor); if (shouldReport @@ -182,7 +185,7 @@ public class PipeProcessorSubtask extends PipeReportableSubtask { PipeEventCommitManager.getInstance() .enrichWithCommitterKeyAndCommitId((EnrichedEvent) event, creationTime, regionId); } - decreaseReferenceCountAndReleaseLastEvent(shouldReport); + decreaseReferenceCountAndReleaseLastEvent(event, shouldReport); } catch (final PipeRuntimeOutOfMemoryCriticalException e) { LOGGER.info( "Temporarily out of memory in pipe event processing, will wait for the memory to release.", @@ -201,7 +204,7 @@ public class PipeProcessorSubtask extends PipeReportableSubtask { e); } else { LOGGER.info("Exception in pipe event processing, ignored because pipe is dropped.", e); - clearReferenceCountAndReleaseLastEvent(); + clearReferenceCountAndReleaseLastEvent(event); } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index d3dba643e26..ef8f013f7a1 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -208,6 +208,7 @@ public class CommonConfig { private long pipeSubtaskExecutorBasicCheckPointIntervalByTimeDuration = 10 * 1000L; private long pipeSubtaskExecutorPendingQueueMaxBlockingTimeMs = 1000; private long pipeSubtaskExecutorCronHeartbeatEventIntervalSeconds = 20; + private long pipeSubtaskExecutorForcedRestartIntervalMs = Long.MAX_VALUE; private int pipeExtractorAssignerDisruptorRingBufferSize = 65536; private long pipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes = 50; // 50B @@ -883,6 +884,15 @@ public class CommonConfig { pipeSubtaskExecutorCronHeartbeatEventIntervalSeconds; } + public long getPipeSubtaskExecutorForcedRestartIntervalMs() { + return pipeSubtaskExecutorForcedRestartIntervalMs; + } + + public void setPipeSubtaskExecutorForcedRestartIntervalMs( + long pipeSubtaskExecutorForcedRestartIntervalMs) { + this.pipeSubtaskExecutorForcedRestartIntervalMs = pipeSubtaskExecutorForcedRestartIntervalMs; + } + public int getPipeRealTimeQueuePollHistoryThreshold() { return pipeRealTimeQueuePollHistoryThreshold; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java index 5183ab1b776..76e0a853dc2 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java @@ -318,6 +318,11 @@ public class CommonDescriptor { properties.getProperty( "pipe_subtask_executor_cron_heartbeat_event_interval_seconds", String.valueOf(config.getPipeSubtaskExecutorCronHeartbeatEventIntervalSeconds())))); + config.setPipeSubtaskExecutorForcedRestartIntervalMs( + Long.parseLong( + properties.getProperty( + "pipe_subtask_executor_forced_restart_interval_ms", + String.valueOf(config.getPipeSubtaskExecutorForcedRestartIntervalMs())))); config.setPipeExtractorAssignerDisruptorRingBufferSize( Integer.parseInt( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java index fc9fb0911a0..449bd149e67 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java @@ -92,6 +92,10 @@ public class PipeConfig { return COMMON_CONFIG.getPipeSubtaskExecutorCronHeartbeatEventIntervalSeconds(); } + public long getPipeSubtaskExecutorForcedRestartIntervalMs() { + return COMMON_CONFIG.getPipeSubtaskExecutorForcedRestartIntervalMs(); + } + /////////////////////////////// Extractor /////////////////////////////// public int getPipeExtractorAssignerDisruptorRingBufferSize() { @@ -341,6 +345,9 @@ public class PipeConfig { LOGGER.info( "PipeSubtaskExecutorCronHeartbeatEventIntervalSeconds: {}", getPipeSubtaskExecutorCronHeartbeatEventIntervalSeconds()); + LOGGER.info( + "PipeSubtaskExecutorForcedRestartIntervalMs: {}", + getPipeSubtaskExecutorForcedRestartIntervalMs()); LOGGER.info( "PipeExtractorAssignerDisruptorRingBufferSize: {}", diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java index cbdbef1e4e9..57a2ee6a6eb 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java @@ -67,7 +67,7 @@ public abstract class EnrichedEvent implements Event { protected boolean isPatternParsed; protected boolean isTimeParsed; - protected boolean shouldReportOnCommit = true; + protected volatile boolean shouldReportOnCommit = true; protected List<Supplier<Void>> onCommittedHooks = new ArrayList<>(); protected EnrichedEvent( @@ -106,29 +106,32 @@ public abstract class EnrichedEvent implements Event { * {@code false} otherwise; {@link EnrichedEvent#referenceCount} will be incremented * regardless of the circumstances */ - public boolean increaseReferenceCount(final String holderMessage) { + public synchronized boolean increaseReferenceCount(final String holderMessage) { boolean isSuccessful = true; - synchronized (this) { - if (isReleased.get()) { - LOGGER.warn( - "re-increase reference count to event that has already been released: {}, stack trace: {}", - coreReportMessage(), - Thread.currentThread().getStackTrace()); - isSuccessful = false; - // Here we still increase the reference count, to remain consistent with the behavior after - // internal increase failure. - referenceCount.incrementAndGet(); - } else { - if (referenceCount.get() == 0) { - // We assume that this function will not throw any exceptions. - isSuccessful = internallyIncreaseResourceReferenceCount(holderMessage); - } - referenceCount.incrementAndGet(); - } + + if (isReleased.get()) { + LOGGER.warn( + "re-increase reference count to event that has already been released: {}, stack trace: {}", + coreReportMessage(), + Thread.currentThread().getStackTrace()); + isSuccessful = false; + return isSuccessful; } - if (!isSuccessful) { - LOGGER.warn("increase reference count failed, EnrichedEvent: {}", coreReportMessage()); + + if (referenceCount.get() == 0) { + // We assume that this function will not throw any exceptions. + isSuccessful = internallyIncreaseResourceReferenceCount(holderMessage); + } + + if (isSuccessful) { + referenceCount.incrementAndGet(); + } else { + LOGGER.warn( + "increase reference count failed, EnrichedEvent: {}, stack trace: {}", + coreReportMessage(), + Thread.currentThread().getStackTrace()); } + return isSuccessful; } @@ -156,31 +159,53 @@ public abstract class EnrichedEvent implements Event { * {@code false} otherwise; {@link EnrichedEvent#referenceCount} will be decremented * regardless of the circumstances */ - public boolean decreaseReferenceCount(final String holderMessage, final boolean shouldReport) { + public synchronized boolean decreaseReferenceCount( + final String holderMessage, final boolean shouldReport) { boolean isSuccessful = true; - synchronized (this) { - if (referenceCount.get() == 1 && !isReleased.get()) { - // We assume that this function will not throw any exceptions. - isSuccessful = internallyDecreaseResourceReferenceCount(holderMessage); - if (!shouldReport) { - shouldReportOnCommit = false; - } - PipeEventCommitManager.getInstance().commit(this, committerKey); + + if (isReleased.get()) { + LOGGER.warn( + "decrease reference count to event that has already been released: {}, stack trace: {}", + coreReportMessage(), + Thread.currentThread().getStackTrace()); + isSuccessful = false; + return isSuccessful; + } + + if (referenceCount.get() == 1) { + // We assume that this function will not throw any exceptions. + if (!internallyDecreaseResourceReferenceCount(holderMessage)) { + LOGGER.warn( + "resource reference count is decreased to 0, but failed to release the resource, EnrichedEvent: {}, stack trace: {}", + coreReportMessage(), + Thread.currentThread().getStackTrace()); } - final int newReferenceCount = referenceCount.decrementAndGet(); - if (newReferenceCount == 0) { - isReleased.set(true); + if (!shouldReport) { + shouldReportOnCommit = false; } + PipeEventCommitManager.getInstance().commit(this, committerKey); + } + + // No matter whether the resource is released, we should decrease the reference count. + final int newReferenceCount = referenceCount.decrementAndGet(); + if (newReferenceCount <= 0) { + isReleased.set(true); + isSuccessful = newReferenceCount == 0; if (newReferenceCount < 0) { - LOGGER.debug( + LOGGER.warn( "reference count is decreased to {}, event: {}, stack trace: {}", newReferenceCount, coreReportMessage(), Thread.currentThread().getStackTrace()); + referenceCount.set(0); } } + if (!isSuccessful) { - LOGGER.warn("decrease reference count failed, EnrichedEvent: {}", coreReportMessage()); + LOGGER.warn( + "decrease reference count failed, EnrichedEvent: {}, stack trace: {}", + coreReportMessage(), + Thread.currentThread().getStackTrace()); } return isSuccessful; } @@ -195,20 +220,29 @@ public abstract class EnrichedEvent implements Event { * {@code false} otherwise; {@link EnrichedEvent#referenceCount} will be reset to zero * regardless of the circumstances */ - public boolean clearReferenceCount(final String holderMessage) { - boolean isSuccessful = true; - synchronized (this) { - if (referenceCount.get() >= 1 && !isReleased.get()) { - // We assume that this function will not throw any exceptions. - isSuccessful = internallyDecreaseResourceReferenceCount(holderMessage); - isReleased.set(true); - } - referenceCount.set(0); + public synchronized boolean clearReferenceCount(final String holderMessage) { + if (isReleased.get()) { + LOGGER.warn( + "clear reference count to event that has already been released: {}, stack trace: {}", + coreReportMessage(), + Thread.currentThread().getStackTrace()); + return false; } - if (!isSuccessful) { - LOGGER.warn("clear reference count failed, EnrichedEvent: {}", coreReportMessage()); + + if (referenceCount.get() >= 1) { + shouldReportOnCommit = false; + // We assume that this function will not throw any exceptions. + if (!internallyDecreaseResourceReferenceCount(holderMessage)) { + LOGGER.warn( + "resource reference count is decreased to 0, but failed to release the resource, EnrichedEvent: {}, stack trace: {}", + coreReportMessage(), + Thread.currentThread().getStackTrace()); + } } - return isSuccessful; + + referenceCount.set(0); + isReleased.set(true); + return true; } /** diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/metric/PipeEventCounter.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/metric/PipeEventCounter.java index c62e1ef60ff..591ec500dae 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/metric/PipeEventCounter.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/metric/PipeEventCounter.java @@ -23,15 +23,15 @@ import org.apache.iotdb.pipe.api.event.Event; public abstract class PipeEventCounter { - public Integer getTsFileInsertionEventCount() { + public int getTsFileInsertionEventCount() { return 0; } - public Integer getTabletInsertionEventCount() { + public int getTabletInsertionEventCount() { return 0; } - public Integer getPipeHeartbeatEventCount() { + public int getPipeHeartbeatEventCount() { return 0; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/connection/BlockingPendingQueue.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/connection/BlockingPendingQueue.java index 04983a984d9..c67e80181f9 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/connection/BlockingPendingQueue.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/connection/BlockingPendingQueue.java @@ -20,6 +20,7 @@ package org.apache.iotdb.commons.pipe.task.connection; import org.apache.iotdb.commons.pipe.config.PipeConfig; +import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.pipe.metric.PipeEventCounter; import org.apache.iotdb.pipe.api.event.Event; @@ -29,7 +30,6 @@ import org.slf4j.LoggerFactory; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; -import java.util.function.Predicate; public abstract class BlockingPendingQueue<E extends Event> { @@ -40,7 +40,7 @@ public abstract class BlockingPendingQueue<E extends Event> { protected final BlockingQueue<E> pendingQueue; - private final PipeEventCounter eventCounter; + protected final PipeEventCounter eventCounter; protected BlockingPendingQueue( final BlockingQueue<E> pendingQueue, final PipeEventCounter eventCounter) { @@ -106,12 +106,36 @@ public abstract class BlockingPendingQueue<E extends Event> { eventCounter.reset(); } + /** DO NOT FORGET to set eventCounter to new value after invoking this method. */ public void forEach(final Consumer<? super E> action) { pendingQueue.forEach(action); } - public void removeIf(final Predicate<? super E> filter) { - pendingQueue.removeIf(filter); + public void discardAllEvents() { + pendingQueue.removeIf( + event -> { + if (event instanceof EnrichedEvent) { + if (((EnrichedEvent) event).clearReferenceCount(BlockingPendingQueue.class.getName())) { + eventCounter.decreaseEventCount(event); + } + } + return true; + }); + eventCounter.reset(); + } + + public void discardEventsOfPipe(final String pipeNameToDrop) { + pendingQueue.removeIf( + event -> { + if (event instanceof EnrichedEvent + && pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName())) { + if (((EnrichedEvent) event).clearReferenceCount(BlockingPendingQueue.class.getName())) { + eventCounter.decreaseEventCount(event); + } + return true; + } + return false; + }); } public boolean isEmpty() { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeAbstractConnectorSubtask.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeAbstractConnectorSubtask.java index 25ddb438806..6e520de1b76 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeAbstractConnectorSubtask.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeAbstractConnectorSubtask.java @@ -84,7 +84,7 @@ public abstract class PipeAbstractConnectorSubtask extends PipeReportableSubtask LOGGER.info( "onFailure in pipe transfer, ignored because the connector subtask is dropped.", throwable); - clearReferenceCountAndReleaseLastEvent(); + clearReferenceCountAndReleaseLastEvent(null); return; } @@ -227,7 +227,8 @@ public abstract class PipeAbstractConnectorSubtask extends PipeReportableSubtask protected synchronized void clearReferenceCountAndReleaseLastExceptionEvent() { if (lastExceptionEvent != null) { - if (lastExceptionEvent instanceof EnrichedEvent) { + if (lastExceptionEvent instanceof EnrichedEvent + && !((EnrichedEvent) lastExceptionEvent).isReleased()) { ((EnrichedEvent) lastExceptionEvent).clearReferenceCount(PipeSubtask.class.getName()); } lastExceptionEvent = null; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeReportableSubtask.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeReportableSubtask.java index c366b4d62e4..cff90b88481 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeReportableSubtask.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeReportableSubtask.java @@ -39,7 +39,7 @@ public abstract class PipeReportableSubtask extends PipeSubtask { public synchronized void onFailure(final Throwable throwable) { if (isClosed.get()) { LOGGER.info("onFailure in pipe subtask, ignored because pipe is dropped.", throwable); - clearReferenceCountAndReleaseLastEvent(); + clearReferenceCountAndReleaseLastEvent(null); return; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeSubtask.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeSubtask.java index a6f11bcae9b..493e272e799 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeSubtask.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeSubtask.java @@ -148,26 +148,47 @@ public abstract class PipeSubtask @Override public void close() { - clearReferenceCountAndReleaseLastEvent(); + clearReferenceCountAndReleaseLastEvent(null); } protected synchronized void decreaseReferenceCountAndReleaseLastEvent( - final boolean shouldReport) { + final Event actualLastEvent, final boolean shouldReport) { + // lastEvent may be set to null due to PipeConnectorSubtask#discardEventsOfPipe if (lastEvent != null) { - if (lastEvent instanceof EnrichedEvent) { + if (lastEvent instanceof EnrichedEvent && !((EnrichedEvent) lastEvent).isReleased()) { ((EnrichedEvent) lastEvent) .decreaseReferenceCount(PipeSubtask.class.getName(), shouldReport); } lastEvent = null; + return; + } + + // If lastEvent is set to null due to PipeConnectorSubtask#discardEventsOfPipe (connector close) + // and finally exception occurs, we need to release the actual last event from the connector + // given by the parameter + if (actualLastEvent instanceof EnrichedEvent + && !((EnrichedEvent) actualLastEvent).isReleased()) { + ((EnrichedEvent) actualLastEvent) + .decreaseReferenceCount(PipeSubtask.class.getName(), shouldReport); } } - protected synchronized void clearReferenceCountAndReleaseLastEvent() { + protected synchronized void clearReferenceCountAndReleaseLastEvent(final Event actualLastEvent) { + // lastEvent may be set to null due to PipeConnectorSubtask#discardEventsOfPipe if (lastEvent != null) { - if (lastEvent instanceof EnrichedEvent) { + if (lastEvent instanceof EnrichedEvent && !((EnrichedEvent) lastEvent).isReleased()) { ((EnrichedEvent) lastEvent).clearReferenceCount(PipeSubtask.class.getName()); } lastEvent = null; + return; + } + + // If lastEvent is set to null due to PipeConnectorSubtask#discardEventsOfPipe (connector close) + // and finally exception occurs, we need to release the actual last event from the connector + // given by the parameter + if (actualLastEvent instanceof EnrichedEvent + && !((EnrichedEvent) actualLastEvent).isReleased()) { + ((EnrichedEvent) actualLastEvent).clearReferenceCount(PipeSubtask.class.getName()); } }
