This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 75c4ecb973a IGNITE-25986 Add public REST API for partition restart
with clean up functionality (#6493)
75c4ecb973a is described below
commit 75c4ecb973a94aa24e01919dab30e91b26f8e9c9
Author: Mirza Aliev <[email protected]>
AuthorDate: Fri Aug 29 11:41:59 2025 +0400
IGNITE-25986 Add public REST API for partition restart with clean up
functionality (#6493)
---
.../rest/api/recovery/DisasterRecoveryApi.java | 28 +++++++++++++
.../ItDisasterRecoveryColocationDisabledTest.java | 14 +++++++
...terRecoveryControllerRestartPartitionsTest.java | 1 -
...ontrollerRestartPartitionsWithCleanupTest.java} | 49 +++++++++++++++-------
.../recovery/ItDisasterRecoveryControllerTest.java | 1 -
.../rest/recovery/DisasterRecoveryController.java | 28 +++++++++++++
.../disaster/ManualGroupRestartRequest.java | 17 ++++++--
7 files changed, 116 insertions(+), 22 deletions(-)
diff --git
a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/DisasterRecoveryApi.java
b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/DisasterRecoveryApi.java
index 06188755225..88fa3c3ca85 100644
---
a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/DisasterRecoveryApi.java
+++
b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/DisasterRecoveryApi.java
@@ -106,6 +106,20 @@ public interface DisasterRecoveryApi {
@Produces(MediaType.PROBLEM_JSON)
CompletableFuture<Void> restartPartitions(@Body RestartPartitionsRequest
command);
+ @Post("partitions/restartWithCleanup")
+ @Operation(
+ operationId = "restartPartitionsWithCleanup",
+ description = "Restarts replica service and raft group of passed
partitions with cleaning up of the storage."
+ )
+ @ApiResponse(responseCode = "200", description = "Partitions restarted.")
+ @ApiResponse(responseCode = "500", description = "Internal error.",
+ content = @Content(mediaType = MediaType.PROBLEM_JSON, schema =
@Schema(implementation = Problem.class)))
+ @ApiResponse(responseCode = "400", description = "Bad request.",
+ content = @Content(mediaType = MediaType.PROBLEM_JSON, schema =
@Schema(implementation = Problem.class)))
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.PROBLEM_JSON)
+ CompletableFuture<Void> restartPartitionsWithCleanup(@Body
RestartPartitionsRequest command);
+
@Post("zone/partitions/reset")
@Operation(
operationId = "resetZonePartitions",
@@ -135,6 +149,20 @@ public interface DisasterRecoveryApi {
@Produces(MediaType.PROBLEM_JSON)
CompletableFuture<Void> restartZonePartitions(@Body
RestartZonePartitionsRequest command);
+ @Post("zone/partitions/restartWithCleanup")
+ @Operation(
+ operationId = "restartZonePartitionsWithCleanup",
+ description = "Restarts replica service and raft group of passed
zone partitions with cleaning up of the storage."
+ )
+ @ApiResponse(responseCode = "200", description = "Zone partitions
restarted.")
+ @ApiResponse(responseCode = "500", description = "Internal error.",
+ content = @Content(mediaType = MediaType.PROBLEM_JSON, schema =
@Schema(implementation = Problem.class)))
+ @ApiResponse(responseCode = "400", description = "Bad request.",
+ content = @Content(mediaType = MediaType.PROBLEM_JSON, schema =
@Schema(implementation = Problem.class)))
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.PROBLEM_JSON)
+ CompletableFuture<Void> restartZonePartitionsWithCleanup(@Body
RestartZonePartitionsRequest command);
+
@Get("zone/state/local")
@Operation(operationId = "getZoneLocalPartitionStates", description =
"Returns local zone partition states.")
@ApiResponse(responseCode = "200", description = "Zone partition states
returned.")
diff --git
a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryColocationDisabledTest.java
b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryColocationDisabledTest.java
index c830f16bdcc..263896d5e4b 100644
---
a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryColocationDisabledTest.java
+++
b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryColocationDisabledTest.java
@@ -22,6 +22,7 @@ import static
org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_AIPERS
import static
org.apache.ignite.internal.lang.IgniteSystemProperties.COLOCATION_FEATURE_FLAG;
import static
org.apache.ignite.internal.rest.constants.HttpCode.INTERNAL_SERVER_ERROR;
import static
org.apache.ignite.internal.rest.recovery.ItDisasterRecoveryControllerRestartPartitionsTest.RESTART_ZONE_PARTITIONS_ENDPOINT;
+import static
org.apache.ignite.internal.rest.recovery.ItDisasterRecoveryControllerRestartPartitionsWithCleanupTest.RESTART_ZONE_PARTITIONS_WITH_CLEANUP_ENDPOINT;
import static
org.apache.ignite.internal.rest.recovery.ItDisasterRecoveryControllerTest.RESET_ZONE_PARTITIONS_ENDPOINT;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
@@ -89,6 +90,19 @@ public class ItDisasterRecoveryColocationDisabledTest
extends ClusterPerClassInt
assertThat(e.getMessage(), containsString("This method is unsupported
when colocation is disabled."));
}
+ @Test
+ public void testRestartPartitionsWithCleanup() {
+ MutableHttpRequest<?> post =
HttpRequest.POST(RESTART_ZONE_PARTITIONS_WITH_CLEANUP_ENDPOINT,
+ new RestartZonePartitionsRequest(emptySet(), FIRST_ZONE,
emptySet()));
+
+ HttpClientResponseException e =
assertThrows(HttpClientResponseException.class,
+ () -> client.toBlocking().exchange(post));
+
+ assertThat(e.getResponse().code(), is(INTERNAL_SERVER_ERROR.code()));
+
+ assertThat(e.getMessage(), containsString("This method is unsupported
when colocation is disabled."));
+ }
+
@Test
public void testGetLocalPartitions() {
HttpClientResponseException e =
assertThrows(HttpClientResponseException.class,
diff --git
a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerRestartPartitionsTest.java
b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerRestartPartitionsTest.java
index 40845a46c75..e9666a0ab03 100644
---
a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerRestartPartitionsTest.java
+++
b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerRestartPartitionsTest.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.rest.recovery;
import static java.util.stream.Collectors.toSet;
import static
org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_AIPERSIST_PROFILE_NAME;
-import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PARTITION_COUNT;
import static
org.apache.ignite.internal.lang.IgniteSystemProperties.colocationEnabled;
import static org.apache.ignite.internal.rest.constants.HttpCode.BAD_REQUEST;
import static org.apache.ignite.internal.rest.constants.HttpCode.OK;
diff --git
a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerRestartPartitionsTest.java
b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerRestartPartitionsWithCleanupTest.java
similarity index 79%
copy from
modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerRestartPartitionsTest.java
copy to
modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerRestartPartitionsWithCleanupTest.java
index 40845a46c75..4ed47c01c69 100644
---
a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerRestartPartitionsTest.java
+++
b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerRestartPartitionsWithCleanupTest.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.rest.recovery;
import static java.util.stream.Collectors.toSet;
import static
org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_AIPERSIST_PROFILE_NAME;
-import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PARTITION_COUNT;
import static
org.apache.ignite.internal.lang.IgniteSystemProperties.colocationEnabled;
import static org.apache.ignite.internal.rest.constants.HttpCode.BAD_REQUEST;
import static org.apache.ignite.internal.rest.constants.HttpCode.OK;
@@ -47,12 +46,13 @@ import
org.apache.ignite.internal.ClusterPerClassIntegrationTest;
import org.apache.ignite.internal.rest.api.recovery.RestartPartitionsRequest;
import
org.apache.ignite.internal.rest.api.recovery.RestartZonePartitionsRequest;
import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledIf;
-/** Test for disaster recovery restart partitions command. */
+/** Test for disaster recovery restart partitions with cleanup command. */
@MicronautTest
-public class ItDisasterRecoveryControllerRestartPartitionsTest extends
ClusterPerClassIntegrationTest {
+public class ItDisasterRecoveryControllerRestartPartitionsWithCleanupTest
extends ClusterPerClassIntegrationTest {
private static final String NODE_URL = "http://localhost:" +
ClusterConfiguration.DEFAULT_BASE_HTTP_PORT;
private static final String FIRST_ZONE = "first_ZONE";
@@ -61,9 +61,9 @@ public class
ItDisasterRecoveryControllerRestartPartitionsTest extends ClusterPe
private static final String QUALIFIED_TABLE_NAME = canonicalName("PUBLIC",
TABLE_NAME);
- public static final String RESTART_PARTITIONS_ENDPOINT =
"/partitions/restart";
+ public static final String RESTART_PARTITIONS_WITH_CLEANUP_ENDPOINT =
"/partitions/restartWithCleanup";
- public static final String RESTART_ZONE_PARTITIONS_ENDPOINT =
"zone/partitions/restart";
+ public static final String RESTART_ZONE_PARTITIONS_WITH_CLEANUP_ENDPOINT =
"zone/partitions/restartWithCleanup";
@Inject
@Client(NODE_URL + "/management/v1/recovery/")
@@ -71,13 +71,15 @@ public class
ItDisasterRecoveryControllerRestartPartitionsTest extends ClusterPe
@BeforeAll
public void setUp() {
- sql(String.format("CREATE ZONE \"%s\" storage profiles ['%s']",
FIRST_ZONE, DEFAULT_AIPERSIST_PROFILE_NAME));
+ sql(String.format("CREATE ZONE \"%s\" (REPLICAS %s) storage profiles
['%s']", FIRST_ZONE, 3, DEFAULT_AIPERSIST_PROFILE_NAME));
sql(String.format("CREATE TABLE PUBLIC.\"%s\" (id INT PRIMARY KEY, val
INT) ZONE \"%s\"", TABLE_NAME,
FIRST_ZONE));
+
+ sql(String.format("INSERT INTO PUBLIC.\"%s\" VALUES (1, 1)",
TABLE_NAME));
}
@Test
- public void testRestartPartitionZoneNotFound() {
+ public void testRestartPartitionWithCleanupZoneNotFound() {
String unknownZone = "unknown_zone";
MutableHttpRequest<?> post = restartPartitionsRequest(Set.of(),
unknownZone, QUALIFIED_TABLE_NAME, Set.of());
@@ -93,7 +95,7 @@ public class
ItDisasterRecoveryControllerRestartPartitionsTest extends ClusterPe
@Test
// TODO: remove this test when colocation is enabled
https://issues.apache.org/jira/browse/IGNITE-22522
@DisabledIf("org.apache.ignite.internal.lang.IgniteSystemProperties#colocationEnabled")
- public void testRestartPartitionTableNotFound() {
+ public void testRestartPartitionWithCleanupTableNotFound() {
String tableName = "PUBLIC.unknown_table";
MutableHttpRequest<?> post = restartPartitionsRequest(Set.of(),
FIRST_ZONE, tableName, Set.of());
@@ -106,7 +108,7 @@ public class
ItDisasterRecoveryControllerRestartPartitionsTest extends ClusterPe
}
@Test
- void testRestartPartitionsIllegalPartitionNegative() {
+ void testRestartPartitionsWithCleanupIllegalPartitionNegative() {
MutableHttpRequest<?> post = restartPartitionsRequest(Set.of(),
FIRST_ZONE, QUALIFIED_TABLE_NAME, Set.of(0, 5, -1, -10));
HttpClientResponseException e =
assertThrows(HttpClientResponseException.class,
@@ -118,7 +120,7 @@ public class
ItDisasterRecoveryControllerRestartPartitionsTest extends ClusterPe
}
@Test
- void testRestartPartitionsPartitionsOutOfRange() {
+ void testRestartPartitionsWithCleanupPartitionsOutOfRange() {
MutableHttpRequest<?> post = restartPartitionsRequest(Set.of(),
FIRST_ZONE, QUALIFIED_TABLE_NAME, Set.of(DEFAULT_PARTITION_COUNT));
HttpClientResponseException e =
assertThrows(HttpClientResponseException.class,
@@ -136,7 +138,7 @@ public class
ItDisasterRecoveryControllerRestartPartitionsTest extends ClusterPe
}
@Test
- void testRestartPartitionsNodesAreCaseSensitive() {
+ void testRestartPartitionsWithCleanupNodesAreCaseSensitive() {
Set<String> uppercaseNodeNames = nodeNames(initialNodes() - 1).stream()
.map(String::toUpperCase)
.collect(toSet());
@@ -151,7 +153,8 @@ public class
ItDisasterRecoveryControllerRestartPartitionsTest extends ClusterPe
}
@Test
- public void testRestartAllPartitions() {
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-26337")
+ public void testRestartAllPartitionsWithCleanup() {
MutableHttpRequest<?> post = restartPartitionsRequest(Set.of(),
FIRST_ZONE, QUALIFIED_TABLE_NAME, Set.of());
HttpResponse<Void> response = client.toBlocking().exchange(post);
@@ -160,7 +163,7 @@ public class
ItDisasterRecoveryControllerRestartPartitionsTest extends ClusterPe
}
@Test
- public void testRestartSpecifiedPartitions() {
+ public void testRestartSpecifiedPartitionsWithCleanup() {
MutableHttpRequest<?> post = restartPartitionsRequest(Set.of(),
FIRST_ZONE, QUALIFIED_TABLE_NAME, Set.of(0, 1));
HttpResponse<Void> response = client.toBlocking().exchange(post);
@@ -169,7 +172,8 @@ public class
ItDisasterRecoveryControllerRestartPartitionsTest extends ClusterPe
}
@Test
- public void testRestartPartitionsByNodes() {
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-26337")
+ public void testRestartPartitionsWithCleanupByNodes() {
Set<String> nodeNames = nodeNames(initialNodes() - 1);
MutableHttpRequest<?> post = restartPartitionsRequest(nodeNames,
FIRST_ZONE, QUALIFIED_TABLE_NAME, Set.of());
@@ -179,6 +183,19 @@ public class
ItDisasterRecoveryControllerRestartPartitionsTest extends ClusterPe
assertThat(response.getStatus().getCode(), is(OK.code()));
}
+ @Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-26337")
+ public void testRestartTablePartitionsWithCleanupByNodes() {
+ Set<String> nodeNames = nodeNames(initialNodes() - 1);
+
+ MutableHttpRequest<?> post =
HttpRequest.POST(RESTART_PARTITIONS_WITH_CLEANUP_ENDPOINT,
+ new RestartPartitionsRequest(nodeNames, FIRST_ZONE,
QUALIFIED_TABLE_NAME, Set.of()));
+
+ HttpResponse<Void> response = client.toBlocking().exchange(post);
+
+ assertThat(response.getStatus().getCode(), is(OK.code()));
+ }
+
private static Set<String> nodeNames(int count) {
return CLUSTER.runningNodes()
.map(Ignite::name)
@@ -193,10 +210,10 @@ public class
ItDisasterRecoveryControllerRestartPartitionsTest extends ClusterPe
Collection<Integer> partitionIds
) {
if (colocationEnabled()) {
- return HttpRequest.POST(RESTART_ZONE_PARTITIONS_ENDPOINT,
+ return
HttpRequest.POST(RESTART_ZONE_PARTITIONS_WITH_CLEANUP_ENDPOINT,
new RestartZonePartitionsRequest(nodeNames, zoneName,
partitionIds));
} else {
- return HttpRequest.POST(RESTART_PARTITIONS_ENDPOINT,
+ return HttpRequest.POST(RESTART_PARTITIONS_WITH_CLEANUP_ENDPOINT,
new RestartPartitionsRequest(nodeNames, zoneName,
tableName, partitionIds));
}
}
diff --git
a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerTest.java
b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerTest.java
index ac5c7933fed..7cad904539e 100644
---
a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerTest.java
+++
b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerTest.java
@@ -23,7 +23,6 @@ import static java.util.stream.IntStream.range;
import static
org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_AIPERSIST_PROFILE_NAME;
import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
import static
org.apache.ignite.internal.catalog.CatalogManagerImpl.DEFAULT_ZONE_NAME;
-import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PARTITION_COUNT;
import static
org.apache.ignite.internal.lang.IgniteSystemProperties.colocationEnabled;
import static org.apache.ignite.internal.rest.constants.HttpCode.BAD_REQUEST;
import static org.apache.ignite.internal.sql.SqlCommon.DEFAULT_SCHEMA_NAME;
diff --git
a/modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/DisasterRecoveryController.java
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/DisasterRecoveryController.java
index bae1f0741c1..3c4fae62611 100644
---
a/modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/DisasterRecoveryController.java
+++
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/DisasterRecoveryController.java
@@ -134,6 +134,27 @@ public class DisasterRecoveryController implements
DisasterRecoveryApi, Resource
);
}
+ @Override
+ public CompletableFuture<Void> restartPartitionsWithCleanup(@Body
RestartPartitionsRequest command) {
+ if (nodeProperties.colocationEnabled()) {
+ return restartZonePartitionsWithCleanup(new
RestartZonePartitionsRequest(
+ command.nodeNames(),
+ command.zoneName(),
+ command.partitionIds()
+ ));
+ }
+
+ QualifiedName tableName = QualifiedName.parse(command.tableName());
+
+ return disasterRecoveryManager.restartTablePartitionsWithCleanup(
+ command.nodeNames(),
+ command.zoneName(),
+ tableName.schemaName(),
+ tableName.objectName(),
+ command.partitionIds()
+ );
+ }
+
@Override
public CompletableFuture<Void>
resetZonePartitions(ResetZonePartitionsRequest command) {
checkColocationEnabled();
@@ -148,6 +169,13 @@ public class DisasterRecoveryController implements
DisasterRecoveryApi, Resource
return disasterRecoveryManager.restartPartitions(command.nodeNames(),
command.zoneName(), command.partitionIds());
}
+ @Override
+ public CompletableFuture<Void>
restartZonePartitionsWithCleanup(RestartZonePartitionsRequest command) {
+ checkColocationEnabled();
+
+ return
disasterRecoveryManager.restartPartitionsWithCleanup(command.nodeNames(),
command.zoneName(), command.partitionIds());
+ }
+
@Override
public CompletableFuture<LocalZonePartitionStatesResponse>
getZoneLocalPartitionStates(
Optional<Set<String>> zoneNames,
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java
index 7356d30733c..4d3277a15bc 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java
@@ -29,6 +29,7 @@ import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFu
import static
org.apache.ignite.lang.ErrorGroups.DisasterRecovery.RESTART_WITH_CLEAN_UP_ERR;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
@@ -39,6 +40,7 @@ import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.catalog.descriptors.ConsistencyMode;
import org.apache.ignite.internal.distributionzones.NodeWithAttributes;
+import org.apache.ignite.internal.distributionzones.rebalance.AssignmentUtil;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import
org.apache.ignite.internal.partition.replicator.network.disaster.LocalPartitionStateMessage;
import org.apache.ignite.internal.partitiondistribution.Assignment;
@@ -133,7 +135,7 @@ class ManualGroupRestartRequest implements
DisasterRecoveryRequest {
disasterRecoveryManager.raftManager.forEach((raftNodeId,
raftGroupService) -> {
ReplicationGroupId replicationGroupId = raftNodeId.groupId();
- if (shouldProcessPartition(replicationGroupId)) {
+ if (shouldProcessPartition(replicationGroupId, zoneDescriptor)) {
if (cleanUp) {
restartFutures.add(
createRestartWithCleanupFuture(disasterRecoveryManager, replicationGroupId,
revision, zoneDescriptor, catalog)
@@ -147,14 +149,21 @@ class ManualGroupRestartRequest implements
DisasterRecoveryRequest {
return restartFutures.isEmpty() ? nullCompletedFuture() :
allOf(restartFutures.toArray(CompletableFuture[]::new));
}
- private boolean shouldProcessPartition(ReplicationGroupId
replicationGroupId) {
+ private boolean shouldProcessPartition(ReplicationGroupId
replicationGroupId, CatalogZoneDescriptor zoneDescriptor) {
+ Set<Integer> partitionIdsToCheck = partitionIds.isEmpty()
+ ?
Arrays.stream(AssignmentUtil.partitionIds(zoneDescriptor.partitions())).boxed().collect(Collectors.toSet())
+ : partitionIds;
+
if (replicationGroupId instanceof TablePartitionId) {
TablePartitionId groupId = (TablePartitionId) replicationGroupId;
- return groupId.tableId() == tableId &&
partitionIds.contains(groupId.partitionId());
+
+ return groupId.tableId() == tableId &&
partitionIdsToCheck.contains(groupId.partitionId());
} else if (replicationGroupId instanceof ZonePartitionId) {
ZonePartitionId groupId = (ZonePartitionId) replicationGroupId;
- return groupId.zoneId() == zoneId &&
partitionIds.contains(groupId.partitionId());
+
+ return groupId.zoneId() == zoneId &&
partitionIdsToCheck.contains(groupId.partitionId());
}
+
return false;
}