tkalkirill commented on code in PR #7156:
URL: https://github.com/apache/ignite-3/pull/7156#discussion_r2602973271
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java:
##########
@@ -279,6 +299,13 @@ public CompletableFuture<Void> startAsync(ComponentContext
componentContext) {
registerMetricSources();
+ pollingFuture = threadPool.scheduleWithFixedDelay(
+ this::pollMultiNodeOperations,
Review Comment:
`pollMultiNodeOperations` is not called in `busyLock`.
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java:
##########
@@ -177,11 +183,17 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
*/
private static final int CATCH_UP_THRESHOLD = 100;
+ /** How often to check status of running multi node operations. */
+ private static final long POLLING_RATE_MILLIS = 100;
+
+ /** Multi node operation was successfully completed. */
+ private static final String COMPLETED_STATUS = "COMPLETED";
+
/** Busy lock to stop synchronously. */
private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
/** Thread pool executor for async parts. */
- private final ExecutorService threadPool;
+ private final ScheduledExecutorService threadPool;
Review Comment:
We need to use a separate pool because we should try not to slow down other
tasks from other components.
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java:
##########
@@ -1151,7 +1183,97 @@ private CompletableFuture<Void>
processNewRequest(DisasterRecoveryRequest reques
metaStorageManager.put(RECOVERY_TRIGGER_KEY, serializedRequest);
}
- return operationFuture;
+ return operationFuture.thenCompose(v -> {
+ if (request.type() == DisasterRecoveryRequestType.MULTI_NODE) {
+ return allOf(getNodeNames(request.nodeNames())
+ .stream()
+ .map(nodeName -> addMultiNodeOperation(nodeName,
operationId))
+ .toArray(CompletableFuture[]::new));
+ } else {
+ return nullCompletedFuture();
+ }
+ });
+ }
+
+ /** If request node names is empty, returns all nodes in the logical
topology. */
+ private Collection<String> getNodeNames(Set<String> requestNodeNames) {
+ if (requestNodeNames.isEmpty()) {
Review Comment:
I didn't notice the mention in the description that an empty list means all
nodes.
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java:
##########
@@ -1151,7 +1183,97 @@ private CompletableFuture<Void>
processNewRequest(DisasterRecoveryRequest reques
metaStorageManager.put(RECOVERY_TRIGGER_KEY, serializedRequest);
}
- return operationFuture;
+ return operationFuture.thenCompose(v -> {
+ if (request.type() == DisasterRecoveryRequestType.MULTI_NODE) {
+ return allOf(getNodeNames(request.nodeNames())
+ .stream()
+ .map(nodeName -> addMultiNodeOperation(nodeName,
operationId))
+ .toArray(CompletableFuture[]::new));
+ } else {
+ return nullCompletedFuture();
+ }
+ });
+ }
+
+ /** If request node names is empty, returns all nodes in the logical
topology. */
+ private Collection<String> getNodeNames(Set<String> requestNodeNames) {
+ if (requestNodeNames.isEmpty()) {
+ return dzManager.logicalTopology().stream()
+ .map(NodeWithAttributes::nodeName)
+ .collect(toSet());
+ } else {
+ return requestNodeNames;
+ }
+ }
+
+ private CompletableFuture<Void> addMultiNodeOperation(String nodeName,
UUID operationId) {
+ CompletableFuture<Void> result = new
CompletableFuture<Void>().orTimeout(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+
+ operationsByNodeName.compute(nodeName, (node, operations) -> {
+ if (operations == null) {
+ operations = new MultiNodeOperations();
+ }
+
+ operations.add(operationId, result);
+
+ return operations;
+ });
+
+ return result;
+ }
+
+ /**
+ * Sends status request to nodes participating in MULTI_NODE operations
and completes corresponding futures in
+ * {@link #operationsByNodeName}. Doesn't send identical requests.
+ */
+ private void pollMultiNodeOperations() {
+ for (Map.Entry<String, MultiNodeOperations> entry :
operationsByNodeName.entrySet()) {
+ // We already sent the latest version, skip.
+ if (!entry.getValue().startPollingIfNeeded()) {
+ continue;
+ }
+
+ String nodeName = entry.getKey();
+ MultiNodeOperations operations = entry.getValue();
+
+ DisasterRecoveryStatusRequestMessage request =
PARTITION_REPLICATION_MESSAGES_FACTORY.disasterRecoveryStatusRequestMessage()
+ .operationIds(operations.operationsIds())
+ .build();
+
+ messagingService.invoke(nodeName, request,
TimeUnit.SECONDS.toMillis(1))
+ .whenComplete((message, ex) -> {
+ if (ex != null) {
+ if (hasCause(ex, TimeoutException.class)) {
+ operations.triggerNextRequest();
+ } else {
+ operations.exceptionally(ex);
+ }
+
+ return;
+ }
+
+ DisasterRecoveryStatusResponseMessage responseMessage
= (DisasterRecoveryStatusResponseMessage) message;
+
+ for (Map.Entry<UUID, String> operationResponse :
responseMessage.operationStatuses().entrySet()) {
+ UUID operationId = operationResponse.getKey();
+ String operationStatus =
operationResponse.getValue();
+
+ if (!COMPLETED_STATUS.equals(operationStatus)) {
+ operations.get(operationId)
Review Comment:
It looks like completing a future and removing it from an object can be done
with one method.
##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java:
##########
@@ -312,5 +312,11 @@ interface DisasterRecoveryMessages {
/** Message type for disaster recovery request forwarding response. */
short DISASTER_RECOVERY_RESPONSE = 111;
+
+ /** Message type for disaster recovery status request. */
+ short DISASTER_RECOVERY_STATUS_REQUEST = 112;
+
+ /** Message type for disaster recovery status response. */
Review Comment:
Missing reference to message.
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java:
##########
@@ -1151,7 +1183,97 @@ private CompletableFuture<Void>
processNewRequest(DisasterRecoveryRequest reques
metaStorageManager.put(RECOVERY_TRIGGER_KEY, serializedRequest);
}
- return operationFuture;
+ return operationFuture.thenCompose(v -> {
+ if (request.type() == DisasterRecoveryRequestType.MULTI_NODE) {
+ return allOf(getNodeNames(request.nodeNames())
+ .stream()
+ .map(nodeName -> addMultiNodeOperation(nodeName,
operationId))
+ .toArray(CompletableFuture[]::new));
+ } else {
+ return nullCompletedFuture();
+ }
+ });
+ }
+
+ /** If request node names is empty, returns all nodes in the logical
topology. */
+ private Collection<String> getNodeNames(Set<String> requestNodeNames) {
+ if (requestNodeNames.isEmpty()) {
+ return dzManager.logicalTopology().stream()
+ .map(NodeWithAttributes::nodeName)
+ .collect(toSet());
+ } else {
+ return requestNodeNames;
+ }
+ }
+
+ private CompletableFuture<Void> addMultiNodeOperation(String nodeName,
UUID operationId) {
+ CompletableFuture<Void> result = new
CompletableFuture<Void>().orTimeout(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+
+ operationsByNodeName.compute(nodeName, (node, operations) -> {
+ if (operations == null) {
+ operations = new MultiNodeOperations();
+ }
+
+ operations.add(operationId, result);
+
+ return operations;
+ });
+
+ return result;
+ }
+
+ /**
+ * Sends status request to nodes participating in MULTI_NODE operations
and completes corresponding futures in
+ * {@link #operationsByNodeName}. Doesn't send identical requests.
+ */
+ private void pollMultiNodeOperations() {
+ for (Map.Entry<String, MultiNodeOperations> entry :
operationsByNodeName.entrySet()) {
+ // We already sent the latest version, skip.
+ if (!entry.getValue().startPollingIfNeeded()) {
+ continue;
+ }
+
+ String nodeName = entry.getKey();
+ MultiNodeOperations operations = entry.getValue();
+
+ DisasterRecoveryStatusRequestMessage request =
PARTITION_REPLICATION_MESSAGES_FACTORY.disasterRecoveryStatusRequestMessage()
+ .operationIds(operations.operationsIds())
+ .build();
+
+ messagingService.invoke(nodeName, request,
TimeUnit.SECONDS.toMillis(1))
+ .whenComplete((message, ex) -> {
+ if (ex != null) {
+ if (hasCause(ex, TimeoutException.class)) {
+ operations.triggerNextRequest();
+ } else {
+ operations.exceptionally(ex);
+ }
+
+ return;
+ }
+
+ DisasterRecoveryStatusResponseMessage responseMessage
= (DisasterRecoveryStatusResponseMessage) message;
+
+ for (Map.Entry<UUID, String> operationResponse :
responseMessage.operationStatuses().entrySet()) {
+ UUID operationId = operationResponse.getKey();
+ String operationStatus =
operationResponse.getValue();
+
+ if (!COMPLETED_STATUS.equals(operationStatus)) {
+ operations.get(operationId)
+ .completeExceptionally(new
RemoteProcessingDisasterRecoveryException(operationStatus, nodeName));
+ } else {
+ operations.get(operationId).complete(null);
+ }
+
+ operations.remove(operationId);
+ }
+
+ // Not all operations were completed, increase version
to poll again.
+ if (!operations.operationsIds().isEmpty()) {
+ operations.triggerNextRequest();
Review Comment:
Maybe I should just do this: if there is at least one unfinished operation,
then continue polling?
##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/DisasterRecoveryStatusResponseMessage.java:
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator.network.disaster;
+
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.internal.network.NetworkMessage;
+import org.apache.ignite.internal.network.annotations.Transferable;
+import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup.DisasterRecoveryMessages;
+
+/** Response with statuses of disaster recovery operations completed by the
node. */
+@Transferable(DisasterRecoveryMessages.DISASTER_RECOVERY_STATUS_RESPONSE)
+public interface DisasterRecoveryStatusResponseMessage extends NetworkMessage {
Review Comment:
Maybe rename to `OperationStatusesResponseMessage` ?
##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/DisasterRecoveryStatusRequestMessage.java:
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator.network.disaster;
+
+import java.util.Collection;
+import java.util.UUID;
+import org.apache.ignite.internal.network.NetworkMessage;
+import org.apache.ignite.internal.network.annotations.Transferable;
+import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup.DisasterRecoveryMessages;
+
+/** Request completed disaster recovery operations. */
+@Transferable(DisasterRecoveryMessages.DISASTER_RECOVERY_STATUS_REQUEST)
+public interface DisasterRecoveryStatusRequestMessage extends NetworkMessage {
Review Comment:
Maybe rename to `OperationStatuseRequestMessage` ?
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/exceptions/RemoteProcessingDisasterRecoveryException.java:
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.disaster.exceptions;
+
+import org.apache.ignite.lang.ErrorGroups.DisasterRecovery;
+
+/** Exception is thrown when remote node encounters an error during disaster
recovery processing. */
+public class RemoteProcessingDisasterRecoveryException extends
DisasterRecoveryException {
Review Comment:
Maybe call it something like `RemoteOperationException` ?
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java:
##########
@@ -226,9 +238,17 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
private final Map<Integer, PartitionStatesMetricSource>
metricSourceByTableId = new ConcurrentHashMap<>();
+ private final Map<String, MultiNodeOperations> operationsByNodeName = new
ConcurrentHashMap<>();
+
+ /** Multi node operations completed by current node and not yet reported
to initiator. */
+ private final Map<UUID, String> operationStatusByOperationId = new
HashMap<>();
Review Comment:
I didn't see where and when this map is cleared and why is it not concurrent?
We also need to cover the case when the node that is supposed to poll falls
temporarily and permanently.
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/MultiNodeOperations.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.disaster;
+
+import static
org.apache.ignite.lang.ErrorGroups.DisasterRecovery.REMOTE_NODE_ERR;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import
org.apache.ignite.internal.table.distributed.disaster.exceptions.DisasterRecoveryException;
+
+/** Used to poll statuses of multi node operations from other nodes. */
+class MultiNodeOperations {
+ private final Map<UUID, CompletableFuture<Void>> operationsById = new
ConcurrentHashMap<>();
+
+ private final AtomicBoolean shouldPoll = new AtomicBoolean(true);
Review Comment:
I think we can get rid of it.
##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java:
##########
@@ -312,5 +312,11 @@ interface DisasterRecoveryMessages {
/** Message type for disaster recovery request forwarding response. */
short DISASTER_RECOVERY_RESPONSE = 111;
+
+ /** Message type for disaster recovery status request. */
Review Comment:
Missing reference to message.
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java:
##########
@@ -1151,7 +1183,97 @@ private CompletableFuture<Void>
processNewRequest(DisasterRecoveryRequest reques
metaStorageManager.put(RECOVERY_TRIGGER_KEY, serializedRequest);
}
- return operationFuture;
+ return operationFuture.thenCompose(v -> {
+ if (request.type() == DisasterRecoveryRequestType.MULTI_NODE) {
+ return allOf(getNodeNames(request.nodeNames())
+ .stream()
+ .map(nodeName -> addMultiNodeOperation(nodeName,
operationId))
+ .toArray(CompletableFuture[]::new));
+ } else {
+ return nullCompletedFuture();
+ }
+ });
+ }
+
+ /** If request node names is empty, returns all nodes in the logical
topology. */
+ private Collection<String> getNodeNames(Set<String> requestNodeNames) {
+ if (requestNodeNames.isEmpty()) {
+ return dzManager.logicalTopology().stream()
+ .map(NodeWithAttributes::nodeName)
+ .collect(toSet());
+ } else {
+ return requestNodeNames;
+ }
+ }
+
+ private CompletableFuture<Void> addMultiNodeOperation(String nodeName,
UUID operationId) {
+ CompletableFuture<Void> result = new
CompletableFuture<Void>().orTimeout(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+
+ operationsByNodeName.compute(nodeName, (node, operations) -> {
+ if (operations == null) {
+ operations = new MultiNodeOperations();
+ }
+
+ operations.add(operationId, result);
+
+ return operations;
+ });
+
+ return result;
+ }
+
+ /**
+ * Sends status request to nodes participating in MULTI_NODE operations
and completes corresponding futures in
+ * {@link #operationsByNodeName}. Doesn't send identical requests.
+ */
+ private void pollMultiNodeOperations() {
+ for (Map.Entry<String, MultiNodeOperations> entry :
operationsByNodeName.entrySet()) {
+ // We already sent the latest version, skip.
+ if (!entry.getValue().startPollingIfNeeded()) {
+ continue;
+ }
+
+ String nodeName = entry.getKey();
+ MultiNodeOperations operations = entry.getValue();
+
+ DisasterRecoveryStatusRequestMessage request =
PARTITION_REPLICATION_MESSAGES_FACTORY.disasterRecoveryStatusRequestMessage()
+ .operationIds(operations.operationsIds())
+ .build();
+
+ messagingService.invoke(nodeName, request,
TimeUnit.SECONDS.toMillis(1))
+ .whenComplete((message, ex) -> {
+ if (ex != null) {
+ if (hasCause(ex, TimeoutException.class)) {
Review Comment:
What about `NodeStoppingException` ?
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java:
##########
@@ -226,9 +238,17 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
private final Map<Integer, PartitionStatesMetricSource>
metricSourceByTableId = new ConcurrentHashMap<>();
+ private final Map<String, MultiNodeOperations> operationsByNodeName = new
ConcurrentHashMap<>();
Review Comment:
When and where do we get rid of objects in the map?
##########
modules/cli/src/main/java/org/apache/ignite/internal/cli/call/recovery/restart/RestartPartitionsCall.java:
##########
@@ -34,13 +34,16 @@
public class RestartPartitionsCall implements Call<RestartPartitionsCallInput,
String> {
private final ApiClientFactory clientFactory;
+ /** Timeout used for disaster recovery operations. */
+ private static final int TIMEOUT_MILLIS = 30_000;
Review Comment:
Why does the variable have such a value, why is there an assumption that it
should be sufficient?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]