This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 51da71f73bd Pipe: Fix epoch status metric (#16355)
51da71f73bd is described below

commit 51da71f73bd0ef2558b7c9c4ba151a75c29841c9
Author: nanxiang xia <[email protected]>
AuthorDate: Mon Sep 8 16:09:38 2025 +0800

    Pipe: Fix epoch status metric (#16355)
    
    * fix
    
    * fix
---
 .../pipe/source/dataregion/realtime/epoch/TsFileEpoch.java  | 13 ++++++++++---
 1 file changed, 10 insertions(+), 3 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/epoch/TsFileEpoch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/epoch/TsFileEpoch.java
index 1f9f7d178f0..940086bb8d6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/epoch/TsFileEpoch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/epoch/TsFileEpoch.java
@@ -43,9 +43,16 @@ public class TsFileEpoch {
   }
 
   public TsFileEpoch.State getState(final PipeRealtimeDataRegionSource 
extractor) {
-    return dataRegionExtractor2State
-        .computeIfAbsent(extractor, o -> new AtomicReference<>(State.EMPTY))
-        .get();
+    AtomicReference<State> stateRef = dataRegionExtractor2State.get(extractor);
+
+    if (stateRef == null) {
+      dataRegionExtractor2State.putIfAbsent(
+          extractor, stateRef = new AtomicReference<>(State.EMPTY));
+      extractor.increaseExtractEpochSize();
+      setExtractorsRecentProcessedTsFileEpochState();
+    }
+
+    return stateRef.get();
   }
 
   public void migrateState(

Reply via email to