This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 75c4ecb973a IGNITE-25986 Add public REST API for partition restart 
with clean up functionality (#6493)
75c4ecb973a is described below

commit 75c4ecb973a94aa24e01919dab30e91b26f8e9c9
Author: Mirza Aliev <[email protected]>
AuthorDate: Fri Aug 29 11:41:59 2025 +0400

    IGNITE-25986 Add public REST API for partition restart with clean up 
functionality (#6493)
---
 .../rest/api/recovery/DisasterRecoveryApi.java     | 28 +++++++++++++
 .../ItDisasterRecoveryColocationDisabledTest.java  | 14 +++++++
 ...terRecoveryControllerRestartPartitionsTest.java |  1 -
 ...ontrollerRestartPartitionsWithCleanupTest.java} | 49 +++++++++++++++-------
 .../recovery/ItDisasterRecoveryControllerTest.java |  1 -
 .../rest/recovery/DisasterRecoveryController.java  | 28 +++++++++++++
 .../disaster/ManualGroupRestartRequest.java        | 17 ++++++--
 7 files changed, 116 insertions(+), 22 deletions(-)

diff --git 
a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/DisasterRecoveryApi.java
 
b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/DisasterRecoveryApi.java
index 06188755225..88fa3c3ca85 100644
--- 
a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/DisasterRecoveryApi.java
+++ 
b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/DisasterRecoveryApi.java
@@ -106,6 +106,20 @@ public interface DisasterRecoveryApi {
     @Produces(MediaType.PROBLEM_JSON)
     CompletableFuture<Void> restartPartitions(@Body RestartPartitionsRequest 
command);
 
+    @Post("partitions/restartWithCleanup")
+    @Operation(
+            operationId = "restartPartitionsWithCleanup",
+            description = "Restarts replica service and raft group of passed 
partitions with cleaning up of the storage."
+    )
+    @ApiResponse(responseCode = "200", description = "Partitions restarted.")
+    @ApiResponse(responseCode = "500", description = "Internal error.",
+            content = @Content(mediaType = MediaType.PROBLEM_JSON, schema = 
@Schema(implementation = Problem.class)))
+    @ApiResponse(responseCode = "400", description = "Bad request.",
+            content = @Content(mediaType = MediaType.PROBLEM_JSON, schema = 
@Schema(implementation = Problem.class)))
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.PROBLEM_JSON)
+    CompletableFuture<Void> restartPartitionsWithCleanup(@Body 
RestartPartitionsRequest command);
+
     @Post("zone/partitions/reset")
     @Operation(
             operationId = "resetZonePartitions",
@@ -135,6 +149,20 @@ public interface DisasterRecoveryApi {
     @Produces(MediaType.PROBLEM_JSON)
     CompletableFuture<Void> restartZonePartitions(@Body 
RestartZonePartitionsRequest command);
 
+    @Post("zone/partitions/restartWithCleanup")
+    @Operation(
+            operationId = "restartZonePartitionsWithCleanup",
+            description = "Restarts replica service and raft group of passed 
zone partitions with cleaning up of the storage."
+    )
+    @ApiResponse(responseCode = "200", description = "Zone partitions 
restarted.")
+    @ApiResponse(responseCode = "500", description = "Internal error.",
+            content = @Content(mediaType = MediaType.PROBLEM_JSON, schema = 
@Schema(implementation = Problem.class)))
+    @ApiResponse(responseCode = "400", description = "Bad request.",
+            content = @Content(mediaType = MediaType.PROBLEM_JSON, schema = 
@Schema(implementation = Problem.class)))
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.PROBLEM_JSON)
+    CompletableFuture<Void> restartZonePartitionsWithCleanup(@Body 
RestartZonePartitionsRequest command);
+
     @Get("zone/state/local")
     @Operation(operationId = "getZoneLocalPartitionStates", description = 
"Returns local zone partition states.")
     @ApiResponse(responseCode = "200", description = "Zone partition states 
returned.")
diff --git 
a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryColocationDisabledTest.java
 
b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryColocationDisabledTest.java
index c830f16bdcc..263896d5e4b 100644
--- 
a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryColocationDisabledTest.java
+++ 
b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryColocationDisabledTest.java
@@ -22,6 +22,7 @@ import static 
org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_AIPERS
 import static 
org.apache.ignite.internal.lang.IgniteSystemProperties.COLOCATION_FEATURE_FLAG;
 import static 
org.apache.ignite.internal.rest.constants.HttpCode.INTERNAL_SERVER_ERROR;
 import static 
org.apache.ignite.internal.rest.recovery.ItDisasterRecoveryControllerRestartPartitionsTest.RESTART_ZONE_PARTITIONS_ENDPOINT;
+import static 
org.apache.ignite.internal.rest.recovery.ItDisasterRecoveryControllerRestartPartitionsWithCleanupTest.RESTART_ZONE_PARTITIONS_WITH_CLEANUP_ENDPOINT;
 import static 
org.apache.ignite.internal.rest.recovery.ItDisasterRecoveryControllerTest.RESET_ZONE_PARTITIONS_ENDPOINT;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsString;
@@ -89,6 +90,19 @@ public class ItDisasterRecoveryColocationDisabledTest 
extends ClusterPerClassInt
         assertThat(e.getMessage(), containsString("This method is unsupported 
when colocation is disabled."));
     }
 
+    @Test
+    public void testRestartPartitionsWithCleanup() {
+        MutableHttpRequest<?> post = 
HttpRequest.POST(RESTART_ZONE_PARTITIONS_WITH_CLEANUP_ENDPOINT,
+                new RestartZonePartitionsRequest(emptySet(), FIRST_ZONE, 
emptySet()));
+
+        HttpClientResponseException e = 
assertThrows(HttpClientResponseException.class,
+                () -> client.toBlocking().exchange(post));
+
+        assertThat(e.getResponse().code(), is(INTERNAL_SERVER_ERROR.code()));
+
+        assertThat(e.getMessage(), containsString("This method is unsupported 
when colocation is disabled."));
+    }
+
     @Test
     public void testGetLocalPartitions() {
         HttpClientResponseException e = 
assertThrows(HttpClientResponseException.class,
diff --git 
a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerRestartPartitionsTest.java
 
b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerRestartPartitionsTest.java
index 40845a46c75..e9666a0ab03 100644
--- 
a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerRestartPartitionsTest.java
+++ 
b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerRestartPartitionsTest.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.rest.recovery;
 
 import static java.util.stream.Collectors.toSet;
 import static 
org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_AIPERSIST_PROFILE_NAME;
-import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PARTITION_COUNT;
 import static 
org.apache.ignite.internal.lang.IgniteSystemProperties.colocationEnabled;
 import static org.apache.ignite.internal.rest.constants.HttpCode.BAD_REQUEST;
 import static org.apache.ignite.internal.rest.constants.HttpCode.OK;
diff --git 
a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerRestartPartitionsTest.java
 
b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerRestartPartitionsWithCleanupTest.java
similarity index 79%
copy from 
modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerRestartPartitionsTest.java
copy to 
modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerRestartPartitionsWithCleanupTest.java
index 40845a46c75..4ed47c01c69 100644
--- 
a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerRestartPartitionsTest.java
+++ 
b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerRestartPartitionsWithCleanupTest.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.rest.recovery;
 
 import static java.util.stream.Collectors.toSet;
 import static 
org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_AIPERSIST_PROFILE_NAME;
-import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PARTITION_COUNT;
 import static 
org.apache.ignite.internal.lang.IgniteSystemProperties.colocationEnabled;
 import static org.apache.ignite.internal.rest.constants.HttpCode.BAD_REQUEST;
 import static org.apache.ignite.internal.rest.constants.HttpCode.OK;
@@ -47,12 +46,13 @@ import 
org.apache.ignite.internal.ClusterPerClassIntegrationTest;
 import org.apache.ignite.internal.rest.api.recovery.RestartPartitionsRequest;
 import 
org.apache.ignite.internal.rest.api.recovery.RestartZonePartitionsRequest;
 import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.condition.DisabledIf;
 
-/** Test for disaster recovery restart partitions command. */
+/** Test for disaster recovery restart partitions with cleanup command. */
 @MicronautTest
-public class ItDisasterRecoveryControllerRestartPartitionsTest extends 
ClusterPerClassIntegrationTest {
+public class ItDisasterRecoveryControllerRestartPartitionsWithCleanupTest 
extends ClusterPerClassIntegrationTest {
     private static final String NODE_URL = "http://localhost:"; + 
ClusterConfiguration.DEFAULT_BASE_HTTP_PORT;
 
     private static final String FIRST_ZONE = "first_ZONE";
@@ -61,9 +61,9 @@ public class 
ItDisasterRecoveryControllerRestartPartitionsTest extends ClusterPe
 
     private static final String QUALIFIED_TABLE_NAME = canonicalName("PUBLIC", 
TABLE_NAME);
 
-    public static final String RESTART_PARTITIONS_ENDPOINT = 
"/partitions/restart";
+    public static final String RESTART_PARTITIONS_WITH_CLEANUP_ENDPOINT = 
"/partitions/restartWithCleanup";
 
-    public static final String RESTART_ZONE_PARTITIONS_ENDPOINT = 
"zone/partitions/restart";
+    public static final String RESTART_ZONE_PARTITIONS_WITH_CLEANUP_ENDPOINT = 
"zone/partitions/restartWithCleanup";
 
     @Inject
     @Client(NODE_URL + "/management/v1/recovery/")
@@ -71,13 +71,15 @@ public class 
ItDisasterRecoveryControllerRestartPartitionsTest extends ClusterPe
 
     @BeforeAll
     public void setUp() {
-        sql(String.format("CREATE ZONE \"%s\" storage profiles ['%s']", 
FIRST_ZONE, DEFAULT_AIPERSIST_PROFILE_NAME));
+        sql(String.format("CREATE ZONE \"%s\" (REPLICAS %s) storage profiles 
['%s']", FIRST_ZONE, 3, DEFAULT_AIPERSIST_PROFILE_NAME));
         sql(String.format("CREATE TABLE PUBLIC.\"%s\" (id INT PRIMARY KEY, val 
INT) ZONE \"%s\"", TABLE_NAME,
                 FIRST_ZONE));
+
+        sql(String.format("INSERT INTO PUBLIC.\"%s\" VALUES (1, 1)", 
TABLE_NAME));
     }
 
     @Test
-    public void testRestartPartitionZoneNotFound() {
+    public void testRestartPartitionWithCleanupZoneNotFound() {
         String unknownZone = "unknown_zone";
 
         MutableHttpRequest<?> post = restartPartitionsRequest(Set.of(), 
unknownZone, QUALIFIED_TABLE_NAME, Set.of());
@@ -93,7 +95,7 @@ public class 
ItDisasterRecoveryControllerRestartPartitionsTest extends ClusterPe
     @Test
     // TODO: remove this test when colocation is enabled 
https://issues.apache.org/jira/browse/IGNITE-22522
     
@DisabledIf("org.apache.ignite.internal.lang.IgniteSystemProperties#colocationEnabled")
-    public void testRestartPartitionTableNotFound() {
+    public void testRestartPartitionWithCleanupTableNotFound() {
         String tableName = "PUBLIC.unknown_table";
 
         MutableHttpRequest<?> post = restartPartitionsRequest(Set.of(), 
FIRST_ZONE, tableName, Set.of());
@@ -106,7 +108,7 @@ public class 
ItDisasterRecoveryControllerRestartPartitionsTest extends ClusterPe
     }
 
     @Test
-    void testRestartPartitionsIllegalPartitionNegative() {
+    void testRestartPartitionsWithCleanupIllegalPartitionNegative() {
         MutableHttpRequest<?> post = restartPartitionsRequest(Set.of(), 
FIRST_ZONE, QUALIFIED_TABLE_NAME, Set.of(0, 5, -1, -10));
 
         HttpClientResponseException e = 
assertThrows(HttpClientResponseException.class,
@@ -118,7 +120,7 @@ public class 
ItDisasterRecoveryControllerRestartPartitionsTest extends ClusterPe
     }
 
     @Test
-    void testRestartPartitionsPartitionsOutOfRange() {
+    void testRestartPartitionsWithCleanupPartitionsOutOfRange() {
         MutableHttpRequest<?> post = restartPartitionsRequest(Set.of(), 
FIRST_ZONE, QUALIFIED_TABLE_NAME, Set.of(DEFAULT_PARTITION_COUNT));
 
         HttpClientResponseException e = 
assertThrows(HttpClientResponseException.class,
@@ -136,7 +138,7 @@ public class 
ItDisasterRecoveryControllerRestartPartitionsTest extends ClusterPe
     }
 
     @Test
-    void testRestartPartitionsNodesAreCaseSensitive() {
+    void testRestartPartitionsWithCleanupNodesAreCaseSensitive() {
         Set<String> uppercaseNodeNames = nodeNames(initialNodes() - 1).stream()
                 .map(String::toUpperCase)
                 .collect(toSet());
@@ -151,7 +153,8 @@ public class 
ItDisasterRecoveryControllerRestartPartitionsTest extends ClusterPe
     }
 
     @Test
-    public void testRestartAllPartitions() {
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-26337";)
+    public void testRestartAllPartitionsWithCleanup() {
         MutableHttpRequest<?> post = restartPartitionsRequest(Set.of(), 
FIRST_ZONE, QUALIFIED_TABLE_NAME, Set.of());
 
         HttpResponse<Void> response = client.toBlocking().exchange(post);
@@ -160,7 +163,7 @@ public class 
ItDisasterRecoveryControllerRestartPartitionsTest extends ClusterPe
     }
 
     @Test
-    public void testRestartSpecifiedPartitions() {
+    public void testRestartSpecifiedPartitionsWithCleanup() {
         MutableHttpRequest<?> post = restartPartitionsRequest(Set.of(), 
FIRST_ZONE, QUALIFIED_TABLE_NAME, Set.of(0, 1));
 
         HttpResponse<Void> response = client.toBlocking().exchange(post);
@@ -169,7 +172,8 @@ public class 
ItDisasterRecoveryControllerRestartPartitionsTest extends ClusterPe
     }
 
     @Test
-    public void testRestartPartitionsByNodes() {
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-26337";)
+    public void testRestartPartitionsWithCleanupByNodes() {
         Set<String> nodeNames = nodeNames(initialNodes() - 1);
 
         MutableHttpRequest<?> post = restartPartitionsRequest(nodeNames, 
FIRST_ZONE, QUALIFIED_TABLE_NAME, Set.of());
@@ -179,6 +183,19 @@ public class 
ItDisasterRecoveryControllerRestartPartitionsTest extends ClusterPe
         assertThat(response.getStatus().getCode(), is(OK.code()));
     }
 
+    @Test
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-26337";)
+    public void testRestartTablePartitionsWithCleanupByNodes() {
+        Set<String> nodeNames = nodeNames(initialNodes() - 1);
+
+        MutableHttpRequest<?> post = 
HttpRequest.POST(RESTART_PARTITIONS_WITH_CLEANUP_ENDPOINT,
+                new RestartPartitionsRequest(nodeNames, FIRST_ZONE, 
QUALIFIED_TABLE_NAME, Set.of()));
+
+        HttpResponse<Void> response = client.toBlocking().exchange(post);
+
+        assertThat(response.getStatus().getCode(), is(OK.code()));
+    }
+
     private static Set<String> nodeNames(int count) {
         return CLUSTER.runningNodes()
                 .map(Ignite::name)
@@ -193,10 +210,10 @@ public class 
ItDisasterRecoveryControllerRestartPartitionsTest extends ClusterPe
             Collection<Integer> partitionIds
     ) {
         if (colocationEnabled()) {
-            return HttpRequest.POST(RESTART_ZONE_PARTITIONS_ENDPOINT,
+            return 
HttpRequest.POST(RESTART_ZONE_PARTITIONS_WITH_CLEANUP_ENDPOINT,
                     new RestartZonePartitionsRequest(nodeNames, zoneName, 
partitionIds));
         } else {
-            return HttpRequest.POST(RESTART_PARTITIONS_ENDPOINT,
+            return HttpRequest.POST(RESTART_PARTITIONS_WITH_CLEANUP_ENDPOINT,
                     new RestartPartitionsRequest(nodeNames, zoneName, 
tableName, partitionIds));
         }
     }
diff --git 
a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerTest.java
 
b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerTest.java
index ac5c7933fed..7cad904539e 100644
--- 
a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerTest.java
+++ 
b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerTest.java
@@ -23,7 +23,6 @@ import static java.util.stream.IntStream.range;
 import static 
org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_AIPERSIST_PROFILE_NAME;
 import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
 import static 
org.apache.ignite.internal.catalog.CatalogManagerImpl.DEFAULT_ZONE_NAME;
-import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PARTITION_COUNT;
 import static 
org.apache.ignite.internal.lang.IgniteSystemProperties.colocationEnabled;
 import static org.apache.ignite.internal.rest.constants.HttpCode.BAD_REQUEST;
 import static org.apache.ignite.internal.sql.SqlCommon.DEFAULT_SCHEMA_NAME;
diff --git 
a/modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/DisasterRecoveryController.java
 
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/DisasterRecoveryController.java
index bae1f0741c1..3c4fae62611 100644
--- 
a/modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/DisasterRecoveryController.java
+++ 
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/DisasterRecoveryController.java
@@ -134,6 +134,27 @@ public class DisasterRecoveryController implements 
DisasterRecoveryApi, Resource
         );
     }
 
+    @Override
+    public CompletableFuture<Void> restartPartitionsWithCleanup(@Body 
RestartPartitionsRequest command) {
+        if (nodeProperties.colocationEnabled()) {
+            return restartZonePartitionsWithCleanup(new 
RestartZonePartitionsRequest(
+                    command.nodeNames(),
+                    command.zoneName(),
+                    command.partitionIds()
+            ));
+        }
+
+        QualifiedName tableName = QualifiedName.parse(command.tableName());
+
+        return disasterRecoveryManager.restartTablePartitionsWithCleanup(
+                command.nodeNames(),
+                command.zoneName(),
+                tableName.schemaName(),
+                tableName.objectName(),
+                command.partitionIds()
+        );
+    }
+
     @Override
     public CompletableFuture<Void> 
resetZonePartitions(ResetZonePartitionsRequest command) {
         checkColocationEnabled();
@@ -148,6 +169,13 @@ public class DisasterRecoveryController implements 
DisasterRecoveryApi, Resource
         return disasterRecoveryManager.restartPartitions(command.nodeNames(), 
command.zoneName(), command.partitionIds());
     }
 
+    @Override
+    public CompletableFuture<Void> 
restartZonePartitionsWithCleanup(RestartZonePartitionsRequest command) {
+        checkColocationEnabled();
+
+        return 
disasterRecoveryManager.restartPartitionsWithCleanup(command.nodeNames(), 
command.zoneName(), command.partitionIds());
+    }
+
     @Override
     public CompletableFuture<LocalZonePartitionStatesResponse> 
getZoneLocalPartitionStates(
             Optional<Set<String>> zoneNames,
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java
index 7356d30733c..4d3277a15bc 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java
@@ -29,6 +29,7 @@ import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFu
 import static 
org.apache.ignite.lang.ErrorGroups.DisasterRecovery.RESTART_WITH_CLEAN_UP_ERR;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
@@ -39,6 +40,7 @@ import org.apache.ignite.internal.catalog.Catalog;
 import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.ConsistencyMode;
 import org.apache.ignite.internal.distributionzones.NodeWithAttributes;
+import org.apache.ignite.internal.distributionzones.rebalance.AssignmentUtil;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import 
org.apache.ignite.internal.partition.replicator.network.disaster.LocalPartitionStateMessage;
 import org.apache.ignite.internal.partitiondistribution.Assignment;
@@ -133,7 +135,7 @@ class ManualGroupRestartRequest implements 
DisasterRecoveryRequest {
         disasterRecoveryManager.raftManager.forEach((raftNodeId, 
raftGroupService) -> {
             ReplicationGroupId replicationGroupId = raftNodeId.groupId();
 
-            if (shouldProcessPartition(replicationGroupId)) {
+            if (shouldProcessPartition(replicationGroupId, zoneDescriptor)) {
                 if (cleanUp) {
                     restartFutures.add(
                             
createRestartWithCleanupFuture(disasterRecoveryManager, replicationGroupId, 
revision, zoneDescriptor, catalog)
@@ -147,14 +149,21 @@ class ManualGroupRestartRequest implements 
DisasterRecoveryRequest {
         return restartFutures.isEmpty() ? nullCompletedFuture() : 
allOf(restartFutures.toArray(CompletableFuture[]::new));
     }
 
-    private boolean shouldProcessPartition(ReplicationGroupId 
replicationGroupId) {
+    private boolean shouldProcessPartition(ReplicationGroupId 
replicationGroupId, CatalogZoneDescriptor zoneDescriptor) {
+        Set<Integer> partitionIdsToCheck = partitionIds.isEmpty()
+                ? 
Arrays.stream(AssignmentUtil.partitionIds(zoneDescriptor.partitions())).boxed().collect(Collectors.toSet())
+                : partitionIds;
+
         if (replicationGroupId instanceof TablePartitionId) {
             TablePartitionId groupId = (TablePartitionId) replicationGroupId;
-            return groupId.tableId() == tableId && 
partitionIds.contains(groupId.partitionId());
+
+            return groupId.tableId() == tableId && 
partitionIdsToCheck.contains(groupId.partitionId());
         } else if (replicationGroupId instanceof ZonePartitionId) {
             ZonePartitionId groupId = (ZonePartitionId) replicationGroupId;
-            return groupId.zoneId() == zoneId && 
partitionIds.contains(groupId.partitionId());
+
+            return groupId.zoneId() == zoneId && 
partitionIdsToCheck.contains(groupId.partitionId());
         }
+
         return false;
     }
 

Reply via email to