This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch rc/1.3.5 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 8c694e434f6dc53787058dad603be773a34f4a1c Author: Caideyipi <[email protected]> AuthorDate: Thu Aug 7 09:28:36 2025 +0800 [To dev/1.3] Pipe: Do not use the fork join pool in TerminateEvent #16113 (#16114) * fix * optimize --- .../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 2 +- .../event/common/terminate/PipeTerminateEvent.java | 37 ++++++++++++++++++---- .../PipeHistoricalDataRegionTsFileExtractor.java | 21 ++++++++++-- .../iotdb/commons/concurrent/ThreadName.java | 1 + 4 files changed, 52 insertions(+), 9 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java index f800c15c3f8..56e4ef9159c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java @@ -132,7 +132,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { 0L, TimeUnit.SECONDS, new ArrayBlockingQueue<>( - IoTDBDescriptor.getInstance().getConfig().getSchemaThreadCount()), + IoTDBDescriptor.getInstance().getConfig().getPipeTaskThreadCount()), new IoTThreadFactory(ThreadName.PIPE_PARALLEL_EXECUTION_POOL.getName()), ThreadName.PIPE_PARALLEL_EXECUTION_POOL.getName(), new ThreadPoolExecutor.CallerRunsPolicy()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java index 91d38cf3361..3e0475a3d60 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java @@ -19,16 +19,22 @@ package org.apache.iotdb.db.pipe.event.common.terminate; +import org.apache.iotdb.commons.concurrent.IoTThreadFactory; +import org.apache.iotdb.commons.concurrent.ThreadName; +import org.apache.iotdb.commons.concurrent.threadpool.WrappedThreadPoolExecutor; import org.apache.iotdb.commons.consensus.index.ProgressIndex; import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; import org.apache.iotdb.db.pipe.agent.task.PipeDataNodeTask; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; /** * The {@link PipeTerminateEvent} is an {@link EnrichedEvent} that controls the termination of pipe, @@ -40,13 +46,29 @@ public class PipeTerminateEvent extends EnrichedEvent { private final int dataRegionId; + private final boolean shouldMark; + + // Do not use call run policy to avoid deadlock + private static final ExecutorService terminateExecutor = + new WrappedThreadPoolExecutor( + 0, + IoTDBDescriptor.getInstance().getConfig().getPipeTaskThreadCount(), + 0L, + TimeUnit.SECONDS, + new ArrayBlockingQueue<>( + IoTDBDescriptor.getInstance().getConfig().getPipeTaskThreadCount()), + new IoTThreadFactory(ThreadName.PIPE_TERMINATE_EXECUTION_POOL.getName()), + ThreadName.PIPE_TERMINATE_EXECUTION_POOL.getName()); + public PipeTerminateEvent( final String pipeName, final long creationTime, final PipeTaskMeta pipeTaskMeta, - final int dataRegionId) { + final int dataRegionId, + final boolean shouldMark) { super(pipeName, creationTime, pipeTaskMeta, null, Long.MIN_VALUE, Long.MAX_VALUE); this.dataRegionId = dataRegionId; + this.shouldMark = shouldMark; } @Override @@ -74,7 +96,7 @@ public class PipeTerminateEvent extends EnrichedEvent { final long endTime) { // Should record PipeTaskMeta, for the terminateEvent shall report progress to // notify the pipeTask it's completed. - return new PipeTerminateEvent(pipeName, creationTime, pipeTaskMeta, dataRegionId); + return new PipeTerminateEvent(pipeName, creationTime, pipeTaskMeta, dataRegionId, shouldMark); } @Override @@ -95,13 +117,16 @@ public class PipeTerminateEvent extends EnrichedEvent { @Override public void reportProgress() { // To avoid deadlock - CompletableFuture.runAsync( - () -> PipeDataNodeAgent.task().markCompleted(pipeName, dataRegionId)); + if (shouldMark) { + terminateExecutor.submit( + () -> PipeDataNodeAgent.task().markCompleted(pipeName, dataRegionId)); + } } @Override public String toString() { - return String.format("PipeTerminateEvent{dataRegionId=%s}", dataRegionId) + return String.format( + "PipeTerminateEvent{dataRegionId=%s, shouldMark=%s}", dataRegionId, shouldMark) + " - " + super.toString(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java index 25759654a5b..efd03a3709c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java @@ -77,6 +77,10 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstan import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_LOOSE_RANGE_PATH_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_LOOSE_RANGE_TIME_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_START_TIME_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_QUERY_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_SNAPSHOT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_ENABLE_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_ENABLE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_START_TIME_KEY; @@ -86,11 +90,11 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstan import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_END_TIME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_LOOSE_RANGE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_START_TIME_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODS_ENABLE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_START_TIME_KEY; public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDataRegionExtractor { - private static final Logger LOGGER = LoggerFactory.getLogger(PipeHistoricalDataRegionTsFileExtractor.class); @@ -117,6 +121,7 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa private boolean shouldExtractInsertion; private boolean shouldTransferModFile; // Whether to transfer mods + private boolean shouldTerminatePipeOnAllHistoricalEventsConsumed; private boolean isTerminateSignalSent = false; private boolean isForwardingPipeRequests; @@ -294,6 +299,13 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa || // Should extract deletion listeningOptionPair.getRight()); + final String extractorModeValue = + parameters.getStringOrDefault( + Arrays.asList(EXTRACTOR_MODE_KEY, SOURCE_MODE_KEY), EXTRACTOR_MODE_DEFAULT_VALUE); + shouldTerminatePipeOnAllHistoricalEventsConsumed = + extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_SNAPSHOT_VALUE) + || extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_QUERY_VALUE); + isForwardingPipeRequests = parameters.getBooleanOrDefault( Arrays.asList( @@ -554,7 +566,12 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa if (resource == null) { final PipeTerminateEvent terminateEvent = - new PipeTerminateEvent(pipeName, creationTime, pipeTaskMeta, dataRegionId); + new PipeTerminateEvent( + pipeName, + creationTime, + pipeTaskMeta, + dataRegionId, + shouldTerminatePipeOnAllHistoricalEventsConsumed); if (!terminateEvent.increaseReferenceCount( PipeHistoricalDataRegionTsFileExtractor.class.getName())) { LOGGER.warn( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java index 72a0e4ebb0c..6f81aaba419 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java @@ -145,6 +145,7 @@ public enum ThreadName { PIPE_RECEIVER_AIR_GAP_AGENT("Pipe-Receiver-Air-Gap-Agent"), PIPE_AIR_GAP_RECEIVER("Pipe-Air-Gap-Receiver"), PIPE_PARALLEL_EXECUTION_POOL("Pipe-Parallel-Execution-Pool"), + PIPE_TERMINATE_EXECUTION_POOL("Pipe-Terminate-Execution-Pool"), LOAD_DATATYPE_CONVERT_POOL("Load-Datatype-Convert-Pool"), SUBSCRIPTION_EXECUTOR_POOL("Subscription-Executor-Pool"), SUBSCRIPTION_RUNTIME_META_SYNCER("Subscription-Runtime-Meta-Syncer"),
