This is an automated email from the ASF dual-hosted git repository. Caideyipi pushed a commit to branch log-opt in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 7b60882550c5d2175aa7f3f8e15e55e8296f10ff Author: Caideyipi <[email protected]> AuthorDate: Tue May 19 11:39:13 2026 +0800 add --- .../runtime/heartbeat/PipeHeartbeatParser.java | 13 ++ .../agent/task/connection/PipeEventCollector.java | 9 ++ .../subtask/processor/PipeProcessorSubtask.java | 7 + .../event/common/terminate/PipeTerminateEvent.java | 179 +++++++++++++++++++++ ...istoricalDataRegionTsFileAndDeletionSource.java | 26 +++ .../pipe/receiver/PipeReceiverStatusHandler.java | 29 ++++ 6 files changed, 263 insertions(+) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java index 423353dd1fb..597120ffe1c 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java @@ -157,11 +157,24 @@ public class PipeHeartbeatParser { if (Boolean.TRUE.equals(isPipeCompletedFromAgent)) { temporaryMeta.markDataNodeCompleted(nodeId); + LOGGER.info( + "Detected historical pipe completion report from DataNode {} for pipe {}. remainingEventCount: {}, remainingTime: {}, completedDataNodes: {}", + nodeId, + staticMeta.getPipeName(), + pipeHeartbeat.getRemainingEventCount(staticMeta), + pipeHeartbeat.getRemainingTime(staticMeta), + temporaryMeta.getCompletedDataNodeIds()); final Set<Integer> uncompletedDataNodeIds = configManager.getNodeManager().getRegisteredDataNodeLocations().keySet(); uncompletedDataNodeIds.removeAll(temporaryMeta.getCompletedDataNodeIds()); if (uncompletedDataNodeIds.isEmpty()) { + LOGGER.info( + "All DataNodes reported historical pipe {} completed. globalRemainingEventCount: {}, globalRemainingTime: {}, staticMeta: {}", + staticMeta.getPipeName(), + temporaryMeta.getGlobalRemainingEvents(), + temporaryMeta.getGlobalRemainingTime(), + staticMeta); pipeTaskInfo.get().removePipeMeta(staticMeta.getPipeName()); LOGGER.info( ManagerMessages.DETECTED_COMPLETION_OF_PIPE_STATIC_META_REMOVE_IT, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java index ad44b78042a..7e6b0aa7781 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java @@ -32,6 +32,7 @@ import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent; import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; +import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; import org.apache.iotdb.db.pipe.source.schemaregion.IoTDBSchemaRegionSource; import org.apache.iotdb.db.pipe.source.schemaregion.PipePlanTreePrivilegeParseVisitor; @@ -138,12 +139,20 @@ public class PipeEventCollector implements EventCollector { if (skipParsing || !forceTabletFormat && canSkipParsing4TsFileEvent(sourceEvent)) { collectEvent(sourceEvent); + if (sourceEvent.isGeneratedByHistoricalExtractor()) { + PipeTerminateEvent.markHistoricalTsFileUnsplit( + sourceEvent.getPipeName(), sourceEvent.getCreationTime(), regionId); + } return; } try { sourceEvent.consumeTabletInsertionEventsWithRetry( this::collectParsedRawTableEvent, "PipeEventCollector::parseAndCollectEvent"); + if (sourceEvent.isGeneratedByHistoricalExtractor()) { + PipeTerminateEvent.markHistoricalTsFileSplit( + sourceEvent.getPipeName(), sourceEvent.getCreationTime(), regionId); + } } finally { sourceEvent.close(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java index c40b37b31e5..fe9737b0d7d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java @@ -36,6 +36,7 @@ import org.apache.iotdb.db.pipe.agent.task.connection.PipeEventCollector; import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent; import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; +import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics; import org.apache.iotdb.db.pipe.metric.processor.PipeProcessorMetrics; @@ -181,6 +182,12 @@ public class PipeProcessorSubtask extends PipeReportableSubtask { } }, "PipeProcessorSubtask::executeOnce"); + if (tsFileInsertionEvent.isGeneratedByHistoricalExtractor()) { + PipeTerminateEvent.markHistoricalTsFileSplit( + tsFileInsertionEvent.getPipeName(), + tsFileInsertionEvent.getCreationTime(), + regionId); + } if (ex.get() != null) { throw ex.get(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java index 64209242cc4..12385c52b06 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java @@ -33,9 +33,16 @@ import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; import org.apache.iotdb.db.pipe.agent.task.PipeDataNodeTask; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Objects; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; /** * The {@link PipeTerminateEvent} is an {@link EnrichedEvent} that controls the termination of pipe, @@ -45,6 +52,8 @@ import java.util.concurrent.TimeUnit; */ public class PipeTerminateEvent extends EnrichedEvent { + private static final Logger LOGGER = LoggerFactory.getLogger(PipeTerminateEvent.class); + private final int dataRegionId; private final boolean shouldMark; @@ -58,6 +67,9 @@ public class PipeTerminateEvent extends EnrichedEvent { // Do not use call run policy to avoid deadlock private static final ExecutorService terminateExecutor = createTerminateExecutor(); + private static final ConcurrentMap<HistoricalTransferKey, HistoricalTransferSummaryCounter> + HISTORICAL_TRANSFER_SUMMARY_COUNTER_MAP = new ConcurrentHashMap<>(); + private static ExecutorService createTerminateExecutor() { final WrappedThreadPoolExecutor executor = new WrappedThreadPoolExecutor( @@ -145,6 +157,18 @@ public class PipeTerminateEvent extends EnrichedEvent { } public void markCompleted() { + final HistoricalTransferSummary summary = + snapshotAndClearHistoricalTransferSummary(pipeName, creationTime, dataRegionId); + if (Objects.nonNull(summary)) { + LOGGER.info( + "Pipe {}@{}: terminate event committed for historical transfer. creationTime: {}, shouldMark: {}. {}", + pipeName, + dataRegionId, + creationTime, + shouldMark, + summary.toReportMessage()); + } + // To avoid deadlock if (shouldMark) { terminateExecutor.submit( @@ -159,4 +183,159 @@ public class PipeTerminateEvent extends EnrichedEvent { + " - " + super.toString(); } + + public static void initializeHistoricalTransferSummary( + final String pipeName, + final long creationTime, + final int dataRegionId, + final long extractedHistoricalTsFileCount, + final long extractedHistoricalDeletionCount) { + HISTORICAL_TRANSFER_SUMMARY_COUNTER_MAP + .computeIfAbsent( + new HistoricalTransferKey(pipeName, creationTime, dataRegionId), + ignored -> new HistoricalTransferSummaryCounter()) + .initialize(extractedHistoricalTsFileCount, extractedHistoricalDeletionCount); + } + + public static void markHistoricalTsFileSkipped( + final String pipeName, final long creationTime, final int dataRegionId) { + getOrCreateHistoricalTransferSummaryCounter(pipeName, creationTime, dataRegionId) + .skippedHistoricalTsFileCount + .incrementAndGet(); + } + + public static void markHistoricalTsFileSplit( + final String pipeName, final long creationTime, final int dataRegionId) { + getOrCreateHistoricalTransferSummaryCounter(pipeName, creationTime, dataRegionId) + .splitHistoricalTsFileCount + .incrementAndGet(); + } + + public static void markHistoricalTsFileUnsplit( + final String pipeName, final long creationTime, final int dataRegionId) { + getOrCreateHistoricalTransferSummaryCounter(pipeName, creationTime, dataRegionId) + .unsplitHistoricalTsFileCount + .incrementAndGet(); + } + + public static HistoricalTransferSummary snapshotHistoricalTransferSummary( + final String pipeName, final long creationTime, final int dataRegionId) { + final HistoricalTransferSummaryCounter counter = + HISTORICAL_TRANSFER_SUMMARY_COUNTER_MAP.get( + new HistoricalTransferKey(pipeName, creationTime, dataRegionId)); + return Objects.nonNull(counter) ? counter.snapshot() : null; + } + + public static void clearHistoricalTransferSummary( + final String pipeName, final long creationTime, final int dataRegionId) { + HISTORICAL_TRANSFER_SUMMARY_COUNTER_MAP.remove( + new HistoricalTransferKey(pipeName, creationTime, dataRegionId)); + } + + private static HistoricalTransferSummary snapshotAndClearHistoricalTransferSummary( + final String pipeName, final long creationTime, final int dataRegionId) { + final HistoricalTransferSummaryCounter counter = + HISTORICAL_TRANSFER_SUMMARY_COUNTER_MAP.remove( + new HistoricalTransferKey(pipeName, creationTime, dataRegionId)); + return Objects.nonNull(counter) ? counter.snapshot() : null; + } + + private static HistoricalTransferSummaryCounter getOrCreateHistoricalTransferSummaryCounter( + final String pipeName, final long creationTime, final int dataRegionId) { + return HISTORICAL_TRANSFER_SUMMARY_COUNTER_MAP.computeIfAbsent( + new HistoricalTransferKey(pipeName, creationTime, dataRegionId), + ignored -> new HistoricalTransferSummaryCounter()); + } + + public static final class HistoricalTransferSummary { + + private final long extractedHistoricalTsFileCount; + private final long skippedHistoricalTsFileCount; + private final long splitHistoricalTsFileCount; + private final long unsplitHistoricalTsFileCount; + private final long extractedHistoricalDeletionCount; + + private HistoricalTransferSummary( + final long extractedHistoricalTsFileCount, + final long skippedHistoricalTsFileCount, + final long splitHistoricalTsFileCount, + final long unsplitHistoricalTsFileCount, + final long extractedHistoricalDeletionCount) { + this.extractedHistoricalTsFileCount = extractedHistoricalTsFileCount; + this.skippedHistoricalTsFileCount = skippedHistoricalTsFileCount; + this.splitHistoricalTsFileCount = splitHistoricalTsFileCount; + this.unsplitHistoricalTsFileCount = unsplitHistoricalTsFileCount; + this.extractedHistoricalDeletionCount = extractedHistoricalDeletionCount; + } + + public String toReportMessage() { + return String.format( + "historical summary: extractedTsFileCount=%s, skippedTsFileCount=%s, splitTsFileCount=%s, unsplitTsFileCount=%s, deletionCount=%s", + extractedHistoricalTsFileCount, + skippedHistoricalTsFileCount, + splitHistoricalTsFileCount, + unsplitHistoricalTsFileCount, + extractedHistoricalDeletionCount); + } + } + + private static final class HistoricalTransferSummaryCounter { + + private final AtomicLong extractedHistoricalTsFileCount = new AtomicLong(0); + private final AtomicLong skippedHistoricalTsFileCount = new AtomicLong(0); + private final AtomicLong splitHistoricalTsFileCount = new AtomicLong(0); + private final AtomicLong unsplitHistoricalTsFileCount = new AtomicLong(0); + private final AtomicLong extractedHistoricalDeletionCount = new AtomicLong(0); + + private void initialize( + final long extractedHistoricalTsFileCount, final long extractedHistoricalDeletionCount) { + this.extractedHistoricalTsFileCount.set(extractedHistoricalTsFileCount); + this.skippedHistoricalTsFileCount.set(0); + this.splitHistoricalTsFileCount.set(0); + this.unsplitHistoricalTsFileCount.set(0); + this.extractedHistoricalDeletionCount.set(extractedHistoricalDeletionCount); + } + + private HistoricalTransferSummary snapshot() { + return new HistoricalTransferSummary( + extractedHistoricalTsFileCount.get(), + skippedHistoricalTsFileCount.get(), + splitHistoricalTsFileCount.get(), + unsplitHistoricalTsFileCount.get(), + extractedHistoricalDeletionCount.get()); + } + } + + private static final class HistoricalTransferKey { + + private final String pipeName; + private final long creationTime; + private final int dataRegionId; + + private HistoricalTransferKey( + final String pipeName, final long creationTime, final int dataRegionId) { + this.pipeName = pipeName; + this.creationTime = creationTime; + this.dataRegionId = dataRegionId; + } + + @Override + public boolean equals(final Object obj) { + if (this == obj) { + return true; + } + if (!(obj instanceof HistoricalTransferKey)) { + return false; + } + final HistoricalTransferKey that = (HistoricalTransferKey) obj; + return creationTime == that.creationTime + && dataRegionId == that.dataRegionId + && Objects.equals(pipeName, that.pipeName); + } + + @Override + public int hashCode() { + return Objects.hash(pipeName, creationTime, dataRegionId); + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java index 053aac13887..e71d80a61a6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java @@ -168,6 +168,8 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource new HashMap<>(); private final Map<PersistentResource, Long> pendingResource2ReplicateIndexForIoTV2 = new HashMap<>(); + private int extractedHistoricalTsFileCount = 0; + private int extractedHistoricalDeletionCount = 0; @Override public void validate(final PipeParameterValidator validator) { @@ -488,6 +490,8 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource return; } hasBeenStarted = true; + extractedHistoricalTsFileCount = 0; + extractedHistoricalDeletionCount = 0; final DataRegion dataRegion = StorageEngine.getInstance().getDataRegion(new DataRegionId(dataRegionId)); @@ -521,6 +525,12 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource ? Long.compare(o1.getFileStartTime(), o2.getFileStartTime()) : o1.getProgressIndex().topologicalCompareTo(o2.getProgressIndex())); pendingQueue = new ArrayDeque<>(originalResourceList); + PipeTerminateEvent.initializeHistoricalTransferSummary( + pipeName, + creationTime, + dataRegionId, + extractedHistoricalTsFileCount, + extractedHistoricalDeletionCount); LOGGER.info( DataNodePipeMessages.PIPE_FINISH_TO_SORT_ALL_EXTRACTED_RESOURCES, @@ -649,6 +659,7 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource return true; } }); + extractedHistoricalTsFileCount = filteredTsFileResources2TableNames.size(); LOGGER.info( "Pipe {}@{}: finish to extract historical TsFile, extracted sequence file count {}/{}, " @@ -798,6 +809,7 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource }) .collect(Collectors.toList()); resourceList.addAll(allDeletionResources); + extractedHistoricalDeletionCount = allDeletionResources.size(); LOGGER.info( DataNodePipeMessages.PIPE_FINISH_TO_EXTRACT_DELETIONS_EXTRACT_DELETIONS, pipeName, @@ -841,6 +853,16 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource } private Event supplyTerminateEvent() { + final PipeTerminateEvent.HistoricalTransferSummary historicalTransferSummary = + PipeTerminateEvent.snapshotHistoricalTransferSummary(pipeName, creationTime, dataRegionId); + if (Objects.nonNull(historicalTransferSummary)) { + LOGGER.info( + "Pipe {}@{}: historical source has supplied all events, emitting terminate event. {}", + pipeName, + dataRegionId, + historicalTransferSummary.toReportMessage()); + } + final PipeTerminateEvent terminateEvent = new PipeTerminateEvent( pipeName, @@ -867,6 +889,7 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource } filteredTsFileResources2TableNames.remove(resource); + PipeTerminateEvent.markHistoricalTsFileSkipped(pipeName, creationTime, dataRegionId); LOGGER.info( DataNodePipeMessages.PIPE_SKIP_HISTORICAL_TSFILE_BECAUSE_REALTIME_SOURCE, pipeName, @@ -1074,6 +1097,9 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource @Override public synchronized void close() { + if (!isTerminateSignalSent) { + PipeTerminateEvent.clearHistoricalTransferSummary(pipeName, creationTime, dataRegionId); + } if (Objects.nonNull(pendingQueue)) { pendingQueue.forEach( resource -> { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java index 5cc9017df73..1a922448fc0 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java @@ -49,6 +49,7 @@ public class PipeReceiverStatusHandler { private static final String NO_PERMISSION = "No permission"; private static final String UNCLASSIFIED_EXCEPTION = "Unclassified exception"; private static final String NO_PERMISSION_STR = "No permissions for this operation"; + private static final int MAX_RECORD_MESSAGE_LENGTH_IN_LOG = 2048; private final boolean isRetryAllowedWhenConflictOccurs; private final long retryMaxMillisWhenConflictOccurs; @@ -147,6 +148,7 @@ public class PipeReceiverStatusHandler { PipeMessages.USER_CONFLICT_NOT_ALLOWED, shouldRecordIgnoredDataWhenConflictOccurs ? recordMessage : "not recorded", status); + logDiscardedUserConflictData("retry is not allowed", recordMessage, status); return; } @@ -160,6 +162,7 @@ public class PipeReceiverStatusHandler { PipeMessages.USER_CONFLICT_RETRY_TIMEOUT, shouldRecordIgnoredDataWhenConflictOccurs ? recordMessage : "not recorded", status); + logDiscardedUserConflictData("retry timeout", recordMessage, status); resetExceptionStatus(); return; } @@ -266,6 +269,32 @@ public class PipeReceiverStatusHandler { return noPermission ? NO_PERMISSION : UNCLASSIFIED_EXCEPTION; } + private void logDiscardedUserConflictData( + final String reason, final String recordMessage, final TSStatus status) { + if (!LOGGER.isWarnEnabled()) { + return; + } + + LOGGER.warn( + "User conflict exception: discarded data info because {}. data: {}. receiver message: {}. status: {}", + reason, + summarizeRecordMessage(recordMessage), + status.getMessage(), + status); + } + + private String summarizeRecordMessage(final String recordMessage) { + if (Objects.isNull(recordMessage) || recordMessage.isEmpty()) { + return "<empty>"; + } + + final String normalizedRecordMessage = + recordMessage.replace('\r', ' ').replace('\n', ' ').trim(); + return normalizedRecordMessage.length() <= MAX_RECORD_MESSAGE_LENGTH_IN_LOG + ? normalizedRecordMessage + : normalizedRecordMessage.substring(0, MAX_RECORD_MESSAGE_LENGTH_IN_LOG) + "...(truncated)"; + } + private void recordExceptionStatusIfNecessary(final String message) { if (!Objects.equals(exceptionRecordedMessage.get(), message)) { exceptionFirstEncounteredTime.set(System.currentTimeMillis());
