rpuch commented on code in PR #3614: URL: https://github.com/apache/ignite-3/pull/3614#discussion_r1568857289
########## modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/DisasterRecoveryApi.java: ########## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.rest.api.recovery; + +import io.micronaut.http.annotation.Controller; +import io.micronaut.http.annotation.Get; +import io.micronaut.http.annotation.PathVariable; +import io.micronaut.http.annotation.Produces; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.media.Content; +import io.swagger.v3.oas.annotations.media.Schema; +import io.swagger.v3.oas.annotations.responses.ApiResponse; +import io.swagger.v3.oas.annotations.tags.Tag; +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.internal.rest.api.Problem; +import org.apache.ignite.internal.rest.constants.MediaType; + +/** + * Disaster recovery controller. + */ +@Controller("/management/v1/recovery") +@Tag(name = "recovery") +public interface DisasterRecoveryApi { + @Get("state/local") + @Operation(operationId = "getLocalPartitionStates", description = "Returns local partition states.") + @ApiResponse(responseCode = "200", description = "Partition states returned.") + @ApiResponse(responseCode = "500", description = "Internal error.", + content = @Content(mediaType = MediaType.PROBLEM_JSON, schema = @Schema(implementation = Problem.class))) + @Produces(MediaType.APPLICATION_JSON) + CompletableFuture<LocalPartitionStatesResponse> getLocalPartitionStates(); + + @Get("state/local/{zoneName}") + @Operation(operationId = "getLocalPartitionStatesByZone", description = "Returns local partition states.") + @ApiResponse(responseCode = "200", description = "Partition states returned.") + @ApiResponse(responseCode = "500", description = "Internal error.", + content = @Content(mediaType = MediaType.PROBLEM_JSON, schema = @Schema(implementation = Problem.class))) + @ApiResponse(responseCode = "400", description = "Zone is not found.", Review Comment: Why isn't it 404? ########## modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/DisasterRecoveryApi.java: ########## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.rest.api.recovery; + +import io.micronaut.http.annotation.Controller; +import io.micronaut.http.annotation.Get; +import io.micronaut.http.annotation.PathVariable; +import io.micronaut.http.annotation.Produces; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.media.Content; +import io.swagger.v3.oas.annotations.media.Schema; +import io.swagger.v3.oas.annotations.responses.ApiResponse; +import io.swagger.v3.oas.annotations.tags.Tag; +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.internal.rest.api.Problem; +import org.apache.ignite.internal.rest.constants.MediaType; + +/** + * Disaster recovery controller. + */ +@Controller("/management/v1/recovery") +@Tag(name = "recovery") +public interface DisasterRecoveryApi { + @Get("state/local") + @Operation(operationId = "getLocalPartitionStates", description = "Returns local partition states.") + @ApiResponse(responseCode = "200", description = "Partition states returned.") + @ApiResponse(responseCode = "500", description = "Internal error.", Review Comment: Is it required to explicitly list code 500 for internal errors? `ComputeApi` does not do that, and it seems logical for Micronaut to just map anything not listed explicitly to 500. ########## modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerTest.java: ########## @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.rest.recovery; + +import static java.util.stream.Collectors.toList; +import static java.util.stream.IntStream.range; +import static org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_AIPERSIST_PROFILE_NAME; +import static org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PARTITION_COUNT; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import io.micronaut.http.HttpResponse; +import io.micronaut.http.HttpStatus; +import io.micronaut.http.client.HttpClient; +import io.micronaut.http.client.annotation.Client; +import io.micronaut.http.client.exceptions.HttpClientResponseException; +import io.micronaut.test.extensions.junit5.annotation.MicronautTest; +import jakarta.inject.Inject; +import java.util.List; +import org.apache.ignite.internal.Cluster; +import org.apache.ignite.internal.ClusterPerTestIntegrationTest; +import org.apache.ignite.internal.rest.api.recovery.GlobalPartitionStateResponse; +import org.apache.ignite.internal.rest.api.recovery.GlobalPartitionStatesResponse; +import org.apache.ignite.internal.rest.api.recovery.LocalPartitionStateResponse; +import org.apache.ignite.internal.rest.api.recovery.LocalPartitionStatesResponse; +import org.junit.jupiter.api.Test; + +/** + * Test for disaster recovery REST commands. + */ +@MicronautTest +public class ItDisasterRecoveryControllerTest extends ClusterPerTestIntegrationTest { + private static final String NODE_URL = "http://localhost:" + Cluster.BASE_HTTP_PORT; + + @Inject + @Client(NODE_URL + "/management/v1/recovery/") + HttpClient client; + + @Override + protected int initialNodes() { + return 1; + } + + @Test + void testLocalPartitionStates() { + executeSql("CREATE TABLE foo (id INT PRIMARY KEY, val INT)"); + var response = client.toBlocking().exchange("/state/local/", LocalPartitionStatesResponse.class); + + assertEquals(HttpStatus.OK, response.status()); + + LocalPartitionStatesResponse body = response.body(); + assertEquals(DEFAULT_PARTITION_COUNT, body.states().size()); + + List<Integer> partitionIds = body.states().stream().map(LocalPartitionStateResponse::partitionId).collect(toList()); + assertEquals(range(0, DEFAULT_PARTITION_COUNT).boxed().collect(toList()), partitionIds); + } + + @Test + void testLocalPartitionStatesByZoneMissingZone() { + HttpClientResponseException thrown = assertThrows( + HttpClientResponseException.class, + () -> client.toBlocking().exchange("/state/local/foo/", LocalPartitionStatesResponse.class) + ); + + assertEquals(HttpStatus.BAD_REQUEST, thrown.getResponse().status()); + } + + @Test + void testLocalPartitionStatesByZone() { + executeSql("CREATE TABLE def (id INT PRIMARY KEY, val INT)"); + + executeSql("CREATE ZONE foo WITH partitions=1, storage_profiles='" + DEFAULT_AIPERSIST_PROFILE_NAME + "'"); + executeSql("CREATE TABLE foo (id INT PRIMARY KEY, val INT) WITH PRIMARY_ZONE = 'FOO'"); + + var response = client.toBlocking().exchange("/state/local/Default/", LocalPartitionStatesResponse.class); + + assertEquals(HttpStatus.OK, response.status()); + assertEquals(DEFAULT_PARTITION_COUNT, response.body().states().size()); + + response = client.toBlocking().exchange("/state/local/FOO/", LocalPartitionStatesResponse.class); + + assertEquals(HttpStatus.OK, response.status()); + assertEquals(1, response.body().states().size()); + } + + @Test + void testLocalPartitionStatesByZoneJson() { + executeSql("CREATE ZONE foo WITH partitions=1, storage_profiles='" + DEFAULT_AIPERSIST_PROFILE_NAME + "'"); + executeSql("CREATE TABLE foo (id INT PRIMARY KEY, val INT) WITH PRIMARY_ZONE = 'FOO'"); + + HttpResponse<String> response = client.toBlocking().exchange("/state/local/FOO/", String.class); + + assertEquals( + "{'states':[{'partitionId':0,'tableName':'FOO','nodeName':'idrct_tlpsbzj_0','state':'HEALTHY'}]}".replace('\'', '"'), Review Comment: This seems brittle. I think Java does not guarantee any field/method order while traversing a class. Maybe Micronaut does (in accordance with the order in `openapi.yaml`), but anyway the order is not significant here. Probably the Micronaut testing framework should have ways to assert JSONs equality in terms of content, not in terms of their string representation. ########## modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerTest.java: ########## @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.rest.recovery; + +import static java.util.stream.Collectors.toList; +import static java.util.stream.IntStream.range; +import static org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_AIPERSIST_PROFILE_NAME; +import static org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PARTITION_COUNT; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import io.micronaut.http.HttpResponse; +import io.micronaut.http.HttpStatus; +import io.micronaut.http.client.HttpClient; +import io.micronaut.http.client.annotation.Client; +import io.micronaut.http.client.exceptions.HttpClientResponseException; +import io.micronaut.test.extensions.junit5.annotation.MicronautTest; +import jakarta.inject.Inject; +import java.util.List; +import org.apache.ignite.internal.Cluster; +import org.apache.ignite.internal.ClusterPerTestIntegrationTest; +import org.apache.ignite.internal.rest.api.recovery.GlobalPartitionStateResponse; +import org.apache.ignite.internal.rest.api.recovery.GlobalPartitionStatesResponse; +import org.apache.ignite.internal.rest.api.recovery.LocalPartitionStateResponse; +import org.apache.ignite.internal.rest.api.recovery.LocalPartitionStatesResponse; +import org.junit.jupiter.api.Test; + +/** + * Test for disaster recovery REST commands. + */ +@MicronautTest +public class ItDisasterRecoveryControllerTest extends ClusterPerTestIntegrationTest { + private static final String NODE_URL = "http://localhost:" + Cluster.BASE_HTTP_PORT; + + @Inject + @Client(NODE_URL + "/management/v1/recovery/") + HttpClient client; + + @Override + protected int initialNodes() { + return 1; + } + + @Test + void testLocalPartitionStates() { + executeSql("CREATE TABLE foo (id INT PRIMARY KEY, val INT)"); + var response = client.toBlocking().exchange("/state/local/", LocalPartitionStatesResponse.class); + + assertEquals(HttpStatus.OK, response.status()); + + LocalPartitionStatesResponse body = response.body(); + assertEquals(DEFAULT_PARTITION_COUNT, body.states().size()); + + List<Integer> partitionIds = body.states().stream().map(LocalPartitionStateResponse::partitionId).collect(toList()); + assertEquals(range(0, DEFAULT_PARTITION_COUNT).boxed().collect(toList()), partitionIds); + } + + @Test + void testLocalPartitionStatesByZoneMissingZone() { + HttpClientResponseException thrown = assertThrows( + HttpClientResponseException.class, + () -> client.toBlocking().exchange("/state/local/foo/", LocalPartitionStatesResponse.class) + ); + + assertEquals(HttpStatus.BAD_REQUEST, thrown.getResponse().status()); + } + + @Test + void testLocalPartitionStatesByZone() { + executeSql("CREATE TABLE def (id INT PRIMARY KEY, val INT)"); + + executeSql("CREATE ZONE foo WITH partitions=1, storage_profiles='" + DEFAULT_AIPERSIST_PROFILE_NAME + "'"); + executeSql("CREATE TABLE foo (id INT PRIMARY KEY, val INT) WITH PRIMARY_ZONE = 'FOO'"); + + var response = client.toBlocking().exchange("/state/local/Default/", LocalPartitionStatesResponse.class); + + assertEquals(HttpStatus.OK, response.status()); + assertEquals(DEFAULT_PARTITION_COUNT, response.body().states().size()); + + response = client.toBlocking().exchange("/state/local/FOO/", LocalPartitionStatesResponse.class); + + assertEquals(HttpStatus.OK, response.status()); + assertEquals(1, response.body().states().size()); + } + + @Test + void testLocalPartitionStatesByZoneJson() { + executeSql("CREATE ZONE foo WITH partitions=1, storage_profiles='" + DEFAULT_AIPERSIST_PROFILE_NAME + "'"); + executeSql("CREATE TABLE foo (id INT PRIMARY KEY, val INT) WITH PRIMARY_ZONE = 'FOO'"); + + HttpResponse<String> response = client.toBlocking().exchange("/state/local/FOO/", String.class); + + assertEquals( + "{'states':[{'partitionId':0,'tableName':'FOO','nodeName':'idrct_tlpsbzj_0','state':'HEALTHY'}]}".replace('\'', '"'), + response.body() + ); + } + + @Test + void testGlobalPartitionStates() { + executeSql("CREATE TABLE foo (id INT PRIMARY KEY, val INT)"); + var response = client.toBlocking().exchange("/state/global/", GlobalPartitionStatesResponse.class); + + assertEquals(HttpStatus.OK, response.status()); + + GlobalPartitionStatesResponse body = response.body(); + assertEquals(DEFAULT_PARTITION_COUNT, body.states().size()); + + List<Integer> partitionIds = body.states().stream().map(GlobalPartitionStateResponse::partitionId).collect(toList()); + assertEquals(range(0, DEFAULT_PARTITION_COUNT).boxed().collect(toList()), partitionIds); + } + + @Test + void testGlobalPartitionStatesByZoneMissingZone() { + HttpClientResponseException thrown = assertThrows( + HttpClientResponseException.class, + () -> client.toBlocking().exchange("/state/global/foo/", GlobalPartitionStatesResponse.class) Review Comment: ```suggestion () -> client.toBlocking().exchange("/state/global/no-such-zone/", GlobalPartitionStatesResponse.class) ``` ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GlobalPartitionStateEnum.java: ########## @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.distributed.disaster; + +/** + * Enum for states of partitions. + */ +public enum GlobalPartitionStateEnum { + /** All replicas are healthy. */ + AVAILABLE, + + /** There are healthy replicas, but they don't form a majority. */ + READ_ONLY, + + /** There are healthy replicas, and they form a majority. */ Review Comment: Would it make sense to define them in the order of 'how damaged the partition is', so AVAILABLE->DEGRADED->READ-ONLY->UNAVAILABLE? ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java: ########## @@ -175,26 +191,55 @@ public CompletableFuture<Void> resetPartitions(int zoneId, int tableId) { * @param zoneName Zone name. Review Comment: Please clarify what it means to pass `null` here ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java: ########## @@ -357,36 +402,94 @@ private static LocalPartitionStateEnum convertState(State nodeState) { default: // Unrecognized state, better safe than sorry. - return LocalPartitionStateEnum.BROKEN; + return BROKEN; } } /** * Replaces some healthy states with a {@link LocalPartitionStateEnum#CATCHING_UP},it can only be done once the state of all peers is * known. */ - private Map<TablePartitionId, Map<String, LocalPartitionState>> normalize( - Map<TablePartitionId, Map<String, LocalPartitionState>> result + private static Map<TablePartitionId, Map<String, LocalPartitionState>> normalizeLocal( + Map<TablePartitionId, Map<String, LocalPartitionStateMessage>> result, + Catalog catalog ) { return result.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> { - Map<String, LocalPartitionState> map = entry.getValue(); + TablePartitionId tablePartitionId = entry.getKey(); + Map<String, LocalPartitionStateMessage> map = entry.getValue(); // noinspection OptionalGetWithoutIsPresent - long maxLogIndex = map.values().stream().mapToLong(LocalPartitionState::logIndex).max().getAsLong(); + long maxLogIndex = map.values().stream().mapToLong(LocalPartitionStateMessage::logIndex).max().getAsLong(); return map.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry2 -> { - LocalPartitionState state = entry2.getValue(); + LocalPartitionStateMessage stateMsg = entry2.getValue(); - if (state.state() != LocalPartitionStateEnum.HEALTHY || maxLogIndex - state.logIndex() < CATCH_UP_THRESHOLD) { - return state; + LocalPartitionStateEnum stateEnum = stateMsg.state(); + + if (stateMsg.state() == HEALTHY && maxLogIndex - stateMsg.logIndex() >= CATCH_UP_THRESHOLD) { + stateEnum = CATCHING_UP; } - return MSG_FACTORY.localPartitionState() - .state(LocalPartitionStateEnum.CATCHING_UP) - .partitionId(state.partitionId()) - .logIndex(state.logIndex()) - .build(); + CatalogTableDescriptor tableDescriptor = catalog.table(tablePartitionId.tableId()); + return new LocalPartitionState(tableDescriptor.name(), tablePartitionId.partitionId(), stateEnum); })); })); } + + private static Map<TablePartitionId, GlobalPartitionState> normalizeGlobal( + Map<TablePartitionId, Map<String, LocalPartitionState>> localResult, + Catalog catalog + ) { + Map<Integer, Integer> tableIdToPartitions = new HashMap<>(); + + Map<TablePartitionId, GlobalPartitionState> result = localResult.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, entry -> { + TablePartitionId tablePartitionId = entry.getKey(); + Map<String, LocalPartitionState> map = entry.getValue(); + + int zoneId = catalog.table(tablePartitionId.tableId()).zoneId(); Review Comment: Is it somehow guaranteed that this node contains in its local Catalog all tables that might come from other nodes in responses? ########## modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/GlobalPartitionStatesResponse.java: ########## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.rest.api.recovery; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonGetter; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.swagger.v3.oas.annotations.media.Schema; +import java.util.List; + +/** + * Global partition states schema class. + */ +@Schema(description = "Information about global partition states.") +public class GlobalPartitionStatesResponse { + @Schema + private final List<GlobalPartitionStateResponse> states; + + @JsonCreator + public GlobalPartitionStatesResponse(@JsonProperty("states") List<GlobalPartitionStateResponse> states) { + this.states = states; Review Comment: How about making a defensive copy? ########## modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/DisasterRecoveryApi.java: ########## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.rest.api.recovery; + +import io.micronaut.http.annotation.Controller; +import io.micronaut.http.annotation.Get; +import io.micronaut.http.annotation.PathVariable; +import io.micronaut.http.annotation.Produces; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.media.Content; +import io.swagger.v3.oas.annotations.media.Schema; +import io.swagger.v3.oas.annotations.responses.ApiResponse; +import io.swagger.v3.oas.annotations.tags.Tag; +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.internal.rest.api.Problem; +import org.apache.ignite.internal.rest.constants.MediaType; + +/** + * Disaster recovery controller. + */ +@Controller("/management/v1/recovery") +@Tag(name = "recovery") Review Comment: Should it be 'disaster-recovery' or both 'disaster' and 'recovery'? As 'recovery' might mean many things in the context of a DBMS. ########## modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/GlobalPartitionStateResponse.java: ########## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.rest.api.recovery; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonGetter; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.swagger.v3.oas.annotations.media.Schema; + +/** + * Global partition state schema class. + */ +@Schema(description = "Information about global partition state.") +public class GlobalPartitionStateResponse { + private final int partitionId; + private final String tableName; + private final String state; Review Comment: Is it an enum? If yes, is it possible to use an enum type here? ########## modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/DisasterRecoveryApi.java: ########## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.rest.api.recovery; + +import io.micronaut.http.annotation.Controller; +import io.micronaut.http.annotation.Get; +import io.micronaut.http.annotation.PathVariable; +import io.micronaut.http.annotation.Produces; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.media.Content; +import io.swagger.v3.oas.annotations.media.Schema; +import io.swagger.v3.oas.annotations.responses.ApiResponse; +import io.swagger.v3.oas.annotations.tags.Tag; +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.internal.rest.api.Problem; +import org.apache.ignite.internal.rest.constants.MediaType; + +/** + * Disaster recovery controller. + */ +@Controller("/management/v1/recovery") +@Tag(name = "recovery") +public interface DisasterRecoveryApi { + @Get("state/local") + @Operation(operationId = "getLocalPartitionStates", description = "Returns local partition states.") + @ApiResponse(responseCode = "200", description = "Partition states returned.") + @ApiResponse(responseCode = "500", description = "Internal error.", + content = @Content(mediaType = MediaType.PROBLEM_JSON, schema = @Schema(implementation = Problem.class))) + @Produces(MediaType.APPLICATION_JSON) + CompletableFuture<LocalPartitionStatesResponse> getLocalPartitionStates(); + + @Get("state/local/{zoneName}") + @Operation(operationId = "getLocalPartitionStatesByZone", description = "Returns local partition states.") + @ApiResponse(responseCode = "200", description = "Partition states returned.") + @ApiResponse(responseCode = "500", description = "Internal error.", + content = @Content(mediaType = MediaType.PROBLEM_JSON, schema = @Schema(implementation = Problem.class))) + @ApiResponse(responseCode = "400", description = "Zone is not found.", + content = @Content(mediaType = MediaType.PROBLEM_JSON, schema = @Schema(implementation = Problem.class))) + @Produces(MediaType.APPLICATION_JSON) + CompletableFuture<LocalPartitionStatesResponse> getLocalPartitionStates(@PathVariable("zoneName") String zoneName); + + @Get("state/global") + @Operation(operationId = "getGlobalPartitionStates", description = "Returns global partition states.") + @ApiResponse(responseCode = "200", description = "Partition states returned.") + @ApiResponse(responseCode = "500", description = "Internal error.", + content = @Content(mediaType = MediaType.PROBLEM_JSON, schema = @Schema(implementation = Problem.class))) + @Produces(MediaType.APPLICATION_JSON) + CompletableFuture<GlobalPartitionStatesResponse> getGlobalPartitionStates(); + + @Get("state/global/{zoneName}") + @Operation(operationId = "getGlobalPartitionStatesByZone", description = "Returns global partition states.") + @ApiResponse(responseCode = "200", description = "Partition states returned.") + @ApiResponse(responseCode = "500", description = "Internal error.", + content = @Content(mediaType = MediaType.PROBLEM_JSON, schema = @Schema(implementation = Problem.class))) + @ApiResponse(responseCode = "400", description = "Zone is not found.", Review Comment: 404? ########## modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/DisasterRecoveryController.java: ########## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.rest.recovery; + +import static java.util.Comparator.comparing; +import static org.apache.ignite.internal.rest.recovery.DisasterRecoveryFactory.DISASTER_RECOVERY_MANAGER_NAME; + +import io.micronaut.context.annotation.Requires; +import io.micronaut.http.annotation.Controller; +import jakarta.inject.Named; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.internal.replicator.TablePartitionId; +import org.apache.ignite.internal.rest.api.recovery.DisasterRecoveryApi; +import org.apache.ignite.internal.rest.api.recovery.GlobalPartitionStateResponse; +import org.apache.ignite.internal.rest.api.recovery.GlobalPartitionStatesResponse; +import org.apache.ignite.internal.rest.api.recovery.LocalPartitionStateResponse; +import org.apache.ignite.internal.rest.api.recovery.LocalPartitionStatesResponse; +import org.apache.ignite.internal.rest.exception.handler.IgniteInternalExceptionHandler; +import org.apache.ignite.internal.table.distributed.disaster.DisasterRecoveryManager; +import org.apache.ignite.internal.table.distributed.disaster.GlobalPartitionState; +import org.apache.ignite.internal.table.distributed.disaster.LocalPartitionState; + +/** + * Disaster recovery controller. + */ +@Controller("/management/v1/recovery/") +@Requires(classes = IgniteInternalExceptionHandler.class) +public class DisasterRecoveryController implements DisasterRecoveryApi { + private final DisasterRecoveryManager disasterRecoveryManager; + + public DisasterRecoveryController(@Named(DISASTER_RECOVERY_MANAGER_NAME) DisasterRecoveryManager disasterRecoveryManager) { Review Comment: Is the name needed? Can we have more than 1 `DisasterRecoveryManager` instance? ########## modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/DisasterRecoveryController.java: ########## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.rest.recovery; + +import static java.util.Comparator.comparing; +import static org.apache.ignite.internal.rest.recovery.DisasterRecoveryFactory.DISASTER_RECOVERY_MANAGER_NAME; + +import io.micronaut.context.annotation.Requires; +import io.micronaut.http.annotation.Controller; +import jakarta.inject.Named; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.internal.replicator.TablePartitionId; +import org.apache.ignite.internal.rest.api.recovery.DisasterRecoveryApi; +import org.apache.ignite.internal.rest.api.recovery.GlobalPartitionStateResponse; +import org.apache.ignite.internal.rest.api.recovery.GlobalPartitionStatesResponse; +import org.apache.ignite.internal.rest.api.recovery.LocalPartitionStateResponse; +import org.apache.ignite.internal.rest.api.recovery.LocalPartitionStatesResponse; +import org.apache.ignite.internal.rest.exception.handler.IgniteInternalExceptionHandler; +import org.apache.ignite.internal.table.distributed.disaster.DisasterRecoveryManager; +import org.apache.ignite.internal.table.distributed.disaster.GlobalPartitionState; +import org.apache.ignite.internal.table.distributed.disaster.LocalPartitionState; + +/** + * Disaster recovery controller. + */ +@Controller("/management/v1/recovery/") +@Requires(classes = IgniteInternalExceptionHandler.class) +public class DisasterRecoveryController implements DisasterRecoveryApi { + private final DisasterRecoveryManager disasterRecoveryManager; + + public DisasterRecoveryController(@Named(DISASTER_RECOVERY_MANAGER_NAME) DisasterRecoveryManager disasterRecoveryManager) { + this.disasterRecoveryManager = disasterRecoveryManager; + } + + @Override + public CompletableFuture<LocalPartitionStatesResponse> getLocalPartitionStates() { + return disasterRecoveryManager.localPartitionStates(null).thenApply(DisasterRecoveryController::convertLocalStates); + } + + @Override + public CompletableFuture<LocalPartitionStatesResponse> getLocalPartitionStates(String zoneName) { + return disasterRecoveryManager.localPartitionStates(zoneName).thenApply(DisasterRecoveryController::convertLocalStates); + } + + @Override + public CompletableFuture<GlobalPartitionStatesResponse> getGlobalPartitionStates() { + return disasterRecoveryManager.globalPartitionStates(null).thenApply(DisasterRecoveryController::convertGlobalStates); + } + + @Override + public CompletableFuture<GlobalPartitionStatesResponse> getGlobalPartitionStates(String zoneName) { + return disasterRecoveryManager.globalPartitionStates(zoneName).thenApply(DisasterRecoveryController::convertGlobalStates); + } + + private static LocalPartitionStatesResponse convertLocalStates(Map<TablePartitionId, Map<String, LocalPartitionState>> localStates) { + List<LocalPartitionStateResponse> states = new ArrayList<>(); + + for (Entry<TablePartitionId, Map<String, LocalPartitionState>> entry0 : localStates.entrySet()) { Review Comment: It seems we don't need keys from the outer map, so a simpler iteration over values of this map is possible ########## modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/DisasterRecoveryController.java: ########## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.rest.recovery; + +import static java.util.Comparator.comparing; +import static org.apache.ignite.internal.rest.recovery.DisasterRecoveryFactory.DISASTER_RECOVERY_MANAGER_NAME; + +import io.micronaut.context.annotation.Requires; +import io.micronaut.http.annotation.Controller; +import jakarta.inject.Named; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.internal.replicator.TablePartitionId; +import org.apache.ignite.internal.rest.api.recovery.DisasterRecoveryApi; +import org.apache.ignite.internal.rest.api.recovery.GlobalPartitionStateResponse; +import org.apache.ignite.internal.rest.api.recovery.GlobalPartitionStatesResponse; +import org.apache.ignite.internal.rest.api.recovery.LocalPartitionStateResponse; +import org.apache.ignite.internal.rest.api.recovery.LocalPartitionStatesResponse; +import org.apache.ignite.internal.rest.exception.handler.IgniteInternalExceptionHandler; +import org.apache.ignite.internal.table.distributed.disaster.DisasterRecoveryManager; +import org.apache.ignite.internal.table.distributed.disaster.GlobalPartitionState; +import org.apache.ignite.internal.table.distributed.disaster.LocalPartitionState; + +/** + * Disaster recovery controller. + */ +@Controller("/management/v1/recovery/") +@Requires(classes = IgniteInternalExceptionHandler.class) +public class DisasterRecoveryController implements DisasterRecoveryApi { + private final DisasterRecoveryManager disasterRecoveryManager; + + public DisasterRecoveryController(@Named(DISASTER_RECOVERY_MANAGER_NAME) DisasterRecoveryManager disasterRecoveryManager) { + this.disasterRecoveryManager = disasterRecoveryManager; + } + + @Override + public CompletableFuture<LocalPartitionStatesResponse> getLocalPartitionStates() { + return disasterRecoveryManager.localPartitionStates(null).thenApply(DisasterRecoveryController::convertLocalStates); + } + + @Override + public CompletableFuture<LocalPartitionStatesResponse> getLocalPartitionStates(String zoneName) { + return disasterRecoveryManager.localPartitionStates(zoneName).thenApply(DisasterRecoveryController::convertLocalStates); + } + + @Override + public CompletableFuture<GlobalPartitionStatesResponse> getGlobalPartitionStates() { + return disasterRecoveryManager.globalPartitionStates(null).thenApply(DisasterRecoveryController::convertGlobalStates); + } + + @Override + public CompletableFuture<GlobalPartitionStatesResponse> getGlobalPartitionStates(String zoneName) { + return disasterRecoveryManager.globalPartitionStates(zoneName).thenApply(DisasterRecoveryController::convertGlobalStates); + } + + private static LocalPartitionStatesResponse convertLocalStates(Map<TablePartitionId, Map<String, LocalPartitionState>> localStates) { + List<LocalPartitionStateResponse> states = new ArrayList<>(); + + for (Entry<TablePartitionId, Map<String, LocalPartitionState>> entry0 : localStates.entrySet()) { + for (Entry<String, LocalPartitionState> entry1 : entry0.getValue().entrySet()) { + String nodeName = entry1.getKey(); + LocalPartitionState state = entry1.getValue(); + + states.add(new LocalPartitionStateResponse( + state.partitionId, + state.tableName, + nodeName, + state.state.name() + )); + } + } + + // Sort the output conveniently. + states.sort(comparing(LocalPartitionStateResponse::tableName) + .thenComparingInt(LocalPartitionStateResponse::partitionId) + .thenComparing(LocalPartitionStateResponse::nodeName)); + + return new LocalPartitionStatesResponse(states); + } + + private static GlobalPartitionStatesResponse convertGlobalStates(Map<TablePartitionId, GlobalPartitionState> globalStates) { + List<GlobalPartitionStateResponse> states = new ArrayList<>(); + + for (Entry<TablePartitionId, GlobalPartitionState> entry : globalStates.entrySet()) { Review Comment: Here, we don't need map keys as well, so we could just iterate over `values()` ########## modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerTest.java: ########## @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.rest.recovery; + +import static java.util.stream.Collectors.toList; +import static java.util.stream.IntStream.range; +import static org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_AIPERSIST_PROFILE_NAME; +import static org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PARTITION_COUNT; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import io.micronaut.http.HttpResponse; +import io.micronaut.http.HttpStatus; +import io.micronaut.http.client.HttpClient; +import io.micronaut.http.client.annotation.Client; +import io.micronaut.http.client.exceptions.HttpClientResponseException; +import io.micronaut.test.extensions.junit5.annotation.MicronautTest; +import jakarta.inject.Inject; +import java.util.List; +import org.apache.ignite.internal.Cluster; +import org.apache.ignite.internal.ClusterPerTestIntegrationTest; +import org.apache.ignite.internal.rest.api.recovery.GlobalPartitionStateResponse; +import org.apache.ignite.internal.rest.api.recovery.GlobalPartitionStatesResponse; +import org.apache.ignite.internal.rest.api.recovery.LocalPartitionStateResponse; +import org.apache.ignite.internal.rest.api.recovery.LocalPartitionStatesResponse; +import org.junit.jupiter.api.Test; + +/** + * Test for disaster recovery REST commands. + */ +@MicronautTest +public class ItDisasterRecoveryControllerTest extends ClusterPerTestIntegrationTest { + private static final String NODE_URL = "http://localhost:" + Cluster.BASE_HTTP_PORT; + + @Inject + @Client(NODE_URL + "/management/v1/recovery/") + HttpClient client; + + @Override + protected int initialNodes() { + return 1; + } + + @Test + void testLocalPartitionStates() { + executeSql("CREATE TABLE foo (id INT PRIMARY KEY, val INT)"); + var response = client.toBlocking().exchange("/state/local/", LocalPartitionStatesResponse.class); + + assertEquals(HttpStatus.OK, response.status()); + + LocalPartitionStatesResponse body = response.body(); + assertEquals(DEFAULT_PARTITION_COUNT, body.states().size()); + + List<Integer> partitionIds = body.states().stream().map(LocalPartitionStateResponse::partitionId).collect(toList()); + assertEquals(range(0, DEFAULT_PARTITION_COUNT).boxed().collect(toList()), partitionIds); + } + + @Test + void testLocalPartitionStatesByZoneMissingZone() { + HttpClientResponseException thrown = assertThrows( + HttpClientResponseException.class, + () -> client.toBlocking().exchange("/state/local/foo/", LocalPartitionStatesResponse.class) Review Comment: ```suggestion () -> client.toBlocking().exchange("/state/local/no-such-zone/", LocalPartitionStatesResponse.class) ``` ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java: ########## @@ -78,7 +90,7 @@ public class DisasterRecoveryManager implements IgniteComponent { private static final int TIMEOUT = 30; - private static final int CATCH_UP_THRESHOLD = 10; + private static final int CATCH_UP_THRESHOLD = 100; Review Comment: Could you please add javadocs to `TIMEOUT` and `CATCH_UP_THRESHOLD` explaining what is means and in which units they are measured? Also, maybe it makes sense to include the unit in the name for the timeout constant, like `TIMEOUT_SEC`. ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java: ########## @@ -175,26 +191,55 @@ public CompletableFuture<Void> resetPartitions(int zoneId, int tableId) { * @param zoneName Zone name. * @return Future with the mapping. */ - public CompletableFuture<Map<TablePartitionId, Map<String, LocalPartitionState>>> partitionStates(String zoneName) { - int latestCatalogVersion = catalogManager.latestCatalogVersion(); - Optional<CatalogZoneDescriptor> zoneDesciptorOptional = catalogManager.zones(latestCatalogVersion).stream() - .filter(catalogZoneDescriptor -> catalogZoneDescriptor.name().equals(zoneName)) - .findAny(); - - if (zoneDesciptorOptional.isEmpty()) { - return CompletableFuture.failedFuture(new DistributionZoneNotFoundException(zoneName, null)); - } + public CompletableFuture<Map<TablePartitionId, Map<String, LocalPartitionState>>> localPartitionStates(@Nullable String zoneName) { + Catalog catalog = catalogManager.catalog(catalogManager.latestCatalogVersion()); + + return localPartitionStatesInternal(zoneName, catalog) + .thenApply(res -> normalizeLocal(res, catalog)); + } - CatalogZoneDescriptor zoneDescriptor = zoneDesciptorOptional.get(); + /** + * Returns partition states for all zones' partitions in the cluster. Result is a mapping of {@link TablePartitionId} to the global + * partition state enum value. + * + * @param zoneName Zone name. + * @return Future with the mapping. + */ + public CompletableFuture<Map<TablePartitionId, GlobalPartitionState>> globalPartitionStates(@Nullable String zoneName) { + Catalog catalog = catalogManager.catalog(catalogManager.latestCatalogVersion()); + + return localPartitionStatesInternal(zoneName, catalog) + .thenApply(res -> normalizeLocal(res, catalog)) + .thenApply(res -> normalizeGlobal(res, catalog)); + } + + private CompletableFuture<Map<TablePartitionId, Map<String, LocalPartitionStateMessage>>> localPartitionStatesInternal( + @Nullable String zoneName, Catalog catalog + ) { + int zoneId; + if (zoneName == null) { + zoneId = -1; Review Comment: Please extract this to a constant ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java: ########## @@ -357,36 +402,94 @@ private static LocalPartitionStateEnum convertState(State nodeState) { default: // Unrecognized state, better safe than sorry. - return LocalPartitionStateEnum.BROKEN; + return BROKEN; } } /** * Replaces some healthy states with a {@link LocalPartitionStateEnum#CATCHING_UP},it can only be done once the state of all peers is * known. */ - private Map<TablePartitionId, Map<String, LocalPartitionState>> normalize( - Map<TablePartitionId, Map<String, LocalPartitionState>> result + private static Map<TablePartitionId, Map<String, LocalPartitionState>> normalizeLocal( + Map<TablePartitionId, Map<String, LocalPartitionStateMessage>> result, + Catalog catalog ) { return result.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> { - Map<String, LocalPartitionState> map = entry.getValue(); + TablePartitionId tablePartitionId = entry.getKey(); + Map<String, LocalPartitionStateMessage> map = entry.getValue(); // noinspection OptionalGetWithoutIsPresent - long maxLogIndex = map.values().stream().mapToLong(LocalPartitionState::logIndex).max().getAsLong(); + long maxLogIndex = map.values().stream().mapToLong(LocalPartitionStateMessage::logIndex).max().getAsLong(); return map.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry2 -> { Review Comment: `toMap()` can be imported statically ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java: ########## @@ -175,26 +191,55 @@ public CompletableFuture<Void> resetPartitions(int zoneId, int tableId) { * @param zoneName Zone name. * @return Future with the mapping. */ - public CompletableFuture<Map<TablePartitionId, Map<String, LocalPartitionState>>> partitionStates(String zoneName) { - int latestCatalogVersion = catalogManager.latestCatalogVersion(); - Optional<CatalogZoneDescriptor> zoneDesciptorOptional = catalogManager.zones(latestCatalogVersion).stream() - .filter(catalogZoneDescriptor -> catalogZoneDescriptor.name().equals(zoneName)) - .findAny(); - - if (zoneDesciptorOptional.isEmpty()) { - return CompletableFuture.failedFuture(new DistributionZoneNotFoundException(zoneName, null)); - } + public CompletableFuture<Map<TablePartitionId, Map<String, LocalPartitionState>>> localPartitionStates(@Nullable String zoneName) { + Catalog catalog = catalogManager.catalog(catalogManager.latestCatalogVersion()); + + return localPartitionStatesInternal(zoneName, catalog) + .thenApply(res -> normalizeLocal(res, catalog)); + } - CatalogZoneDescriptor zoneDescriptor = zoneDesciptorOptional.get(); + /** + * Returns partition states for all zones' partitions in the cluster. Result is a mapping of {@link TablePartitionId} to the global + * partition state enum value. + * + * @param zoneName Zone name. Review Comment: Please explain what `null` means here ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java: ########## @@ -357,36 +402,94 @@ private static LocalPartitionStateEnum convertState(State nodeState) { default: // Unrecognized state, better safe than sorry. - return LocalPartitionStateEnum.BROKEN; + return BROKEN; } } /** * Replaces some healthy states with a {@link LocalPartitionStateEnum#CATCHING_UP},it can only be done once the state of all peers is * known. */ - private Map<TablePartitionId, Map<String, LocalPartitionState>> normalize( - Map<TablePartitionId, Map<String, LocalPartitionState>> result + private static Map<TablePartitionId, Map<String, LocalPartitionState>> normalizeLocal( + Map<TablePartitionId, Map<String, LocalPartitionStateMessage>> result, + Catalog catalog ) { return result.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> { - Map<String, LocalPartitionState> map = entry.getValue(); + TablePartitionId tablePartitionId = entry.getKey(); + Map<String, LocalPartitionStateMessage> map = entry.getValue(); // noinspection OptionalGetWithoutIsPresent - long maxLogIndex = map.values().stream().mapToLong(LocalPartitionState::logIndex).max().getAsLong(); + long maxLogIndex = map.values().stream().mapToLong(LocalPartitionStateMessage::logIndex).max().getAsLong(); return map.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry2 -> { - LocalPartitionState state = entry2.getValue(); + LocalPartitionStateMessage stateMsg = entry2.getValue(); - if (state.state() != LocalPartitionStateEnum.HEALTHY || maxLogIndex - state.logIndex() < CATCH_UP_THRESHOLD) { - return state; + LocalPartitionStateEnum stateEnum = stateMsg.state(); + + if (stateMsg.state() == HEALTHY && maxLogIndex - stateMsg.logIndex() >= CATCH_UP_THRESHOLD) { + stateEnum = CATCHING_UP; } - return MSG_FACTORY.localPartitionState() - .state(LocalPartitionStateEnum.CATCHING_UP) - .partitionId(state.partitionId()) - .logIndex(state.logIndex()) - .build(); + CatalogTableDescriptor tableDescriptor = catalog.table(tablePartitionId.tableId()); + return new LocalPartitionState(tableDescriptor.name(), tablePartitionId.partitionId(), stateEnum); })); })); } + + private static Map<TablePartitionId, GlobalPartitionState> normalizeGlobal( Review Comment: ```suggestion private static Map<TablePartitionId, GlobalPartitionState> assembleGlobal( ``` ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java: ########## @@ -357,36 +402,94 @@ private static LocalPartitionStateEnum convertState(State nodeState) { default: // Unrecognized state, better safe than sorry. - return LocalPartitionStateEnum.BROKEN; + return BROKEN; } } /** * Replaces some healthy states with a {@link LocalPartitionStateEnum#CATCHING_UP},it can only be done once the state of all peers is * known. */ - private Map<TablePartitionId, Map<String, LocalPartitionState>> normalize( - Map<TablePartitionId, Map<String, LocalPartitionState>> result + private static Map<TablePartitionId, Map<String, LocalPartitionState>> normalizeLocal( + Map<TablePartitionId, Map<String, LocalPartitionStateMessage>> result, + Catalog catalog ) { return result.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> { - Map<String, LocalPartitionState> map = entry.getValue(); + TablePartitionId tablePartitionId = entry.getKey(); + Map<String, LocalPartitionStateMessage> map = entry.getValue(); // noinspection OptionalGetWithoutIsPresent - long maxLogIndex = map.values().stream().mapToLong(LocalPartitionState::logIndex).max().getAsLong(); + long maxLogIndex = map.values().stream().mapToLong(LocalPartitionStateMessage::logIndex).max().getAsLong(); return map.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry2 -> { - LocalPartitionState state = entry2.getValue(); + LocalPartitionStateMessage stateMsg = entry2.getValue(); - if (state.state() != LocalPartitionStateEnum.HEALTHY || maxLogIndex - state.logIndex() < CATCH_UP_THRESHOLD) { - return state; + LocalPartitionStateEnum stateEnum = stateMsg.state(); + + if (stateMsg.state() == HEALTHY && maxLogIndex - stateMsg.logIndex() >= CATCH_UP_THRESHOLD) { + stateEnum = CATCHING_UP; } - return MSG_FACTORY.localPartitionState() - .state(LocalPartitionStateEnum.CATCHING_UP) - .partitionId(state.partitionId()) - .logIndex(state.logIndex()) - .build(); + CatalogTableDescriptor tableDescriptor = catalog.table(tablePartitionId.tableId()); + return new LocalPartitionState(tableDescriptor.name(), tablePartitionId.partitionId(), stateEnum); })); })); } + + private static Map<TablePartitionId, GlobalPartitionState> normalizeGlobal( + Map<TablePartitionId, Map<String, LocalPartitionState>> localResult, + Catalog catalog + ) { + Map<Integer, Integer> tableIdToPartitions = new HashMap<>(); + + Map<TablePartitionId, GlobalPartitionState> result = localResult.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, entry -> { + TablePartitionId tablePartitionId = entry.getKey(); Review Comment: The following block of code looks pretty scary as a lambda. I would suggest to extract it as a method, but filling of `tableIdToPartitions` that is done as a side effect of that lambda makes it more difficult. So, I suggest the following: 1. Do not fill `tableIdToPartitions` in the same block of code; instead, make second iteration over the original map to fill this variable. This aggregation will be extremely rare, so we'll not lose much CPU cycles here. 2. After the lambda does not touch `tableIdToPartitions` anymore, extract it to a method like `assembleGlobalStateFromLocal()` 3. This method and the code for filling `tableIdToPartitions` would reuse the same method to get zone descriptor It seems that after this refactoring it will become easier to understand this method: it will become much shorter and less coupled. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
