This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 534e8a65c04 [To dev/1.3] Pipe: Fix epoch status metric (#16355)
(#16366)
534e8a65c04 is described below
commit 534e8a65c0456b675b935bae41939319ffdbac18
Author: nanxiang xia <[email protected]>
AuthorDate: Mon Sep 8 16:20:54 2025 +0800
[To dev/1.3] Pipe: Fix epoch status metric (#16355) (#16366)
* fix
* fix
---
.../pipe/source/dataregion/realtime/epoch/TsFileEpoch.java | 13 ++++++++++---
1 file changed, 10 insertions(+), 3 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/epoch/TsFileEpoch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/epoch/TsFileEpoch.java
index 1f9f7d178f0..940086bb8d6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/epoch/TsFileEpoch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/epoch/TsFileEpoch.java
@@ -43,9 +43,16 @@ public class TsFileEpoch {
}
public TsFileEpoch.State getState(final PipeRealtimeDataRegionSource
extractor) {
- return dataRegionExtractor2State
- .computeIfAbsent(extractor, o -> new AtomicReference<>(State.EMPTY))
- .get();
+ AtomicReference<State> stateRef = dataRegionExtractor2State.get(extractor);
+
+ if (stateRef == null) {
+ dataRegionExtractor2State.putIfAbsent(
+ extractor, stateRef = new AtomicReference<>(State.EMPTY));
+ extractor.increaseExtractEpochSize();
+ setExtractorsRecentProcessedTsFileEpochState();
+ }
+
+ return stateRef.get();
}
public void migrateState(