This is an automated email from the ASF dual-hosted git repository. Caideyipi pushed a commit to branch mf in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 565766ce165fc49a435e1862c3e450b9f15cd77f Author: Caideyipi <[email protected]> AuthorDate: Thu Apr 30 11:31:47 2026 +0800 mb --- .../PipeConfigNodeRemainingTimeOperator.java | 24 +++----- .../PipeDataNodeRemainingEventAndTimeOperator.java | 48 ++++++--------- .../fetcher/cache/TableDeviceCacheEntry.java | 70 +++++++++------------- 3 files changed, 56 insertions(+), 86 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/overview/PipeConfigNodeRemainingTimeOperator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/overview/PipeConfigNodeRemainingTimeOperator.java index c27e4cd898b..ff9a7ea2648 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/overview/PipeConfigNodeRemainingTimeOperator.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/overview/PipeConfigNodeRemainingTimeOperator.java @@ -66,14 +66,11 @@ class PipeConfigNodeRemainingTimeOperator extends PipeRemainingOperator { .reduce(Long::sum) .orElse(0L); - configRegionCommitMeter.updateAndGet( - meter -> { - if (Objects.nonNull(meter)) { - lastConfigRegionCommitSmoothingValue = - pipeRemainingTimeCommitRateAverageTime.getMeterRate(meter); - } - return meter; - }); + final Meter configRegionMeter = configRegionCommitMeter.get(); + if (Objects.nonNull(configRegionMeter)) { + lastConfigRegionCommitSmoothingValue = + pipeRemainingTimeCommitRateAverageTime.getMeterRate(configRegionMeter); + } final double configRegionRemainingTime; if (totalConfigRegionWriteEventCount <= 0) { @@ -101,13 +98,10 @@ class PipeConfigNodeRemainingTimeOperator extends PipeRemainingOperator { //////////////////////////// Rate //////////////////////////// void markConfigRegionCommit() { - configRegionCommitMeter.updateAndGet( - meter -> { - if (Objects.nonNull(meter)) { - meter.mark(); - } - return meter; - }); + final Meter meter = configRegionCommitMeter.get(); + if (Objects.nonNull(meter)) { + meter.mark(); + } } //////////////////////////// Switch //////////////////////////// diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java index 38cfb579622..6ff3de4d949 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java @@ -138,14 +138,11 @@ public class PipeDataNodeRemainingEventAndTimeOperator extends PipeRemainingOper + rawTabletEventCount.get() + insertNodeEventCount.get(); - dataRegionCommitMeter.updateAndGet( - meter -> { - if (Objects.nonNull(meter)) { - lastDataRegionCommitSmoothingValue = - pipeRemainingTimeCommitRateAverageTime.getMeterRate(meter); - } - return meter; - }); + final Meter dataRegionMeter = dataRegionCommitMeter.get(); + if (Objects.nonNull(dataRegionMeter)) { + lastDataRegionCommitSmoothingValue = + pipeRemainingTimeCommitRateAverageTime.getMeterRate(dataRegionMeter); + } final double dataRegionRemainingTime; if (totalDataRegionWriteEventCount <= 0) { dataRegionRemainingTime = 0; @@ -162,14 +159,11 @@ public class PipeDataNodeRemainingEventAndTimeOperator extends PipeRemainingOper .reduce(Long::sum) .orElse(0L); - schemaRegionCommitMeter.updateAndGet( - meter -> { - if (Objects.nonNull(meter)) { - lastSchemaRegionCommitSmoothingValue = - pipeRemainingTimeCommitRateAverageTime.getMeterRate(meter); - } - return meter; - }); + final Meter schemaRegionMeter = schemaRegionCommitMeter.get(); + if (Objects.nonNull(schemaRegionMeter)) { + lastSchemaRegionCommitSmoothingValue = + pipeRemainingTimeCommitRateAverageTime.getMeterRate(schemaRegionMeter); + } final double schemaRegionRemainingTime; if (totalSchemaRegionWriteEventCount <= 0) { schemaRegionRemainingTime = 0; @@ -199,23 +193,17 @@ public class PipeDataNodeRemainingEventAndTimeOperator extends PipeRemainingOper //////////////////////////// Rate //////////////////////////// void markDataRegionCommit() { - dataRegionCommitMeter.updateAndGet( - meter -> { - if (Objects.nonNull(meter)) { - meter.mark(); - } - return meter; - }); + final Meter meter = dataRegionCommitMeter.get(); + if (Objects.nonNull(meter)) { + meter.mark(); + } } void markSchemaRegionCommit() { - schemaRegionCommitMeter.updateAndGet( - meter -> { - if (Objects.nonNull(meter)) { - meter.mark(); - } - return meter; - }); + final Meter meter = schemaRegionCommitMeter.get(); + if (Objects.nonNull(meter)) { + meter.mark(); + } } void markTsFileCollectInvocationCount(final long collectInvocationCount) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceCacheEntry.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceCacheEntry.java index 7af9c25af6d..4f151d15eae 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceCacheEntry.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceCacheEntry.java @@ -38,7 +38,6 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.OptionalLong; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import static org.apache.iotdb.commons.schema.SchemaConstant.NON_TEMPLATE; @@ -80,29 +79,21 @@ public class TableDeviceCacheEntry { } int invalidateAttribute() { - final AtomicInteger size = new AtomicInteger(0); - deviceSchema.updateAndGet( - schema -> { - if (schema instanceof TableAttributeSchema) { - size.set(schema.estimateSize()); - return null; - } - return schema; - }); - return size.get(); + IDeviceSchema schema; + do { + schema = deviceSchema.get(); + if (!(schema instanceof TableAttributeSchema)) { + return 0; + } + } while (!deviceSchema.compareAndSet(schema, null)); + return schema.estimateSize(); } int invalidateAttributeColumn(final String attribute) { - final AtomicInteger size = new AtomicInteger(0); - deviceSchema.updateAndGet( - schema -> { - if (schema instanceof TableAttributeSchema) { - size.set(((TableAttributeSchema) schema).removeAttribute(attribute)); - return schema; - } - return schema; - }); - return size.get(); + final IDeviceSchema schema = deviceSchema.get(); + return schema instanceof TableAttributeSchema + ? ((TableAttributeSchema) schema).removeAttribute(attribute) + : 0; } Map<String, Binary> getAttributeMap() { @@ -162,17 +153,15 @@ public class TableDeviceCacheEntry { } int invalidateTreeSchema() { - final AtomicInteger size = new AtomicInteger(0); - deviceSchema.updateAndGet( - schema -> { - if (schema instanceof TreeDeviceNormalSchema - || schema instanceof TreeDeviceTemplateSchema) { - size.set(schema.estimateSize()); - return null; - } - return schema; - }); - return size.get(); + IDeviceSchema schema; + do { + schema = deviceSchema.get(); + if (!(schema instanceof TreeDeviceNormalSchema) + && !(schema instanceof TreeDeviceTemplateSchema)) { + return 0; + } + } while (!deviceSchema.compareAndSet(schema, null)); + return schema.estimateSize(); } /////////////////////////////// Last Cache /////////////////////////////// @@ -240,15 +229,14 @@ public class TableDeviceCacheEntry { } int invalidateLastCache() { - final AtomicInteger size = new AtomicInteger(0); - lastCache.updateAndGet( - cacheEntry -> { - if (Objects.nonNull(cacheEntry)) { - size.set(cacheEntry.estimateSize()); - } - return null; - }); - return size.get(); + TableDeviceLastCache cacheEntry; + do { + cacheEntry = lastCache.get(); + if (Objects.isNull(cacheEntry)) { + return 0; + } + } while (!lastCache.compareAndSet(cacheEntry, null)); + return cacheEntry.estimateSize(); } /////////////////////////////// Management ///////////////////////////////
