This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e9c43479c8 Pipe: Modify epoch status metric changes (#16272)
8e9c43479c8 is described below

commit 8e9c43479c8538af23b376f43023510d7ee6c525
Author: Zhenyu Luo <[email protected]>
AuthorDate: Wed Aug 27 16:31:38 2025 +0800

    Pipe: Modify epoch status metric changes (#16272)
    
    * Pipe: Modify epoch status metric changes
    
    * fix
---
 .../PipeRealtimeDataRegionHybridSource.java        |  1 +
 .../realtime/PipeRealtimeDataRegionLogSource.java  |  1 +
 .../realtime/PipeRealtimeDataRegionSource.java     | 15 ++++++++++++
 .../PipeRealtimeDataRegionTsFileSource.java        |  2 ++
 .../dataregion/realtime/epoch/TsFileEpoch.java     | 28 +++++++++++++++++++---
 .../realtime/epoch/TsFileEpochManager.java         |  3 ---
 6 files changed, 44 insertions(+), 6 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
index 25256238794..7cf476a66cb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
@@ -53,6 +53,7 @@ public class PipeRealtimeDataRegionHybridSource extends 
PipeRealtimeDataRegionSo
       extractTabletInsertion(event);
     } else if (eventToExtract instanceof TsFileInsertionEvent) {
       extractTsFileInsertion(event);
+      event.getTsFileEpoch().clearState(this);
     } else if (eventToExtract instanceof PipeHeartbeatEvent) {
       extractHeartbeat(event);
     } else if (eventToExtract instanceof PipeDeleteDataNodeEvent) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionLogSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionLogSource.java
index def6649b943..35a2c7190c8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionLogSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionLogSource.java
@@ -47,6 +47,7 @@ public class PipeRealtimeDataRegionLogSource extends 
PipeRealtimeDataRegionSourc
       extractTabletInsertion(event);
     } else if (eventToExtract instanceof TsFileInsertionEvent) {
       extractTsFileInsertion(event);
+      event.getTsFileEpoch().clearState(this);
     } else if (eventToExtract instanceof PipeHeartbeatEvent) {
       extractHeartbeat(event);
     } else if (eventToExtract instanceof PipeDeleteDataNodeEvent) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
index 7c547e94407..cf29544744d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
@@ -63,6 +63,7 @@ import java.util.List;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
@@ -121,6 +122,8 @@ public abstract class PipeRealtimeDataRegionSource 
implements PipeExtractor {
   private boolean sloppyTimeRange; // true to disable time range filter after 
extraction
   private boolean sloppyPattern; // true to disable pattern filter after 
extraction
 
+  private AtomicLong extractEpochSize = new AtomicLong();
+
   // This queue is used to store pending events extracted by the method 
extract(). The method
   // supply() will poll events from this queue and send them to the next pipe 
plugin.
   protected final UnboundedBlockingPendingQueue<Event> pendingQueue =
@@ -646,4 +649,16 @@ public abstract class PipeRealtimeDataRegionSource 
implements PipeExtractor {
   public String getTaskID() {
     return taskID;
   }
+
+  public void increaseExtractEpochSize() {
+    extractEpochSize.incrementAndGet();
+  }
+
+  public void decreaseExtractEpochSize() {
+    extractEpochSize.decrementAndGet();
+  }
+
+  public boolean extractEpochSizeIsEmpty() {
+    return extractEpochSize.get() == 0;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
index f666589dda3..8f74bc63fb3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
@@ -74,6 +74,8 @@ public class PipeRealtimeDataRegionTsFileSource extends 
PipeRealtimeDataRegionSo
       // Ignore the event.
       
event.decreaseReferenceCount(PipeRealtimeDataRegionTsFileSource.class.getName(),
 false);
     }
+
+    event.getTsFileEpoch().clearState(this);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/epoch/TsFileEpoch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/epoch/TsFileEpoch.java
index b1ba0d37084..1f9f7d178f0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/epoch/TsFileEpoch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/epoch/TsFileEpoch.java
@@ -23,6 +23,7 @@ import 
org.apache.iotdb.db.pipe.metric.source.PipeDataRegionSourceMetrics;
 import 
org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegionSource;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 
+import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicLong;
@@ -49,9 +50,30 @@ public class TsFileEpoch {
 
   public void migrateState(
       final PipeRealtimeDataRegionSource extractor, final 
TsFileEpochStateMigrator visitor) {
-    dataRegionExtractor2State
-        .computeIfAbsent(extractor, o -> new AtomicReference<>(State.EMPTY))
-        .getAndUpdate(visitor::migrate);
+    AtomicReference<State> stateRef = dataRegionExtractor2State.get(extractor);
+
+    if (stateRef == null) {
+      dataRegionExtractor2State.putIfAbsent(
+          extractor, stateRef = new AtomicReference<>(State.EMPTY));
+      extractor.increaseExtractEpochSize();
+      setExtractorsRecentProcessedTsFileEpochState();
+    }
+
+    State migratedState = visitor.migrate(stateRef.get());
+    if (!Objects.equals(stateRef.get(), migratedState)) {
+      stateRef.set(migratedState);
+      setExtractorsRecentProcessedTsFileEpochState();
+    }
+  }
+
+  public void clearState(final PipeRealtimeDataRegionSource extractor) {
+    if (dataRegionExtractor2State.containsKey(extractor)) {
+      extractor.decreaseExtractEpochSize();
+    }
+    if (extractor.extractEpochSizeIsEmpty()) {
+      PipeDataRegionSourceMetrics.getInstance()
+          .setRecentProcessedTsFileEpochState(extractor.getTaskID(), 
State.EMPTY);
+    }
   }
 
   public void setExtractorsRecentProcessedTsFileEpochState() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/epoch/TsFileEpochManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/epoch/TsFileEpochManager.java
index 10951b2db8f..887321add22 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/epoch/TsFileEpochManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/epoch/TsFileEpochManager.java
@@ -61,9 +61,6 @@ public class TsFileEpochManager {
         });
 
     final TsFileEpoch epoch = filePath2Epoch.remove(filePath);
-    // When all data corresponding to this TsFileEpoch have been extracted, 
update the state
-    // of the extractors processing this TsFileEpoch.
-    epoch.setExtractorsRecentProcessedTsFileEpochState();
 
     LOGGER.info("All data in TsFileEpoch {} was extracted", epoch);
     return new PipeRealtimeEvent(

Reply via email to