This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 51da71f73bd Pipe: Fix epoch status metric (#16355)
51da71f73bd is described below
commit 51da71f73bd0ef2558b7c9c4ba151a75c29841c9
Author: nanxiang xia <[email protected]>
AuthorDate: Mon Sep 8 16:09:38 2025 +0800
Pipe: Fix epoch status metric (#16355)
* fix
* fix
---
.../pipe/source/dataregion/realtime/epoch/TsFileEpoch.java | 13 ++++++++++---
1 file changed, 10 insertions(+), 3 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/epoch/TsFileEpoch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/epoch/TsFileEpoch.java
index 1f9f7d178f0..940086bb8d6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/epoch/TsFileEpoch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/epoch/TsFileEpoch.java
@@ -43,9 +43,16 @@ public class TsFileEpoch {
}
public TsFileEpoch.State getState(final PipeRealtimeDataRegionSource
extractor) {
- return dataRegionExtractor2State
- .computeIfAbsent(extractor, o -> new AtomicReference<>(State.EMPTY))
- .get();
+ AtomicReference<State> stateRef = dataRegionExtractor2State.get(extractor);
+
+ if (stateRef == null) {
+ dataRegionExtractor2State.putIfAbsent(
+ extractor, stateRef = new AtomicReference<>(State.EMPTY));
+ extractor.increaseExtractEpochSize();
+ setExtractorsRecentProcessedTsFileEpochState();
+ }
+
+ return stateRef.get();
}
public void migrateState(