This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new c8a03bc7f5 IGNITE-22410 Implement rebalance triggers for zone based
partitions (#3943)
c8a03bc7f5 is described below
commit c8a03bc7f5d5542bfb2cc3e2140a926f2bc803e6
Author: Mirza Aliev <[email protected]>
AuthorDate: Wed Jul 3 11:23:18 2024 +0400
IGNITE-22410 Implement rebalance triggers for zone based partitions (#3943)
---
.../rebalance/DistributionZoneRebalanceEngine.java | 13 +-
.../DistributionZoneRebalanceEngineV2.java | 213 ++++++++
.../RebalanceRaftGroupEventsListener.java | 2 +
.../distributionzones/rebalance/RebalanceUtil.java | 64 +--
...a => ZoneRebalanceRaftGroupEventsListener.java} | 337 ++++++-------
.../{RebalanceUtil.java => ZoneRebalanceUtil.java} | 383 ++++++---------
.../replicator/ItReplicaLifecycleTest.java | 125 ++++-
.../PartitionReplicaLifecycleManager.java | 542 ++++++++++++++++++++-
.../ignite/internal/replicator/ReplicaManager.java | 63 ++-
.../replicator/ZonePartitionReplicaImpl.java | 15 +-
.../org/apache/ignite/internal/app/IgniteImpl.java | 3 +-
11 files changed, 1230 insertions(+), 530 deletions(-)
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
index 39448ae24f..f8309f7fa0 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
@@ -99,6 +99,11 @@ public class DistributionZoneRebalanceEngine {
/** Executor for scheduling rebalances. */
private final ScheduledExecutorService rebalanceScheduler;
+ /** Zone rebalance manager. */
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-22522 this class
will replace DistributionZoneRebalanceEngine
+ // TODO: after switching to zone-based replication
+ private final DistributionZoneRebalanceEngineV2
distributionZoneRebalanceEngineV2;
+
/**
* Constructor.
*
@@ -122,6 +127,12 @@ public class DistributionZoneRebalanceEngine {
this.dataNodesListener = createDistributionZonesDataNodesListener();
this.partitionsCounterListener = createPartitionsCounterListener();
this.rebalanceScheduler = rebalanceScheduler;
+ this.distributionZoneRebalanceEngineV2 = new
DistributionZoneRebalanceEngineV2(
+ busyLock,
+ metaStorageManager,
+ distributionZoneManager,
+ catalogService
+ );
}
/**
@@ -148,7 +159,7 @@ public class DistributionZoneRebalanceEngine {
long recoveryRevision = recoveryFinishFuture.join();
- return rebalanceTriggersRecovery(recoveryRevision);
+ return rebalanceTriggersRecovery(recoveryRevision).thenCompose(v
-> distributionZoneRebalanceEngineV2.start());
});
}
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineV2.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineV2.java
new file mode 100644
index 0000000000..a1b684de29
--- /dev/null
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineV2.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones.rebalance;
+
+import static
org.apache.ignite.internal.catalog.events.CatalogEvent.ZONE_ALTER;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.filterDataNodes;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.parseDataNodes;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
+import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.extractZoneIdDataNodes;
+import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.triggerZonePartitionsRebalance;
+import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.catalog.CatalogManager;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
+import org.apache.ignite.internal.catalog.events.AlterZoneEventParameters;
+import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
+import org.apache.ignite.internal.distributionzones.Node;
+import
org.apache.ignite.internal.distributionzones.utils.CatalogAlterZoneEventListener;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.WatchEvent;
+import org.apache.ignite.internal.metastorage.WatchListener;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
+
+/**
+ * Zone rebalance manager. It listens to the changes in the distribution zones
data nodes and replicas and triggers rebalance
+ * for the corresponding partitions. By triggering rebalance, it updates the
pending assignments in the metastore.
+ * // TODO: https://issues.apache.org/jira/browse/IGNITE-22522 this class will
replace DistributionZoneRebalanceEngine
+ * // TODO: after switching to zone-based replication
+ */
+public class DistributionZoneRebalanceEngineV2 {
+ /** The logger. */
+ private static final IgniteLogger LOG =
Loggers.forClass(DistributionZoneRebalanceEngineV2.class);
+
+ /** Prevents double stopping of the component. */
+ private final AtomicBoolean stopGuard = new AtomicBoolean();
+
+ /** External busy lock. */
+ private final IgniteSpinBusyLock busyLock;
+
+ /** Meta Storage manager. */
+ private final MetaStorageManager metaStorageManager;
+
+ /** Distribution zones manager. */
+ private final DistributionZoneManager distributionZoneManager;
+
+ /** Meta storage listener for data nodes changes. */
+ private final WatchListener dataNodesListener;
+
+ /** Catalog service. */
+ private final CatalogService catalogService;
+
+ /**
+ * Constructor.
+ *
+ * @param busyLock External busy lock.
+ * @param metaStorageManager Meta Storage manager.
+ * @param distributionZoneManager Distribution zones manager.
+ * @param catalogService Catalog service.
+ */
+ public DistributionZoneRebalanceEngineV2(
+ IgniteSpinBusyLock busyLock,
+ MetaStorageManager metaStorageManager,
+ DistributionZoneManager distributionZoneManager,
+ CatalogManager catalogService
+ ) {
+ this.busyLock = busyLock;
+ this.metaStorageManager = metaStorageManager;
+ this.distributionZoneManager = distributionZoneManager;
+ this.catalogService = catalogService;
+
+ this.dataNodesListener = createDistributionZonesDataNodesListener();
+ }
+
+ /**
+ * Starts the rebalance engine by registering corresponding meta storage
and catalog listeners.
+ */
+ public CompletableFuture<Void> start() {
+ return IgniteUtils.inBusyLockAsync(busyLock, () -> {
+ catalogService.listen(ZONE_ALTER, new
CatalogAlterZoneEventListener(catalogService) {
+ @Override
+ protected CompletableFuture<Void>
onReplicasUpdate(AlterZoneEventParameters parameters, int oldReplicas) {
+ return onUpdateReplicas(parameters);
+ }
+ });
+
+ // TODO: IGNITE-18694 - Recovery for the case when zones watch
listener processed event but assignments were not updated.
+ metaStorageManager.registerPrefixWatch(zoneDataNodesKey(),
dataNodesListener);
+
+ return nullCompletedFuture();
+ });
+ }
+
+ /**
+ * Stops the rebalance engine by unregistering meta storage watches.
+ */
+ public void stop() {
+ if (!stopGuard.compareAndSet(false, true)) {
+ return;
+ }
+
+ metaStorageManager.unregisterWatch(dataNodesListener);
+ }
+
+ private WatchListener createDistributionZonesDataNodesListener() {
+ return new WatchListener() {
+ @Override
+ public CompletableFuture<Void> onUpdate(WatchEvent evt) {
+ return IgniteUtils.inBusyLockAsync(busyLock, () -> {
+ Set<Node> dataNodes =
parseDataNodes(evt.entryEvent().newEntry().value());
+
+ if (dataNodes == null) {
+ // The zone was removed so data nodes were removed too.
+ return nullCompletedFuture();
+ }
+
+ int zoneId =
extractZoneIdDataNodes(evt.entryEvent().newEntry().key());
+
+ // It is safe to get the latest version of the catalog as
we are in the metastore thread.
+ int catalogVersion = catalogService.latestCatalogVersion();
+
+ CatalogZoneDescriptor zoneDescriptor =
catalogService.zone(zoneId, catalogVersion);
+
+ if (zoneDescriptor == null) {
+ // Zone has been removed.
+ return nullCompletedFuture();
+ }
+
+ Set<String> filteredDataNodes = filterDataNodes(
+ dataNodes,
+ zoneDescriptor,
+ distributionZoneManager.nodesAttributes()
+ );
+
+ if (filteredDataNodes.isEmpty()) {
+ return nullCompletedFuture();
+ }
+
+ return triggerZonePartitionsRebalance(
+ zoneDescriptor,
+ filteredDataNodes,
+ evt.entryEvent().newEntry().revision(),
+ metaStorageManager
+ );
+ });
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ LOG.warn("Unable to process data nodes event", e);
+ }
+ };
+ }
+
+ private CompletableFuture<Void> onUpdateReplicas(AlterZoneEventParameters
parameters) {
+ return recalculateAssignmentsAndTriggerZonePartitionsRebalance(
+ parameters.zoneDescriptor(),
+ parameters.causalityToken(),
+ parameters.catalogVersion()
+ );
+ }
+
+ /**
+ * Calculate data nodes from the distribution zone for {@code
zoneDescriptor} and {@code causalityToken} and trigger zones partitions
+ * rebalance. For more details see
+ * {@link
ZoneRebalanceUtil#triggerZonePartitionsRebalance(CatalogZoneDescriptor, Set,
long, MetaStorageManager)}
+ *
+ * @param zoneDescriptor Zone descriptor.
+ * @param causalityToken Causality token.
+ * @param catalogVersion Catalog version.
+ * @return The future, which completes when the all metastore updates done.
+ */
+ private CompletableFuture<Void>
recalculateAssignmentsAndTriggerZonePartitionsRebalance(
+ CatalogZoneDescriptor zoneDescriptor,
+ long causalityToken,
+ int catalogVersion
+ ) {
+ return distributionZoneManager.dataNodes(causalityToken,
catalogVersion, zoneDescriptor.id())
+ .thenCompose(dataNodes ->
IgniteUtils.inBusyLockAsync(busyLock, () -> {
+ if (dataNodes.isEmpty()) {
+ return nullCompletedFuture();
+ }
+
+ return triggerZonePartitionsRebalance(
+ zoneDescriptor,
+ dataNodes,
+ causalityToken,
+ metaStorageManager
+ );
+ }));
+ }
+}
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java
index ee1610ee3a..39373ef464 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java
@@ -74,6 +74,8 @@ import org.apache.ignite.internal.util.IgniteSpinBusyLock;
/**
* Listener for the raft group events, which must provide correct error
handling of rebalance process
* and start new rebalance after the current one finished.
+ * TODO: https://issues.apache.org/jira/browse/IGNITE-22522 remove this class
and use {@link ZoneRebalanceRaftGroupEventsListener} instead
+ * after switching to zone-based replication.
*/
public class RebalanceRaftGroupEventsListener implements
RaftGroupEventsListener {
/** Ignite logger. */
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
index bc40b6caa3..3d77134d01 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
@@ -59,11 +59,12 @@ import
org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.dsl.Condition;
import org.apache.ignite.internal.metastorage.dsl.Iif;
import org.apache.ignite.internal.replicator.TablePartitionId;
-import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.jetbrains.annotations.Nullable;
/**
* Util class for methods needed for the rebalance process.
+ * TODO: https://issues.apache.org/jira/browse/IGNITE-22522 remove this class
and use {@link ZoneRebalanceUtil} instead
+ * after switching to zone-based replication.
*/
public class RebalanceUtil {
@@ -314,9 +315,6 @@ public class RebalanceUtil {
/** Key prefix for pending assignments. */
public static final String PENDING_ASSIGNMENTS_PREFIX =
"assignments.pending.";
- /** Key prefix for stable assignments. */
- public static final String ZONE_STABLE_ASSIGNMENTS_PREFIX =
"zone.assignments.stable.";
-
/** Key prefix for stable assignments. */
public static final String STABLE_ASSIGNMENTS_PREFIX =
"assignments.stable.";
@@ -387,17 +385,6 @@ public class RebalanceUtil {
return new ByteArray(STABLE_ASSIGNMENTS_PREFIX + partId);
}
- /**
- * Key that is needed for the rebalance algorithm.
- *
- * @param partId Unique identifier of a partition.
- * @return Key for a partition.
- * @see <a
href="https://github.com/apache/ignite-3/blob/main/modules/table/tech-notes/rebalance.md">Rebalance
documentation</a>
- */
- public static ByteArray stablePartAssignmentsKey(ZonePartitionId partId) {
- return new ByteArray(ZONE_STABLE_ASSIGNMENTS_PREFIX + partId);
- }
-
/**
* Key that is needed for the rebalance algorithm.
*
@@ -587,27 +574,6 @@ public class RebalanceUtil {
return (entry == null || entry.empty() || entry.tombstone()) ? null :
Assignments.fromBytes(entry.value()).nodes();
}
- /**
- * Returns partition assignments from meta storage locally.
- *
- * @param metaStorageManager Meta storage manager.
- * @param zoneId Zone id.
- * @param partitionNumber Partition number.
- * @param revision Revision.
- * @return Returns partition assignments from meta storage locally or
{@code null} if assignments is absent.
- */
- @Nullable
- public static Set<Assignment> zonePartitionAssignmentsGetLocally(
- MetaStorageManager metaStorageManager,
- int zoneId,
- int partitionNumber,
- long revision
- ) {
- Entry entry =
metaStorageManager.getLocally(stablePartAssignmentsKey(new
ZonePartitionId(zoneId, partitionNumber)), revision);
-
- return (entry == null || entry.empty() || entry.tombstone()) ? null :
Assignments.fromBytes(entry.value()).nodes();
- }
-
/**
* Returns table assignments for table partitions from meta storage.
*
@@ -684,30 +650,4 @@ public class RebalanceUtil {
})
.collect(toList());
}
-
- /**
- * Returns zone assignments for all zone partitions from meta storage
locally. Assignments must be present.
- *
- * @param metaStorageManager Meta storage manager.
- * @param zoneId Zone id.
- * @param numberOfPartitions Number of partitions.
- * @param revision Revision.
- * @return Future with zone assignments as a value.
- */
- public static List<Assignments> zoneAssignmentsGetLocally(
- MetaStorageManager metaStorageManager,
- int zoneId,
- int numberOfPartitions,
- long revision
- ) {
- return IntStream.range(0, numberOfPartitions)
- .mapToObj(p -> {
- Entry e =
metaStorageManager.getLocally(stablePartAssignmentsKey(new
ZonePartitionId(zoneId, p)), revision);
-
- assert e != null && !e.empty() && !e.tombstone() : e;
-
- return Assignments.fromBytes(e.value());
- })
- .collect(toList());
- }
}
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceRaftGroupEventsListener.java
similarity index 70%
copy from
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java
copy to
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceRaftGroupEventsListener.java
index ee1610ee3a..951101a3c7 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceRaftGroupEventsListener.java
@@ -17,16 +17,12 @@
package org.apache.ignite.internal.distributionzones.rebalance;
-import static
org.apache.ignite.internal.distributionzones.rebalance.DistributionZoneRebalanceEngine.calculateAssignments;
-import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.pendingPartAssignmentsKey;
-import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.plannedPartAssignmentsKey;
-import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.raftConfigurationAppliedKey;
-import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stableChangeTriggerKey;
-import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartAssignmentsKey;
-import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.switchAppendKey;
-import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.switchReduceKey;
-import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.tablesCounterKey;
-import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.union;
+import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.pendingPartAssignmentsKey;
+import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.plannedPartAssignmentsKey;
+import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.stablePartAssignmentsKey;
+import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.switchAppendKey;
+import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.switchReduceKey;
+import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.union;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.and;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.or;
@@ -36,48 +32,52 @@ import static
org.apache.ignite.internal.metastorage.dsl.Operations.ops;
import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
import static org.apache.ignite.internal.metastorage.dsl.Operations.remove;
import static org.apache.ignite.internal.metastorage.dsl.Statements.iif;
-import static
org.apache.ignite.internal.util.ByteUtils.bytesToLongKeepingOrder;
-import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
-import static
org.apache.ignite.internal.util.ByteUtils.longToBytesKeepingOrder;
-import static org.apache.ignite.internal.util.ByteUtils.toBytes;
import static org.apache.ignite.internal.util.CollectionUtils.difference;
import static org.apache.ignite.internal.util.CollectionUtils.intersect;
+import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import org.apache.ignite.internal.affinity.AffinityUtils;
import org.apache.ignite.internal.affinity.Assignment;
import org.apache.ignite.internal.affinity.Assignments;
import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.WatchEvent;
import org.apache.ignite.internal.metastorage.dsl.Condition;
+import org.apache.ignite.internal.metastorage.dsl.Iif;
import org.apache.ignite.internal.metastorage.dsl.SimpleCondition;
import org.apache.ignite.internal.metastorage.dsl.Update;
import org.apache.ignite.internal.raft.PeersAndLearners;
import org.apache.ignite.internal.raft.RaftError;
import org.apache.ignite.internal.raft.RaftGroupEventsListener;
import org.apache.ignite.internal.raft.Status;
-import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
+import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
/**
- * Listener for the raft group events, which must provide correct error
handling of rebalance process
- * and start new rebalance after the current one finished.
+ * Listener for the raft group events, which must provide correct error
handling of rebalance process and start new rebalance after the
+ * current one finished.
*/
-public class RebalanceRaftGroupEventsListener implements
RaftGroupEventsListener {
+public class ZoneRebalanceRaftGroupEventsListener implements
RaftGroupEventsListener {
/** Ignite logger. */
- private static final IgniteLogger LOG =
Loggers.forClass(RebalanceRaftGroupEventsListener.class);
+ private static final IgniteLogger LOG =
Loggers.forClass(ZoneRebalanceRaftGroupEventsListener.class);
/** Number of retrying of the current rebalance in case of errors. */
private static final int REBALANCE_RETRY_THRESHOLD = 10;
@@ -97,12 +97,6 @@ public class RebalanceRaftGroupEventsListener implements
RaftGroupEventsListener
/** Success code for the MetaStorage stable assignments change. */
private static final int FINISH_REBALANCE_SUCCESS = 4;
- /** Success code for the MetaStorage tables counter decrement. */
- private static final int TABLES_COUNTER_DECREMENT_SUCCESS = 5;
-
- /** Status for outdated MetaStorage update. */
- private static final int OUTDATED_INVOKE_STATUS = 6;
-
/** Failure code for the MetaStorage switch append assignments change. */
private static final int SWITCH_APPEND_FAIL = -SWITCH_APPEND_SUCCESS;
@@ -115,14 +109,15 @@ public class RebalanceRaftGroupEventsListener implements
RaftGroupEventsListener
/** Failure code for the MetaStorage stable assignments change. */
private static final int FINISH_REBALANCE_FAIL = -FINISH_REBALANCE_SUCCESS;
- /** Failure code for the MetaStorage tables counter decrement. */
- private static final int PART_COUNTER_DECREMENT_FAIL =
-TABLES_COUNTER_DECREMENT_SUCCESS;
-
/** Meta storage manager. */
private final MetaStorageManager metaStorageMgr;
+ private final CatalogService catalogService;
+
+ private final DistributionZoneManager distributionZoneManager;
+
/** Unique table partition id. */
- private final TablePartitionId tablePartitionId;
+ private final ZonePartitionId zonePartitionId;
/** Busy lock of parent component for synchronous stop. */
private final IgniteSpinBusyLock busyLock;
@@ -130,38 +125,37 @@ public class RebalanceRaftGroupEventsListener implements
RaftGroupEventsListener
/** Executor for scheduling rebalance retries. */
private final ScheduledExecutorService rebalanceScheduler;
- /** Zone id. */
- private final int zoneId;
-
/** Performs reconfiguration of a Raft group of a partition. */
private final PartitionMover partitionMover;
/** Attempts to retry the current rebalance in case of errors. */
- private final AtomicInteger rebalanceAttempts = new AtomicInteger(0);
+ private final AtomicInteger rebalanceAttempts = new AtomicInteger(0);
/**
* Constructs new listener.
*
* @param metaStorageMgr Meta storage manager.
- * @param tablePartitionId Partition id.
+ * @param zonePartitionId Partition id.
* @param busyLock Busy lock.
* @param partitionMover Class that moves partition between nodes.
* @param rebalanceScheduler Executor for scheduling rebalance retries.
*/
- public RebalanceRaftGroupEventsListener(
+ public ZoneRebalanceRaftGroupEventsListener(
MetaStorageManager metaStorageMgr,
- TablePartitionId tablePartitionId,
+ ZonePartitionId zonePartitionId,
IgniteSpinBusyLock busyLock,
PartitionMover partitionMover,
ScheduledExecutorService rebalanceScheduler,
- int zoneId
+ CatalogService catalogService,
+ DistributionZoneManager distributionZoneManager
) {
this.metaStorageMgr = metaStorageMgr;
- this.tablePartitionId = tablePartitionId;
+ this.zonePartitionId = zonePartitionId;
this.busyLock = busyLock;
this.partitionMover = partitionMover;
this.rebalanceScheduler = rebalanceScheduler;
- this.zoneId = zoneId;
+ this.distributionZoneManager = distributionZoneManager;
+ this.catalogService = catalogService;
}
/** {@inheritDoc} */
@@ -180,7 +174,7 @@ public class RebalanceRaftGroupEventsListener implements
RaftGroupEventsListener
try {
rebalanceAttempts.set(0);
- byte[] pendingAssignmentsBytes =
metaStorageMgr.get(pendingPartAssignmentsKey(tablePartitionId)).get().value();
+ byte[] pendingAssignmentsBytes =
metaStorageMgr.get(pendingPartAssignmentsKey(zonePartitionId)).get().value();
if (pendingAssignmentsBytes != null) {
Set<Assignment> pendingAssignments =
Assignments.fromBytes(pendingAssignmentsBytes).nodes();
@@ -197,8 +191,8 @@ public class RebalanceRaftGroupEventsListener implements
RaftGroupEventsListener
}
LOG.info(
- "New leader elected. Going to apply new
configuration [tablePartitionId={}, peers={}, learners={}]",
- tablePartitionId, peers, learners
+ "New leader elected. Going to apply new
configuration [zonePartitionId={}, peers={}, learners={}]",
+ zonePartitionId, peers, learners
);
PeersAndLearners peersAndLearners =
PeersAndLearners.fromConsistentIds(peers, learners);
@@ -207,7 +201,7 @@ public class RebalanceRaftGroupEventsListener implements
RaftGroupEventsListener
}
} catch (Exception e) {
// TODO: IGNITE-14693
- LOG.warn("Unable to start rebalance [tablePartitionId,
term={}]", e, tablePartitionId, term);
+ LOG.warn("Unable to start rebalance [tablePartitionId,
term={}]", e, zonePartitionId, term);
} finally {
busyLock.leaveBusy();
}
@@ -233,7 +227,13 @@ public class RebalanceRaftGroupEventsListener implements
RaftGroupEventsListener
try {
Set<Assignment> stable = createAssignments(configuration);
- countDownPartitionsFromZone(stable);
+ doStableKeySwitch(
+ stable,
+ zonePartitionId,
+ metaStorageMgr,
+ catalogService,
+ distributionZoneManager
+ );
} finally {
busyLock.leaveBusy();
}
@@ -243,76 +243,6 @@ public class RebalanceRaftGroupEventsListener implements
RaftGroupEventsListener
}
}
- /**
- * This method count downs the counter for tables from the corresponding
zone that are associated to the corresponding partition number
- * when raft configuration of that partition raft group has been
successfully changed.
- *
- * @param stable Stable assignments to save to metastore for further
stable switch.
- */
- private void countDownPartitionsFromZone(Set<Assignment> stable) {
- try {
- int partId = tablePartitionId.partitionId();
-
- Entry counterEntry = metaStorageMgr.get(tablesCounterKey(zoneId,
partId)).get();
-
- assert counterEntry.value() != null;
-
- Set<Integer> counter = fromBytes(counterEntry.value());
-
- assert !counter.isEmpty();
-
- if (!counter.contains(tablePartitionId.tableId())) {
- // Count down for this table has already been processed, just
skip.
- // For example, this can happen when leader re-election
happened during the rebalance process.
- return;
- }
-
- Condition condition = value(tablesCounterKey(zoneId,
partId)).eq(counterEntry.value());
-
- byte[] stableArray = Assignments.toBytes(stable);
-
- counter.remove(tablePartitionId.tableId());
-
- if (counter.isEmpty()) {
- counter = Set.of();
- }
-
- Update successCase = ops(
- put(tablesCounterKey(zoneId, partId), toBytes(counter)),
- // Todo: change to one key
https://issues.apache.org/jira/browse/IGNITE-18991
- put(raftConfigurationAppliedKey(tablePartitionId),
stableArray)
- ).yield(TABLES_COUNTER_DECREMENT_SUCCESS);
-
- Update failCase = ops().yield(PART_COUNTER_DECREMENT_FAIL);
-
- int res = metaStorageMgr.invoke(iif(condition, successCase,
failCase)).get().getAsInt();
-
- if (res < 0) {
- LOG.info("Count down of zone's tables counter is failed. "
- + "Going to retry [zoneId={},
appliedPeers={}]",
- zoneId,
stable.stream().map(Assignment::consistentId).collect(Collectors.toSet())
- );
-
- countDownPartitionsFromZone(stable);
-
- return;
- } else {
- LOG.info(
- "Count down of zone's tables counter is succeeded
[zoneId={}, partId={}, counter={}, appliedPeers={}]",
- zoneId,
- partId,
- counter,
-
stable.stream().map(Assignment::consistentId).collect(Collectors.toSet())
- );
- }
-
- rebalanceAttempts.set(0);
- } catch (InterruptedException | ExecutionException e) {
- // TODO: IGNITE-14693
- LOG.warn("Unable to count down partitions counter in metastore: "
+ tablePartitionId, e);
- }
- }
-
/** {@inheritDoc} */
@Override
public void onReconfigurationError(Status status, PeersAndLearners
configuration, long term) {
@@ -325,7 +255,7 @@ public class RebalanceRaftGroupEventsListener implements
RaftGroupEventsListener
if (status.equals(Status.LEADER_STEPPED_DOWN)) {
// Leader stepped down, so we are expecting
RebalanceRaftGroupEventsListener.onLeaderElected to be called on a new leader.
- LOG.info("Leader stepped down during rebalance [partId={}]",
tablePartitionId);
+ LOG.info("Leader stepped down during rebalance [partId={}]",
zonePartitionId);
return;
}
@@ -335,12 +265,12 @@ public class RebalanceRaftGroupEventsListener implements
RaftGroupEventsListener
assert raftError == RaftError.ECATCHUP : "According to the JRaft
protocol, " + RaftError.ECATCHUP
+ " is expected, got " + raftError;
- LOG.debug("Error occurred during rebalance [partId={}]",
tablePartitionId);
+ LOG.debug("Error occurred during rebalance [partId={}]",
zonePartitionId);
if (rebalanceAttempts.incrementAndGet() <
REBALANCE_RETRY_THRESHOLD) {
scheduleChangePeers(configuration, term);
} else {
- LOG.info("Number of retries for rebalance exceeded the
threshold [partId={}, threshold={}]", tablePartitionId,
+ LOG.info("Number of retries for rebalance exceeded the
threshold [partId={}, threshold={}]", zonePartitionId,
REBALANCE_RETRY_THRESHOLD);
// TODO: currently we just retry intent to change peers
according to the rebalance infinitely, until new leader is elected,
@@ -364,7 +294,7 @@ public class RebalanceRaftGroupEventsListener implements
RaftGroupEventsListener
return;
}
- LOG.info("Going to retry rebalance [attemptNo={}, partId={}]",
rebalanceAttempts.get(), tablePartitionId);
+ LOG.info("Going to retry rebalance [attemptNo={}, partId={}]",
rebalanceAttempts.get(), zonePartitionId);
try {
partitionMover.movePartition(peersAndLearners, term).join();
@@ -379,20 +309,17 @@ public class RebalanceRaftGroupEventsListener implements
RaftGroupEventsListener
*/
static void doStableKeySwitch(
Set<Assignment> stableFromRaft,
- TablePartitionId tablePartitionId,
- long revision,
+ ZonePartitionId zonePartitionId,
MetaStorageManager metaStorageMgr,
CatalogService catalogService,
DistributionZoneManager distributionZoneManager
) {
try {
- ByteArray pendingPartAssignmentsKey =
pendingPartAssignmentsKey(tablePartitionId);
- ByteArray stablePartAssignmentsKey =
stablePartAssignmentsKey(tablePartitionId);
- ByteArray plannedPartAssignmentsKey =
plannedPartAssignmentsKey(tablePartitionId);
- ByteArray switchReduceKey = switchReduceKey(tablePartitionId);
- ByteArray switchAppendKey = switchAppendKey(tablePartitionId);
- ByteArray stableChangeTriggerKey =
stableChangeTriggerKey(tablePartitionId);
-
+ ByteArray pendingPartAssignmentsKey =
pendingPartAssignmentsKey(zonePartitionId);
+ ByteArray stablePartAssignmentsKey =
stablePartAssignmentsKey(zonePartitionId);
+ ByteArray plannedPartAssignmentsKey =
plannedPartAssignmentsKey(zonePartitionId);
+ ByteArray switchReduceKey = switchReduceKey(zonePartitionId);
+ ByteArray switchAppendKey = switchAppendKey(zonePartitionId);
// TODO: https://issues.apache.org/jira/browse/IGNITE-17592 Remove
synchronous wait
Map<ByteArray, Entry> values = metaStorageMgr.getAll(
Set.of(
@@ -400,26 +327,26 @@ public class RebalanceRaftGroupEventsListener implements
RaftGroupEventsListener
pendingPartAssignmentsKey,
stablePartAssignmentsKey,
switchReduceKey,
- switchAppendKey,
- stableChangeTriggerKey
+ switchAppendKey
)
).get();
- Set<Assignment> calculatedAssignments =
calculateAssignments(tablePartitionId, catalogService,
distributionZoneManager).get();
+ Set<Assignment> calculatedAssignments = calculateZoneAssignments(
+ zonePartitionId,
+ catalogService,
+ distributionZoneManager
+ ).get();
Entry stableEntry = values.get(stablePartAssignmentsKey);
Entry pendingEntry = values.get(pendingPartAssignmentsKey);
Entry plannedEntry = values.get(plannedPartAssignmentsKey);
Entry switchReduceEntry = values.get(switchReduceKey);
Entry switchAppendEntry = values.get(switchAppendKey);
- Entry stableChangeTriggerEntry =
values.get(stableChangeTriggerKey);
Set<Assignment> retrievedStable =
readAssignments(stableEntry).nodes();
Set<Assignment> retrievedSwitchReduce =
readAssignments(switchReduceEntry).nodes();
Set<Assignment> retrievedSwitchAppend =
readAssignments(switchAppendEntry).nodes();
Set<Assignment> retrievedPending =
readAssignments(pendingEntry).nodes();
- long stableChangeTriggerValue = stableChangeTriggerEntry.value()
== null
- ? 0L :
bytesToLongKeepingOrder(stableChangeTriggerEntry.value());
if (!retrievedPending.equals(stableFromRaft)) {
return;
@@ -517,35 +444,27 @@ public class RebalanceRaftGroupEventsListener implements
RaftGroupEventsListener
}
// TODO: https://issues.apache.org/jira/browse/IGNITE-17592 Remove
synchronous wait
- int res = metaStorageMgr.invoke(
- iif(or(
-
notExists(stableChangeTriggerKey(tablePartitionId)),
-
value(stableChangeTriggerKey(tablePartitionId)).lt(longToBytesKeepingOrder(revision))
- ),
- iif(retryPreconditions, successCase, failCase),
- ops().yield(OUTDATED_INVOKE_STATUS)
- )
- ).get().getAsInt();
+ int res = metaStorageMgr.invoke(iif(retryPreconditions,
successCase, failCase)).get().getAsInt();
if (res < 0) {
switch (res) {
case SWITCH_APPEND_FAIL:
LOG.info("Rebalance keys changed while trying to
update rebalance pending addition information. "
- + "Going to retry
[tablePartitionID={}, appliedPeers={}]",
- tablePartitionId, stableFromRaft
+ + "Going to retry [zonePartitionId={},
appliedPeers={}]",
+ zonePartitionId, stableFromRaft
);
break;
case SWITCH_REDUCE_FAIL:
LOG.info("Rebalance keys changed while trying to
update rebalance pending reduce information. "
- + "Going to retry
[tablePartitionID={}, appliedPeers={}]",
- tablePartitionId, stableFromRaft
+ + "Going to retry [zonePartitionId={},
appliedPeers={}]",
+ zonePartitionId, stableFromRaft
);
break;
case SCHEDULE_PENDING_REBALANCE_FAIL:
case FINISH_REBALANCE_FAIL:
LOG.info("Rebalance keys changed while trying to
update rebalance information. "
- + "Going to retry
[tablePartitionId={}, appliedPeers={}]",
- tablePartitionId, stableFromRaft
+ + "Going to retry [zonePartitionId={},
appliedPeers={}]",
+ zonePartitionId, stableFromRaft
);
break;
default:
@@ -555,8 +474,7 @@ public class RebalanceRaftGroupEventsListener implements
RaftGroupEventsListener
doStableKeySwitch(
stableFromRaft,
- tablePartitionId,
- revision,
+ zonePartitionId,
metaStorageMgr,
catalogService,
distributionZoneManager
@@ -568,31 +486,24 @@ public class RebalanceRaftGroupEventsListener implements
RaftGroupEventsListener
switch (res) {
case SWITCH_APPEND_SUCCESS:
LOG.info("Rebalance finished. Going to schedule next
rebalance with addition"
- + " [tablePartitionId={}, appliedPeers={},
plannedPeers={}]",
- tablePartitionId, stableFromRaft,
calculatedPendingAddition
+ + " [zonePartitionId={}, appliedPeers={},
plannedPeers={}]",
+ zonePartitionId, stableFromRaft,
calculatedPendingAddition
);
break;
case SWITCH_REDUCE_SUCCESS:
LOG.info("Rebalance finished. Going to schedule next
rebalance with reduction"
- + " [tablePartitionId={}, appliedPeers={},
plannedPeers={}]",
- tablePartitionId, stableFromRaft,
calculatedPendingReduction
+ + " [zonePartitionId={}, appliedPeers={},
plannedPeers={}]",
+ zonePartitionId, stableFromRaft,
calculatedPendingReduction
);
break;
case SCHEDULE_PENDING_REBALANCE_SUCCESS:
LOG.info(
- "Rebalance finished. Going to schedule next
rebalance [tablePartitionId={}, appliedPeers={}, plannedPeers={}]",
- tablePartitionId, stableFromRaft,
Assignments.fromBytes(plannedEntry.value()).nodes()
+ "Rebalance finished. Going to schedule next
rebalance [zonePartitionId={}, appliedPeers={}, plannedPeers={}]",
+ zonePartitionId, stableFromRaft,
Assignments.fromBytes(plannedEntry.value()).nodes()
);
break;
case FINISH_REBALANCE_SUCCESS:
- LOG.info("Rebalance finished [tablePartitionId={},
appliedPeers={}]", tablePartitionId, stableFromRaft);
- break;
-
- case OUTDATED_INVOKE_STATUS:
- LOG.debug("Stable switch skipped because event is outdated
"
- + "[tablePartitionId={},
stableChangeTriggerKey={}, revision={}]",
- tablePartitionId, stableChangeTriggerValue,
revision
- );
+ LOG.info("Rebalance finished [zonePartitionId={},
appliedPeers={}]", zonePartitionId, stableFromRaft);
break;
default:
@@ -602,7 +513,7 @@ public class RebalanceRaftGroupEventsListener implements
RaftGroupEventsListener
} catch (InterruptedException | ExecutionException e) {
// TODO: IGNITE-14693
- LOG.warn("Unable to commit partition configuration to metastore: "
+ tablePartitionId, e);
+ LOG.warn("Unable to commit partition configuration to metastore: "
+ zonePartitionId, e);
}
}
@@ -629,4 +540,100 @@ public class RebalanceRaftGroupEventsListener implements
RaftGroupEventsListener
return value == null ? Assignments.EMPTY :
Assignments.fromBytes(value);
}
+
+ /**
+ * Handles assignments switch reduce changed updating pending assignments
if there is no rebalancing in progress.
+ * If there is rebalancing in progress, then new assignments will be
applied when rebalance finishes.
+ *
+ * @param metaStorageMgr MetaStorage manager.
+ * @param dataNodes Data nodes.
+ * @param replicas Replicas count.
+ * @param partId Partition's raft group id.
+ * @param event Assignments switch reduce change event.
+ * @return Completable future that signifies the completion of this
operation.
+ */
+ public static CompletableFuture<Void>
handleReduceChanged(MetaStorageManager metaStorageMgr, Collection<String>
dataNodes,
+ int replicas, ZonePartitionId partId, WatchEvent event) {
+ Entry entry = event.entryEvent().newEntry();
+ byte[] eventData = entry.value();
+
+ assert eventData != null : "Null event data for " + partId;
+
+ Assignments switchReduce = Assignments.fromBytes(eventData);
+
+ if (switchReduce.isEmpty()) {
+ return nullCompletedFuture();
+ }
+
+ Set<Assignment> assignments =
AffinityUtils.calculateAssignmentForPartition(dataNodes, partId.partitionId(),
replicas);
+
+ ByteArray pendingKey = pendingPartAssignmentsKey(partId);
+
+ Set<Assignment> pendingAssignments = difference(assignments,
switchReduce.nodes());
+
+ byte[] pendingByteArray = Assignments.toBytes(pendingAssignments);
+ byte[] assignmentsByteArray = Assignments.toBytes(assignments);
+
+ ByteArray changeTriggerKey =
ZoneRebalanceUtil.pendingChangeTriggerKey(partId);
+ byte[] rev = ByteUtils.longToBytesKeepingOrder(entry.revision());
+
+ // Here is what happens in the MetaStorage:
+ // if ((notExists(changeTriggerKey) || value(changeTriggerKey) <
revision) && (notExists(pendingKey) && notExists(stableKey)) {
+ // put(pendingKey, pending)
+ // put(stableKey, assignments)
+ // put(changeTriggerKey, revision)
+ // } else if ((notExists(changeTriggerKey) || value(changeTriggerKey)
< revision) && (notExists(pendingKey))) {
+ // put(pendingKey, pending)
+ // put(changeTriggerKey, revision)
+ // }
+
+ Iif resultingOperation = iif(
+ and(
+ or(notExists(changeTriggerKey),
value(changeTriggerKey).lt(rev)),
+ and(notExists(pendingKey),
(notExists(stablePartAssignmentsKey(partId))))
+ ),
+ ops(
+ put(pendingKey, pendingByteArray),
+ put(stablePartAssignmentsKey(partId),
assignmentsByteArray),
+ put(changeTriggerKey, rev)
+ ).yield(),
+ iif(
+ and(
+ or(notExists(changeTriggerKey),
value(changeTriggerKey).lt(rev)),
+ notExists(pendingKey)
+ ),
+ ops(
+ put(pendingKey, pendingByteArray),
+ put(changeTriggerKey, rev)
+ ).yield(),
+ ops().yield()
+ )
+ );
+
+ return metaStorageMgr.invoke(resultingOperation).thenApply(unused ->
null);
+ }
+
+ private static CompletableFuture<Set<Assignment>> calculateZoneAssignments(
+ ZonePartitionId zonePartitionId,
+ CatalogService catalogService,
+ DistributionZoneManager distributionZoneManager
+ ) {
+ int catalogVersion = catalogService.latestCatalogVersion();
+
+ CatalogZoneDescriptor zoneDescriptor =
catalogService.zone(zonePartitionId.zoneId(), catalogVersion);
+
+ int zoneId = zonePartitionId.zoneId();
+
+ return distributionZoneManager.dataNodes(
+ zoneDescriptor.updateToken(),
+ catalogVersion,
+ zoneId
+ ).thenApply(dataNodes ->
+ AffinityUtils.calculateAssignmentForPartition(
+ dataNodes,
+ zonePartitionId.partitionId(),
+ zoneDescriptor.replicas()
+ )
+ );
+ }
}
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceUtil.java
similarity index 66%
copy from
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
copy to
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceUtil.java
index bc40b6caa3..23fea49067 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceUtil.java
@@ -17,8 +17,10 @@
package org.apache.ignite.internal.distributionzones.rebalance;
+import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.DISTRIBUTION_ZONE_DATA_NODES_VALUE_PREFIX;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.UpdateStatus.ASSIGNMENT_NOT_UPDATED;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.UpdateStatus.OUTDATED_UPDATE_RECEIVED;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.UpdateStatus.PENDING_KEY_UPDATED;
@@ -45,11 +47,11 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.IntStream;
import org.apache.ignite.internal.affinity.AffinityUtils;
import org.apache.ignite.internal.affinity.Assignment;
import org.apache.ignite.internal.affinity.Assignments;
-import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.logger.IgniteLogger;
@@ -58,21 +60,20 @@ import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.dsl.Condition;
import org.apache.ignite.internal.metastorage.dsl.Iif;
-import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.ZonePartitionId;
+import org.apache.ignite.internal.util.ExceptionUtils;
import org.jetbrains.annotations.Nullable;
/**
* Util class for methods needed for the rebalance process.
*/
-public class RebalanceUtil {
-
+public class ZoneRebalanceUtil {
/** Logger. */
- private static final IgniteLogger LOG =
Loggers.forClass(RebalanceUtil.class);
+ private static final IgniteLogger LOG =
Loggers.forClass(ZoneRebalanceUtil.class);
/**
* Status values for methods like
- * {@link #updatePendingAssignmentsKeys(CatalogTableDescriptor,
TablePartitionId, Collection, int, long, MetaStorageManager, int, Set)}.
+ * {@link #updatePendingAssignmentsKeys(CatalogZoneDescriptor,
ZonePartitionId, Collection, int, long, MetaStorageManager, int, Set)}.
*/
public enum UpdateStatus {
/**
@@ -118,43 +119,40 @@ public class RebalanceUtil {
}
}
- /** Rebalance scheduler pool size. */
- public static final int REBALANCE_SCHEDULER_POOL_SIZE =
Math.min(Runtime.getRuntime().availableProcessors() * 3, 20);
-
/**
* Update keys that related to rebalance algorithm in Meta Storage. Keys
are specific for partition.
*
- * @param tableDescriptor Table descriptor.
- * @param partId Unique identifier of a partition.
+ * @param zoneDescriptor Zone descriptor.
+ * @param zonePartitionId Unique aggregate identifier of a partition of a
zone.
* @param dataNodes Data nodes.
- * @param replicas Number of replicas for a table.
+ * @param replicas Number of replicas for a zone.
* @param revision Revision of Meta Storage that is specific for the
assignment update.
* @param metaStorageMgr Meta Storage manager.
* @param partNum Partition id.
- * @param tableCfgPartAssignments Table configuration assignments.
+ * @param zoneCfgPartAssignments Zone configuration assignments.
* @return Future representing result of updating keys in {@code
metaStorageMgr}
*/
public static CompletableFuture<Void> updatePendingAssignmentsKeys(
- CatalogTableDescriptor tableDescriptor,
- TablePartitionId partId,
+ CatalogZoneDescriptor zoneDescriptor,
+ ZonePartitionId zonePartitionId,
Collection<String> dataNodes,
int replicas,
long revision,
MetaStorageManager metaStorageMgr,
int partNum,
- Set<Assignment> tableCfgPartAssignments
+ Set<Assignment> zoneCfgPartAssignments
) {
- ByteArray partChangeTriggerKey = pendingChangeTriggerKey(partId);
+ ByteArray partChangeTriggerKey =
pendingChangeTriggerKey(zonePartitionId);
- ByteArray partAssignmentsPendingKey =
pendingPartAssignmentsKey(partId);
+ ByteArray partAssignmentsPendingKey =
pendingPartAssignmentsKey(zonePartitionId);
- ByteArray partAssignmentsPlannedKey =
plannedPartAssignmentsKey(partId);
+ ByteArray partAssignmentsPlannedKey =
plannedPartAssignmentsKey(zonePartitionId);
- ByteArray partAssignmentsStableKey = stablePartAssignmentsKey(partId);
+ ByteArray partAssignmentsStableKey =
stablePartAssignmentsKey(zonePartitionId);
Set<Assignment> partAssignments =
AffinityUtils.calculateAssignmentForPartition(dataNodes, partNum, replicas);
- boolean isNewAssignments =
!tableCfgPartAssignments.equals(partAssignments);
+ boolean isNewAssignments =
!zoneCfgPartAssignments.equals(partAssignments);
byte[] partAssignmentsBytes = Assignments.toBytes(partAssignments);
@@ -208,23 +206,23 @@ public class RebalanceUtil {
switch (UpdateStatus.valueOf(sr.getAsInt())) {
case PENDING_KEY_UPDATED:
LOG.info(
- "Update metastore pending partitions key [key={},
partition={}, table={}/{}, newVal={}]",
- partAssignmentsPendingKey.toString(), partNum,
tableDescriptor.id(), tableDescriptor.name(),
+ "Update metastore pending partitions key [key={},
partition={}, zone={}/{}, newVal={}]",
+ partAssignmentsPendingKey.toString(), partNum,
zoneDescriptor.id(), zoneDescriptor.name(),
partAssignments);
break;
case PLANNED_KEY_UPDATED:
LOG.info(
- "Update metastore planned partitions key [key={},
partition={}, table={}/{}, newVal={}]",
- partAssignmentsPlannedKey, partNum,
tableDescriptor.id(), tableDescriptor.name(),
+ "Update metastore planned partitions key [key={},
partition={}, zone={}/{}, newVal={}]",
+ partAssignmentsPlannedKey, partNum,
zoneDescriptor.id(), zoneDescriptor.name(),
partAssignments
);
break;
case PLANNED_KEY_REMOVED_EQUALS_PENDING:
LOG.info(
- "Remove planned key because current pending key
has the same value [key={}, partition={}, table={}/{}, val={}]",
- partAssignmentsPlannedKey.toString(), partNum,
tableDescriptor.id(), tableDescriptor.name(),
+ "Remove planned key because current pending key
has the same value [key={}, partition={}, zone={}/{}, val={}]",
+ partAssignmentsPlannedKey.toString(), partNum,
zoneDescriptor.id(), zoneDescriptor.name(),
partAssignments
);
@@ -232,24 +230,24 @@ public class RebalanceUtil {
case PLANNED_KEY_REMOVED_EMPTY_PENDING:
LOG.info(
"Remove planned key because pending is empty and
calculated assignments are equal to current assignments "
- + "[key={}, partition={}, table={}/{},
val={}]",
- partAssignmentsPlannedKey.toString(), partNum,
tableDescriptor.id(), tableDescriptor.name(),
+ + "[key={}, partition={}, zone={}/{},
val={}]",
+ partAssignmentsPlannedKey.toString(), partNum,
zoneDescriptor.id(), zoneDescriptor.name(),
partAssignments
);
break;
case ASSIGNMENT_NOT_UPDATED:
LOG.debug(
- "Assignments are not updated [key={},
partition={}, table={}/{}, val={}]",
- partAssignmentsPlannedKey.toString(), partNum,
tableDescriptor.id(), tableDescriptor.name(),
+ "Assignments are not updated [key={},
partition={}, zone={}/{}, val={}]",
+ partAssignmentsPlannedKey.toString(), partNum,
zoneDescriptor.id(), zoneDescriptor.name(),
partAssignments
);
break;
case OUTDATED_UPDATE_RECEIVED:
LOG.debug(
- "Received outdated rebalance trigger event
[revision={}, partition={}, table={}/{}]",
- revision, partNum, tableDescriptor.id(),
tableDescriptor.name());
+ "Received outdated rebalance trigger event
[revision={}, partition={}, zone={}/{}]",
+ revision, partNum, zoneDescriptor.id(),
zoneDescriptor.name());
break;
default:
@@ -259,212 +257,174 @@ public class RebalanceUtil {
}
/**
- * Triggers rebalance on all partitions of the provided table: that is,
reads table assignments from
- * the MetaStorage, computes new ones based on the current properties of
the table, its zone and the
+ * Triggers rebalance on all partitions of the provided zone: that is,
reads zone assignments from
+ * the MetaStorage, computes new ones based on the current properties of
the zone, the
* provided data nodes, and, if the calculated assignments are different
from the ones loaded from the
* MetaStorages, writes them as pending assignments.
*
- * @param tableDescriptor Table descriptor.
* @param zoneDescriptor Zone descriptor.
* @param dataNodes Data nodes to use.
* @param storageRevision MetaStorage revision corresponding to this
request.
* @param metaStorageManager MetaStorage manager used to read/write
assignments.
- * @return Array of futures, one per partition of the table; the futures
complete when the described
+ * @return Array of futures, one per partition of the zone; the futures
complete when the described
* rebalance triggering completes.
*/
- public static CompletableFuture<?>[] triggerAllTablePartitionsRebalance(
- CatalogTableDescriptor tableDescriptor,
+ static CompletableFuture<Void> triggerZonePartitionsRebalance(
CatalogZoneDescriptor zoneDescriptor,
Set<String> dataNodes,
long storageRevision,
MetaStorageManager metaStorageManager
) {
- CompletableFuture<Map<Integer, Assignments>> tableAssignmentsFut =
tableAssignments(
+ CompletableFuture<Map<Integer, Assignments>> zoneAssignmentsFut =
zoneAssignments(
metaStorageManager,
- tableDescriptor.id(),
+ zoneDescriptor.id(),
Set.of(),
zoneDescriptor.partitions()
);
- CompletableFuture<?>[] futures = new
CompletableFuture[zoneDescriptor.partitions()];
+ CompletableFuture<?>[] partitionFutures = new
CompletableFuture[zoneDescriptor.partitions()];
for (int partId = 0; partId < zoneDescriptor.partitions(); partId++) {
- TablePartitionId replicaGrpId = new
TablePartitionId(tableDescriptor.id(), partId);
+ ZonePartitionId replicaGrpId = new
ZonePartitionId(zoneDescriptor.id(), partId);
int finalPartId = partId;
- futures[partId] = tableAssignmentsFut.thenCompose(tableAssignments
->
+ partitionFutures[partId] =
zoneAssignmentsFut.thenCompose(zoneAssignments ->
// TODO https://issues.apache.org/jira/browse/IGNITE-19763
We should distinguish empty stable assignments on
// TODO node recovery in case of interrupted table
creation, and moving from empty assignments to non-empty.
- tableAssignments.isEmpty() ? nullCompletedFuture() :
updatePendingAssignmentsKeys(
- tableDescriptor,
+ zoneAssignments.isEmpty() ? nullCompletedFuture() :
updatePendingAssignmentsKeys(
+ zoneDescriptor,
replicaGrpId,
dataNodes,
zoneDescriptor.replicas(),
storageRevision,
metaStorageManager,
finalPartId,
- tableAssignments.get(finalPartId).nodes()
+ zoneAssignments.get(finalPartId).nodes()
));
}
- return futures;
+ // This set is used to deduplicate exceptions (if there is an
exception from upstream, for instance,
+ // when reading from MetaStorage, it will be encountered by every
partition future) to avoid noise
+ // in the logs.
+ Set<Throwable> unwrappedCauses = ConcurrentHashMap.newKeySet();
+
+ for (int partId = 0; partId < partitionFutures.length; partId++) {
+ int finalPartId = partId;
+
+ partitionFutures[partId].exceptionally(e -> {
+ Throwable cause = ExceptionUtils.unwrapCause(e);
+
+ if (unwrappedCauses.add(cause)) {
+ // The exception is specific to this partition.
+ LOG.error(
+ "Exception on updating assignments for [zone={},
partition={}]",
+ e,
+ zoneInfo(zoneDescriptor), finalPartId
+ );
+ } else {
+ // The exception is from upstream and not specific for
this partition, so don't log the partition index.
+ LOG.error(
+ "Exception on updating assignments for [zone={}]",
+ e,
+ zoneInfo(zoneDescriptor)
+ );
+ }
+
+ return null;
+ });
+ }
+
+ return allOf(partitionFutures);
+ }
+
+ private static String zoneInfo(CatalogZoneDescriptor zoneDescriptor) {
+ return zoneDescriptor.id() + "/" + zoneDescriptor.name();
}
/** Key prefix for pending assignments. */
- public static final String PENDING_ASSIGNMENTS_PREFIX =
"assignments.pending.";
+ public static final String PENDING_ASSIGNMENTS_PREFIX =
"zone.assignments.pending.";
/** Key prefix for stable assignments. */
- public static final String ZONE_STABLE_ASSIGNMENTS_PREFIX =
"zone.assignments.stable.";
+ public static final String STABLE_ASSIGNMENTS_PREFIX =
"zone.assignments.stable.";
- /** Key prefix for stable assignments. */
- public static final String STABLE_ASSIGNMENTS_PREFIX =
"assignments.stable.";
+ /** Key prefix for planned assignments. */
+ public static final String PLANNED_ASSIGNMENTS_PREFIX =
"zone.assignments.planned.";
/** Key prefix for switch reduce assignments. */
- public static final String ASSIGNMENTS_SWITCH_REDUCE_PREFIX =
"assignments.switch.reduce.";
+ public static final String ASSIGNMENTS_SWITCH_REDUCE_PREFIX =
"zone.assignments.switch.reduce.";
/** Key prefix for switch append assignments. */
- public static final String ASSIGNMENTS_SWITCH_APPEND_PREFIX =
"assignments.switch.append.";
-
- /** Key prefix for counter of rebalances of tables from a zone that are
associated with the specified partition. */
- private static final String TABLES_COUNTER_PREFIX = "tables.counter.";
-
- /** Key prefix for a raft configuration that was applied during rebalance
of the specified partition form a table. */
- private static final String RAFT_CONF_APPLIED_PREFIX =
"assignments.raft.conf.applied.";
+ public static final String ASSIGNMENTS_SWITCH_APPEND_PREFIX =
"zone.assignments.switch.append.";
/**
* Key that is needed for skipping stale events of pending key change.
*
- * @param partId Unique identifier of a partition.
+ * @param zonePartitionId Unique aggregate identifier of a partition of a
zone.
* @return Key for a partition.
* @see <a
href="https://github.com/apache/ignite-3/blob/main/modules/table/tech-notes/rebalance.md">Rebalance
documentation</a>
*/
- public static ByteArray pendingChangeTriggerKey(TablePartitionId partId) {
- return new ByteArray(partId + "pending.change.trigger");
- }
-
- /**
- * Key that is needed for skipping stale events of stable key change.
- *
- * @param partId Unique identifier of a partition.
- * @return Key for a partition.
- * @see <a
href="https://github.com/apache/ignite-3/blob/main/modules/table/tech-notes/rebalance.md">Rebalance
documentation</a>
- */
- public static ByteArray stableChangeTriggerKey(TablePartitionId partId) {
- return new ByteArray(partId + "stable.change.trigger");
- }
-
- /**
- * Key that is needed for the rebalance algorithm.
- *
- * @param partId Unique identifier of a partition.
- * @return Key for a partition.
- * @see <a
href="https://github.com/apache/ignite-3/blob/main/modules/table/tech-notes/rebalance.md">Rebalance
documentation</a>
- */
- public static ByteArray pendingPartAssignmentsKey(TablePartitionId partId)
{
- return new ByteArray(PENDING_ASSIGNMENTS_PREFIX + partId);
+ public static ByteArray pendingChangeTriggerKey(ZonePartitionId
zonePartitionId) {
+ return new ByteArray(zonePartitionId + "zone.pending.change.trigger");
}
/**
* Key that is needed for the rebalance algorithm.
*
- * @param partId Unique identifier of a partition.
+ * @param zonePartitionId Unique aggregate identifier of a partition of a
zone.
* @return Key for a partition.
* @see <a
href="https://github.com/apache/ignite-3/blob/main/modules/table/tech-notes/rebalance.md">Rebalance
documentation</a>
*/
- public static ByteArray plannedPartAssignmentsKey(TablePartitionId partId)
{
- return new ByteArray("assignments.planned." + partId);
+ public static ByteArray pendingPartAssignmentsKey(ZonePartitionId
zonePartitionId) {
+ return new ByteArray(PENDING_ASSIGNMENTS_PREFIX + zonePartitionId);
}
/**
* Key that is needed for the rebalance algorithm.
*
- * @param partId Unique identifier of a partition.
+ * @param zonePartitionId Unique aggregate identifier of a partition of a
zone.
* @return Key for a partition.
* @see <a
href="https://github.com/apache/ignite-3/blob/main/modules/table/tech-notes/rebalance.md">Rebalance
documentation</a>
*/
- public static ByteArray stablePartAssignmentsKey(TablePartitionId partId) {
- return new ByteArray(STABLE_ASSIGNMENTS_PREFIX + partId);
+ public static ByteArray plannedPartAssignmentsKey(ZonePartitionId
zonePartitionId) {
+ return new ByteArray(PLANNED_ASSIGNMENTS_PREFIX + zonePartitionId);
}
/**
* Key that is needed for the rebalance algorithm.
*
- * @param partId Unique identifier of a partition.
+ * @param zonePartitionId Unique aggregate identifier of a partition of a
zone.
* @return Key for a partition.
* @see <a
href="https://github.com/apache/ignite-3/blob/main/modules/table/tech-notes/rebalance.md">Rebalance
documentation</a>
*/
- public static ByteArray stablePartAssignmentsKey(ZonePartitionId partId) {
- return new ByteArray(ZONE_STABLE_ASSIGNMENTS_PREFIX + partId);
+ public static ByteArray stablePartAssignmentsKey(ZonePartitionId
zonePartitionId) {
+ return new ByteArray(STABLE_ASSIGNMENTS_PREFIX + zonePartitionId);
}
/**
* Key that is needed for the rebalance algorithm.
*
- * @param partId Unique identifier of a partition.
+ * @param zonePartitionId Unique aggregate identifier of a partition of a
zone.
* @return Key for a partition.
* @see <a
href="https://github.com/apache/ignite-3/blob/main/modules/table/tech-notes/rebalance.md">Rebalance
documentation</a>
*/
- public static ByteArray switchReduceKey(TablePartitionId partId) {
- return new ByteArray(ASSIGNMENTS_SWITCH_REDUCE_PREFIX + partId);
+ public static ByteArray switchReduceKey(ZonePartitionId zonePartitionId) {
+ return new ByteArray(ASSIGNMENTS_SWITCH_REDUCE_PREFIX +
zonePartitionId);
}
/**
* Key that is needed for the rebalance algorithm.
*
- * @param partId Unique identifier of a partition.
+ * @param zonePartitionId Unique aggregate identifier of a partition of a
zone.
* @return Key for a partition.
* @see <a
href="https://github.com/apache/ignite-3/blob/main/modules/table/tech-notes/rebalance.md">Rebalance
documentation</a>
*/
- public static ByteArray switchAppendKey(TablePartitionId partId) {
- return new ByteArray(ASSIGNMENTS_SWITCH_APPEND_PREFIX + partId);
- }
-
- /**
- * ByteArray key for a counter of rebalances of tables from a zone that
are associated with the specified partition.
- *
- * @param zoneId Identifier of a zone.
- * @param partId Unique identifier of a partition.
- * @return Key for a partition.
- */
- public static ByteArray tablesCounterKey(int zoneId, int partId) {
- return new ByteArray(TABLES_COUNTER_PREFIX + zoneId + "_part_" +
partId);
- }
-
- /**
- * ByteArray prefix for counter of rebalances of tables from a zone that
are associated with the specified partition.
- *
- * @return Prefix for a counter of rebalances of tables partition.
- */
- public static ByteArray tablesCounterPrefixKey() {
- return new ByteArray(TABLES_COUNTER_PREFIX);
+ public static ByteArray switchAppendKey(ZonePartitionId zonePartitionId) {
+ return new ByteArray(ASSIGNMENTS_SWITCH_APPEND_PREFIX +
zonePartitionId);
}
/**
- * ByteArray key for a raft configuration that was applied during
rebalance of the specified partition form a table.
- *
- * @param partId Unique identifier of a partition.
- * @return Key for a applied raft configuration.
- */
- public static ByteArray raftConfigurationAppliedKey(TablePartitionId
partId) {
- return new ByteArray(RAFT_CONF_APPLIED_PREFIX + partId);
- }
-
- /**
- * Extract table id from a metastorage key of partition.
- *
- * @param key Key.
- * @param prefix Key prefix.
- * @return Table id.
- */
- public static int extractTableId(byte[] key, String prefix) {
- String strKey = new String(key, StandardCharsets.UTF_8);
-
- return Integer.parseInt(strKey.substring(prefix.length(),
strKey.indexOf("_part_")));
- }
-
- /**
- * Extract table id from a metastorage key of partition.
+ * Extract zone id from a metastorage key of partition.
*
* @param key Key.
* @param prefix Key prefix.
@@ -473,7 +433,7 @@ public class RebalanceUtil {
public static int extractZoneId(byte[] key, String prefix) {
String strKey = new String(key, StandardCharsets.UTF_8);
- return Integer.parseInt(strKey.substring(prefix.length()));
+ return Integer.parseInt(strKey.substring(prefix.length(),
strKey.indexOf("_part_")));
}
/**
@@ -482,10 +442,10 @@ public class RebalanceUtil {
* @param key Key.
* @return Table id.
*/
- static int extractZoneIdFromTablesCounter(byte[] key) {
+ public static int extractZoneIdDataNodes(byte[] key) {
String strKey = new String(key, StandardCharsets.UTF_8);
- return
Integer.parseInt(strKey.substring(TABLES_COUNTER_PREFIX.length(),
strKey.indexOf("_part_")));
+ return
Integer.parseInt(strKey.substring(DISTRIBUTION_ZONE_DATA_NODES_VALUE_PREFIX.length()));
}
/**
@@ -548,90 +508,77 @@ public class RebalanceUtil {
return op1.stream().filter(op2::contains).collect(toSet());
}
- /**
- * Returns partition assignments from meta storage.
- *
- * @param metaStorageManager Meta storage manager.
- * @param tableId Table ID.
- * @param partitionId Partition ID.
- * @return Future with partition assignments as a value.
- */
- public static CompletableFuture<Set<Assignment>> partitionAssignments(
- MetaStorageManager metaStorageManager,
- int tableId,
- int partitionId
- ) {
- return metaStorageManager
- .get(stablePartAssignmentsKey(new TablePartitionId(tableId,
partitionId)))
- .thenApply(e -> (e.value() == null) ? null :
Assignments.fromBytes(e.value()).nodes());
- }
-
/**
* Returns partition assignments from meta storage locally.
*
* @param metaStorageManager Meta storage manager.
- * @param tableId Table id.
+ * @param zoneId Zone id.
* @param partitionNumber Partition number.
* @param revision Revision.
* @return Returns partition assignments from meta storage locally or
{@code null} if assignments is absent.
*/
@Nullable
- public static Set<Assignment> partitionAssignmentsGetLocally(
+ public static Set<Assignment> zonePartitionAssignmentsGetLocally(
MetaStorageManager metaStorageManager,
- int tableId,
+ int zoneId,
int partitionNumber,
long revision
) {
- Entry entry =
metaStorageManager.getLocally(stablePartAssignmentsKey(new
TablePartitionId(tableId, partitionNumber)), revision);
+ Entry entry =
metaStorageManager.getLocally(stablePartAssignmentsKey(new
ZonePartitionId(zoneId, partitionNumber)), revision);
return (entry == null || entry.empty() || entry.tombstone()) ? null :
Assignments.fromBytes(entry.value()).nodes();
}
/**
- * Returns partition assignments from meta storage locally.
+ * Returns zone assignments for all zone partitions from meta storage
locally. Assignments must be present.
*
* @param metaStorageManager Meta storage manager.
* @param zoneId Zone id.
- * @param partitionNumber Partition number.
+ * @param numberOfPartitions Number of partitions.
* @param revision Revision.
- * @return Returns partition assignments from meta storage locally or
{@code null} if assignments is absent.
+ * @return Future with zone assignments as a value.
*/
- @Nullable
- public static Set<Assignment> zonePartitionAssignmentsGetLocally(
+ public static List<Assignments> zoneAssignmentsGetLocally(
MetaStorageManager metaStorageManager,
int zoneId,
- int partitionNumber,
+ int numberOfPartitions,
long revision
) {
- Entry entry =
metaStorageManager.getLocally(stablePartAssignmentsKey(new
ZonePartitionId(zoneId, partitionNumber)), revision);
+ return IntStream.range(0, numberOfPartitions)
+ .mapToObj(p -> {
+ Entry e =
metaStorageManager.getLocally(stablePartAssignmentsKey(new
ZonePartitionId(zoneId, p)), revision);
- return (entry == null || entry.empty() || entry.tombstone()) ? null :
Assignments.fromBytes(entry.value()).nodes();
+ assert e != null && !e.empty() && !e.tombstone() : e;
+
+ return Assignments.fromBytes(e.value());
+ })
+ .collect(toList());
}
/**
- * Returns table assignments for table partitions from meta storage.
+ * Returns zone assignments for zone partitions from meta storage.
*
* @param metaStorageManager Meta storage manager.
- * @param tableId Table id.
+ * @param zoneId Zone id.
* @param partitionIds IDs of partitions to get assignments for. If empty,
get all partition assignments.
* @param numberOfPartitions Number of partitions. Ignored if partition
IDs are specified.
- * @return Future with table assignments as a value.
+ * @return Future with zone assignments as a value.
*/
- public static CompletableFuture<Map<Integer, Assignments>>
tableAssignments(
+ private static CompletableFuture<Map<Integer, Assignments>>
zoneAssignments(
MetaStorageManager metaStorageManager,
- int tableId,
+ int zoneId,
Set<Integer> partitionIds,
int numberOfPartitions
) {
- IntStream partitionIdsStream = partitionIds.isEmpty()
- ? IntStream.range(0, numberOfPartitions)
- : partitionIds.stream().mapToInt(Integer::intValue);
-
- Map<ByteArray, Integer> partitionKeysToPartitionNumber =
partitionIdsStream.collect(
- HashMap::new,
- (map, partId) -> map.put(stablePartAssignmentsKey(new
TablePartitionId(tableId, partId)), partId),
- Map::putAll
- );
+ Map<ByteArray, Integer> partitionKeysToPartitionNumber = new
HashMap<>();
+
+ Collection<Integer> ids = partitionIds.isEmpty()
+ ? IntStream.range(0,
numberOfPartitions).boxed().collect(toList())
+ : partitionIds;
+
+ for (Integer partId : ids) {
+ partitionKeysToPartitionNumber.put(stablePartAssignmentsKey(new
ZonePartitionId(zoneId, partId)), partId);
+ }
return
metaStorageManager.getAll(partitionKeysToPartitionNumber.keySet())
.thenApply(entries -> {
@@ -653,61 +600,9 @@ public class RebalanceUtil {
assert numberOfMsPartitions == 0 || numberOfMsPartitions
== entries.size()
: "Invalid number of stable partition entries
received from meta storage [received="
- + numberOfMsPartitions + ", numberOfPartitions=" +
entries.size() + ", tableId=" + tableId + "].";
+ + numberOfMsPartitions + ", numberOfPartitions=" +
entries.size() + ", zoneId=" + zoneId + "].";
return numberOfMsPartitions == 0 ? Map.of() : result;
});
}
-
- /**
- * Returns table assignments for all table partitions from meta storage
locally. Assignments must be present.
- *
- * @param metaStorageManager Meta storage manager.
- * @param tableId Table id.
- * @param numberOfPartitions Number of partitions.
- * @param revision Revision.
- * @return Future with table assignments as a value.
- */
- public static List<Assignments> tableAssignmentsGetLocally(
- MetaStorageManager metaStorageManager,
- int tableId,
- int numberOfPartitions,
- long revision
- ) {
- return IntStream.range(0, numberOfPartitions)
- .mapToObj(p -> {
- Entry e =
metaStorageManager.getLocally(stablePartAssignmentsKey(new
TablePartitionId(tableId, p)), revision);
-
- assert e != null && !e.empty() && !e.tombstone() : e;
-
- return Assignments.fromBytes(e.value());
- })
- .collect(toList());
- }
-
- /**
- * Returns zone assignments for all zone partitions from meta storage
locally. Assignments must be present.
- *
- * @param metaStorageManager Meta storage manager.
- * @param zoneId Zone id.
- * @param numberOfPartitions Number of partitions.
- * @param revision Revision.
- * @return Future with zone assignments as a value.
- */
- public static List<Assignments> zoneAssignmentsGetLocally(
- MetaStorageManager metaStorageManager,
- int zoneId,
- int numberOfPartitions,
- long revision
- ) {
- return IntStream.range(0, numberOfPartitions)
- .mapToObj(p -> {
- Entry e =
metaStorageManager.getLocally(stablePartAssignmentsKey(new
ZonePartitionId(zoneId, p)), revision);
-
- assert e != null && !e.empty() && !e.tombstone() : e;
-
- return Assignments.fromBytes(e.value());
- })
- .collect(toList());
- }
}
diff --git
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
index d4864210ed..f0a435f281 100644
---
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
+++
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
@@ -23,10 +23,11 @@ import static
java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.ignite.internal.BaseIgniteRestartTest.createVault;
import static
org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_TEST_PROFILE_NAME;
import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
+import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.defaultZoneIdOpt;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.alterZone;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.assertValueInStorage;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.REBALANCE_SCHEDULER_POOL_SIZE;
-import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartAssignmentsKey;
+import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.stablePartAssignmentsKey;
import static
org.apache.ignite.internal.partition.replicator.PartitionReplicaLifecycleManager.FEATURE_FLAG_NAME;
import static org.apache.ignite.internal.sql.SqlCommon.DEFAULT_SCHEMA_NAME;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
@@ -167,7 +168,6 @@ import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
@@ -323,7 +323,6 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
}
@Test
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-22410")
void testAlterReplicaTrigger() throws Exception {
Assignment replicaAssignment = (Assignment)
AffinityUtils.calculateAssignmentForPartition(
nodes.stream().map(n -> n.name).collect(Collectors.toList()),
0, 1).toArray()[0];
@@ -332,12 +331,10 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
placementDriver.setPrimary(node.clusterService.topologyService().localMember());
- createZone(node, "test_zone", 2, 3);
+ createZone(node, "test_zone", 1, 3);
int zoneId = DistributionZonesTestUtil.getZoneId(node.catalogManager,
"test_zone", node.hybridClock.nowLong());
- createTable(node, "test_zone", "test_table");
-
MetaStorageManager metaStorageManager = node.metaStorageManager;
ZonePartitionId partId = new ZonePartitionId(zoneId, 0);
@@ -348,7 +345,7 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
(v) -> Assignments.fromBytes(v).nodes()
.stream().map(Assignment::consistentId).collect(Collectors.toSet()),
nodes.stream().map(n -> n.name).collect(Collectors.toSet()),
- 10_000L
+ 20_000L
);
CatalogManager catalogManager = node.catalogManager;
@@ -361,7 +358,92 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
(v) -> Assignments.fromBytes(v).nodes()
.stream().map(Assignment::consistentId).collect(Collectors.toSet()).size(),
2,
- 10_000L * 2
+ 20_000L
+ );
+
+ assertValueInStorage(
+ metaStorageManager,
+ stablePartAssignmentsKey(partId),
+ (v) ->
Assignments.fromBytes(v).nodes().contains(replicaAssignment),
+ true,
+ 20_000L
+ );
+ }
+
+ @Test
+ void testAlterReplicaTriggerDefaultZone() throws Exception {
+ Assignment replicaAssignment = (Assignment)
AffinityUtils.calculateAssignmentForPartition(
+ nodes.stream().map(n -> n.name).collect(Collectors.toList()),
0, 1).toArray()[0];
+
+ Node node = getNode(replicaAssignment.consistentId());
+
+
placementDriver.setPrimary(node.clusterService.topologyService().localMember());
+
+ CatalogManager catalogManager = node.catalogManager;
+
+ int zoneId =
defaultZoneIdOpt(catalogManager.catalog(catalogManager.latestCatalogVersion()));
+
+ String defaultZoneName = catalogManager.zone(zoneId,
catalogManager.latestCatalogVersion()).name();
+
+ MetaStorageManager metaStorageManager = node.metaStorageManager;
+
+ ZonePartitionId partId = new ZonePartitionId(zoneId, 0);
+
+ assertValueInStorage(
+ metaStorageManager,
+ stablePartAssignmentsKey(partId),
+ (v) -> Assignments.fromBytes(v).nodes()
+
.stream().map(Assignment::consistentId).collect(Collectors.toSet()).size(),
+ 1,
+ 20_000L
+ );
+
+ alterZone(catalogManager, defaultZoneName, 2);
+
+ assertValueInStorage(
+ metaStorageManager,
+ stablePartAssignmentsKey(partId),
+ (v) -> Assignments.fromBytes(v).nodes()
+
.stream().map(Assignment::consistentId).collect(Collectors.toSet()).size(),
+ 2,
+ 20_000L
+ );
+
+ assertValueInStorage(
+ metaStorageManager,
+ stablePartAssignmentsKey(partId),
+ (v) ->
Assignments.fromBytes(v).nodes().contains(replicaAssignment),
+ true,
+ 20_000L
+ );
+ }
+
+ @Test
+ void testAlterReplicaExtensionTrigger() throws Exception {
+ Assignment replicaAssignment = (Assignment)
AffinityUtils.calculateAssignmentForPartition(
+ nodes.stream().map(n -> n.name).collect(Collectors.toList()),
0, 1).toArray()[0];
+
+ Node node = getNode(replicaAssignment.consistentId());
+
+
placementDriver.setPrimary(node.clusterService.topologyService().localMember());
+
+ assertTrue(waitForCondition(() ->
node.distributionZoneManager.logicalTopology().size() == 3, 10_000L));
+
+ createZone(node, "test_zone", 2, 2);
+
+ int zoneId = DistributionZonesTestUtil.getZoneId(node.catalogManager,
"test_zone", node.hybridClock.nowLong());
+
+ MetaStorageManager metaStorageManager = node.metaStorageManager;
+
+ ZonePartitionId partId = new ZonePartitionId(zoneId, 0);
+
+ assertValueInStorage(
+ metaStorageManager,
+ stablePartAssignmentsKey(partId),
+ (v) -> Assignments.fromBytes(v).nodes()
+
.stream().map(Assignment::consistentId).collect(Collectors.toSet()).size(),
+ 2,
+ 20_000L
);
assertValueInStorage(
@@ -369,12 +451,24 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
stablePartAssignmentsKey(partId),
(v) ->
Assignments.fromBytes(v).nodes().contains(replicaAssignment),
true,
- 10_000L * 2
+ 20_000L
+ );
+
+ CatalogManager catalogManager = node.catalogManager;
+
+ alterZone(catalogManager, "test_zone", 3);
+
+ assertValueInStorage(
+ metaStorageManager,
+ stablePartAssignmentsKey(partId),
+ (v) -> Assignments.fromBytes(v).nodes()
+
.stream().map(Assignment::consistentId).collect(Collectors.toSet()),
+ nodes.stream().map(n -> n.name).collect(Collectors.toSet()),
+ 20_000L
);
}
@Test
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-22410")
void testAlterFilterTrigger() throws Exception {
Assignment replicaAssignment = (Assignment)
AffinityUtils.calculateAssignmentForPartition(
nodes.stream().map(n -> n.name).collect(Collectors.toList()),
0, 1).toArray()[0];
@@ -387,8 +481,6 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
int zoneId = DistributionZonesTestUtil.getZoneId(node.catalogManager,
"test_zone", node.hybridClock.nowLong());
- createTable(node, "test_zone", "test_table");
-
MetaStorageManager metaStorageManager = node.metaStorageManager;
ZonePartitionId partId = new ZonePartitionId(zoneId, 0);
@@ -399,7 +491,7 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
(v) -> Assignments.fromBytes(v).nodes()
.stream().map(Assignment::consistentId).collect(Collectors.toSet()),
nodes.stream().map(n -> n.name).collect(Collectors.toSet()),
- 10_000L
+ 20_000L
);
CatalogManager catalogManager = node.catalogManager;
@@ -414,7 +506,7 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
(v) -> Assignments.fromBytes(v).nodes()
.stream().map(Assignment::consistentId).collect(Collectors.toSet()),
Set.of(nodes.get(0).name),
- 10_000L * 2
+ 20_000L
);
assertValueInStorage(
@@ -422,7 +514,7 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
stablePartAssignmentsKey(partId),
(v) ->
Assignments.fromBytes(v).nodes().contains(replicaAssignment),
true,
- 10_000L * 2
+ 20_000L
);
}
@@ -786,7 +878,8 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
distributionZoneManager,
metaStorageManager,
clusterService.topologyService(),
- threadPoolsManager.tableIoExecutor()
+ threadPoolsManager.tableIoExecutor(),
+ rebalanceScheduler
);
StorageUpdateConfiguration storageUpdateConfiguration =
clusterConfigRegistry.getConfiguration(StorageUpdateConfiguration.KEY);
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
index df2db83be3..c55c5d0410 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
@@ -17,27 +17,42 @@
package org.apache.ignite.internal.partition.replicator;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Collections.emptySet;
import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.concurrent.CompletableFuture.runAsync;
+import static java.util.concurrent.CompletableFuture.supplyAsync;
+import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
import static
org.apache.ignite.internal.catalog.events.CatalogEvent.ZONE_CREATE;
-import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartAssignmentsKey;
-import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.zoneAssignmentsGetLocally;
-import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.zonePartitionAssignmentsGetLocally;
+import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceRaftGroupEventsListener.handleReduceChanged;
+import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.ASSIGNMENTS_SWITCH_REDUCE_PREFIX;
+import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.PENDING_ASSIGNMENTS_PREFIX;
+import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.STABLE_ASSIGNMENTS_PREFIX;
+import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.extractPartitionNumber;
+import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.extractZoneId;
+import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.pendingPartAssignmentsKey;
+import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.stablePartAssignmentsKey;
+import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.zoneAssignmentsGetLocally;
+import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.zonePartitionAssignmentsGetLocally;
import static
org.apache.ignite.internal.lang.IgniteSystemProperties.getBoolean;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
+import static org.apache.ignite.internal.raft.PeersAndLearners.fromAssignments;
import static org.apache.ignite.internal.util.ByteUtils.toByteArray;
import static
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
+import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -45,18 +60,24 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.ignite.internal.affinity.AffinityUtils;
+import org.apache.ignite.internal.affinity.Assignment;
import org.apache.ignite.internal.affinity.Assignments;
import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.catalog.events.CreateZoneEventParameters;
import org.apache.ignite.internal.close.ManuallyCloseable;
import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
+import org.apache.ignite.internal.distributionzones.rebalance.PartitionMover;
+import org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil;
+import
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceRaftGroupEventsListener;
import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.IgniteStringFormatter;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.logger.IgniteLogger;
@@ -65,13 +86,18 @@ import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.WatchEvent;
+import org.apache.ignite.internal.metastorage.WatchListener;
import org.apache.ignite.internal.metastorage.dsl.Condition;
import org.apache.ignite.internal.metastorage.dsl.Operation;
import org.apache.ignite.internal.network.TopologyService;
import
org.apache.ignite.internal.partition.replicator.snapshot.FailFastSnapshotStorageFactory;
+import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.PeersAndLearners;
-import org.apache.ignite.internal.raft.RaftGroupEventsListener;
+import org.apache.ignite.internal.raft.service.LeaderWithTerm;
import org.apache.ignite.internal.raft.service.RaftGroupListener;
+import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.replicator.Replica;
import org.apache.ignite.internal.replicator.ReplicaManager;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.ZonePartitionId;
@@ -79,6 +105,7 @@ import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.network.ClusterNode;
+import org.jetbrains.annotations.Nullable;
/**
* The main responsibilities of this class:
@@ -102,6 +129,15 @@ public class PartitionReplicaLifecycleManager implements
IgniteComponent {
private final TopologyService topologyService;
+ /** Meta storage listener for pending assignments. */
+ private final WatchListener pendingAssignmentsRebalanceListener;
+
+ /** Meta storage listener for stable assignments. */
+ private final WatchListener stableAssignmentsRebalanceListener;
+
+ /** Meta storage listener for switch reduce assignments. */
+ private final WatchListener assignmentsSwitchRebalanceListener;
+
/** The logger. */
private static final IgniteLogger LOG =
Loggers.forClass(PartitionReplicaLifecycleManager.class);
@@ -115,6 +151,9 @@ public class PartitionReplicaLifecycleManager implements
IgniteComponent {
*/
private final ExecutorService ioExecutor;
+ /** Executor for scheduling rebalance routine. */
+ private final ScheduledExecutorService rebalanceScheduler;
+
/**
* The constructor.
*
@@ -131,7 +170,8 @@ public class PartitionReplicaLifecycleManager implements
IgniteComponent {
DistributionZoneManager distributionZoneMgr,
MetaStorageManager metaStorageMgr,
TopologyService topologyService,
- ExecutorService ioExecutor
+ ExecutorService ioExecutor,
+ ScheduledExecutorService rebalanceScheduler
) {
this.catalogMgr = catalogMgr;
this.replicaMgr = replicaMgr;
@@ -139,6 +179,11 @@ public class PartitionReplicaLifecycleManager implements
IgniteComponent {
this.metaStorageMgr = metaStorageMgr;
this.topologyService = topologyService;
this.ioExecutor = ioExecutor;
+ this.rebalanceScheduler = rebalanceScheduler;
+
+ pendingAssignmentsRebalanceListener =
createPendingAssignmentsRebalanceListener();
+ stableAssignmentsRebalanceListener =
createStableAssignmentsRebalanceListener();
+ assignmentsSwitchRebalanceListener =
createAssignmentsSwitchRebalanceListener();
}
@Override
@@ -147,6 +192,12 @@ public class PartitionReplicaLifecycleManager implements
IgniteComponent {
return nullCompletedFuture();
}
+
metaStorageMgr.registerPrefixWatch(ByteArray.fromString(PENDING_ASSIGNMENTS_PREFIX),
pendingAssignmentsRebalanceListener);
+
+
metaStorageMgr.registerPrefixWatch(ByteArray.fromString(STABLE_ASSIGNMENTS_PREFIX),
stableAssignmentsRebalanceListener);
+
+
metaStorageMgr.registerPrefixWatch(ByteArray.fromString(ASSIGNMENTS_SWITCH_REDUCE_PREFIX),
assignmentsSwitchRebalanceListener);
+
catalogMgr.listen(ZONE_CREATE,
(CreateZoneEventParameters parameters) ->
inBusyLock(busyLock, () ->
onCreateZone(parameters).thenApply((ignored) -> false))
@@ -177,41 +228,81 @@ public class PartitionReplicaLifecycleManager implements
IgniteComponent {
List<CompletableFuture<?>> partitionsStartFutures = new
ArrayList<>();
- for (int i = 0; i < assignments.size(); i++) {
- int partId = i;
+ for (int partId = 0; partId < assignments.size(); partId++) {
+ Assignments zoneAssignment = assignments.get(partId);
+
+ Assignment localMemberAssignment =
localMemberAssignment(zoneAssignment);
-
partitionsStartFutures.add(createZonePartitionReplicationNodes(zoneId, partId,
assignments.get(i)));
+
partitionsStartFutures.add(createZonePartitionReplicationNode(zoneId, partId,
localMemberAssignment, zoneAssignment));
}
return allOf(partitionsStartFutures.toArray(new
CompletableFuture<?>[0]));
}));
}
- private CompletableFuture<Void> createZonePartitionReplicationNodes(int
zoneId, int partId, Assignments assignments) {
- if (!shouldStartLocally(assignments)) {
+ /**
+ * Start a replica for the corresponding {@code zoneId} and {@code partId}
on the local node if {@code localMemberAssignment} is not
+ * null, meaning that the local node is part of the assignment.
+ *
+ * @param zoneId Zone id.
+ * @param partId Partition id.
+ * @param localMemberAssignment Assignment of the local member, or null if
local member is not part of the assignment.
+ * @param stableAssignments Stable assignments.
+ * @return Future that completes when a replica is started.
+ */
+ private CompletableFuture<Void> createZonePartitionReplicationNode(
+ int zoneId,
+ int partId,
+ @Nullable Assignment localMemberAssignment,
+ Assignments stableAssignments
+ ) {
+ if (localMemberAssignment == null) {
return nullCompletedFuture();
}
- PeersAndLearners realConfiguration =
PeersAndLearners.fromAssignments(assignments.nodes());
+ PeersAndLearners stablePeersAndLearners =
fromAssignments(stableAssignments.nodes());
ZonePartitionId replicaGrpId = new ZonePartitionId(zoneId, partId);
RaftGroupListener raftGroupListener = new ZonePartitionRaftListener();
+ ZoneRebalanceRaftGroupEventsListener raftGroupEventsListener = new
ZoneRebalanceRaftGroupEventsListener(
+ metaStorageMgr,
+ replicaGrpId,
+ busyLock,
+ createPartitionMover(replicaGrpId),
+ rebalanceScheduler,
+ catalogMgr,
+ distributionZoneMgr
+ );
+
+ replicationGroupIds.add(replicaGrpId);
+
try {
return replicaMgr.startReplica(
replicaGrpId,
new ZonePartitionReplicaListener(),
new FailFastSnapshotStorageFactory(),
- realConfiguration,
+ stablePeersAndLearners,
raftGroupListener,
- RaftGroupEventsListener.noopLsnr
- ).thenRun(() -> replicationGroupIds.add(replicaGrpId));
+ raftGroupEventsListener
+ ).thenApply(ignored -> null);
} catch (NodeStoppingException e) {
return failedFuture(e);
}
}
+ private PartitionMover createPartitionMover(ZonePartitionId replicaGrpId) {
+ return new PartitionMover(busyLock, () -> {
+ CompletableFuture<Replica> replicaFut =
replicaMgr.replica(replicaGrpId);
+ if (replicaFut == null) {
+ return failedFuture(new IgniteInternalException("No such
replica for partition " + replicaGrpId.partitionId()
+ + " in zone " + replicaGrpId.zoneId()));
+ }
+ return replicaFut.thenApply(Replica::raftClient);
+ });
+ }
+
private boolean shouldStartLocally(Assignments assignments) {
return assignments
.nodes()
@@ -227,6 +318,10 @@ public class PartitionReplicaLifecycleManager implements
IgniteComponent {
public void beforeNodeStop() {
busyLock.block();
+ metaStorageMgr.unregisterWatch(pendingAssignmentsRebalanceListener);
+ metaStorageMgr.unregisterWatch(stableAssignmentsRebalanceListener);
+ metaStorageMgr.unregisterWatch(assignmentsSwitchRebalanceListener);
+
cleanUpPartitionsResources(replicationGroupIds);
}
@@ -360,6 +455,415 @@ public class PartitionReplicaLifecycleManager implements
IgniteComponent {
return assignmentsFuture;
}
+
+ /**
+ * Creates meta storage listener for pending assignments updates.
+ *
+ * @return The watch listener.
+ */
+ private WatchListener createPendingAssignmentsRebalanceListener() {
+ return new WatchListener() {
+ @Override
+ public CompletableFuture<Void> onUpdate(WatchEvent evt) {
+ if (!busyLock.enterBusy()) {
+ return failedFuture(new NodeStoppingException());
+ }
+
+ try {
+ Entry newEntry = evt.entryEvent().newEntry();
+
+ return handleChangePendingAssignmentEvent(newEntry,
evt.revision(), false);
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ LOG.warn("Unable to process pending assignments event", e);
+ }
+ };
+ }
+
+ /**
+ * Creates Meta storage listener for stable assignments updates.
+ *
+ * @return The watch listener.
+ */
+ private WatchListener createStableAssignmentsRebalanceListener() {
+ return new WatchListener() {
+ @Override
+ public CompletableFuture<Void> onUpdate(WatchEvent evt) {
+ if (!busyLock.enterBusy()) {
+ return failedFuture(new NodeStoppingException());
+ }
+
+ try {
+ return handleChangeStableAssignmentEvent(evt);
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ LOG.warn("Unable to process stable assignments event", e);
+ }
+ };
+ }
+
+ /** Creates Meta storage listener for switch reduce assignments updates. */
+ private WatchListener createAssignmentsSwitchRebalanceListener() {
+ return new WatchListener() {
+ @Override
+ public CompletableFuture<Void> onUpdate(WatchEvent evt) {
+ return inBusyLockAsync(busyLock, () -> {
+ byte[] key = evt.entryEvent().newEntry().key();
+
+ int partitionId = extractPartitionNumber(key);
+ int zoneId = extractZoneId(key,
ASSIGNMENTS_SWITCH_REDUCE_PREFIX);
+
+ ZonePartitionId replicaGrpId = new ZonePartitionId(zoneId,
partitionId);
+
+ // It is safe to get the latest version of the catalog as
we are in the metastore thread.
+ int catalogVersion = catalogMgr.latestCatalogVersion();
+
+ CatalogZoneDescriptor zoneDescriptor =
catalogMgr.zone(zoneId, catalogVersion);
+
+ long causalityToken = zoneDescriptor.updateToken();
+
+ return distributionZoneMgr.dataNodes(causalityToken,
catalogVersion, zoneId)
+ .thenCompose(dataNodes -> handleReduceChanged(
+ metaStorageMgr,
+ dataNodes,
+ zoneDescriptor.replicas(),
+ replicaGrpId,
+ evt
+ ));
+
+ });
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ LOG.warn("Unable to process switch reduce event", e);
+ }
+ };
+ }
+
+ /**
+ * Handles the {@link
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil#STABLE_ASSIGNMENTS_PREFIX}
update event.
+ *
+ * @param evt Event.
+ */
+ protected CompletableFuture<Void>
handleChangeStableAssignmentEvent(WatchEvent evt) {
+ if (evt.entryEvents().stream().allMatch(e -> e.oldEntry().value() ==
null)) {
+ // It's the initial write to zone stable assignments on zone
create event.
+ return nullCompletedFuture();
+ }
+
+ if (!evt.single()) {
+ // If there is not a single entry, then all entries must be
tombstones (this happens after zone drop).
+ assert evt.entryEvents().stream().allMatch(entryEvent ->
entryEvent.newEntry().tombstone()) : evt;
+
+ return nullCompletedFuture();
+ }
+
+ // here we can receive only update from the rebalance logic
+ // these updates always processing only 1 partition, so, only 1 stable
partition key.
+ assert evt.single() : evt;
+
+ if (evt.entryEvent().oldEntry() == null) {
+ // This means it's an event on zone creation.
+ return nullCompletedFuture();
+ }
+
+ Entry stableAssignmentsWatchEvent = evt.entryEvent().newEntry();
+
+ long revision = evt.revision();
+
+ assert stableAssignmentsWatchEvent.revision() == revision :
stableAssignmentsWatchEvent;
+
+ if (stableAssignmentsWatchEvent.value() == null) {
+ return nullCompletedFuture();
+ }
+
+ return handleChangeStableAssignmentEvent(stableAssignmentsWatchEvent,
evt.revision(), false);
+ }
+
+ protected CompletableFuture<Void> handleChangeStableAssignmentEvent(
+ Entry stableAssignmentsWatchEvent,
+ long revision,
+ boolean isRecovery
+ ) {
+ int partitionId =
extractPartitionNumber(stableAssignmentsWatchEvent.key());
+ int zoneId = extractZoneId(stableAssignmentsWatchEvent.key(),
STABLE_ASSIGNMENTS_PREFIX);
+
+ ZonePartitionId zonePartitionId = new ZonePartitionId(zoneId,
partitionId);
+
+ Set<Assignment> stableAssignments =
stableAssignmentsWatchEvent.value() == null
+ ? emptySet()
+ :
Assignments.fromBytes(stableAssignmentsWatchEvent.value()).nodes();
+
+
+ return supplyAsync(() -> {
+ Entry pendingAssignmentsEntry =
metaStorageMgr.getLocally(pendingPartAssignmentsKey(zonePartitionId), revision);
+
+ byte[] pendingAssignmentsFromMetaStorage =
pendingAssignmentsEntry.value();
+
+ Assignments pendingAssignments = pendingAssignmentsFromMetaStorage
== null
+ ? Assignments.EMPTY
+ : Assignments.fromBytes(pendingAssignmentsFromMetaStorage);
+
+ return stopAndDestroyPartitionAndUpdateClients(
+ zonePartitionId,
+ stableAssignments,
+ pendingAssignments,
+ isRecovery
+ );
+ }, ioExecutor).thenCompose(identity());
+ }
+
+ private CompletableFuture<Void> updatePartitionClients(
+ ZonePartitionId zonePartitionId,
+ Set<Assignment> stableAssignments
+ ) {
+ // Update raft client peers and learners according to the actual
assignments.
+ if (replicaMgr.isReplicaStarted(zonePartitionId)) {
+ replicaMgr.getReplica(zonePartitionId).join()
+
.raftClient().updateConfiguration(fromAssignments(stableAssignments));
+ }
+
+ return nullCompletedFuture();
+ }
+
+ private CompletableFuture<Void> stopAndDestroyPartitionAndUpdateClients(
+ ZonePartitionId zonePartitionId,
+ Set<Assignment> stableAssignments,
+ Assignments pendingAssignments,
+ boolean isRecovery
+ ) {
+ CompletableFuture<Void> clientUpdateFuture = isRecovery
+ // Updating clients is not needed on recovery.
+ ? nullCompletedFuture()
+ : updatePartitionClients(zonePartitionId, stableAssignments);
+
+ boolean shouldStopLocalServices = (pendingAssignments.force()
+ ? pendingAssignments.nodes().stream()
+ : Stream.concat(stableAssignments.stream(),
pendingAssignments.nodes().stream())
+ )
+ .noneMatch(assignment ->
assignment.consistentId().equals(localNode().name()));
+
+ if (shouldStopLocalServices) {
+ return allOf(
+ clientUpdateFuture,
+ stopAndDestroyPartition(zonePartitionId)
+ );
+ } else {
+ return clientUpdateFuture;
+ }
+ }
+
+ private CompletableFuture<?> stopAndDestroyPartition(ReplicationGroupId
zonePartitionId) {
+ return stopPartition(zonePartitionId);
+ }
+
+ private CompletableFuture<Void> handleChangePendingAssignmentEvent(
+ Entry pendingAssignmentsEntry,
+ long revision,
+ boolean isRecovery
+ ) {
+ if (pendingAssignmentsEntry.value() == null ||
pendingAssignmentsEntry.empty()) {
+ return nullCompletedFuture();
+ }
+
+ int partId = extractPartitionNumber(pendingAssignmentsEntry.key());
+ int zoneId = extractZoneId(pendingAssignmentsEntry.key(),
PENDING_ASSIGNMENTS_PREFIX);
+
+ var zonePartitionId = new ZonePartitionId(zoneId, partId);
+
+ // Stable assignments from the meta store, which revision is bounded
by the current pending event.
+ Assignments stableAssignments = stableAssignments(zonePartitionId,
revision);
+
+ Assignments pendingAssignments =
Assignments.fromBytes(pendingAssignmentsEntry.value());
+
+ if (!busyLock.enterBusy()) {
+ return CompletableFuture.<Void>failedFuture(new
NodeStoppingException());
+ }
+
+ try {
+ if (LOG.isInfoEnabled()) {
+ var stringKey = new String(pendingAssignmentsEntry.key(),
UTF_8);
+
+ LOG.info("Received update on pending assignments. Check if new
replication node should be started [key={}, "
+ + "partition={}, zoneId={},
localMemberAddress={}, pendingAssignments={}, revision={}]",
+ stringKey, partId, zoneId, localNode().address(),
pendingAssignments, revision);
+ }
+
+ return handleChangePendingAssignmentEvent(
+ zonePartitionId,
+ stableAssignments,
+ pendingAssignments
+ ).thenCompose(v -> changePeersOnRebalance(
+ replicaMgr,
+ zonePartitionId,
+ pendingAssignments.nodes(),
+ stableAssignments == null ? emptySet() :
stableAssignments.nodes(),
+ revision)
+ );
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ private CompletableFuture<Void> handleChangePendingAssignmentEvent(
+ ZonePartitionId replicaGrpId,
+ @Nullable Assignments stableAssignments,
+ Assignments pendingAssignments
+ ) {
+ boolean pendingAssignmentsAreForced = pendingAssignments.force();
+ Set<Assignment> pendingAssignmentsNodes = pendingAssignments.nodes();
+
+ // Start a new Raft node and Replica if this node has appeared in the
new assignments.
+ Assignment localMemberAssignment =
localMemberAssignment(pendingAssignments);
+
+ boolean shouldStartLocalGroupNode = localMemberAssignment != null
+ && (stableAssignments == null ||
!stableAssignments.nodes().contains(localMemberAssignment));
+
+ // This is a set of assignments for nodes that are not the part of
stable assignments, i.e. unstable part of the distribution.
+ // For regular pending assignments we use (old) stable set, so that
none of new nodes would be able to propose itself as a leader.
+ // For forced assignments, we should do the same thing, but only for
the subset of stable set that is alive right now. Dead nodes
+ // are excluded. It is calculated precisely as an intersection between
forced assignments and (old) stable assignments.
+ Assignments computedStableAssignments;
+
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-22600 remove the
second condition
+ // when we will have a proper handling of empty stable assignments
+ if (stableAssignments == null || stableAssignments.nodes().isEmpty()) {
+ // This condition can only pass if all stable nodes are dead, and
we start new raft group from scratch.
+ // In this case new initial configuration must match new forced
assignments.
+ computedStableAssignments =
Assignments.forced(pendingAssignmentsNodes);
+ } else if (pendingAssignmentsAreForced) {
+ // In case of forced assignments we need to remove nodes that are
present in the stable set but are missing from the
+ // pending set. Such operation removes dead stable nodes from the
resulting stable set, which guarantees that we will
+ // have a live majority.
+ Set<Assignment> intersection =
RebalanceUtil.intersect(stableAssignments.nodes(), pendingAssignmentsNodes);
+
+ computedStableAssignments = intersection.isEmpty() ?
pendingAssignments : Assignments.forced(intersection);
+ } else {
+ computedStableAssignments = stableAssignments;
+ }
+
+ int partitionId = replicaGrpId.partitionId();
+ int zoneId = replicaGrpId.zoneId();
+
+ CompletableFuture<Void> localServicesStartFuture;
+
+ if (shouldStartLocalGroupNode) {
+ localServicesStartFuture = createZonePartitionReplicationNode(
+ zoneId,
+ partitionId,
+ localMemberAssignment,
+ computedStableAssignments
+ );
+ } else {
+ localServicesStartFuture = runAsync(() -> {
+ if (pendingAssignmentsAreForced &&
replicaMgr.isReplicaStarted(replicaGrpId)) {
+ replicaMgr.resetPeers(replicaGrpId,
fromAssignments(computedStableAssignments.nodes()));
+ }
+ }, ioExecutor);
+ }
+
+ return localServicesStartFuture.thenRunAsync(() -> {
+ if (!replicaMgr.isReplicaStarted(replicaGrpId)) {
+ return;
+ }
+
+ // For forced assignments, we exclude dead stable nodes, and all
alive stable nodes are already in pending assignments.
+ // Union is not required in such a case.
+ Set<Assignment> newAssignments = pendingAssignmentsAreForced ||
stableAssignments == null
+ ? pendingAssignmentsNodes
+ : RebalanceUtil.union(pendingAssignmentsNodes,
stableAssignments.nodes());
+
+
replicaMgr.getReplica(replicaGrpId).join().raftClient().updateConfiguration(fromAssignments(newAssignments));
+ }, ioExecutor);
+ }
+
+ private CompletableFuture<Void> changePeersOnRebalance(
+ ReplicaManager replicaMgr,
+ ZonePartitionId replicaGrpId,
+ Set<Assignment> pendingAssignments,
+ Set<Assignment> stableAssignments,
+ long revision
+ ) {
+ Set<Assignment> union = new HashSet<>();
+ union.addAll(pendingAssignments);
+ union.addAll(stableAssignments);
+
+ if
(!union.stream().map(Assignment::consistentId).collect(toSet()).contains(localNode().name()))
{
+ return nullCompletedFuture();
+ }
+
+ int partId = replicaGrpId.partitionId();
+
+ RaftGroupService partGrpSvc =
replicaMgr.replica(replicaGrpId).join().raftClient();
+
+ return partGrpSvc.refreshAndGetLeaderWithTerm()
+ .exceptionally(throwable -> {
+ throwable = unwrapCause(throwable);
+
+ if (throwable instanceof TimeoutException) {
+ LOG.info("Node couldn't get the leader within timeout
so the changing peers is skipped [grp={}].", replicaGrpId);
+
+ return LeaderWithTerm.NO_LEADER;
+ }
+
+ throw new IgniteInternalException(
+ INTERNAL_ERR,
+ "Failed to get a leader for the RAFT replication
group [get=" + replicaGrpId + "].",
+ throwable
+ );
+ })
+ .thenCompose(leaderWithTerm -> {
+ if (leaderWithTerm.isEmpty() ||
!isLocalPeer(leaderWithTerm.leader())) {
+ return nullCompletedFuture();
+ }
+
+ // run update of raft configuration if this node is a
leader
+ LOG.info("Current node={} is the leader of partition raft
group={}. "
+ + "Initiate rebalance process for
partition={}, zoneId={}",
+ leaderWithTerm.leader(), replicaGrpId, partId,
replicaGrpId.zoneId());
+
+ return
metaStorageMgr.get(pendingPartAssignmentsKey(replicaGrpId))
+ .thenCompose(latestPendingAssignmentsEntry -> {
+ // Do not change peers of the raft group if
this is a stale event.
+ // Note that we start raft node before for the
sake of the consistency in a
+ // starting and stopping raft nodes.
+ if (revision <
latestPendingAssignmentsEntry.revision()) {
+ return nullCompletedFuture();
+ }
+
+ PeersAndLearners newConfiguration =
fromAssignments(pendingAssignments);
+
+ CompletableFuture<Void> voidCompletableFuture
= partGrpSvc.changePeersAsync(newConfiguration,
+ leaderWithTerm.term()).exceptionally(e
-> {
+ return null;
+ });
+ return voidCompletableFuture;
+ });
+ });
+ }
+
+ private boolean isLocalPeer(Peer peer) {
+ return peer.consistentId().equals(localNode().name());
+ }
+
+ @Nullable
+ private Assignment localMemberAssignment(Assignments assignments) {
+ Assignment localMemberAssignment =
Assignment.forPeer(localNode().name());
+
+ return assignments.nodes().contains(localMemberAssignment) ?
localMemberAssignment : null;
+ }
+
@Override
public CompletableFuture<Void> stopAsync(ComponentContext
componentContext) {
if (!ENABLED) {
@@ -369,6 +873,16 @@ public class PartitionReplicaLifecycleManager implements
IgniteComponent {
return nullCompletedFuture();
}
+ private static String zoneInfo(CatalogZoneDescriptor zoneDescriptor) {
+ return zoneDescriptor.id() + "/" + zoneDescriptor.name();
+ }
+
+ private @Nullable Assignments stableAssignments(ZonePartitionId
zonePartitionId, long revision) {
+ Entry entry =
metaStorageMgr.getLocally(stablePartAssignmentsKey(zonePartitionId), revision);
+
+ return Assignments.fromBytes(entry.value());
+ }
+
/**
* Stops all resources associated with a given partition, like replicas
and partition trackers.
*
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index c243706ea8..ca37e69f04 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.replicator;
import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.stream.Collectors.toSet;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import static org.apache.ignite.internal.raft.PeersAndLearners.fromAssignments;
@@ -672,44 +673,56 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
RaftNodeId raftNodeId = new RaftNodeId(replicaGrpId, new
Peer(localNodeConsistentId));
- ((Loza) raftManager).startRaftGroupNodeWithoutService(
+ CompletableFuture<TopologyAwareRaftGroupService> newRaftClientFut =
((Loza) raftManager).startRaftGroupNode(
raftNodeId,
newConfiguration,
raftGroupListener,
raftGroupEventsListener,
- groupOptions
+ groupOptions,
+ raftGroupServiceFactory
);
- LOG.info("Replica is about to start [replicationGroupId={}].",
replicaGrpId);
+ return newRaftClientFut.thenComposeAsync(raftClient -> {
+ if (!busyLock.enterBusy()) {
+ return failedFuture(new NodeStoppingException());
+ }
- Replica newReplica = new ZonePartitionReplicaImpl(
- replicaGrpId,
- listener
- );
+ try {
+ LOG.info("Replica is about to start [replicationGroupId={}].",
replicaGrpId);
- CompletableFuture<Replica> replicaFuture =
replicas.compute(replicaGrpId, (k, existingReplicaFuture) -> {
- if (existingReplicaFuture == null ||
existingReplicaFuture.isDone()) {
- assert existingReplicaFuture == null ||
isCompletedSuccessfully(existingReplicaFuture);
- LOG.info("Replica is started [replicationGroupId={}].",
replicaGrpId);
+ Replica newReplica = new ZonePartitionReplicaImpl(
+ replicaGrpId,
+ listener,
+ raftClient
+ );
- return completedFuture(newReplica);
- } else {
- existingReplicaFuture.complete(newReplica);
- LOG.info("Replica is started, existing replica waiter was
completed [replicationGroupId={}].", replicaGrpId);
+ CompletableFuture<Replica> replicaFuture =
replicas.compute(replicaGrpId, (k, existingReplicaFuture) -> {
+ if (existingReplicaFuture == null ||
existingReplicaFuture.isDone()) {
+ assert existingReplicaFuture == null ||
isCompletedSuccessfully(existingReplicaFuture);
+ LOG.info("Replica is started
[replicationGroupId={}].", replicaGrpId);
- return existingReplicaFuture;
- }
- });
+ return completedFuture(newReplica);
+ } else {
+ existingReplicaFuture.complete(newReplica);
+ LOG.info("Replica is started, existing replica waiter
was completed [replicationGroupId={}].", replicaGrpId);
- var eventParams = new LocalReplicaEventParameters(replicaGrpId);
+ return existingReplicaFuture;
+ }
+ });
- return fireEvent(AFTER_REPLICA_STARTED, eventParams)
- .exceptionally(e -> {
- LOG.error("Error when notifying about
AFTER_REPLICA_STARTED event.", e);
+ var eventParams = new
LocalReplicaEventParameters(replicaGrpId);
- return null;
- })
- .thenCompose(v -> replicaFuture);
+ return fireEvent(AFTER_REPLICA_STARTED, eventParams)
+ .exceptionally(e -> {
+ LOG.error("Error when notifying about
AFTER_REPLICA_STARTED event.", e);
+
+ return null;
+ })
+ .thenCompose(v -> replicaFuture);
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }, executor);
}
/**
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ZonePartitionReplicaImpl.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ZonePartitionReplicaImpl.java
index 4b91767e80..5cd05a8a5e 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ZonePartitionReplicaImpl.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ZonePartitionReplicaImpl.java
@@ -35,17 +35,28 @@ public class ZonePartitionReplicaImpl implements Replica {
private final ReplicaListener listener;
+ TopologyAwareRaftGroupService raftClient;
+
+ /**
+ * Constructor.
+ *
+ * @param replicaGrpId Replication group id.
+ * @param listener Listener for the replica.
+ * @param raftClient Raft client.
+ */
public ZonePartitionReplicaImpl(
ReplicationGroupId replicaGrpId,
- ReplicaListener listener
+ ReplicaListener listener,
+ TopologyAwareRaftGroupService raftClient
) {
this.replicaGrpId = replicaGrpId;
this.listener = listener;
+ this.raftClient = raftClient;
}
@Override
public TopologyAwareRaftGroupService raftClient() {
- throw new UnsupportedOperationException("raftClient");
+ return raftClient;
}
@Override
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index adfdac3464..d340c3b9ce 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -740,7 +740,8 @@ public class IgniteImpl implements Ignite {
distributionZoneManager,
metaStorageMgr,
clusterSvc.topologyService(),
- threadPoolsManager.tableIoExecutor()
+ threadPoolsManager.tableIoExecutor(),
+ rebalanceScheduler
);
TransactionConfiguration txConfig =
clusterConfigRegistry.getConfiguration(TransactionConfiguration.KEY);