This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 534e8a65c04 [To dev/1.3] Pipe: Fix epoch status metric (#16355) 
(#16366)
534e8a65c04 is described below

commit 534e8a65c0456b675b935bae41939319ffdbac18
Author: nanxiang xia <[email protected]>
AuthorDate: Mon Sep 8 16:20:54 2025 +0800

    [To dev/1.3] Pipe: Fix epoch status metric (#16355) (#16366)
    
    * fix
    
    * fix
---
 .../pipe/source/dataregion/realtime/epoch/TsFileEpoch.java  | 13 ++++++++++---
 1 file changed, 10 insertions(+), 3 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/epoch/TsFileEpoch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/epoch/TsFileEpoch.java
index 1f9f7d178f0..940086bb8d6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/epoch/TsFileEpoch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/epoch/TsFileEpoch.java
@@ -43,9 +43,16 @@ public class TsFileEpoch {
   }
 
   public TsFileEpoch.State getState(final PipeRealtimeDataRegionSource 
extractor) {
-    return dataRegionExtractor2State
-        .computeIfAbsent(extractor, o -> new AtomicReference<>(State.EMPTY))
-        .get();
+    AtomicReference<State> stateRef = dataRegionExtractor2State.get(extractor);
+
+    if (stateRef == null) {
+      dataRegionExtractor2State.putIfAbsent(
+          extractor, stateRef = new AtomicReference<>(State.EMPTY));
+      extractor.increaseExtractEpochSize();
+      setExtractorsRecentProcessedTsFileEpochState();
+    }
+
+    return stateRef.get();
   }
 
   public void migrateState(

Reply via email to