This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 73f4e7b2494 PipeConsensus: always execute flush for historical data
extraction of consensus pipe to reduce data sync delay (#14132) (#14154)
73f4e7b2494 is described below
commit 73f4e7b249422fa7fcace29c2b698beb1c559451
Author: V_Galaxy <[email protected]>
AuthorDate: Thu Nov 21 16:50:50 2024 +0800
PipeConsensus: always execute flush for historical data extraction of
consensus pipe to reduce data sync delay (#14132) (#14154)
---
.../PipeHistoricalDataRegionTsFileExtractor.java | 51 ++++++++++++++--------
1 file changed, 34 insertions(+), 17 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
index 71f62e98871..80d43a320b3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.StateProgressIndex;
import
org.apache.iotdb.commons.consensus.index.impl.TimeWindowStateProgressIndex;
import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant;
import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
@@ -395,23 +396,39 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
final long startHistoricalExtractionTime = System.currentTimeMillis();
try {
LOGGER.info("Pipe {}@{}: start to flush data region", pipeName,
dataRegionId);
- synchronized (DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP) {
- final long lastFlushedByPipeTime =
- DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.get(dataRegionId);
- if (System.currentTimeMillis() - lastFlushedByPipeTime >=
PIPE_MIN_FLUSH_INTERVAL_IN_MS) {
- dataRegion.syncCloseAllWorkingTsFileProcessors();
- DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.replace(dataRegionId,
System.currentTimeMillis());
- LOGGER.info(
- "Pipe {}@{}: finish to flush data region, took {} ms",
- pipeName,
- dataRegionId,
- System.currentTimeMillis() - startHistoricalExtractionTime);
- } else {
- LOGGER.info(
- "Pipe {}@{}: skip to flush data region, last flushed time {} ms
ago",
- pipeName,
- dataRegionId,
- System.currentTimeMillis() - lastFlushedByPipeTime);
+
+ // Consider the scenario: a consensus pipe comes to the same region,
followed by another pipe
+ // **immediately**, the latter pipe will skip the flush operation.
+ // Since a large number of consensus pipes are not created at the same
time, resulting in no
+ // serious waiting for locks. Therefore, the flush operation is always
performed for the
+ // consensus pipe, and the lastFlushed timestamp is not updated here.
+ if (pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)) {
+ dataRegion.syncCloseAllWorkingTsFileProcessors();
+ LOGGER.info(
+ "Pipe {}@{}: finish to flush data region, took {} ms",
+ pipeName,
+ dataRegionId,
+ System.currentTimeMillis() - startHistoricalExtractionTime);
+ } else {
+ synchronized (DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP) {
+ final long lastFlushedByPipeTime =
+ DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.get(dataRegionId);
+ if (System.currentTimeMillis() - lastFlushedByPipeTime >=
PIPE_MIN_FLUSH_INTERVAL_IN_MS) {
+ dataRegion.syncCloseAllWorkingTsFileProcessors();
+ DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.replace(
+ dataRegionId, System.currentTimeMillis());
+ LOGGER.info(
+ "Pipe {}@{}: finish to flush data region, took {} ms",
+ pipeName,
+ dataRegionId,
+ System.currentTimeMillis() - startHistoricalExtractionTime);
+ } else {
+ LOGGER.info(
+ "Pipe {}@{}: skip to flush data region, last flushed time {}
ms ago",
+ pipeName,
+ dataRegionId,
+ System.currentTimeMillis() - lastFlushedByPipeTime);
+ }
}
}