tkalkirill commented on code in PR #7156:
URL: https://github.com/apache/ignite-3/pull/7156#discussion_r2602973271


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java:
##########
@@ -279,6 +299,13 @@ public CompletableFuture<Void> startAsync(ComponentContext 
componentContext) {
 
             registerMetricSources();
 
+            pollingFuture = threadPool.scheduleWithFixedDelay(
+                    this::pollMultiNodeOperations,

Review Comment:
   `pollMultiNodeOperations` is not called in `busyLock`.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java:
##########
@@ -177,11 +183,17 @@ public class DisasterRecoveryManager implements 
IgniteComponent, SystemViewProvi
      */
     private static final int CATCH_UP_THRESHOLD = 100;
 
+    /** How often to check status of running multi node operations. */
+    private static final long POLLING_RATE_MILLIS = 100;
+
+    /** Multi node operation was successfully completed. */
+    private static final String COMPLETED_STATUS = "COMPLETED";
+
     /** Busy lock to stop synchronously. */
     private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
 
     /** Thread pool executor for async parts. */
-    private final ExecutorService threadPool;
+    private final ScheduledExecutorService threadPool;

Review Comment:
   We need to use a separate pool because we should try not to slow down other 
tasks from other components.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java:
##########
@@ -1151,7 +1183,97 @@ private CompletableFuture<Void> 
processNewRequest(DisasterRecoveryRequest reques
             metaStorageManager.put(RECOVERY_TRIGGER_KEY, serializedRequest);
         }
 
-        return operationFuture;
+        return operationFuture.thenCompose(v -> {
+            if (request.type() == DisasterRecoveryRequestType.MULTI_NODE) {
+                return allOf(getNodeNames(request.nodeNames())
+                        .stream()
+                        .map(nodeName -> addMultiNodeOperation(nodeName, 
operationId))
+                        .toArray(CompletableFuture[]::new));
+            } else {
+                return nullCompletedFuture();
+            }
+        });
+    }
+
+    /** If request node names is empty, returns all nodes in the logical 
topology. */
+    private Collection<String> getNodeNames(Set<String> requestNodeNames) {
+        if (requestNodeNames.isEmpty()) {

Review Comment:
   I didn't notice the mention in the description that an empty list means all 
nodes.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java:
##########
@@ -1151,7 +1183,97 @@ private CompletableFuture<Void> 
processNewRequest(DisasterRecoveryRequest reques
             metaStorageManager.put(RECOVERY_TRIGGER_KEY, serializedRequest);
         }
 
-        return operationFuture;
+        return operationFuture.thenCompose(v -> {
+            if (request.type() == DisasterRecoveryRequestType.MULTI_NODE) {
+                return allOf(getNodeNames(request.nodeNames())
+                        .stream()
+                        .map(nodeName -> addMultiNodeOperation(nodeName, 
operationId))
+                        .toArray(CompletableFuture[]::new));
+            } else {
+                return nullCompletedFuture();
+            }
+        });
+    }
+
+    /** If request node names is empty, returns all nodes in the logical 
topology. */
+    private Collection<String> getNodeNames(Set<String> requestNodeNames) {
+        if (requestNodeNames.isEmpty()) {
+            return dzManager.logicalTopology().stream()
+                    .map(NodeWithAttributes::nodeName)
+                    .collect(toSet());
+        } else {
+            return requestNodeNames;
+        }
+    }
+
+    private CompletableFuture<Void> addMultiNodeOperation(String nodeName, 
UUID operationId) {
+        CompletableFuture<Void> result = new 
CompletableFuture<Void>().orTimeout(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+
+        operationsByNodeName.compute(nodeName, (node, operations) -> {
+            if (operations == null) {
+                operations = new MultiNodeOperations();
+            }
+
+            operations.add(operationId, result);
+
+            return operations;
+        });
+
+        return result;
+    }
+
+    /**
+     * Sends status request to nodes participating in MULTI_NODE operations 
and completes corresponding futures in
+     * {@link #operationsByNodeName}. Doesn't send identical requests.
+    */
+    private void pollMultiNodeOperations() {
+        for (Map.Entry<String, MultiNodeOperations> entry : 
operationsByNodeName.entrySet()) {
+            // We already sent the latest version, skip.
+            if (!entry.getValue().startPollingIfNeeded()) {
+                continue;
+            }
+
+            String nodeName = entry.getKey();
+            MultiNodeOperations operations = entry.getValue();
+
+            DisasterRecoveryStatusRequestMessage request = 
PARTITION_REPLICATION_MESSAGES_FACTORY.disasterRecoveryStatusRequestMessage()
+                            .operationIds(operations.operationsIds())
+                            .build();
+
+            messagingService.invoke(nodeName, request, 
TimeUnit.SECONDS.toMillis(1))
+                    .whenComplete((message, ex) -> {
+                        if (ex != null) {
+                            if (hasCause(ex, TimeoutException.class)) {
+                                operations.triggerNextRequest();
+                            } else {
+                                operations.exceptionally(ex);
+                            }
+
+                            return;
+                        }
+
+                        DisasterRecoveryStatusResponseMessage responseMessage 
= (DisasterRecoveryStatusResponseMessage) message;
+
+                        for (Map.Entry<UUID, String> operationResponse : 
responseMessage.operationStatuses().entrySet()) {
+                            UUID operationId = operationResponse.getKey();
+                            String operationStatus = 
operationResponse.getValue();
+
+                            if (!COMPLETED_STATUS.equals(operationStatus)) {
+                                operations.get(operationId)

Review Comment:
   It looks like completing a future and removing it from an object can be done 
with one method.



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java:
##########
@@ -312,5 +312,11 @@ interface DisasterRecoveryMessages {
 
         /** Message type for disaster recovery request forwarding response. */
         short DISASTER_RECOVERY_RESPONSE = 111;
+
+        /** Message type for disaster recovery status request. */
+        short DISASTER_RECOVERY_STATUS_REQUEST = 112;
+
+        /** Message type for disaster recovery status response. */

Review Comment:
   Missing reference to message.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java:
##########
@@ -1151,7 +1183,97 @@ private CompletableFuture<Void> 
processNewRequest(DisasterRecoveryRequest reques
             metaStorageManager.put(RECOVERY_TRIGGER_KEY, serializedRequest);
         }
 
-        return operationFuture;
+        return operationFuture.thenCompose(v -> {
+            if (request.type() == DisasterRecoveryRequestType.MULTI_NODE) {
+                return allOf(getNodeNames(request.nodeNames())
+                        .stream()
+                        .map(nodeName -> addMultiNodeOperation(nodeName, 
operationId))
+                        .toArray(CompletableFuture[]::new));
+            } else {
+                return nullCompletedFuture();
+            }
+        });
+    }
+
+    /** If request node names is empty, returns all nodes in the logical 
topology. */
+    private Collection<String> getNodeNames(Set<String> requestNodeNames) {
+        if (requestNodeNames.isEmpty()) {
+            return dzManager.logicalTopology().stream()
+                    .map(NodeWithAttributes::nodeName)
+                    .collect(toSet());
+        } else {
+            return requestNodeNames;
+        }
+    }
+
+    private CompletableFuture<Void> addMultiNodeOperation(String nodeName, 
UUID operationId) {
+        CompletableFuture<Void> result = new 
CompletableFuture<Void>().orTimeout(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+
+        operationsByNodeName.compute(nodeName, (node, operations) -> {
+            if (operations == null) {
+                operations = new MultiNodeOperations();
+            }
+
+            operations.add(operationId, result);
+
+            return operations;
+        });
+
+        return result;
+    }
+
+    /**
+     * Sends status request to nodes participating in MULTI_NODE operations 
and completes corresponding futures in
+     * {@link #operationsByNodeName}. Doesn't send identical requests.
+    */
+    private void pollMultiNodeOperations() {
+        for (Map.Entry<String, MultiNodeOperations> entry : 
operationsByNodeName.entrySet()) {
+            // We already sent the latest version, skip.
+            if (!entry.getValue().startPollingIfNeeded()) {
+                continue;
+            }
+
+            String nodeName = entry.getKey();
+            MultiNodeOperations operations = entry.getValue();
+
+            DisasterRecoveryStatusRequestMessage request = 
PARTITION_REPLICATION_MESSAGES_FACTORY.disasterRecoveryStatusRequestMessage()
+                            .operationIds(operations.operationsIds())
+                            .build();
+
+            messagingService.invoke(nodeName, request, 
TimeUnit.SECONDS.toMillis(1))
+                    .whenComplete((message, ex) -> {
+                        if (ex != null) {
+                            if (hasCause(ex, TimeoutException.class)) {
+                                operations.triggerNextRequest();
+                            } else {
+                                operations.exceptionally(ex);
+                            }
+
+                            return;
+                        }
+
+                        DisasterRecoveryStatusResponseMessage responseMessage 
= (DisasterRecoveryStatusResponseMessage) message;
+
+                        for (Map.Entry<UUID, String> operationResponse : 
responseMessage.operationStatuses().entrySet()) {
+                            UUID operationId = operationResponse.getKey();
+                            String operationStatus = 
operationResponse.getValue();
+
+                            if (!COMPLETED_STATUS.equals(operationStatus)) {
+                                operations.get(operationId)
+                                        .completeExceptionally(new 
RemoteProcessingDisasterRecoveryException(operationStatus, nodeName));
+                            } else {
+                                operations.get(operationId).complete(null);
+                            }
+
+                            operations.remove(operationId);
+                        }
+
+                        // Not all operations were completed, increase version 
to poll again.
+                        if (!operations.operationsIds().isEmpty()) {
+                            operations.triggerNextRequest();

Review Comment:
   Maybe I should just do this: if there is at least one unfinished operation, 
then continue polling?



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/DisasterRecoveryStatusResponseMessage.java:
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator.network.disaster;
+
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.internal.network.NetworkMessage;
+import org.apache.ignite.internal.network.annotations.Transferable;
+import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup.DisasterRecoveryMessages;
+
+/** Response with statuses of disaster recovery operations completed by the 
node. */
+@Transferable(DisasterRecoveryMessages.DISASTER_RECOVERY_STATUS_RESPONSE)
+public interface DisasterRecoveryStatusResponseMessage extends NetworkMessage {

Review Comment:
   Maybe rename to `OperationStatusesResponseMessage` ?



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/DisasterRecoveryStatusRequestMessage.java:
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator.network.disaster;
+
+import java.util.Collection;
+import java.util.UUID;
+import org.apache.ignite.internal.network.NetworkMessage;
+import org.apache.ignite.internal.network.annotations.Transferable;
+import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup.DisasterRecoveryMessages;
+
+/** Request completed disaster recovery operations. */
+@Transferable(DisasterRecoveryMessages.DISASTER_RECOVERY_STATUS_REQUEST)
+public interface DisasterRecoveryStatusRequestMessage extends NetworkMessage {

Review Comment:
   Maybe rename to `OperationStatuseRequestMessage` ?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/exceptions/RemoteProcessingDisasterRecoveryException.java:
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.disaster.exceptions;
+
+import org.apache.ignite.lang.ErrorGroups.DisasterRecovery;
+
+/** Exception is thrown when remote node encounters an error during disaster 
recovery processing. */
+public class RemoteProcessingDisasterRecoveryException extends 
DisasterRecoveryException {

Review Comment:
   Maybe call it something like `RemoteOperationException` ?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java:
##########
@@ -226,9 +238,17 @@ public class DisasterRecoveryManager implements 
IgniteComponent, SystemViewProvi
 
     private final Map<Integer, PartitionStatesMetricSource> 
metricSourceByTableId = new ConcurrentHashMap<>();
 
+    private final Map<String, MultiNodeOperations> operationsByNodeName = new 
ConcurrentHashMap<>();
+
+    /** Multi node operations completed by current node and not yet reported 
to initiator. */
+    private final Map<UUID, String> operationStatusByOperationId = new 
HashMap<>();

Review Comment:
   I didn't see where and when this map is cleared and why is it not concurrent?
   We also need to cover the case when the node that is supposed to poll falls 
temporarily and permanently.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/MultiNodeOperations.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.disaster;
+
+import static 
org.apache.ignite.lang.ErrorGroups.DisasterRecovery.REMOTE_NODE_ERR;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import 
org.apache.ignite.internal.table.distributed.disaster.exceptions.DisasterRecoveryException;
+
+/** Used to poll statuses of multi node operations from other nodes. */
+class MultiNodeOperations {
+    private final Map<UUID, CompletableFuture<Void>> operationsById = new 
ConcurrentHashMap<>();
+
+    private final AtomicBoolean shouldPoll = new AtomicBoolean(true);

Review Comment:
   I think we can get rid of it.



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java:
##########
@@ -312,5 +312,11 @@ interface DisasterRecoveryMessages {
 
         /** Message type for disaster recovery request forwarding response. */
         short DISASTER_RECOVERY_RESPONSE = 111;
+
+        /** Message type for disaster recovery status request. */

Review Comment:
   Missing reference to message.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java:
##########
@@ -1151,7 +1183,97 @@ private CompletableFuture<Void> 
processNewRequest(DisasterRecoveryRequest reques
             metaStorageManager.put(RECOVERY_TRIGGER_KEY, serializedRequest);
         }
 
-        return operationFuture;
+        return operationFuture.thenCompose(v -> {
+            if (request.type() == DisasterRecoveryRequestType.MULTI_NODE) {
+                return allOf(getNodeNames(request.nodeNames())
+                        .stream()
+                        .map(nodeName -> addMultiNodeOperation(nodeName, 
operationId))
+                        .toArray(CompletableFuture[]::new));
+            } else {
+                return nullCompletedFuture();
+            }
+        });
+    }
+
+    /** If request node names is empty, returns all nodes in the logical 
topology. */
+    private Collection<String> getNodeNames(Set<String> requestNodeNames) {
+        if (requestNodeNames.isEmpty()) {
+            return dzManager.logicalTopology().stream()
+                    .map(NodeWithAttributes::nodeName)
+                    .collect(toSet());
+        } else {
+            return requestNodeNames;
+        }
+    }
+
+    private CompletableFuture<Void> addMultiNodeOperation(String nodeName, 
UUID operationId) {
+        CompletableFuture<Void> result = new 
CompletableFuture<Void>().orTimeout(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+
+        operationsByNodeName.compute(nodeName, (node, operations) -> {
+            if (operations == null) {
+                operations = new MultiNodeOperations();
+            }
+
+            operations.add(operationId, result);
+
+            return operations;
+        });
+
+        return result;
+    }
+
+    /**
+     * Sends status request to nodes participating in MULTI_NODE operations 
and completes corresponding futures in
+     * {@link #operationsByNodeName}. Doesn't send identical requests.
+    */
+    private void pollMultiNodeOperations() {
+        for (Map.Entry<String, MultiNodeOperations> entry : 
operationsByNodeName.entrySet()) {
+            // We already sent the latest version, skip.
+            if (!entry.getValue().startPollingIfNeeded()) {
+                continue;
+            }
+
+            String nodeName = entry.getKey();
+            MultiNodeOperations operations = entry.getValue();
+
+            DisasterRecoveryStatusRequestMessage request = 
PARTITION_REPLICATION_MESSAGES_FACTORY.disasterRecoveryStatusRequestMessage()
+                            .operationIds(operations.operationsIds())
+                            .build();
+
+            messagingService.invoke(nodeName, request, 
TimeUnit.SECONDS.toMillis(1))
+                    .whenComplete((message, ex) -> {
+                        if (ex != null) {
+                            if (hasCause(ex, TimeoutException.class)) {

Review Comment:
   What about `NodeStoppingException` ?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java:
##########
@@ -226,9 +238,17 @@ public class DisasterRecoveryManager implements 
IgniteComponent, SystemViewProvi
 
     private final Map<Integer, PartitionStatesMetricSource> 
metricSourceByTableId = new ConcurrentHashMap<>();
 
+    private final Map<String, MultiNodeOperations> operationsByNodeName = new 
ConcurrentHashMap<>();

Review Comment:
   When and where do we get rid of objects in the map?



##########
modules/cli/src/main/java/org/apache/ignite/internal/cli/call/recovery/restart/RestartPartitionsCall.java:
##########
@@ -34,13 +34,16 @@
 public class RestartPartitionsCall implements Call<RestartPartitionsCallInput, 
String> {
     private final ApiClientFactory clientFactory;
 
+    /** Timeout used for disaster recovery operations. */
+    private static final int TIMEOUT_MILLIS = 30_000;

Review Comment:
   Why does the variable have such a value, why is there an assumption that it 
should be sufficient?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to