This is an automated email from the ASF dual-hosted git repository.
JackieTien97 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 81247b48e44 [To dev/1.3] Fixed the updateAndGet side effect in
DeviceCacheEntry and PipeRemainingOperator (#17584) (#17602)
81247b48e44 is described below
commit 81247b48e44b268b0d7091a6ff728f001b546931
Author: Caideyipi <[email protected]>
AuthorDate: Wed May 6 16:48:20 2026 +0800
[To dev/1.3] Fixed the updateAndGet side effect in DeviceCacheEntry and
PipeRemainingOperator (#17584) (#17602)
---
.../PipeConfigNodeRemainingTimeOperator.java | 24 ++++-------
.../PipeDataNodeRemainingEventAndTimeOperator.java | 48 ++++++++--------------
.../analyze/cache/schema/DeviceCacheEntry.java | 35 ++++++++--------
3 files changed, 43 insertions(+), 64 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/overview/PipeConfigNodeRemainingTimeOperator.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/overview/PipeConfigNodeRemainingTimeOperator.java
index c27e4cd898b..ff9a7ea2648 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/overview/PipeConfigNodeRemainingTimeOperator.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/overview/PipeConfigNodeRemainingTimeOperator.java
@@ -66,14 +66,11 @@ class PipeConfigNodeRemainingTimeOperator extends
PipeRemainingOperator {
.reduce(Long::sum)
.orElse(0L);
- configRegionCommitMeter.updateAndGet(
- meter -> {
- if (Objects.nonNull(meter)) {
- lastConfigRegionCommitSmoothingValue =
- pipeRemainingTimeCommitRateAverageTime.getMeterRate(meter);
- }
- return meter;
- });
+ final Meter configRegionMeter = configRegionCommitMeter.get();
+ if (Objects.nonNull(configRegionMeter)) {
+ lastConfigRegionCommitSmoothingValue =
+
pipeRemainingTimeCommitRateAverageTime.getMeterRate(configRegionMeter);
+ }
final double configRegionRemainingTime;
if (totalConfigRegionWriteEventCount <= 0) {
@@ -101,13 +98,10 @@ class PipeConfigNodeRemainingTimeOperator extends
PipeRemainingOperator {
//////////////////////////// Rate ////////////////////////////
void markConfigRegionCommit() {
- configRegionCommitMeter.updateAndGet(
- meter -> {
- if (Objects.nonNull(meter)) {
- meter.mark();
- }
- return meter;
- });
+ final Meter meter = configRegionCommitMeter.get();
+ if (Objects.nonNull(meter)) {
+ meter.mark();
+ }
}
//////////////////////////// Switch ////////////////////////////
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
index 38cfb579622..6ff3de4d949 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
@@ -138,14 +138,11 @@ public class PipeDataNodeRemainingEventAndTimeOperator
extends PipeRemainingOper
+ rawTabletEventCount.get()
+ insertNodeEventCount.get();
- dataRegionCommitMeter.updateAndGet(
- meter -> {
- if (Objects.nonNull(meter)) {
- lastDataRegionCommitSmoothingValue =
- pipeRemainingTimeCommitRateAverageTime.getMeterRate(meter);
- }
- return meter;
- });
+ final Meter dataRegionMeter = dataRegionCommitMeter.get();
+ if (Objects.nonNull(dataRegionMeter)) {
+ lastDataRegionCommitSmoothingValue =
+ pipeRemainingTimeCommitRateAverageTime.getMeterRate(dataRegionMeter);
+ }
final double dataRegionRemainingTime;
if (totalDataRegionWriteEventCount <= 0) {
dataRegionRemainingTime = 0;
@@ -162,14 +159,11 @@ public class PipeDataNodeRemainingEventAndTimeOperator
extends PipeRemainingOper
.reduce(Long::sum)
.orElse(0L);
- schemaRegionCommitMeter.updateAndGet(
- meter -> {
- if (Objects.nonNull(meter)) {
- lastSchemaRegionCommitSmoothingValue =
- pipeRemainingTimeCommitRateAverageTime.getMeterRate(meter);
- }
- return meter;
- });
+ final Meter schemaRegionMeter = schemaRegionCommitMeter.get();
+ if (Objects.nonNull(schemaRegionMeter)) {
+ lastSchemaRegionCommitSmoothingValue =
+
pipeRemainingTimeCommitRateAverageTime.getMeterRate(schemaRegionMeter);
+ }
final double schemaRegionRemainingTime;
if (totalSchemaRegionWriteEventCount <= 0) {
schemaRegionRemainingTime = 0;
@@ -199,23 +193,17 @@ public class PipeDataNodeRemainingEventAndTimeOperator
extends PipeRemainingOper
//////////////////////////// Rate ////////////////////////////
void markDataRegionCommit() {
- dataRegionCommitMeter.updateAndGet(
- meter -> {
- if (Objects.nonNull(meter)) {
- meter.mark();
- }
- return meter;
- });
+ final Meter meter = dataRegionCommitMeter.get();
+ if (Objects.nonNull(meter)) {
+ meter.mark();
+ }
}
void markSchemaRegionCommit() {
- schemaRegionCommitMeter.updateAndGet(
- meter -> {
- if (Objects.nonNull(meter)) {
- meter.mark();
- }
- return meter;
- });
+ final Meter meter = schemaRegionCommitMeter.get();
+ if (Objects.nonNull(meter)) {
+ meter.mark();
+ }
}
void markTsFileCollectInvocationCount(final long collectInvocationCount) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceCacheEntry.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceCacheEntry.java
index 3337e2f1420..ecd4c297e57 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceCacheEntry.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceCacheEntry.java
@@ -30,7 +30,6 @@ import javax.annotation.concurrent.ThreadSafe;
import java.util.Map;
import java.util.Objects;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static org.apache.iotdb.commons.schema.SchemaConstant.NON_TEMPLATE;
@@ -94,15 +93,14 @@ public class DeviceCacheEntry {
}
int invalidateSchema() {
- final AtomicInteger size = new AtomicInteger(0);
- deviceSchema.updateAndGet(
- schema -> {
- if (Objects.nonNull(schema)) {
- size.set(schema.estimateSize());
- }
- return null;
- });
- return size.get();
+ IDeviceSchema schema;
+ do {
+ schema = deviceSchema.get();
+ if (Objects.isNull(schema)) {
+ return 0;
+ }
+ } while (!deviceSchema.compareAndSet(schema, null));
+ return schema.estimateSize();
}
/////////////////////////////// Last Cache ///////////////////////////////
@@ -150,15 +148,14 @@ public class DeviceCacheEntry {
}
int invalidateLastCache() {
- final AtomicInteger size = new AtomicInteger(0);
- lastCache.updateAndGet(
- cacheEntry -> {
- if (Objects.nonNull(cacheEntry)) {
- size.set(cacheEntry.estimateSize());
- }
- return null;
- });
- return size.get();
+ DeviceLastCache cacheEntry;
+ do {
+ cacheEntry = lastCache.get();
+ if (Objects.isNull(cacheEntry)) {
+ return 0;
+ }
+ } while (!lastCache.compareAndSet(cacheEntry, null));
+ return cacheEntry.estimateSize();
}
/////////////////////////////// Management ///////////////////////////////