This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch pipe-precise-hybrid in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 019749426104366fe73c444619ad84fbcd47df15 Author: Steve Yurong Su <[email protected]> AuthorDate: Sat Nov 4 03:45:58 2023 +0800 Pipe: add connector's pending tsfile event count to help isTsFileEventCountInQueueExceededLimit judgement when hybrid mode is enabled --- .../pipe/event/common/heartbeat/PipeHeartbeatEvent.java | 7 ++++++- .../realtime/PipeRealtimeDataRegionHybridExtractor.java | 15 +++++++++++---- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java index b5e92f69c55..2c54d93401a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java @@ -185,7 +185,7 @@ public class PipeHeartbeatEvent extends EnrichedEvent { if (extractor instanceof PipeRealtimeDataRegionHybridExtractor) { ((PipeRealtimeDataRegionHybridExtractor) extractor) - .informEventCollectorQueueTsFileSize(bufferQueue.getTsFileInsertionEventCount()); + .informProcessorEventCollectorQueueTsFileSize(bufferQueue.getTsFileInsertionEventCount()); } } @@ -195,6 +195,11 @@ public class PipeHeartbeatEvent extends EnrichedEvent { connectorQueueTsFileSize = pendingQueue.getTsFileInsertionEventCount(); connectorQueueSize = pendingQueue.size(); } + + if (extractor instanceof PipeRealtimeDataRegionHybridExtractor) { + ((PipeRealtimeDataRegionHybridExtractor) extractor) + .informConnectorInputPendingQueueTsFileSize(pendingQueue.getTsFileInsertionEventCount()); + } } /////////////////////////////// For Hybrid extractor /////////////////////////////// diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java index 7b6e5e42dcf..3ba5a4c496f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java @@ -42,7 +42,8 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio LoggerFactory.getLogger(PipeRealtimeDataRegionHybridExtractor.class); private volatile boolean isStartedToSupply = false; - private final AtomicInteger eventCollectorQueueTsFileSize = new AtomicInteger(0); + private final AtomicInteger processorEventCollectorQueueTsFileSize = new AtomicInteger(0); + private final AtomicInteger connectorInputPendingQueueTsFileSize = new AtomicInteger(0); @Override protected void doExtract(PipeRealtimeEvent event) { @@ -228,12 +229,18 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio } private boolean isTsFileEventCountInQueueExceededLimit() { - return pendingQueue.getTsFileInsertionEventCount() + eventCollectorQueueTsFileSize.get() + return pendingQueue.getTsFileInsertionEventCount() + + processorEventCollectorQueueTsFileSize.get() + + connectorInputPendingQueueTsFileSize.get() >= PipeConfig.getInstance().getPipeMaxAllowedPendingTsFileEpochPerDataRegion(); } - public void informEventCollectorQueueTsFileSize(int queueSize) { - eventCollectorQueueTsFileSize.set(queueSize); + public void informProcessorEventCollectorQueueTsFileSize(int queueSize) { + processorEventCollectorQueueTsFileSize.set(queueSize); + } + + public void informConnectorInputPendingQueueTsFileSize(int queueSize) { + connectorInputPendingQueueTsFileSize.set(queueSize); } @Override
