rpuch commented on code in PR #7269: URL: https://github.com/apache/ignite-3/pull/7269#discussion_r2634905577
########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/metrics/RaftSnapshotsMetricsSource.java: ########## @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator.raft.snapshot.metrics; + +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.IntSupplier; +import org.apache.ignite.internal.metrics.AbstractMetricSource; +import org.apache.ignite.internal.metrics.IntGauge; +import org.apache.ignite.internal.metrics.Metric; +import org.apache.ignite.internal.partition.replicator.raft.snapshot.metrics.RaftSnapshotsMetricsSource.Holder; + +/** + * Metric source that exposes counters related to Raft snapshots lifecycle for partition replicator. + * + * <p>The source is registered under the name {@code raft.snapshots}. It maintains the number of currently + * running incoming and outgoing snapshots and per-phase counters for the installation of incoming snapshots. + * These counters are intended to help understand where time is spent during snapshot installation and + * whether there are any bottlenecks (for example, waiting for catalog, loading multi-versioned data, etc.). + */ +public class RaftSnapshotsMetricsSource extends AbstractMetricSource<Holder> { + private final AtomicInteger totalIncomingSnapshotsCounter = new AtomicInteger(); + + private final AtomicInteger snapshotsLoadingMetaCounter = new AtomicInteger(); + + private final AtomicInteger snapshotsWaitingCatalogCounter = new AtomicInteger(); + + private final AtomicInteger snapshotsPreparingStoragesCounter = new AtomicInteger(); + + private final AtomicInteger snapshotsPreparingIndexForBuildCounter = new AtomicInteger(); + + private final AtomicInteger snapshotsLoadingMvDataCounter = new AtomicInteger(); + + private final AtomicInteger snapshotsLoadingTxMetaCounter = new AtomicInteger(); + + private final AtomicInteger totalOutgoingSnapshotsCounter = new AtomicInteger(); + + /** + * Creates a new metric source with the name {@code raft.snapshots}. + */ + public RaftSnapshotsMetricsSource() { + super("raft.snapshots"); Review Comment: Technically speaking, this source provides metrics about 'partition raft snapshots', not just 'raft snapshots' as CMG and MG also use raft snapshots. But I wonder if this is important in this case. Even if we keep the name, it makes sense to elaborate in the javadoc that it's about *partition* raft snapshots ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/metrics/RaftSnapshotsMetricsSource.java: ########## @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator.raft.snapshot.metrics; + +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.IntSupplier; +import org.apache.ignite.internal.metrics.AbstractMetricSource; +import org.apache.ignite.internal.metrics.IntGauge; +import org.apache.ignite.internal.metrics.Metric; +import org.apache.ignite.internal.partition.replicator.raft.snapshot.metrics.RaftSnapshotsMetricsSource.Holder; + +/** + * Metric source that exposes counters related to Raft snapshots lifecycle for partition replicator. + * + * <p>The source is registered under the name {@code raft.snapshots}. It maintains the number of currently + * running incoming and outgoing snapshots and per-phase counters for the installation of incoming snapshots. + * These counters are intended to help understand where time is spent during snapshot installation and + * whether there are any bottlenecks (for example, waiting for catalog, loading multi-versioned data, etc.). + */ +public class RaftSnapshotsMetricsSource extends AbstractMetricSource<Holder> { + private final AtomicInteger totalIncomingSnapshotsCounter = new AtomicInteger(); + + private final AtomicInteger snapshotsLoadingMetaCounter = new AtomicInteger(); + + private final AtomicInteger snapshotsWaitingCatalogCounter = new AtomicInteger(); + + private final AtomicInteger snapshotsPreparingStoragesCounter = new AtomicInteger(); + + private final AtomicInteger snapshotsPreparingIndexForBuildCounter = new AtomicInteger(); + + private final AtomicInteger snapshotsLoadingMvDataCounter = new AtomicInteger(); + + private final AtomicInteger snapshotsLoadingTxMetaCounter = new AtomicInteger(); + + private final AtomicInteger totalOutgoingSnapshotsCounter = new AtomicInteger(); + + /** + * Creates a new metric source with the name {@code raft.snapshots}. + */ + public RaftSnapshotsMetricsSource() { + super("raft.snapshots"); + } + + @Override + protected Holder createHolder() { + return new Holder( + totalIncomingSnapshotsCounter::get, + snapshotsLoadingMetaCounter::get, + snapshotsWaitingCatalogCounter::get, + snapshotsPreparingStoragesCounter::get, + snapshotsPreparingIndexForBuildCounter::get, + snapshotsLoadingMvDataCounter::get, + snapshotsLoadingTxMetaCounter::get, + totalOutgoingSnapshotsCounter::get + ); + } + + /** + * Marks the start of an incoming snapshot installation. + * Increments the {@code TotalIncomingSnapshots} counter. + */ + public void onSnapshotInstallationStart() { + totalIncomingSnapshotsCounter.incrementAndGet(); + } + + /** + * Marks the end of an incoming snapshot installation. + * Decrements the {@code TotalIncomingSnapshots} counter. + */ + public void onSnapshotInstallationEnd() { + totalIncomingSnapshotsCounter.decrementAndGet(); + } + + /** + * Marks the beginning of the "load snapshot metadata" phase during incoming snapshot installation. + * Increments the {@code IncomingSnapshotsLoadingMeta} counter. + */ + public void onLoadSnapshotPhaseStart() { Review Comment: Should the method name include `Meta` or `Metadata`? ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/metrics/RaftSnapshotsMetricsSource.java: ########## @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator.raft.snapshot.metrics; + +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.IntSupplier; +import org.apache.ignite.internal.metrics.AbstractMetricSource; +import org.apache.ignite.internal.metrics.IntGauge; +import org.apache.ignite.internal.metrics.Metric; +import org.apache.ignite.internal.partition.replicator.raft.snapshot.metrics.RaftSnapshotsMetricsSource.Holder; + +/** + * Metric source that exposes counters related to Raft snapshots lifecycle for partition replicator. + * + * <p>The source is registered under the name {@code raft.snapshots}. It maintains the number of currently + * running incoming and outgoing snapshots and per-phase counters for the installation of incoming snapshots. + * These counters are intended to help understand where time is spent during snapshot installation and + * whether there are any bottlenecks (for example, waiting for catalog, loading multi-versioned data, etc.). + */ +public class RaftSnapshotsMetricsSource extends AbstractMetricSource<Holder> { + private final AtomicInteger totalIncomingSnapshotsCounter = new AtomicInteger(); + + private final AtomicInteger snapshotsLoadingMetaCounter = new AtomicInteger(); + + private final AtomicInteger snapshotsWaitingCatalogCounter = new AtomicInteger(); + + private final AtomicInteger snapshotsPreparingStoragesCounter = new AtomicInteger(); + + private final AtomicInteger snapshotsPreparingIndexForBuildCounter = new AtomicInteger(); + + private final AtomicInteger snapshotsLoadingMvDataCounter = new AtomicInteger(); + + private final AtomicInteger snapshotsLoadingTxMetaCounter = new AtomicInteger(); + + private final AtomicInteger totalOutgoingSnapshotsCounter = new AtomicInteger(); + + /** + * Creates a new metric source with the name {@code raft.snapshots}. + */ + public RaftSnapshotsMetricsSource() { + super("raft.snapshots"); + } + + @Override + protected Holder createHolder() { + return new Holder( + totalIncomingSnapshotsCounter::get, + snapshotsLoadingMetaCounter::get, + snapshotsWaitingCatalogCounter::get, + snapshotsPreparingStoragesCounter::get, + snapshotsPreparingIndexForBuildCounter::get, + snapshotsLoadingMvDataCounter::get, + snapshotsLoadingTxMetaCounter::get, + totalOutgoingSnapshotsCounter::get + ); + } + + /** + * Marks the start of an incoming snapshot installation. + * Increments the {@code TotalIncomingSnapshots} counter. + */ + public void onSnapshotInstallationStart() { + totalIncomingSnapshotsCounter.incrementAndGet(); + } + + /** + * Marks the end of an incoming snapshot installation. + * Decrements the {@code TotalIncomingSnapshots} counter. + */ + public void onSnapshotInstallationEnd() { + totalIncomingSnapshotsCounter.decrementAndGet(); + } + + /** + * Marks the beginning of the "load snapshot metadata" phase during incoming snapshot installation. + * Increments the {@code IncomingSnapshotsLoadingMeta} counter. + */ + public void onLoadSnapshotPhaseStart() { + snapshotsLoadingMetaCounter.incrementAndGet(); + } + + /** + * Marks the end of the "load snapshot metadata" phase during incoming snapshot installation. + * Decrements the {@code IncomingSnapshotsLoadingMeta} counter. + */ + public void onLoadSnapshotPhaseEnd() { + snapshotsLoadingMetaCounter.decrementAndGet(); + } + + /** + * Marks the beginning of the phase where the node waits for catalog to be ready/apply updates + * for the incoming snapshot installation. + * Increments the {@code IncomingSnapshotsWaitingCatalog} counter. + */ + public void onWaitingCatalogPhaseStart() { + snapshotsWaitingCatalogCounter.incrementAndGet(); + } + + /** + * Marks the end of the "waiting for catalog" phase during incoming snapshot installation. + * Decrements the {@code IncomingSnapshotsWaitingCatalog} counter. + */ + public void onWaitingCatalogPhaseEnd() { + snapshotsWaitingCatalogCounter.decrementAndGet(); + } + + /** + * Marks the beginning of the phase where MV (multi-version) data is loaded from the snapshot. + * Increments the {@code IncomingSnapshotsLoadingMvData} counter. + */ + public void onLoadMvDataPhaseStart() { + snapshotsLoadingMvDataCounter.incrementAndGet(); + } + + /** + * Marks the end of the phase where MV (multi-version) data is loaded from the snapshot. + * Decrements the {@code IncomingSnapshotsLoadingMvData} counter. + */ + public void onLoadMvDataPhaseEnd() { + snapshotsLoadingMvDataCounter.decrementAndGet(); + } + + /** + * Marks the beginning of the phase where transaction metadata is loaded from the snapshot. + * Increments the {@code IncomingSnapshotsLoadingTxMeta} counter. + */ + public void onLoadTxMetasPhaseStart() { + snapshotsLoadingTxMetaCounter.incrementAndGet(); + } + + /** + * Marks the end of the phase where transaction metadata is loaded from the snapshot. + * Decrements the {@code IncomingSnapshotsLoadingTxMeta} counter. + */ + public void onLoadTxMetasPhaseEnd() { + snapshotsLoadingTxMetaCounter.decrementAndGet(); + } + + /** + * Marks the beginning of preparing indexes for build (for example, collecting row IDs) as part of + * incoming snapshot installation. + * Increments the {@code IncomingSnapshotsPreparingIndexForBuild} counter. + */ + public void onSetRowIdToBuildPhaseStart() { + snapshotsPreparingIndexForBuildCounter.incrementAndGet(); + } + + /** + * Marks the end of preparing indexes for build as part of incoming snapshot installation. + * Decrements the {@code IncomingSnapshotsPreparingIndexForBuild} counter. + */ + public void onSetRowIdToBuildPhaseEnd() { + snapshotsPreparingIndexForBuildCounter.decrementAndGet(); + } + + /** + * Marks the beginning of the storage preparation phase for incoming snapshot installation. + * Increments the {@code IncomingSnapshotsPreparingStorages} counter. + */ + public void onPreparingStoragePhaseStart() { Review Comment: Let's move these methods higher. The idea is that order of methods (and fields) reflect (where possible) the order of events happening during a snapshot installation. ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/metrics/RaftSnapshotsMetricsSource.java: ########## @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator.raft.snapshot.metrics; + +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.IntSupplier; +import org.apache.ignite.internal.metrics.AbstractMetricSource; +import org.apache.ignite.internal.metrics.IntGauge; +import org.apache.ignite.internal.metrics.Metric; +import org.apache.ignite.internal.partition.replicator.raft.snapshot.metrics.RaftSnapshotsMetricsSource.Holder; + +/** + * Metric source that exposes counters related to Raft snapshots lifecycle for partition replicator. + * + * <p>The source is registered under the name {@code raft.snapshots}. It maintains the number of currently + * running incoming and outgoing snapshots and per-phase counters for the installation of incoming snapshots. + * These counters are intended to help understand where time is spent during snapshot installation and + * whether there are any bottlenecks (for example, waiting for catalog, loading multi-versioned data, etc.). + */ +public class RaftSnapshotsMetricsSource extends AbstractMetricSource<Holder> { + private final AtomicInteger totalIncomingSnapshotsCounter = new AtomicInteger(); + + private final AtomicInteger snapshotsLoadingMetaCounter = new AtomicInteger(); + + private final AtomicInteger snapshotsWaitingCatalogCounter = new AtomicInteger(); + + private final AtomicInteger snapshotsPreparingStoragesCounter = new AtomicInteger(); + + private final AtomicInteger snapshotsPreparingIndexForBuildCounter = new AtomicInteger(); + + private final AtomicInteger snapshotsLoadingMvDataCounter = new AtomicInteger(); + + private final AtomicInteger snapshotsLoadingTxMetaCounter = new AtomicInteger(); + + private final AtomicInteger totalOutgoingSnapshotsCounter = new AtomicInteger(); + + /** + * Creates a new metric source with the name {@code raft.snapshots}. + */ + public RaftSnapshotsMetricsSource() { + super("raft.snapshots"); + } + + @Override + protected Holder createHolder() { + return new Holder( + totalIncomingSnapshotsCounter::get, + snapshotsLoadingMetaCounter::get, + snapshotsWaitingCatalogCounter::get, + snapshotsPreparingStoragesCounter::get, + snapshotsPreparingIndexForBuildCounter::get, + snapshotsLoadingMvDataCounter::get, + snapshotsLoadingTxMetaCounter::get, + totalOutgoingSnapshotsCounter::get + ); + } + + /** + * Marks the start of an incoming snapshot installation. + * Increments the {@code TotalIncomingSnapshots} counter. + */ + public void onSnapshotInstallationStart() { + totalIncomingSnapshotsCounter.incrementAndGet(); + } + + /** + * Marks the end of an incoming snapshot installation. + * Decrements the {@code TotalIncomingSnapshots} counter. + */ + public void onSnapshotInstallationEnd() { + totalIncomingSnapshotsCounter.decrementAndGet(); + } + + /** + * Marks the beginning of the "load snapshot metadata" phase during incoming snapshot installation. + * Increments the {@code IncomingSnapshotsLoadingMeta} counter. + */ + public void onLoadSnapshotPhaseStart() { + snapshotsLoadingMetaCounter.incrementAndGet(); + } + + /** + * Marks the end of the "load snapshot metadata" phase during incoming snapshot installation. + * Decrements the {@code IncomingSnapshotsLoadingMeta} counter. + */ + public void onLoadSnapshotPhaseEnd() { + snapshotsLoadingMetaCounter.decrementAndGet(); + } + + /** + * Marks the beginning of the phase where the node waits for catalog to be ready/apply updates + * for the incoming snapshot installation. + * Increments the {@code IncomingSnapshotsWaitingCatalog} counter. + */ + public void onWaitingCatalogPhaseStart() { + snapshotsWaitingCatalogCounter.incrementAndGet(); + } + + /** + * Marks the end of the "waiting for catalog" phase during incoming snapshot installation. + * Decrements the {@code IncomingSnapshotsWaitingCatalog} counter. + */ + public void onWaitingCatalogPhaseEnd() { + snapshotsWaitingCatalogCounter.decrementAndGet(); + } + + /** + * Marks the beginning of the phase where MV (multi-version) data is loaded from the snapshot. Review Comment: 'from the snapshot' sounds weird. It is a part of the snapshot. Maybe 'where snapshot MV data is loaded'? ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/metrics/RaftSnapshotsMetricsSource.java: ########## @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator.raft.snapshot.metrics; + +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.IntSupplier; +import org.apache.ignite.internal.metrics.AbstractMetricSource; +import org.apache.ignite.internal.metrics.IntGauge; +import org.apache.ignite.internal.metrics.Metric; +import org.apache.ignite.internal.partition.replicator.raft.snapshot.metrics.RaftSnapshotsMetricsSource.Holder; + +/** + * Metric source that exposes counters related to Raft snapshots lifecycle for partition replicator. + * + * <p>The source is registered under the name {@code raft.snapshots}. It maintains the number of currently + * running incoming and outgoing snapshots and per-phase counters for the installation of incoming snapshots. + * These counters are intended to help understand where time is spent during snapshot installation and + * whether there are any bottlenecks (for example, waiting for catalog, loading multi-versioned data, etc.). + */ +public class RaftSnapshotsMetricsSource extends AbstractMetricSource<Holder> { + private final AtomicInteger totalIncomingSnapshotsCounter = new AtomicInteger(); + + private final AtomicInteger snapshotsLoadingMetaCounter = new AtomicInteger(); + + private final AtomicInteger snapshotsWaitingCatalogCounter = new AtomicInteger(); + + private final AtomicInteger snapshotsPreparingStoragesCounter = new AtomicInteger(); + + private final AtomicInteger snapshotsPreparingIndexForBuildCounter = new AtomicInteger(); + + private final AtomicInteger snapshotsLoadingMvDataCounter = new AtomicInteger(); + + private final AtomicInteger snapshotsLoadingTxMetaCounter = new AtomicInteger(); + + private final AtomicInteger totalOutgoingSnapshotsCounter = new AtomicInteger(); + + /** + * Creates a new metric source with the name {@code raft.snapshots}. + */ + public RaftSnapshotsMetricsSource() { + super("raft.snapshots"); + } + + @Override + protected Holder createHolder() { + return new Holder( + totalIncomingSnapshotsCounter::get, + snapshotsLoadingMetaCounter::get, + snapshotsWaitingCatalogCounter::get, + snapshotsPreparingStoragesCounter::get, + snapshotsPreparingIndexForBuildCounter::get, + snapshotsLoadingMvDataCounter::get, + snapshotsLoadingTxMetaCounter::get, + totalOutgoingSnapshotsCounter::get + ); + } + + /** + * Marks the start of an incoming snapshot installation. + * Increments the {@code TotalIncomingSnapshots} counter. + */ + public void onSnapshotInstallationStart() { + totalIncomingSnapshotsCounter.incrementAndGet(); + } + + /** + * Marks the end of an incoming snapshot installation. + * Decrements the {@code TotalIncomingSnapshots} counter. + */ + public void onSnapshotInstallationEnd() { + totalIncomingSnapshotsCounter.decrementAndGet(); + } + + /** + * Marks the beginning of the "load snapshot metadata" phase during incoming snapshot installation. + * Increments the {@code IncomingSnapshotsLoadingMeta} counter. + */ + public void onLoadSnapshotPhaseStart() { + snapshotsLoadingMetaCounter.incrementAndGet(); + } + + /** + * Marks the end of the "load snapshot metadata" phase during incoming snapshot installation. + * Decrements the {@code IncomingSnapshotsLoadingMeta} counter. + */ + public void onLoadSnapshotPhaseEnd() { + snapshotsLoadingMetaCounter.decrementAndGet(); + } + + /** + * Marks the beginning of the phase where the node waits for catalog to be ready/apply updates + * for the incoming snapshot installation. + * Increments the {@code IncomingSnapshotsWaitingCatalog} counter. + */ + public void onWaitingCatalogPhaseStart() { + snapshotsWaitingCatalogCounter.incrementAndGet(); + } + + /** + * Marks the end of the "waiting for catalog" phase during incoming snapshot installation. + * Decrements the {@code IncomingSnapshotsWaitingCatalog} counter. + */ + public void onWaitingCatalogPhaseEnd() { + snapshotsWaitingCatalogCounter.decrementAndGet(); + } + + /** + * Marks the beginning of the phase where MV (multi-version) data is loaded from the snapshot. + * Increments the {@code IncomingSnapshotsLoadingMvData} counter. + */ + public void onLoadMvDataPhaseStart() { + snapshotsLoadingMvDataCounter.incrementAndGet(); + } + + /** + * Marks the end of the phase where MV (multi-version) data is loaded from the snapshot. + * Decrements the {@code IncomingSnapshotsLoadingMvData} counter. + */ + public void onLoadMvDataPhaseEnd() { + snapshotsLoadingMvDataCounter.decrementAndGet(); + } + + /** + * Marks the beginning of the phase where transaction metadata is loaded from the snapshot. + * Increments the {@code IncomingSnapshotsLoadingTxMeta} counter. + */ + public void onLoadTxMetasPhaseStart() { + snapshotsLoadingTxMetaCounter.incrementAndGet(); + } + + /** + * Marks the end of the phase where transaction metadata is loaded from the snapshot. + * Decrements the {@code IncomingSnapshotsLoadingTxMeta} counter. + */ + public void onLoadTxMetasPhaseEnd() { + snapshotsLoadingTxMetaCounter.decrementAndGet(); + } + + /** + * Marks the beginning of preparing indexes for build (for example, collecting row IDs) as part of Review Comment: Nothing is collected here. We just set NextRowIdToBuildIndex for each buildable index of each table of the zone ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/metrics/RaftSnapshotsMetricsSource.java: ########## @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator.raft.snapshot.metrics; + +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.IntSupplier; +import org.apache.ignite.internal.metrics.AbstractMetricSource; +import org.apache.ignite.internal.metrics.IntGauge; +import org.apache.ignite.internal.metrics.Metric; +import org.apache.ignite.internal.partition.replicator.raft.snapshot.metrics.RaftSnapshotsMetricsSource.Holder; + +/** + * Metric source that exposes counters related to Raft snapshots lifecycle for partition replicator. + * + * <p>The source is registered under the name {@code raft.snapshots}. It maintains the number of currently + * running incoming and outgoing snapshots and per-phase counters for the installation of incoming snapshots. + * These counters are intended to help understand where time is spent during snapshot installation and + * whether there are any bottlenecks (for example, waiting for catalog, loading multi-versioned data, etc.). + */ +public class RaftSnapshotsMetricsSource extends AbstractMetricSource<Holder> { + private final AtomicInteger totalIncomingSnapshotsCounter = new AtomicInteger(); + + private final AtomicInteger snapshotsLoadingMetaCounter = new AtomicInteger(); + + private final AtomicInteger snapshotsWaitingCatalogCounter = new AtomicInteger(); + + private final AtomicInteger snapshotsPreparingStoragesCounter = new AtomicInteger(); + + private final AtomicInteger snapshotsPreparingIndexForBuildCounter = new AtomicInteger(); + + private final AtomicInteger snapshotsLoadingMvDataCounter = new AtomicInteger(); + + private final AtomicInteger snapshotsLoadingTxMetaCounter = new AtomicInteger(); + + private final AtomicInteger totalOutgoingSnapshotsCounter = new AtomicInteger(); + + /** + * Creates a new metric source with the name {@code raft.snapshots}. + */ + public RaftSnapshotsMetricsSource() { + super("raft.snapshots"); + } + + @Override + protected Holder createHolder() { + return new Holder( + totalIncomingSnapshotsCounter::get, + snapshotsLoadingMetaCounter::get, + snapshotsWaitingCatalogCounter::get, + snapshotsPreparingStoragesCounter::get, + snapshotsPreparingIndexForBuildCounter::get, + snapshotsLoadingMvDataCounter::get, + snapshotsLoadingTxMetaCounter::get, + totalOutgoingSnapshotsCounter::get + ); + } + + /** + * Marks the start of an incoming snapshot installation. + * Increments the {@code TotalIncomingSnapshots} counter. Review Comment: This counter is an internal detail, should it be mentioned? Same for other cases ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java: ########## @@ -93,6 +100,11 @@ public class ZoneResourcesManager implements ManuallyCloseable { this.failureProcessor = failureProcessor; this.partitionOperationsExecutor = partitionOperationsExecutor; this.replicaManager = replicaManager; + this.metricManager = metricManager; + + this.snapshotsMetricsSource = new RaftSnapshotsMetricsSource(); + metricManager.registerSource(snapshotsMetricsSource); + metricManager.enable(snapshotsMetricsSource); Review Comment: Should we do it on start? ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopier.java: ########## @@ -579,6 +594,7 @@ private CompletableFuture<Void> loadSnapshotTxData(InternalClusterNode snapshotS */ private CompletableFuture<Void> completeRebalance(SnapshotContext snapshotContext, @Nullable Throwable throwable) { snapshotStats.onSnapshotInstallationEnd(); + snapshotsMetricsSource.onSnapshotInstallationEnd(); Review Comment: This is not the end. After this, rebalance is either aborted or finished on storages which can also take time. It seems that this 'completion' stage is not covered ########## modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopierTest.java: ########## @@ -840,4 +950,46 @@ private static void assertThatTargetStoragesAreEmpty( private static UUID generateTxId() { return TransactionIds.transactionId(CLOCK.now(), 1); } + + private static Metric retrieveOutgoingSnapshotMetric(RaftSnapshotsMetricsSource snapshotsMetricsSource, String metricName) { + return stream(snapshotsMetricsSource.holder().metrics().spliterator(), false) + .filter(metric -> metricName.equals(metric.name())) + .findAny() + .get(); + } + + private static void assertThatMetricHasValue( + RaftSnapshotsMetricsSource snapshotsMetricsSource, + String metricName, + String expectedValue + ) { + Metric metric = retrieveOutgoingSnapshotMetric(snapshotsMetricsSource, metricName); + + Awaitility.await().until(metric::getValueAsString, is(expectedValue)); Review Comment: Is this wait bounded? With which duration? ########## modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopierTest.java: ########## @@ -840,4 +950,46 @@ private static void assertThatTargetStoragesAreEmpty( private static UUID generateTxId() { return TransactionIds.transactionId(CLOCK.now(), 1); } + + private static Metric retrieveOutgoingSnapshotMetric(RaftSnapshotsMetricsSource snapshotsMetricsSource, String metricName) { + return stream(snapshotsMetricsSource.holder().metrics().spliterator(), false) + .filter(metric -> metricName.equals(metric.name())) + .findAny() + .get(); + } + + private static void assertThatMetricHasValue( + RaftSnapshotsMetricsSource snapshotsMetricsSource, + String metricName, + String expectedValue + ) { + Metric metric = retrieveOutgoingSnapshotMetric(snapshotsMetricsSource, metricName); + + Awaitility.await().until(metric::getValueAsString, is(expectedValue)); + } + + private MessagingService messagingServiceForMetrics( + CompletableFuture<NetworkMessage> loadSnapshotFuture, Review Comment: ```suggestion CompletableFuture<NetworkMessage> loadSnapshotMetaFuture, ``` ########## modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopierTest.java: ########## @@ -817,6 +838,95 @@ void laggingSchemasPreventSnapshotInstallation() { assertThatTargetStoragesAreEmpty(incomingMvTableStorage, incomingTxStateStorage); } + @Test + void metricsCalculateCorrectly() throws InterruptedException { + incomingMvTableStorage.createMvPartition(PARTITION_ID); + incomingTxStateStorage.getOrCreatePartitionStorage(PARTITION_ID); + + PartitionSnapshotMeta meta = mock(PartitionSnapshotMeta.class); + + when(meta.requiredCatalogVersion()).thenReturn(1); + + SnapshotMetaResponse metaResponse = mock(SnapshotMetaResponse.class); + + when(metaResponse.meta()).thenReturn(meta); + + CompletableFuture<NetworkMessage> loadSnapshotFuture = new CompletableFuture<>(); + + CompletableFuture<NetworkMessage> loadMvDataFuture = new CompletableFuture<>(); + + CompletableFuture<NetworkMessage> loadTxMetaFuture = new CompletableFuture<>(); + + SnapshotMvDataResponse mvDataResponse = mock(SnapshotMvDataResponse.class); + when(mvDataResponse.finish()).thenReturn(true); + + SnapshotTxDataResponse txMetaResponse = mock(SnapshotTxDataResponse.class); + when(txMetaResponse.finish()).thenReturn(true); + + MessagingService messagingService = messagingServiceForMetrics(loadSnapshotFuture, loadMvDataFuture, loadTxMetaFuture); + + CompletableFuture<Void> catalogReadyFuture = new CompletableFuture<>(); + + CatalogService catalogService = mock(CatalogService.class); + + when(catalogService.catalogReadyFuture(anyInt())).thenReturn(catalogReadyFuture); + + PartitionSnapshotStorage partitionSnapshotStorage = createPartitionSnapshotStorage( + incomingMvTableStorage, + incomingTxStateStorage, + messagingService, + catalogService + ); + + var snapshotMetricSource = new RaftSnapshotsMetricsSource(); + + snapshotMetricSource.enable(); + + IncomingSnapshotCopier copier = new IncomingSnapshotCopier( + partitionSnapshotStorage, + SnapshotUri.fromStringUri(SnapshotUri.toStringUri(snapshotId, NODE_NAME)), + executorService, + 1000, + snapshotMetricSource + ); + + assertThatMetricHasValue(snapshotMetricSource, "TotalIncomingSnapshots", "0"); + + assertThatMetricHasValue(snapshotMetricSource, "IncomingSnapshotsLoadingMeta", "0"); Review Comment: ```suggestion assertThatMetricHasValue(snapshotMetricSource, "TotalIncomingSnapshots", "0"); assertThatMetricHasValue(snapshotMetricSource, "IncomingSnapshotsLoadingMeta", "0"); ``` Same thing for following cases: we could block the related assertions together to make it easier to understand what the blocks are ########## modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopierTest.java: ########## @@ -817,6 +838,95 @@ void laggingSchemasPreventSnapshotInstallation() { assertThatTargetStoragesAreEmpty(incomingMvTableStorage, incomingTxStateStorage); } + @Test + void metricsCalculateCorrectly() throws InterruptedException { + incomingMvTableStorage.createMvPartition(PARTITION_ID); + incomingTxStateStorage.getOrCreatePartitionStorage(PARTITION_ID); + + PartitionSnapshotMeta meta = mock(PartitionSnapshotMeta.class); + + when(meta.requiredCatalogVersion()).thenReturn(1); + + SnapshotMetaResponse metaResponse = mock(SnapshotMetaResponse.class); + + when(metaResponse.meta()).thenReturn(meta); + + CompletableFuture<NetworkMessage> loadSnapshotFuture = new CompletableFuture<>(); Review Comment: ```suggestion CompletableFuture<NetworkMessage> loadSnapshotMetaFuture = new CompletableFuture<>(); ``` ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/metrics/RaftSnapshotsMetricsSource.java: ########## @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator.raft.snapshot.metrics; + +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.IntSupplier; +import org.apache.ignite.internal.metrics.AbstractMetricSource; +import org.apache.ignite.internal.metrics.IntGauge; +import org.apache.ignite.internal.metrics.Metric; +import org.apache.ignite.internal.partition.replicator.raft.snapshot.metrics.RaftSnapshotsMetricsSource.Holder; + +/** + * Metric source that exposes counters related to Raft snapshots lifecycle for partition replicator. + * + * <p>The source is registered under the name {@code raft.snapshots}. It maintains the number of currently + * running incoming and outgoing snapshots and per-phase counters for the installation of incoming snapshots. + * These counters are intended to help understand where time is spent during snapshot installation and + * whether there are any bottlenecks (for example, waiting for catalog, loading multi-versioned data, etc.). + */ +public class RaftSnapshotsMetricsSource extends AbstractMetricSource<Holder> { + private final AtomicInteger totalIncomingSnapshotsCounter = new AtomicInteger(); + + private final AtomicInteger snapshotsLoadingMetaCounter = new AtomicInteger(); + + private final AtomicInteger snapshotsWaitingCatalogCounter = new AtomicInteger(); + + private final AtomicInteger snapshotsPreparingStoragesCounter = new AtomicInteger(); + + private final AtomicInteger snapshotsPreparingIndexForBuildCounter = new AtomicInteger(); + + private final AtomicInteger snapshotsLoadingMvDataCounter = new AtomicInteger(); + + private final AtomicInteger snapshotsLoadingTxMetaCounter = new AtomicInteger(); + + private final AtomicInteger totalOutgoingSnapshotsCounter = new AtomicInteger(); + + /** + * Creates a new metric source with the name {@code raft.snapshots}. + */ + public RaftSnapshotsMetricsSource() { + super("raft.snapshots"); + } + + @Override + protected Holder createHolder() { + return new Holder( + totalIncomingSnapshotsCounter::get, + snapshotsLoadingMetaCounter::get, + snapshotsWaitingCatalogCounter::get, + snapshotsPreparingStoragesCounter::get, + snapshotsPreparingIndexForBuildCounter::get, + snapshotsLoadingMvDataCounter::get, + snapshotsLoadingTxMetaCounter::get, + totalOutgoingSnapshotsCounter::get + ); + } + + /** + * Marks the start of an incoming snapshot installation. + * Increments the {@code TotalIncomingSnapshots} counter. + */ + public void onSnapshotInstallationStart() { + totalIncomingSnapshotsCounter.incrementAndGet(); + } + + /** + * Marks the end of an incoming snapshot installation. + * Decrements the {@code TotalIncomingSnapshots} counter. + */ + public void onSnapshotInstallationEnd() { + totalIncomingSnapshotsCounter.decrementAndGet(); + } + + /** + * Marks the beginning of the "load snapshot metadata" phase during incoming snapshot installation. + * Increments the {@code IncomingSnapshotsLoadingMeta} counter. + */ + public void onLoadSnapshotPhaseStart() { + snapshotsLoadingMetaCounter.incrementAndGet(); + } + + /** + * Marks the end of the "load snapshot metadata" phase during incoming snapshot installation. + * Decrements the {@code IncomingSnapshotsLoadingMeta} counter. + */ + public void onLoadSnapshotPhaseEnd() { + snapshotsLoadingMetaCounter.decrementAndGet(); + } + + /** + * Marks the beginning of the phase where the node waits for catalog to be ready/apply updates + * for the incoming snapshot installation. + * Increments the {@code IncomingSnapshotsWaitingCatalog} counter. + */ + public void onWaitingCatalogPhaseStart() { + snapshotsWaitingCatalogCounter.incrementAndGet(); + } + + /** + * Marks the end of the "waiting for catalog" phase during incoming snapshot installation. + * Decrements the {@code IncomingSnapshotsWaitingCatalog} counter. + */ + public void onWaitingCatalogPhaseEnd() { + snapshotsWaitingCatalogCounter.decrementAndGet(); + } + + /** + * Marks the beginning of the phase where MV (multi-version) data is loaded from the snapshot. + * Increments the {@code IncomingSnapshotsLoadingMvData} counter. + */ + public void onLoadMvDataPhaseStart() { + snapshotsLoadingMvDataCounter.incrementAndGet(); + } + + /** + * Marks the end of the phase where MV (multi-version) data is loaded from the snapshot. + * Decrements the {@code IncomingSnapshotsLoadingMvData} counter. + */ + public void onLoadMvDataPhaseEnd() { + snapshotsLoadingMvDataCounter.decrementAndGet(); + } + + /** + * Marks the beginning of the phase where transaction metadata is loaded from the snapshot. + * Increments the {@code IncomingSnapshotsLoadingTxMeta} counter. + */ + public void onLoadTxMetasPhaseStart() { + snapshotsLoadingTxMetaCounter.incrementAndGet(); + } + + /** + * Marks the end of the phase where transaction metadata is loaded from the snapshot. + * Decrements the {@code IncomingSnapshotsLoadingTxMeta} counter. + */ + public void onLoadTxMetasPhaseEnd() { + snapshotsLoadingTxMetaCounter.decrementAndGet(); + } + + /** + * Marks the beginning of preparing indexes for build (for example, collecting row IDs) as part of + * incoming snapshot installation. + * Increments the {@code IncomingSnapshotsPreparingIndexForBuild} counter. + */ + public void onSetRowIdToBuildPhaseStart() { + snapshotsPreparingIndexForBuildCounter.incrementAndGet(); + } + + /** + * Marks the end of preparing indexes for build as part of incoming snapshot installation. + * Decrements the {@code IncomingSnapshotsPreparingIndexForBuild} counter. + */ + public void onSetRowIdToBuildPhaseEnd() { + snapshotsPreparingIndexForBuildCounter.decrementAndGet(); + } + + /** + * Marks the beginning of the storage preparation phase for incoming snapshot installation. + * Increments the {@code IncomingSnapshotsPreparingStorages} counter. + */ + public void onPreparingStoragePhaseStart() { + snapshotsPreparingStoragesCounter.incrementAndGet(); + } + + /** + * Marks the end of the storage preparation phase for incoming snapshot installation. + * Decrements the {@code IncomingSnapshotsPreparingStorages} counter. + */ + public void onPreparingStoragePhaseEnd() { + snapshotsPreparingStoragesCounter.decrementAndGet(); + } + + /** + * Marks the start of an outgoing snapshot creation/streaming. + * Increments the {@code TotalOutgoingSnapshots} counter. + */ + public void onOutgoingSnapshotStart() { + totalOutgoingSnapshotsCounter.incrementAndGet(); + } + + /** + * Marks the end of an outgoing snapshot creation/streaming. + * Decrements the {@code TotalOutgoingSnapshots} counter. + */ + public void onOutgoingSnapshotEnd() { + totalOutgoingSnapshotsCounter.decrementAndGet(); + } + + /** + * Container of metrics exposed by {@link RaftSnapshotsMetricsSource}. + */ + public static class Holder implements AbstractMetricSource.Holder<Holder> { + private final IntGauge totalIncomingSnapshots; + + private final IntGauge snapshotsLoadingMeta; + + private final IntGauge snapshotsWaitingCatalog; + + private final IntGauge snapshotsPreparingStorages; + + private final IntGauge snapshotsPreparingIndexForBuild; + + private final IntGauge snapshotsLoadingMvData; + + private final IntGauge snapshotsLoadingTxMeta; + + private final IntGauge totalOutgoingSnapshots; + + private Holder( + IntSupplier totalIncomingSnapshotsSupplier, + IntSupplier snapshotsLoadingMetaSupplier, + IntSupplier snapshotsWaitingCatalogSupplier, + IntSupplier snapshotsPreparingStoragesSupplier, + IntSupplier snapshotsPreparingIndexForBuildSupplier, + IntSupplier snapshotsLoadingMvDataSupplier, + IntSupplier snapshotsLoadingTxMetaSupplier, + IntSupplier totalOutgoingSnapshotsSupplier + ) { + totalIncomingSnapshots = new IntGauge( + "TotalIncomingSnapshots", Review Comment: Why does this name contain 'Total' in it while others don't? ########## modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopierTest.java: ########## @@ -840,4 +950,46 @@ private static void assertThatTargetStoragesAreEmpty( private static UUID generateTxId() { return TransactionIds.transactionId(CLOCK.now(), 1); } + + private static Metric retrieveOutgoingSnapshotMetric(RaftSnapshotsMetricsSource snapshotsMetricsSource, String metricName) { + return stream(snapshotsMetricsSource.holder().metrics().spliterator(), false) + .filter(metric -> metricName.equals(metric.name())) + .findAny() + .get(); + } + + private static void assertThatMetricHasValue( Review Comment: ```suggestion private static void waitTillMetricHasValue( ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
