This is an automated email from the ASF dual-hosted git repository.
JackieTien97 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new e631aa7c5e1 Fixed the updateAndGet side effect in
TableDeviceCacheEntry and PipeRemainingOperator
e631aa7c5e1 is described below
commit e631aa7c5e1eab268398e6a826356e8f54ac5353
Author: Caideyipi <[email protected]>
AuthorDate: Thu Apr 30 15:00:04 2026 +0800
Fixed the updateAndGet side effect in TableDeviceCacheEntry and
PipeRemainingOperator
---
.../PipeConfigNodeRemainingTimeOperator.java | 24 +++-----
.../PipeDataNodeRemainingEventAndTimeOperator.java | 48 ++++++---------
.../fetcher/cache/TableDeviceCacheEntry.java | 70 +++++++++-------------
3 files changed, 56 insertions(+), 86 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/overview/PipeConfigNodeRemainingTimeOperator.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/overview/PipeConfigNodeRemainingTimeOperator.java
index c27e4cd898b..ff9a7ea2648 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/overview/PipeConfigNodeRemainingTimeOperator.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/overview/PipeConfigNodeRemainingTimeOperator.java
@@ -66,14 +66,11 @@ class PipeConfigNodeRemainingTimeOperator extends
PipeRemainingOperator {
.reduce(Long::sum)
.orElse(0L);
- configRegionCommitMeter.updateAndGet(
- meter -> {
- if (Objects.nonNull(meter)) {
- lastConfigRegionCommitSmoothingValue =
- pipeRemainingTimeCommitRateAverageTime.getMeterRate(meter);
- }
- return meter;
- });
+ final Meter configRegionMeter = configRegionCommitMeter.get();
+ if (Objects.nonNull(configRegionMeter)) {
+ lastConfigRegionCommitSmoothingValue =
+
pipeRemainingTimeCommitRateAverageTime.getMeterRate(configRegionMeter);
+ }
final double configRegionRemainingTime;
if (totalConfigRegionWriteEventCount <= 0) {
@@ -101,13 +98,10 @@ class PipeConfigNodeRemainingTimeOperator extends
PipeRemainingOperator {
//////////////////////////// Rate ////////////////////////////
void markConfigRegionCommit() {
- configRegionCommitMeter.updateAndGet(
- meter -> {
- if (Objects.nonNull(meter)) {
- meter.mark();
- }
- return meter;
- });
+ final Meter meter = configRegionCommitMeter.get();
+ if (Objects.nonNull(meter)) {
+ meter.mark();
+ }
}
//////////////////////////// Switch ////////////////////////////
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
index 38cfb579622..6ff3de4d949 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
@@ -138,14 +138,11 @@ public class PipeDataNodeRemainingEventAndTimeOperator
extends PipeRemainingOper
+ rawTabletEventCount.get()
+ insertNodeEventCount.get();
- dataRegionCommitMeter.updateAndGet(
- meter -> {
- if (Objects.nonNull(meter)) {
- lastDataRegionCommitSmoothingValue =
- pipeRemainingTimeCommitRateAverageTime.getMeterRate(meter);
- }
- return meter;
- });
+ final Meter dataRegionMeter = dataRegionCommitMeter.get();
+ if (Objects.nonNull(dataRegionMeter)) {
+ lastDataRegionCommitSmoothingValue =
+ pipeRemainingTimeCommitRateAverageTime.getMeterRate(dataRegionMeter);
+ }
final double dataRegionRemainingTime;
if (totalDataRegionWriteEventCount <= 0) {
dataRegionRemainingTime = 0;
@@ -162,14 +159,11 @@ public class PipeDataNodeRemainingEventAndTimeOperator
extends PipeRemainingOper
.reduce(Long::sum)
.orElse(0L);
- schemaRegionCommitMeter.updateAndGet(
- meter -> {
- if (Objects.nonNull(meter)) {
- lastSchemaRegionCommitSmoothingValue =
- pipeRemainingTimeCommitRateAverageTime.getMeterRate(meter);
- }
- return meter;
- });
+ final Meter schemaRegionMeter = schemaRegionCommitMeter.get();
+ if (Objects.nonNull(schemaRegionMeter)) {
+ lastSchemaRegionCommitSmoothingValue =
+
pipeRemainingTimeCommitRateAverageTime.getMeterRate(schemaRegionMeter);
+ }
final double schemaRegionRemainingTime;
if (totalSchemaRegionWriteEventCount <= 0) {
schemaRegionRemainingTime = 0;
@@ -199,23 +193,17 @@ public class PipeDataNodeRemainingEventAndTimeOperator
extends PipeRemainingOper
//////////////////////////// Rate ////////////////////////////
void markDataRegionCommit() {
- dataRegionCommitMeter.updateAndGet(
- meter -> {
- if (Objects.nonNull(meter)) {
- meter.mark();
- }
- return meter;
- });
+ final Meter meter = dataRegionCommitMeter.get();
+ if (Objects.nonNull(meter)) {
+ meter.mark();
+ }
}
void markSchemaRegionCommit() {
- schemaRegionCommitMeter.updateAndGet(
- meter -> {
- if (Objects.nonNull(meter)) {
- meter.mark();
- }
- return meter;
- });
+ final Meter meter = schemaRegionCommitMeter.get();
+ if (Objects.nonNull(meter)) {
+ meter.mark();
+ }
}
void markTsFileCollectInvocationCount(final long collectInvocationCount) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceCacheEntry.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceCacheEntry.java
index 7af9c25af6d..4f151d15eae 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceCacheEntry.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceCacheEntry.java
@@ -38,7 +38,6 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static org.apache.iotdb.commons.schema.SchemaConstant.NON_TEMPLATE;
@@ -80,29 +79,21 @@ public class TableDeviceCacheEntry {
}
int invalidateAttribute() {
- final AtomicInteger size = new AtomicInteger(0);
- deviceSchema.updateAndGet(
- schema -> {
- if (schema instanceof TableAttributeSchema) {
- size.set(schema.estimateSize());
- return null;
- }
- return schema;
- });
- return size.get();
+ IDeviceSchema schema;
+ do {
+ schema = deviceSchema.get();
+ if (!(schema instanceof TableAttributeSchema)) {
+ return 0;
+ }
+ } while (!deviceSchema.compareAndSet(schema, null));
+ return schema.estimateSize();
}
int invalidateAttributeColumn(final String attribute) {
- final AtomicInteger size = new AtomicInteger(0);
- deviceSchema.updateAndGet(
- schema -> {
- if (schema instanceof TableAttributeSchema) {
- size.set(((TableAttributeSchema)
schema).removeAttribute(attribute));
- return schema;
- }
- return schema;
- });
- return size.get();
+ final IDeviceSchema schema = deviceSchema.get();
+ return schema instanceof TableAttributeSchema
+ ? ((TableAttributeSchema) schema).removeAttribute(attribute)
+ : 0;
}
Map<String, Binary> getAttributeMap() {
@@ -162,17 +153,15 @@ public class TableDeviceCacheEntry {
}
int invalidateTreeSchema() {
- final AtomicInteger size = new AtomicInteger(0);
- deviceSchema.updateAndGet(
- schema -> {
- if (schema instanceof TreeDeviceNormalSchema
- || schema instanceof TreeDeviceTemplateSchema) {
- size.set(schema.estimateSize());
- return null;
- }
- return schema;
- });
- return size.get();
+ IDeviceSchema schema;
+ do {
+ schema = deviceSchema.get();
+ if (!(schema instanceof TreeDeviceNormalSchema)
+ && !(schema instanceof TreeDeviceTemplateSchema)) {
+ return 0;
+ }
+ } while (!deviceSchema.compareAndSet(schema, null));
+ return schema.estimateSize();
}
/////////////////////////////// Last Cache ///////////////////////////////
@@ -240,15 +229,14 @@ public class TableDeviceCacheEntry {
}
int invalidateLastCache() {
- final AtomicInteger size = new AtomicInteger(0);
- lastCache.updateAndGet(
- cacheEntry -> {
- if (Objects.nonNull(cacheEntry)) {
- size.set(cacheEntry.estimateSize());
- }
- return null;
- });
- return size.get();
+ TableDeviceLastCache cacheEntry;
+ do {
+ cacheEntry = lastCache.get();
+ if (Objects.isNull(cacheEntry)) {
+ return 0;
+ }
+ } while (!lastCache.compareAndSet(cacheEntry, null));
+ return cacheEntry.estimateSize();
}
/////////////////////////////// Management ///////////////////////////////