This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new dc870f0c757 [To dev/1.3] Pipe: Modify epoch status metric changes 
(#16272) (#16304)
dc870f0c757 is described below

commit dc870f0c757b59cf87bfe24742eae862faf32f38
Author: Zhenyu Luo <[email protected]>
AuthorDate: Fri Aug 29 17:48:21 2025 +0800

    [To dev/1.3] Pipe: Modify epoch status metric changes (#16272) (#16304)
    
    * Pipe: Modify epoch status metric changes
    
    * fix
    
    (cherry picked from commit 8e9c43479c8538af23b376f43023510d7ee6c525)
---
 .../PipeRealtimeDataRegionHybridSource.java        |  1 +
 .../realtime/PipeRealtimeDataRegionLogSource.java  |  1 +
 .../realtime/PipeRealtimeDataRegionSource.java     | 15 ++++++++++++
 .../PipeRealtimeDataRegionTsFileSource.java        |  2 ++
 .../dataregion/realtime/epoch/TsFileEpoch.java     | 28 +++++++++++++++++++---
 .../realtime/epoch/TsFileEpochManager.java         |  3 ---
 6 files changed, 44 insertions(+), 6 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
index 081f3652047..347c936cc8f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
@@ -53,6 +53,7 @@ public class PipeRealtimeDataRegionHybridSource extends 
PipeRealtimeDataRegionSo
       extractTabletInsertion(event);
     } else if (eventToExtract instanceof TsFileInsertionEvent) {
       extractTsFileInsertion(event);
+      event.getTsFileEpoch().clearState(this);
     } else if (eventToExtract instanceof PipeHeartbeatEvent) {
       extractHeartbeat(event);
     } else if (eventToExtract instanceof PipeSchemaRegionWritePlanEvent) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionLogSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionLogSource.java
index ad40109d9fb..ac7883fd2b0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionLogSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionLogSource.java
@@ -47,6 +47,7 @@ public class PipeRealtimeDataRegionLogSource extends 
PipeRealtimeDataRegionSourc
       extractTabletInsertion(event);
     } else if (eventToExtract instanceof TsFileInsertionEvent) {
       extractTsFileInsertion(event);
+      event.getTsFileEpoch().clearState(this);
     } else if (eventToExtract instanceof PipeHeartbeatEvent) {
       extractHeartbeat(event);
     } else if (eventToExtract instanceof PipeSchemaRegionWritePlanEvent) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
index 4f30452fb8b..b9bc57b977a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
@@ -61,6 +61,7 @@ import java.util.List;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
@@ -113,6 +114,8 @@ public abstract class PipeRealtimeDataRegionSource 
implements PipeExtractor {
   private boolean sloppyTimeRange; // true to disable time range filter after 
extraction
   private boolean sloppyPattern; // true to disable pattern filter after 
extraction
 
+  private AtomicLong extractEpochSize = new AtomicLong();
+
   // This queue is used to store pending events extracted by the method 
extract(). The method
   // supply() will poll events from this queue and send them to the next pipe 
plugin.
   protected final UnboundedBlockingPendingQueue<Event> pendingQueue =
@@ -575,4 +578,16 @@ public abstract class PipeRealtimeDataRegionSource 
implements PipeExtractor {
   public String getTaskID() {
     return taskID;
   }
+
+  public void increaseExtractEpochSize() {
+    extractEpochSize.incrementAndGet();
+  }
+
+  public void decreaseExtractEpochSize() {
+    extractEpochSize.decrementAndGet();
+  }
+
+  public boolean extractEpochSizeIsEmpty() {
+    return extractEpochSize.get() == 0;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
index a4473bcfb27..94e7b549c3f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
@@ -74,6 +74,8 @@ public class PipeRealtimeDataRegionTsFileSource extends 
PipeRealtimeDataRegionSo
       // Ignore the event.
       
event.decreaseReferenceCount(PipeRealtimeDataRegionTsFileSource.class.getName(),
 false);
     }
+
+    event.getTsFileEpoch().clearState(this);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/epoch/TsFileEpoch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/epoch/TsFileEpoch.java
index b1ba0d37084..1f9f7d178f0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/epoch/TsFileEpoch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/epoch/TsFileEpoch.java
@@ -23,6 +23,7 @@ import 
org.apache.iotdb.db.pipe.metric.source.PipeDataRegionSourceMetrics;
 import 
org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegionSource;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 
+import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicLong;
@@ -49,9 +50,30 @@ public class TsFileEpoch {
 
   public void migrateState(
       final PipeRealtimeDataRegionSource extractor, final 
TsFileEpochStateMigrator visitor) {
-    dataRegionExtractor2State
-        .computeIfAbsent(extractor, o -> new AtomicReference<>(State.EMPTY))
-        .getAndUpdate(visitor::migrate);
+    AtomicReference<State> stateRef = dataRegionExtractor2State.get(extractor);
+
+    if (stateRef == null) {
+      dataRegionExtractor2State.putIfAbsent(
+          extractor, stateRef = new AtomicReference<>(State.EMPTY));
+      extractor.increaseExtractEpochSize();
+      setExtractorsRecentProcessedTsFileEpochState();
+    }
+
+    State migratedState = visitor.migrate(stateRef.get());
+    if (!Objects.equals(stateRef.get(), migratedState)) {
+      stateRef.set(migratedState);
+      setExtractorsRecentProcessedTsFileEpochState();
+    }
+  }
+
+  public void clearState(final PipeRealtimeDataRegionSource extractor) {
+    if (dataRegionExtractor2State.containsKey(extractor)) {
+      extractor.decreaseExtractEpochSize();
+    }
+    if (extractor.extractEpochSizeIsEmpty()) {
+      PipeDataRegionSourceMetrics.getInstance()
+          .setRecentProcessedTsFileEpochState(extractor.getTaskID(), 
State.EMPTY);
+    }
   }
 
   public void setExtractorsRecentProcessedTsFileEpochState() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/epoch/TsFileEpochManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/epoch/TsFileEpochManager.java
index 675a5b385de..003260ef1cc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/epoch/TsFileEpochManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/epoch/TsFileEpochManager.java
@@ -60,9 +60,6 @@ public class TsFileEpochManager {
         });
 
     final TsFileEpoch epoch = filePath2Epoch.remove(filePath);
-    // When all data corresponding to this TsFileEpoch have been extracted, 
update the state
-    // of the extractors processing this TsFileEpoch.
-    epoch.setExtractorsRecentProcessedTsFileEpochState();
 
     LOGGER.info("All data in TsFileEpoch {} was extracted", epoch);
     return new PipeRealtimeEvent(

Reply via email to