rpuch commented on code in PR #7219:
URL: https://github.com/apache/ignite-3/pull/7219#discussion_r2618720695
##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorage.java:
##########
@@ -120,7 +125,7 @@ public PartitionSnapshotStorage(
failureProcessor,
incomingSnapshotsExecutor,
DEFAULT_WAIT_FOR_METADATA_CATCHUP_MS,
- logStorage
+ logStorage, metricManager
Review Comment:
Please put it on a separate line
##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotMetricsSource.java:
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metrics.AbstractMetricSource;
+import org.apache.ignite.internal.metrics.AtomicIntMetric;
+import org.apache.ignite.internal.metrics.AtomicLongMetric;
+import org.apache.ignite.internal.metrics.Duration;
+import org.apache.ignite.internal.metrics.Metric;
+import org.apache.ignite.internal.metrics.StringGauge;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionKey;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.incoming.IncomingSnapshotCopier;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotMetricsSource.Holder;
+import org.apache.ignite.internal.raft.RaftGroupConfiguration;
+
+/**
+ * Tracks important metrics for the outgoing snapshot.
+ *
+ * <p>Note: not thread safe. Thread safe guaranteed by operation's order in
{@link IncomingSnapshotCopier}.
Review Comment:
```suggestion
* <p>Note: not thread safe. Thread safety is guaranteed by operation's
order in {@link IncomingSnapshotCopier}.
```
##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorage.java:
##########
@@ -280,6 +292,8 @@ private void startSnapshotOperation(UUID snapshotId) {
private void completeSnapshotOperation(UUID snapshotId) {
synchronized (snapshotOperationLock) {
+ LOG.info("Finishing outgoing snapshot [partitionKey={},
snapshotId={}]", partitionKey, snapshotId);
Review Comment:
Let's log this out of the synchronized block as logging might cause I/O
##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotMetricsSource.java:
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metrics.AbstractMetricSource;
+import org.apache.ignite.internal.metrics.AtomicIntMetric;
+import org.apache.ignite.internal.metrics.AtomicLongMetric;
+import org.apache.ignite.internal.metrics.Duration;
+import org.apache.ignite.internal.metrics.Metric;
+import org.apache.ignite.internal.metrics.StringGauge;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionKey;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.incoming.IncomingSnapshotCopier;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotMetricsSource.Holder;
+import org.apache.ignite.internal.raft.RaftGroupConfiguration;
+
+/**
+ * Tracks important metrics for the outgoing snapshot.
+ *
+ * <p>Note: not thread safe. Thread safe guaranteed by operation's order in
{@link IncomingSnapshotCopier}.
+ */
+public class OutgoingSnapshotMetricsSource extends
AbstractMetricSource<Holder> {
+ private static final IgniteLogger LOG =
Loggers.forClass(OutgoingSnapshotMetricsSource.class);
+
+ private final Duration currentBatchDuration = new Duration();
+
+ private long combinedBatchDuration;
+
+ private final Duration totalSnapshotDuration = new Duration();
+
+ private final List<String> peers = new ArrayList<>();
+
+ private final List<String> learners = new ArrayList<>();
+
+ private final UUID snapshotId;
+
+ private final PartitionKey partitionKey;
+
+ /** Constructor. */
+ public OutgoingSnapshotMetricsSource(UUID snapshotId, PartitionKey
partitionKey) {
+ super("snapshots.outgoing." + snapshotId);
+
+ this.snapshotId = snapshotId;
+ this.partitionKey = partitionKey;
+ }
+
+ void onSnapshotStart() {
+ totalSnapshotDuration.onStart();
+ }
+
+ void onSnapshotEnd() {
+ totalSnapshotDuration.onEnd();
+
+ Holder h = holder();
+
+ if (h != null) {
+
h.totalSnapshotInstallationTime.value(totalSnapshotDuration.duration(MILLISECONDS));
+
+ long totalBatches = h.totalBatches.value();
+
+ if (totalBatches > 0) {
+ h.avgBatchDuration.value(combinedBatchDuration / totalBatches);
+ }
+ }
+ }
+
+ void updateSnapshotMeta(long lastAppliedIndex, long lastAppliedTerm,
RaftGroupConfiguration config, int catalogVersion) {
+ Holder h = holder();
+
+ if (h != null) {
+ h.lastAppliedIndex.value(lastAppliedIndex);
+ h.lastAppliedTerm.value(lastAppliedTerm);
+ h.catalogVersion.value(catalogVersion);
+
+ peers.addAll(config.peers());
+
+ learners.addAll(config.learners());
+ }
+ }
+
+ void onStartMvDataBatchProcessing() {
+ this.currentBatchDuration.onStart();
+ }
+
+ void onEndMvDataBatchProcessing() {
+ this.currentBatchDuration.onEnd();
+
+ Holder h = holder();
+
+ if (h != null) {
+ h.totalBatches.increment();
+
+ long batchDuration = currentBatchDuration.duration(MILLISECONDS);
+
+ h.minBatchDuration.value(h.totalBatches.value() == 1
+ ? batchDuration
+ : Math.min(h.minBatchDuration.value(), batchDuration)
+ );
+
+ h.maxBatchDuration.value(Math.max(h.maxBatchDuration.value(),
batchDuration));
+
+ combinedBatchDuration += batchDuration;
+ }
+ }
+
+ void onProcessOutOfOrderRow(long rowVersions, long totalBytes) {
+ Holder h = holder();
+ if (h != null) {
+ h.outOfOrderRowsSent.increment();
+
+ h.outOfOrderVersionsSent.add(rowVersions);
+ h.outOfOrderTotalBytesSent.add(totalBytes);
+ }
+ }
+
+ void onProcessRow(long rowVersions, long totalBytes) {
+ Holder h = holder();
+ if (h != null) {
+ h.rowsSent.increment();
+
+ h.rowVersionsSent.add(rowVersions);
+ h.totalBytesSent.add(totalBytes);
+ }
+ }
+
+ void printSnapshotStats() {
+ Holder h = holder();
+
+ if (h != null && LOG.isInfoEnabled()) {
+ LOG.info("Outgoing snapshot installation completed
[partitionKey={}, snapshotId={}, rows={}, rowVersions={}, totalBytes={}, "
+ + " outOfOrderRows={}, outOfOrderVersions={},
outOfOrderTotalBytes={}, totalBatches={},"
+ + " avgBatchProcessingTime={}ms,
minBatchProcessingTime={}ms, maxBatchProcessingTime={}ms,"
+ + " totalSnapshotInstallationTime={}ms,
lastAppliedIndex={}, lastAppliedTerm={}, peers=[{}], learners=[{}],"
+ + " catalogVersion={}]",
+ partitionKey,
+ snapshotId,
+ h.rowsSent.value(),
+ h.rowVersionsSent.value(),
+ h.totalBytesSent.value(),
+ h.outOfOrderRowsSent.value(),
+ h.outOfOrderVersionsSent.value(),
+ h.outOfOrderTotalBytesSent.value(),
+ h.totalBatches.value(),
+ h.avgBatchDuration.value(),
+ h.minBatchDuration.value(),
+ h.maxBatchDuration.value(),
+ h.totalSnapshotInstallationTime.value(),
+ h.lastAppliedIndex.value(),
+ h.lastAppliedTerm.value(),
+ h.peerList.value(),
+ h.learnerList.value(),
+ h.catalogVersion.value()
+ );
+ }
+ }
+
+ @Override
+ protected Holder createHolder() {
+ return new Holder();
+ }
+
+ class Holder implements AbstractMetricSource.Holder<Holder> {
+ final AtomicLongMetric rowsSent = new AtomicLongMetric("RowsSent",
"Total rows sent");
+
+ final AtomicLongMetric rowVersionsSent = new
AtomicLongMetric("RowVersionsSent", "Total row versions sent");
+
+ final AtomicLongMetric totalBytesSent = new
AtomicLongMetric("TotalBytesSent", "Total bytes sent");
+
+ final AtomicLongMetric outOfOrderRowsSent = new
AtomicLongMetric("OutOfOrderRowsSent", "Total out of order rows sent");
+
+ final AtomicLongMetric outOfOrderVersionsSent = new
AtomicLongMetric("OutOfOrderVersionsSent", "Total out of order versions sent");
+
+ final AtomicLongMetric outOfOrderTotalBytesSent = new
AtomicLongMetric("OutOfOrderTotalBytesSent", "Total out of order bytes sent");
+
+ final AtomicLongMetric totalBatches = new
AtomicLongMetric("TotalBatches", "Total batches");
+
+ final AtomicLongMetric avgBatchDuration = new
AtomicLongMetric("AvgBatchProcessingTime", "Average batch processing time");
+
+ final AtomicLongMetric minBatchDuration = new
AtomicLongMetric("MinBatchProcessingTime", "Minimum batch processing time");
+
+ final AtomicLongMetric maxBatchDuration = new
AtomicLongMetric("MaxBatchProcessingTime", "Maximum batch processing time");
+
+ final AtomicLongMetric totalSnapshotInstallationTime =
+ new AtomicLongMetric("TotalSnapshotInstallationTime", "Total
snapshot installation time");
+
+ final AtomicLongMetric lastAppliedIndex = new
AtomicLongMetric("LastAppliedIndex", "Last applied index");
+
+ final AtomicLongMetric lastAppliedTerm = new
AtomicLongMetric("LastAppliedTerm", "Last applied term");
Review Comment:
In the code, we call it 'last applied index', but it seems more natural to
talk about 'last included index' when we are talking about a Raft snapshot.
Would it make sense to call it this way in the metrics (and maybe rename later
the fields and methods in the code as well)?
##########
docs/_docs/administrators-guide/metrics/metrics-list.adoc:
##########
@@ -296,3 +296,28 @@ Transaction metrics.
| LocalUnrebalancedPartitionsCount | The number of partitions that should be
moved to this node.
| TotalUnrebalancedPartitionsCount | The total number of partitions that
should be moved to a new owner.
|=======================================================================
+
+== snapshots.outgoing.{snapshotId}
+
+[width="100%",cols="20%,80%",opts="header"]
+|=======================================================================
+| Metric name | Description
+| RowsSent | The number of rows that sent during snapshot installation,
commited before rebalance.
+| RowVersionsSent | The number of row versions that sent during snapshot
installation, commited before rebalance.
+| TotalBytesSent | The number of bytes sent during snapshot installation, for
rows commited before rebalance.
+| OutOfOrderRowsSent | The number of rows that sent during snapshot
installation, from transactions running or commited during rebalance.
+| OutOfOrderVersionsSent | The number of row versions that sent during
snapshot installation, from transactions running or commited during
+rebalance.
+| OutOfOrderTotalBytesSent | number of bytes sent during snapshot
installation, from transactions running or commited during rebalance.
+| TotalBatches | The number of batches sent during snapshot installation.
+| AvgBatchProcessingTime | The average time for processing single batch during
snapshot installation.
+| MinBatchProcessingTime | The minimum time for processing single batch during
snapshot installation.
+| MaxBatchProcessingTime | The maximum time for processing single batch during
snapshot installation.
+| TotalSnapshotInstallationTime | The total time take to install the snapshot.
+| LastAppliedIndex | The last applied index of raft group associated with
installed snapshot.
+| LastAppliedTerm | The last applied term of raft group associated with
installed snapshot.
Review Comment:
```suggestion
| LastIncludedIndex | The last included index of the Raft group associated
with the Raft snapshot.
| LastIncludedTerm | The term of the last included index of the Raft group
associated with the Raft snapshot.
```
##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotMetricsSource.java:
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metrics.AbstractMetricSource;
+import org.apache.ignite.internal.metrics.AtomicIntMetric;
+import org.apache.ignite.internal.metrics.AtomicLongMetric;
+import org.apache.ignite.internal.metrics.Duration;
+import org.apache.ignite.internal.metrics.Metric;
+import org.apache.ignite.internal.metrics.StringGauge;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionKey;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.incoming.IncomingSnapshotCopier;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotMetricsSource.Holder;
+import org.apache.ignite.internal.raft.RaftGroupConfiguration;
+
+/**
+ * Tracks important metrics for the outgoing snapshot.
+ *
+ * <p>Note: not thread safe. Thread safe guaranteed by operation's order in
{@link IncomingSnapshotCopier}.
Review Comment:
This is about outgoing snapshot metrics, and incoming snapshot copier
accepts that snapshot on another node. Is everything ok with this line?
##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotMetricsSource.java:
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metrics.AbstractMetricSource;
+import org.apache.ignite.internal.metrics.AtomicIntMetric;
+import org.apache.ignite.internal.metrics.AtomicLongMetric;
+import org.apache.ignite.internal.metrics.Duration;
+import org.apache.ignite.internal.metrics.Metric;
+import org.apache.ignite.internal.metrics.StringGauge;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionKey;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.incoming.IncomingSnapshotCopier;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotMetricsSource.Holder;
+import org.apache.ignite.internal.raft.RaftGroupConfiguration;
+
+/**
+ * Tracks important metrics for the outgoing snapshot.
+ *
+ * <p>Note: not thread safe. Thread safe guaranteed by operation's order in
{@link IncomingSnapshotCopier}.
+ */
+public class OutgoingSnapshotMetricsSource extends
AbstractMetricSource<Holder> {
+ private static final IgniteLogger LOG =
Loggers.forClass(OutgoingSnapshotMetricsSource.class);
+
+ private final Duration currentBatchDuration = new Duration();
+
+ private long combinedBatchDuration;
+
+ private final Duration totalSnapshotDuration = new Duration();
+
+ private final List<String> peers = new ArrayList<>();
+
+ private final List<String> learners = new ArrayList<>();
+
+ private final UUID snapshotId;
+
+ private final PartitionKey partitionKey;
+
+ /** Constructor. */
+ public OutgoingSnapshotMetricsSource(UUID snapshotId, PartitionKey
partitionKey) {
+ super("snapshots.outgoing." + snapshotId);
+
+ this.snapshotId = snapshotId;
+ this.partitionKey = partitionKey;
+ }
+
+ void onSnapshotStart() {
+ totalSnapshotDuration.onStart();
+ }
+
+ void onSnapshotEnd() {
+ totalSnapshotDuration.onEnd();
+
+ Holder h = holder();
+
+ if (h != null) {
+
h.totalSnapshotInstallationTime.value(totalSnapshotDuration.duration(MILLISECONDS));
+
+ long totalBatches = h.totalBatches.value();
+
+ if (totalBatches > 0) {
+ h.avgBatchDuration.value(combinedBatchDuration / totalBatches);
+ }
+ }
+ }
+
+ void updateSnapshotMeta(long lastAppliedIndex, long lastAppliedTerm,
RaftGroupConfiguration config, int catalogVersion) {
+ Holder h = holder();
+
+ if (h != null) {
+ h.lastAppliedIndex.value(lastAppliedIndex);
+ h.lastAppliedTerm.value(lastAppliedTerm);
+ h.catalogVersion.value(catalogVersion);
+
+ peers.addAll(config.peers());
+
+ learners.addAll(config.learners());
+ }
+ }
+
+ void onStartMvDataBatchProcessing() {
+ this.currentBatchDuration.onStart();
+ }
+
+ void onEndMvDataBatchProcessing() {
+ this.currentBatchDuration.onEnd();
+
+ Holder h = holder();
+
+ if (h != null) {
+ h.totalBatches.increment();
+
+ long batchDuration = currentBatchDuration.duration(MILLISECONDS);
+
+ h.minBatchDuration.value(h.totalBatches.value() == 1
+ ? batchDuration
+ : Math.min(h.minBatchDuration.value(), batchDuration)
+ );
+
+ h.maxBatchDuration.value(Math.max(h.maxBatchDuration.value(),
batchDuration));
+
+ combinedBatchDuration += batchDuration;
+ }
+ }
+
+ void onProcessOutOfOrderRow(long rowVersions, long totalBytes) {
+ Holder h = holder();
+ if (h != null) {
+ h.outOfOrderRowsSent.increment();
+
+ h.outOfOrderVersionsSent.add(rowVersions);
+ h.outOfOrderTotalBytesSent.add(totalBytes);
+ }
+ }
+
+ void onProcessRow(long rowVersions, long totalBytes) {
+ Holder h = holder();
+ if (h != null) {
+ h.rowsSent.increment();
+
+ h.rowVersionsSent.add(rowVersions);
+ h.totalBytesSent.add(totalBytes);
+ }
+ }
+
+ void printSnapshotStats() {
+ Holder h = holder();
+
+ if (h != null && LOG.isInfoEnabled()) {
+ LOG.info("Outgoing snapshot installation completed
[partitionKey={}, snapshotId={}, rows={}, rowVersions={}, totalBytes={}, "
+ + " outOfOrderRows={}, outOfOrderVersions={},
outOfOrderTotalBytes={}, totalBatches={},"
+ + " avgBatchProcessingTime={}ms,
minBatchProcessingTime={}ms, maxBatchProcessingTime={}ms,"
+ + " totalSnapshotInstallationTime={}ms,
lastAppliedIndex={}, lastAppliedTerm={}, peers=[{}], learners=[{}],"
+ + " catalogVersion={}]",
+ partitionKey,
+ snapshotId,
+ h.rowsSent.value(),
+ h.rowVersionsSent.value(),
+ h.totalBytesSent.value(),
+ h.outOfOrderRowsSent.value(),
+ h.outOfOrderVersionsSent.value(),
+ h.outOfOrderTotalBytesSent.value(),
+ h.totalBatches.value(),
+ h.avgBatchDuration.value(),
+ h.minBatchDuration.value(),
+ h.maxBatchDuration.value(),
+ h.totalSnapshotInstallationTime.value(),
+ h.lastAppliedIndex.value(),
+ h.lastAppliedTerm.value(),
+ h.peerList.value(),
+ h.learnerList.value(),
+ h.catalogVersion.value()
+ );
+ }
+ }
+
+ @Override
+ protected Holder createHolder() {
+ return new Holder();
+ }
+
+ class Holder implements AbstractMetricSource.Holder<Holder> {
+ final AtomicLongMetric rowsSent = new AtomicLongMetric("RowsSent",
"Total rows sent");
+
+ final AtomicLongMetric rowVersionsSent = new
AtomicLongMetric("RowVersionsSent", "Total row versions sent");
+
+ final AtomicLongMetric totalBytesSent = new
AtomicLongMetric("TotalBytesSent", "Total bytes sent");
+
+ final AtomicLongMetric outOfOrderRowsSent = new
AtomicLongMetric("OutOfOrderRowsSent", "Total out of order rows sent");
+
+ final AtomicLongMetric outOfOrderVersionsSent = new
AtomicLongMetric("OutOfOrderVersionsSent", "Total out of order versions sent");
+
+ final AtomicLongMetric outOfOrderTotalBytesSent = new
AtomicLongMetric("OutOfOrderTotalBytesSent", "Total out of order bytes sent");
+
+ final AtomicLongMetric totalBatches = new
AtomicLongMetric("TotalBatches", "Total batches");
+
+ final AtomicLongMetric avgBatchDuration = new
AtomicLongMetric("AvgBatchProcessingTime", "Average batch processing time");
+
+ final AtomicLongMetric minBatchDuration = new
AtomicLongMetric("MinBatchProcessingTime", "Minimum batch processing time");
+
+ final AtomicLongMetric maxBatchDuration = new
AtomicLongMetric("MaxBatchProcessingTime", "Maximum batch processing time");
Review Comment:
What are the units?
1. Should the units be encoded in the name of the metric?
2. It should definitely be mentioned in the description
##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotMetricsSource.java:
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metrics.AbstractMetricSource;
+import org.apache.ignite.internal.metrics.AtomicIntMetric;
+import org.apache.ignite.internal.metrics.AtomicLongMetric;
+import org.apache.ignite.internal.metrics.Duration;
+import org.apache.ignite.internal.metrics.Metric;
+import org.apache.ignite.internal.metrics.StringGauge;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionKey;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.incoming.IncomingSnapshotCopier;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotMetricsSource.Holder;
+import org.apache.ignite.internal.raft.RaftGroupConfiguration;
+
+/**
+ * Tracks important metrics for the outgoing snapshot.
+ *
+ * <p>Note: not thread safe. Thread safe guaranteed by operation's order in
{@link IncomingSnapshotCopier}.
+ */
+public class OutgoingSnapshotMetricsSource extends
AbstractMetricSource<Holder> {
+ private static final IgniteLogger LOG =
Loggers.forClass(OutgoingSnapshotMetricsSource.class);
+
+ private final Duration currentBatchDuration = new Duration();
+
+ private long combinedBatchDuration;
+
+ private final Duration totalSnapshotDuration = new Duration();
+
+ private final List<String> peers = new ArrayList<>();
+
+ private final List<String> learners = new ArrayList<>();
+
+ private final UUID snapshotId;
+
+ private final PartitionKey partitionKey;
+
+ /** Constructor. */
+ public OutgoingSnapshotMetricsSource(UUID snapshotId, PartitionKey
partitionKey) {
+ super("snapshots.outgoing." + snapshotId);
+
+ this.snapshotId = snapshotId;
+ this.partitionKey = partitionKey;
+ }
+
+ void onSnapshotStart() {
+ totalSnapshotDuration.onStart();
+ }
+
+ void onSnapshotEnd() {
+ totalSnapshotDuration.onEnd();
+
+ Holder h = holder();
+
+ if (h != null) {
+
h.totalSnapshotInstallationTime.value(totalSnapshotDuration.duration(MILLISECONDS));
+
+ long totalBatches = h.totalBatches.value();
+
+ if (totalBatches > 0) {
+ h.avgBatchDuration.value(combinedBatchDuration / totalBatches);
+ }
+ }
+ }
+
+ void updateSnapshotMeta(long lastAppliedIndex, long lastAppliedTerm,
RaftGroupConfiguration config, int catalogVersion) {
+ Holder h = holder();
+
+ if (h != null) {
+ h.lastAppliedIndex.value(lastAppliedIndex);
+ h.lastAppliedTerm.value(lastAppliedTerm);
+ h.catalogVersion.value(catalogVersion);
+
+ peers.addAll(config.peers());
+
+ learners.addAll(config.learners());
+ }
+ }
+
+ void onStartMvDataBatchProcessing() {
+ this.currentBatchDuration.onStart();
+ }
+
+ void onEndMvDataBatchProcessing() {
+ this.currentBatchDuration.onEnd();
+
+ Holder h = holder();
+
+ if (h != null) {
+ h.totalBatches.increment();
+
+ long batchDuration = currentBatchDuration.duration(MILLISECONDS);
+
+ h.minBatchDuration.value(h.totalBatches.value() == 1
+ ? batchDuration
+ : Math.min(h.minBatchDuration.value(), batchDuration)
+ );
+
+ h.maxBatchDuration.value(Math.max(h.maxBatchDuration.value(),
batchDuration));
+
+ combinedBatchDuration += batchDuration;
+ }
+ }
+
+ void onProcessOutOfOrderRow(long rowVersions, long totalBytes) {
+ Holder h = holder();
+ if (h != null) {
+ h.outOfOrderRowsSent.increment();
+
+ h.outOfOrderVersionsSent.add(rowVersions);
+ h.outOfOrderTotalBytesSent.add(totalBytes);
+ }
+ }
+
+ void onProcessRow(long rowVersions, long totalBytes) {
+ Holder h = holder();
+ if (h != null) {
+ h.rowsSent.increment();
+
+ h.rowVersionsSent.add(rowVersions);
+ h.totalBytesSent.add(totalBytes);
+ }
+ }
+
+ void printSnapshotStats() {
+ Holder h = holder();
+
+ if (h != null && LOG.isInfoEnabled()) {
+ LOG.info("Outgoing snapshot installation completed
[partitionKey={}, snapshotId={}, rows={}, rowVersions={}, totalBytes={}, "
+ + " outOfOrderRows={}, outOfOrderVersions={},
outOfOrderTotalBytes={}, totalBatches={},"
+ + " avgBatchProcessingTime={}ms,
minBatchProcessingTime={}ms, maxBatchProcessingTime={}ms,"
+ + " totalSnapshotInstallationTime={}ms,
lastAppliedIndex={}, lastAppliedTerm={}, peers=[{}], learners=[{}],"
+ + " catalogVersion={}]",
+ partitionKey,
+ snapshotId,
+ h.rowsSent.value(),
+ h.rowVersionsSent.value(),
+ h.totalBytesSent.value(),
+ h.outOfOrderRowsSent.value(),
+ h.outOfOrderVersionsSent.value(),
+ h.outOfOrderTotalBytesSent.value(),
+ h.totalBatches.value(),
+ h.avgBatchDuration.value(),
+ h.minBatchDuration.value(),
+ h.maxBatchDuration.value(),
+ h.totalSnapshotInstallationTime.value(),
+ h.lastAppliedIndex.value(),
+ h.lastAppliedTerm.value(),
+ h.peerList.value(),
+ h.learnerList.value(),
+ h.catalogVersion.value()
+ );
+ }
+ }
+
+ @Override
+ protected Holder createHolder() {
+ return new Holder();
+ }
+
+ class Holder implements AbstractMetricSource.Holder<Holder> {
+ final AtomicLongMetric rowsSent = new AtomicLongMetric("RowsSent",
"Total rows sent");
+
+ final AtomicLongMetric rowVersionsSent = new
AtomicLongMetric("RowVersionsSent", "Total row versions sent");
+
+ final AtomicLongMetric totalBytesSent = new
AtomicLongMetric("TotalBytesSent", "Total bytes sent");
+
+ final AtomicLongMetric outOfOrderRowsSent = new
AtomicLongMetric("OutOfOrderRowsSent", "Total out of order rows sent");
Review Comment:
It seems `rowsSent` does not include this one, but it's not obvious. Also,
if a user does not care for the difference and is only interested in the
combined amount, will they have to do the math themselves?
Would it make sense to send the combined values in `rowsSent` and only use
the 'out of order rows' as a category?
##########
modules/core/src/main/java/org/apache/ignite/internal/metrics/Duration.java:
##########
@@ -24,33 +24,33 @@
*
* <p>Not thread safe.</p>
*/
-class Duration {
+public class Duration {
Review Comment:
Current name clashes with `java.time.Duration` which is not a rare guest.
Also, this class is about measuring durations. How about calling it
`DurationMeter` or simply `StopWatch`?
##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotMetricsSource.java:
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metrics.AbstractMetricSource;
+import org.apache.ignite.internal.metrics.AtomicIntMetric;
+import org.apache.ignite.internal.metrics.AtomicLongMetric;
+import org.apache.ignite.internal.metrics.Duration;
+import org.apache.ignite.internal.metrics.Metric;
+import org.apache.ignite.internal.metrics.StringGauge;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionKey;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.incoming.IncomingSnapshotCopier;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotMetricsSource.Holder;
+import org.apache.ignite.internal.raft.RaftGroupConfiguration;
+
+/**
+ * Tracks important metrics for the outgoing snapshot.
+ *
+ * <p>Note: not thread safe. Thread safe guaranteed by operation's order in
{@link IncomingSnapshotCopier}.
+ */
+public class OutgoingSnapshotMetricsSource extends
AbstractMetricSource<Holder> {
+ private static final IgniteLogger LOG =
Loggers.forClass(OutgoingSnapshotMetricsSource.class);
+
+ private final Duration currentBatchDuration = new Duration();
+
+ private long combinedBatchDuration;
+
+ private final Duration totalSnapshotDuration = new Duration();
+
+ private final List<String> peers = new ArrayList<>();
+
+ private final List<String> learners = new ArrayList<>();
+
+ private final UUID snapshotId;
+
+ private final PartitionKey partitionKey;
+
+ /** Constructor. */
+ public OutgoingSnapshotMetricsSource(UUID snapshotId, PartitionKey
partitionKey) {
+ super("snapshots.outgoing." + snapshotId);
Review Comment:
Let's specify that it's about Raft snapshots, to avoid confusion.
`raft.snapshots.outgoing`?
##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotMetricsSource.java:
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metrics.AbstractMetricSource;
+import org.apache.ignite.internal.metrics.AtomicIntMetric;
+import org.apache.ignite.internal.metrics.AtomicLongMetric;
+import org.apache.ignite.internal.metrics.Duration;
+import org.apache.ignite.internal.metrics.Metric;
+import org.apache.ignite.internal.metrics.StringGauge;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionKey;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.incoming.IncomingSnapshotCopier;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotMetricsSource.Holder;
+import org.apache.ignite.internal.raft.RaftGroupConfiguration;
+
+/**
+ * Tracks important metrics for the outgoing snapshot.
+ *
+ * <p>Note: not thread safe. Thread safe guaranteed by operation's order in
{@link IncomingSnapshotCopier}.
+ */
+public class OutgoingSnapshotMetricsSource extends
AbstractMetricSource<Holder> {
+ private static final IgniteLogger LOG =
Loggers.forClass(OutgoingSnapshotMetricsSource.class);
+
+ private final Duration currentBatchDuration = new Duration();
+
+ private long combinedBatchDuration;
+
+ private final Duration totalSnapshotDuration = new Duration();
+
+ private final List<String> peers = new ArrayList<>();
+
+ private final List<String> learners = new ArrayList<>();
+
+ private final UUID snapshotId;
+
+ private final PartitionKey partitionKey;
+
+ /** Constructor. */
+ public OutgoingSnapshotMetricsSource(UUID snapshotId, PartitionKey
partitionKey) {
+ super("snapshots.outgoing." + snapshotId);
+
+ this.snapshotId = snapshotId;
+ this.partitionKey = partitionKey;
+ }
+
+ void onSnapshotStart() {
+ totalSnapshotDuration.onStart();
+ }
+
+ void onSnapshotEnd() {
+ totalSnapshotDuration.onEnd();
+
+ Holder h = holder();
+
+ if (h != null) {
+
h.totalSnapshotInstallationTime.value(totalSnapshotDuration.duration(MILLISECONDS));
+
+ long totalBatches = h.totalBatches.value();
+
+ if (totalBatches > 0) {
+ h.avgBatchDuration.value(combinedBatchDuration / totalBatches);
+ }
+ }
+ }
+
+ void updateSnapshotMeta(long lastAppliedIndex, long lastAppliedTerm,
RaftGroupConfiguration config, int catalogVersion) {
+ Holder h = holder();
+
+ if (h != null) {
+ h.lastAppliedIndex.value(lastAppliedIndex);
+ h.lastAppliedTerm.value(lastAppliedTerm);
+ h.catalogVersion.value(catalogVersion);
+
+ peers.addAll(config.peers());
+
+ learners.addAll(config.learners());
+ }
+ }
+
+ void onStartMvDataBatchProcessing() {
+ this.currentBatchDuration.onStart();
+ }
+
+ void onEndMvDataBatchProcessing() {
+ this.currentBatchDuration.onEnd();
+
+ Holder h = holder();
+
+ if (h != null) {
+ h.totalBatches.increment();
+
+ long batchDuration = currentBatchDuration.duration(MILLISECONDS);
+
+ h.minBatchDuration.value(h.totalBatches.value() == 1
+ ? batchDuration
+ : Math.min(h.minBatchDuration.value(), batchDuration)
+ );
+
+ h.maxBatchDuration.value(Math.max(h.maxBatchDuration.value(),
batchDuration));
+
+ combinedBatchDuration += batchDuration;
+ }
+ }
+
+ void onProcessOutOfOrderRow(long rowVersions, long totalBytes) {
+ Holder h = holder();
+ if (h != null) {
+ h.outOfOrderRowsSent.increment();
+
+ h.outOfOrderVersionsSent.add(rowVersions);
+ h.outOfOrderTotalBytesSent.add(totalBytes);
+ }
+ }
+
+ void onProcessRow(long rowVersions, long totalBytes) {
+ Holder h = holder();
+ if (h != null) {
+ h.rowsSent.increment();
+
+ h.rowVersionsSent.add(rowVersions);
+ h.totalBytesSent.add(totalBytes);
+ }
+ }
+
+ void printSnapshotStats() {
+ Holder h = holder();
+
+ if (h != null && LOG.isInfoEnabled()) {
+ LOG.info("Outgoing snapshot installation completed
[partitionKey={}, snapshotId={}, rows={}, rowVersions={}, totalBytes={}, "
+ + " outOfOrderRows={}, outOfOrderVersions={},
outOfOrderTotalBytes={}, totalBatches={},"
+ + " avgBatchProcessingTime={}ms,
minBatchProcessingTime={}ms, maxBatchProcessingTime={}ms,"
+ + " totalSnapshotInstallationTime={}ms,
lastAppliedIndex={}, lastAppliedTerm={}, peers=[{}], learners=[{}],"
+ + " catalogVersion={}]",
+ partitionKey,
+ snapshotId,
+ h.rowsSent.value(),
+ h.rowVersionsSent.value(),
+ h.totalBytesSent.value(),
+ h.outOfOrderRowsSent.value(),
+ h.outOfOrderVersionsSent.value(),
+ h.outOfOrderTotalBytesSent.value(),
+ h.totalBatches.value(),
+ h.avgBatchDuration.value(),
+ h.minBatchDuration.value(),
+ h.maxBatchDuration.value(),
+ h.totalSnapshotInstallationTime.value(),
+ h.lastAppliedIndex.value(),
+ h.lastAppliedTerm.value(),
+ h.peerList.value(),
+ h.learnerList.value(),
+ h.catalogVersion.value()
+ );
+ }
+ }
+
+ @Override
+ protected Holder createHolder() {
+ return new Holder();
+ }
+
+ class Holder implements AbstractMetricSource.Holder<Holder> {
+ final AtomicLongMetric rowsSent = new AtomicLongMetric("RowsSent",
"Total rows sent");
+
+ final AtomicLongMetric rowVersionsSent = new
AtomicLongMetric("RowVersionsSent", "Total row versions sent");
+
+ final AtomicLongMetric totalBytesSent = new
AtomicLongMetric("TotalBytesSent", "Total bytes sent");
+
+ final AtomicLongMetric outOfOrderRowsSent = new
AtomicLongMetric("OutOfOrderRowsSent", "Total out of order rows sent");
+
+ final AtomicLongMetric outOfOrderVersionsSent = new
AtomicLongMetric("OutOfOrderVersionsSent", "Total out of order versions sent");
+
+ final AtomicLongMetric outOfOrderTotalBytesSent = new
AtomicLongMetric("OutOfOrderTotalBytesSent", "Total out of order bytes sent");
+
+ final AtomicLongMetric totalBatches = new
AtomicLongMetric("TotalBatches", "Total batches");
+
+ final AtomicLongMetric avgBatchDuration = new
AtomicLongMetric("AvgBatchProcessingTime", "Average batch processing time");
+
+ final AtomicLongMetric minBatchDuration = new
AtomicLongMetric("MinBatchProcessingTime", "Minimum batch processing time");
+
+ final AtomicLongMetric maxBatchDuration = new
AtomicLongMetric("MaxBatchProcessingTime", "Maximum batch processing time");
+
+ final AtomicLongMetric totalSnapshotInstallationTime =
+ new AtomicLongMetric("TotalSnapshotInstallationTime", "Total
snapshot installation time");
+
+ final AtomicLongMetric lastAppliedIndex = new
AtomicLongMetric("LastAppliedIndex", "Last applied index");
+
+ final AtomicLongMetric lastAppliedTerm = new
AtomicLongMetric("LastAppliedTerm", "Last applied term");
+
+ final StringGauge peerList = new StringGauge("PeerList", "Raft group
peer list", () -> {
+ return String.join(", ", OutgoingSnapshotMetricsSource.this.peers);
+ });
+
+ final StringGauge learnerList = new StringGauge("LearnerList", "Raft
group learner list", () -> {
Review Comment:
Raft configuration also contains old peers and old learners lists (which are
optional)
##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotMetricsSource.java:
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metrics.AbstractMetricSource;
+import org.apache.ignite.internal.metrics.AtomicIntMetric;
+import org.apache.ignite.internal.metrics.AtomicLongMetric;
+import org.apache.ignite.internal.metrics.Duration;
+import org.apache.ignite.internal.metrics.Metric;
+import org.apache.ignite.internal.metrics.StringGauge;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionKey;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.incoming.IncomingSnapshotCopier;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotMetricsSource.Holder;
+import org.apache.ignite.internal.raft.RaftGroupConfiguration;
+
+/**
+ * Tracks important metrics for the outgoing snapshot.
+ *
+ * <p>Note: not thread safe. Thread safe guaranteed by operation's order in
{@link IncomingSnapshotCopier}.
+ */
+public class OutgoingSnapshotMetricsSource extends
AbstractMetricSource<Holder> {
+ private static final IgniteLogger LOG =
Loggers.forClass(OutgoingSnapshotMetricsSource.class);
+
+ private final Duration currentBatchDuration = new Duration();
+
+ private long combinedBatchDuration;
+
+ private final Duration totalSnapshotDuration = new Duration();
+
+ private final List<String> peers = new ArrayList<>();
+
+ private final List<String> learners = new ArrayList<>();
+
+ private final UUID snapshotId;
+
+ private final PartitionKey partitionKey;
+
+ /** Constructor. */
+ public OutgoingSnapshotMetricsSource(UUID snapshotId, PartitionKey
partitionKey) {
+ super("snapshots.outgoing." + snapshotId);
+
+ this.snapshotId = snapshotId;
+ this.partitionKey = partitionKey;
+ }
+
+ void onSnapshotStart() {
+ totalSnapshotDuration.onStart();
+ }
+
+ void onSnapshotEnd() {
+ totalSnapshotDuration.onEnd();
+
+ Holder h = holder();
+
+ if (h != null) {
+
h.totalSnapshotInstallationTime.value(totalSnapshotDuration.duration(MILLISECONDS));
+
+ long totalBatches = h.totalBatches.value();
+
+ if (totalBatches > 0) {
+ h.avgBatchDuration.value(combinedBatchDuration / totalBatches);
+ }
+ }
+ }
+
+ void updateSnapshotMeta(long lastAppliedIndex, long lastAppliedTerm,
RaftGroupConfiguration config, int catalogVersion) {
Review Comment:
`update` suggests it can be updated more than once. How about `set`?
##########
docs/_docs/administrators-guide/metrics/metrics-list.adoc:
##########
@@ -296,3 +296,28 @@ Transaction metrics.
| LocalUnrebalancedPartitionsCount | The number of partitions that should be
moved to this node.
| TotalUnrebalancedPartitionsCount | The total number of partitions that
should be moved to a new owner.
|=======================================================================
+
+== snapshots.outgoing.{snapshotId}
+
+[width="100%",cols="20%,80%",opts="header"]
+|=======================================================================
+| Metric name | Description
+| RowsSent | The number of rows that sent during snapshot installation,
commited before rebalance.
+| RowVersionsSent | The number of row versions that sent during snapshot
installation, commited before rebalance.
+| TotalBytesSent | The number of bytes sent during snapshot installation, for
rows commited before rebalance.
+| OutOfOrderRowsSent | The number of rows that sent during snapshot
installation, from transactions running or commited during rebalance.
+| OutOfOrderVersionsSent | The number of row versions that sent during
snapshot installation, from transactions running or commited during
+rebalance.
+| OutOfOrderTotalBytesSent | number of bytes sent during snapshot
installation, from transactions running or commited during rebalance.
+| TotalBatches | The number of batches sent during snapshot installation.
+| AvgBatchProcessingTime | The average time for processing single batch during
snapshot installation.
+| MinBatchProcessingTime | The minimum time for processing single batch during
snapshot installation.
+| MaxBatchProcessingTime | The maximum time for processing single batch during
snapshot installation.
+| TotalSnapshotInstallationTime | The total time take to install the snapshot.
+| LastAppliedIndex | The last applied index of raft group associated with
installed snapshot.
+| LastAppliedTerm | The last applied term of raft group associated with
installed snapshot.
+| PeerList | The raft group peer list valid during snapshot installation
process.
+| LearnerList | The raft group leaner list valid during snapshot installation
process.
+| CatalogVersion | The catalog version associated with the installed snapshot.
Review Comment:
```suggestion
| CatalogVersion | The catalog version associated with the installed Raft
snapshot.
```
##########
docs/_docs/administrators-guide/metrics/metrics-list.adoc:
##########
@@ -296,3 +296,28 @@ Transaction metrics.
| LocalUnrebalancedPartitionsCount | The number of partitions that should be
moved to this node.
| TotalUnrebalancedPartitionsCount | The total number of partitions that
should be moved to a new owner.
|=======================================================================
+
+== snapshots.outgoing.{snapshotId}
+
+[width="100%",cols="20%,80%",opts="header"]
+|=======================================================================
+| Metric name | Description
+| RowsSent | The number of rows that sent during snapshot installation,
commited before rebalance.
Review Comment:
```suggestion
| RowsSent | The number of rows that are sent during installation of the
Raft snapshot with the given ID, committed before rebalance.
```
About the 'committed before rebalance' part. We are sending rows, each row
is represented by its versions, those versions can be both committed and not
(i.e. write intents). I seems that the intention was to say that here we
account for rows that are not sent out-of-order. First, I proposed to use
RowsSent for both types of rows; second (if we still decide to keep the
distinction this way), I would not go into technical details too much. There
are regular rows and out-of-order rows that are accumulated in RAM when writes
coincide with the remaining part of the Raft snapshot; but I'm not sure whether
we need to explain the distinction; for the user, it should suffice to
understand that the latter kind is stored in RAM.
##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorage.java:
##########
@@ -257,7 +264,12 @@ public SnapshotReader startOutgoingSnapshot() {
startSnapshotOperation(snapshotId);
- return new OutgoingSnapshotReader(snapshotId, this) {
+ var metricSource = new OutgoingSnapshotMetricsSource(snapshotId,
partitionKey);
+
+ metricManager.registerSource(metricSource);
Review Comment:
Is it deregistered after the Raft snapshot installation is over?
##########
docs/_docs/administrators-guide/metrics/metrics-list.adoc:
##########
@@ -296,3 +296,28 @@ Transaction metrics.
| LocalUnrebalancedPartitionsCount | The number of partitions that should be
moved to this node.
| TotalUnrebalancedPartitionsCount | The total number of partitions that
should be moved to a new owner.
|=======================================================================
+
+== snapshots.outgoing.{snapshotId}
+
+[width="100%",cols="20%,80%",opts="header"]
+|=======================================================================
+| Metric name | Description
+| RowsSent | The number of rows that sent during snapshot installation,
commited before rebalance.
+| RowVersionsSent | The number of row versions that sent during snapshot
installation, commited before rebalance.
+| TotalBytesSent | The number of bytes sent during snapshot installation, for
rows commited before rebalance.
+| OutOfOrderRowsSent | The number of rows that sent during snapshot
installation, from transactions running or commited during rebalance.
+| OutOfOrderVersionsSent | The number of row versions that sent during
snapshot installation, from transactions running or commited during
+rebalance.
+| OutOfOrderTotalBytesSent | number of bytes sent during snapshot
installation, from transactions running or commited during rebalance.
+| TotalBatches | The number of batches sent during snapshot installation.
+| AvgBatchProcessingTime | The average time for processing single batch during
snapshot installation.
+| MinBatchProcessingTime | The minimum time for processing single batch during
snapshot installation.
+| MaxBatchProcessingTime | The maximum time for processing single batch during
snapshot installation.
+| TotalSnapshotInstallationTime | The total time take to install the snapshot.
Review Comment:
```suggestion
| TotalSnapshotInstallationTime | The total time it took to install the
snapshot.
```
##########
docs/_docs/administrators-guide/metrics/metrics-list.adoc:
##########
@@ -296,3 +296,28 @@ Transaction metrics.
| LocalUnrebalancedPartitionsCount | The number of partitions that should be
moved to this node.
| TotalUnrebalancedPartitionsCount | The total number of partitions that
should be moved to a new owner.
|=======================================================================
+
+== snapshots.outgoing.{snapshotId}
+
+[width="100%",cols="20%,80%",opts="header"]
+|=======================================================================
+| Metric name | Description
+| RowsSent | The number of rows that sent during snapshot installation,
commited before rebalance.
+| RowVersionsSent | The number of row versions that sent during snapshot
installation, commited before rebalance.
+| TotalBytesSent | The number of bytes sent during snapshot installation, for
rows commited before rebalance.
+| OutOfOrderRowsSent | The number of rows that sent during snapshot
installation, from transactions running or commited during rebalance.
+| OutOfOrderVersionsSent | The number of row versions that sent during
snapshot installation, from transactions running or commited during
+rebalance.
+| OutOfOrderTotalBytesSent | number of bytes sent during snapshot
installation, from transactions running or commited during rebalance.
+| TotalBatches | The number of batches sent during snapshot installation.
+| AvgBatchProcessingTime | The average time for processing single batch during
snapshot installation.
+| MinBatchProcessingTime | The minimum time for processing single batch during
snapshot installation.
+| MaxBatchProcessingTime | The maximum time for processing single batch during
snapshot installation.
+| TotalSnapshotInstallationTime | The total time take to install the snapshot.
Review Comment:
BTW, in the current form this gauge does not seem to be too useful as the
metric source should be removed as soon as the snapshot is finished, but the
gauge would only get its value set when it's finished. So no one will probably
observe this value, right?
Could we update this gauge each time we finish a batch?
##########
docs/_docs/administrators-guide/metrics/metrics-list.adoc:
##########
@@ -296,3 +296,28 @@ Transaction metrics.
| LocalUnrebalancedPartitionsCount | The number of partitions that should be
moved to this node.
| TotalUnrebalancedPartitionsCount | The total number of partitions that
should be moved to a new owner.
|=======================================================================
+
+== snapshots.outgoing.{snapshotId}
+
+[width="100%",cols="20%,80%",opts="header"]
+|=======================================================================
+| Metric name | Description
+| RowsSent | The number of rows that sent during snapshot installation,
commited before rebalance.
+| RowVersionsSent | The number of row versions that sent during snapshot
installation, commited before rebalance.
+| TotalBytesSent | The number of bytes sent during snapshot installation, for
rows commited before rebalance.
+| OutOfOrderRowsSent | The number of rows that sent during snapshot
installation, from transactions running or commited during rebalance.
+| OutOfOrderVersionsSent | The number of row versions that sent during
snapshot installation, from transactions running or commited during
+rebalance.
+| OutOfOrderTotalBytesSent | number of bytes sent during snapshot
installation, from transactions running or commited during rebalance.
+| TotalBatches | The number of batches sent during snapshot installation.
+| AvgBatchProcessingTime | The average time for processing single batch during
snapshot installation.
+| MinBatchProcessingTime | The minimum time for processing single batch during
snapshot installation.
+| MaxBatchProcessingTime | The maximum time for processing single batch during
snapshot installation.
+| TotalSnapshotInstallationTime | The total time take to install the snapshot.
+| LastAppliedIndex | The last applied index of raft group associated with
installed snapshot.
+| LastAppliedTerm | The last applied term of raft group associated with
installed snapshot.
+| PeerList | The raft group peer list valid during snapshot installation
process.
+| LearnerList | The raft group leaner list valid during snapshot installation
process.
Review Comment:
```suggestion
| PeerList | The raft group peer list of the Raft configuration included in
the Raft snapshot.
| LearnerList | The raft group leaner list of the Raft configuration
included in the Raft snapshot.
```
##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotMetricsSource.java:
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metrics.AbstractMetricSource;
+import org.apache.ignite.internal.metrics.AtomicIntMetric;
+import org.apache.ignite.internal.metrics.AtomicLongMetric;
+import org.apache.ignite.internal.metrics.Duration;
+import org.apache.ignite.internal.metrics.Metric;
+import org.apache.ignite.internal.metrics.StringGauge;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionKey;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.incoming.IncomingSnapshotCopier;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotMetricsSource.Holder;
+import org.apache.ignite.internal.raft.RaftGroupConfiguration;
+
+/**
+ * Tracks important metrics for the outgoing snapshot.
+ *
+ * <p>Note: not thread safe. Thread safe guaranteed by operation's order in
{@link IncomingSnapshotCopier}.
+ */
+public class OutgoingSnapshotMetricsSource extends
AbstractMetricSource<Holder> {
+ private static final IgniteLogger LOG =
Loggers.forClass(OutgoingSnapshotMetricsSource.class);
+
+ private final Duration currentBatchDuration = new Duration();
+
+ private long combinedBatchDuration;
+
+ private final Duration totalSnapshotDuration = new Duration();
+
+ private final List<String> peers = new ArrayList<>();
+
+ private final List<String> learners = new ArrayList<>();
+
+ private final UUID snapshotId;
+
+ private final PartitionKey partitionKey;
+
+ /** Constructor. */
+ public OutgoingSnapshotMetricsSource(UUID snapshotId, PartitionKey
partitionKey) {
+ super("snapshots.outgoing." + snapshotId);
+
+ this.snapshotId = snapshotId;
+ this.partitionKey = partitionKey;
+ }
+
+ void onSnapshotStart() {
+ totalSnapshotDuration.onStart();
+ }
+
+ void onSnapshotEnd() {
+ totalSnapshotDuration.onEnd();
+
+ Holder h = holder();
+
+ if (h != null) {
+
h.totalSnapshotInstallationTime.value(totalSnapshotDuration.duration(MILLISECONDS));
+
+ long totalBatches = h.totalBatches.value();
+
+ if (totalBatches > 0) {
+ h.avgBatchDuration.value(combinedBatchDuration / totalBatches);
+ }
+ }
+ }
+
+ void updateSnapshotMeta(long lastAppliedIndex, long lastAppliedTerm,
RaftGroupConfiguration config, int catalogVersion) {
+ Holder h = holder();
+
+ if (h != null) {
+ h.lastAppliedIndex.value(lastAppliedIndex);
+ h.lastAppliedTerm.value(lastAppliedTerm);
+ h.catalogVersion.value(catalogVersion);
+
+ peers.addAll(config.peers());
+
+ learners.addAll(config.learners());
+ }
+ }
+
+ void onStartMvDataBatchProcessing() {
+ this.currentBatchDuration.onStart();
+ }
+
+ void onEndMvDataBatchProcessing() {
+ this.currentBatchDuration.onEnd();
+
+ Holder h = holder();
+
+ if (h != null) {
+ h.totalBatches.increment();
+
+ long batchDuration = currentBatchDuration.duration(MILLISECONDS);
+
+ h.minBatchDuration.value(h.totalBatches.value() == 1
+ ? batchDuration
+ : Math.min(h.minBatchDuration.value(), batchDuration)
+ );
+
+ h.maxBatchDuration.value(Math.max(h.maxBatchDuration.value(),
batchDuration));
+
+ combinedBatchDuration += batchDuration;
+ }
+ }
+
+ void onProcessOutOfOrderRow(long rowVersions, long totalBytes) {
+ Holder h = holder();
+ if (h != null) {
+ h.outOfOrderRowsSent.increment();
+
+ h.outOfOrderVersionsSent.add(rowVersions);
+ h.outOfOrderTotalBytesSent.add(totalBytes);
+ }
+ }
+
+ void onProcessRow(long rowVersions, long totalBytes) {
+ Holder h = holder();
+ if (h != null) {
+ h.rowsSent.increment();
+
+ h.rowVersionsSent.add(rowVersions);
+ h.totalBytesSent.add(totalBytes);
+ }
+ }
+
+ void printSnapshotStats() {
Review Comment:
```suggestion
void logSnapshotStats() {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]