This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new ce4050397f IGNITE-22001 Throw specific exception if during 
writeTableAssignmentsToMetastore process was interrupted (#3575)
ce4050397f is described below

commit ce4050397f04101618b5706571b43ab37ef9a0a0
Author: Mikhail Efremov <jakuten...@gmail.com>
AuthorDate: Thu Apr 18 19:14:57 2024 +0600

    IGNITE-22001 Throw specific exception if during 
writeTableAssignmentsToMetastore process was interrupted (#3575)
---
 .../ignite/internal/affinity/Assignments.java      |  11 ++
 .../internal/table/distributed/TableManager.java   | 117 +++++++++++++--------
 .../table/distributed/TableManagerTest.java        |  37 +++++++
 3 files changed, 124 insertions(+), 41 deletions(-)

diff --git 
a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/Assignments.java
 
b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/Assignments.java
index 60d2c14681..d0498b8047 100644
--- 
a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/Assignments.java
+++ 
b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/Assignments.java
@@ -24,6 +24,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 import org.apache.ignite.internal.tostring.IgniteToStringInclude;
 import org.apache.ignite.internal.tostring.S;
@@ -135,4 +136,14 @@ public class Assignments implements Serializable {
     public String toString() {
         return S.toString(this);
     }
+
+    /**
+     * Creates a string representation of the given assignments list for 
logging usage purpose mostly.
+     *
+     * @param assignments List of assignments to present as string.
+     * @return String representation of the given assignments list.
+     */
+    public static String assignmentListToString(List<Assignments> assignments) 
{
+        return S.toString(assignments, (sb, e, i) -> 
sb.app(i).app('=').app(e.nodes()));
+    }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 4e8084e26b..2798469056 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -195,7 +195,6 @@ import 
org.apache.ignite.internal.table.distributed.storage.TableRaftServiceImpl
 import 
org.apache.ignite.internal.table.distributed.wrappers.ExecutorInclinedPlacementDriver;
 import org.apache.ignite.internal.thread.IgniteThreadFactory;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
-import org.apache.ignite.internal.tostring.S;
 import org.apache.ignite.internal.tx.HybridTimestampTracker;
 import org.apache.ignite.internal.tx.LockManager;
 import org.apache.ignite.internal.tx.TxManager;
@@ -210,6 +209,7 @@ import 
org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedS
 import 
org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbTableStorage;
 import org.apache.ignite.internal.util.CompletableFutures;
 import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.ExceptionUtils;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.Lazy;
@@ -697,7 +697,7 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
      * @param assignmentsFuture Assignments future, to get the assignments 
that should be written.
      * @return Real list of assignments.
      */
-    private CompletableFuture<List<Assignments>> 
writeTableAssignmentsToMetastore(
+    public CompletableFuture<List<Assignments>> 
writeTableAssignmentsToMetastore(
             int tableId,
             CompletableFuture<List<Assignments>> assignmentsFuture
     ) {
@@ -707,20 +707,37 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
             List<Operation> partitionAssignments = new 
ArrayList<>(newAssignments.size());
 
             for (int i = 0; i < newAssignments.size(); i++) {
-                partitionAssignments.add(put(
-                        stablePartAssignmentsKey(
-                                new TablePartitionId(tableId, i)),
-                        newAssignments.get(i).toBytes()));
+                ByteArray stableAssignmentsKey = stablePartAssignmentsKey(new 
TablePartitionId(tableId, i));
+                byte[] anAssignment = newAssignments.get(i).toBytes();
+                Operation op = put(stableAssignmentsKey, anAssignment);
+                partitionAssignments.add(op);
             }
 
             Condition condition = notExists(new 
ByteArray(partitionAssignments.get(0).key()));
 
             return metaStorageMgr
                     .invoke(condition, partitionAssignments, 
Collections.emptyList())
+                    .handle((invokeResult, e) -> {
+                        if (e != null) {
+                            LOG.error(
+                                    "Couldn't write assignments 
[assignmentsList={}] to metastore during invoke.",
+                                    e,
+                                    
Assignments.assignmentListToString(newAssignments)
+                            );
+
+                            throw ExceptionUtils.sneakyThrow(e);
+                        }
+
+                        return invokeResult;
+                    })
                     .thenCompose(invokeResult -> {
                         if (invokeResult) {
-                            LOG.info(IgniteStringFormatter.format("Assignments 
calculated from data nodes are successfully written"
-                                    + " to meta storage [tableId={}, 
assignments={}]", tableId, assignmentListToString(newAssignments)));
+                            LOG.info(
+                                    "Assignments calculated from data nodes 
are successfully written to meta storage"
+                                            + " [tableId={}, assignments={}].",
+                                    tableId,
+                                    
Assignments.assignmentListToString(newAssignments)
+                            );
 
                             return completedFuture(newAssignments);
                         } else {
@@ -745,17 +762,24 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                                     realAssignments.add(real);
                                 }
 
-                                
LOG.info(IgniteStringFormatter.format("Assignments picked up from meta storage 
[tableId={}, "
-                                        + "assignments={}]", tableId, 
assignmentListToString(realAssignments)));
+                                LOG.info(
+                                        "Assignments picked up from meta 
storage [tableId={}, assignments={}].",
+                                        tableId,
+                                        
Assignments.assignmentListToString(realAssignments)
+                                );
 
                                 return realAssignments;
                             });
                         }
                     })
-                    .exceptionally(e -> {
-                        LOG.error("Couldn't write assignments to metastore", 
e);
+                    .handle((realAssignments, e) -> {
+                        if (e != null) {
+                            LOG.error("Couldn't get assignments from metastore 
for table [tableId={}].", e, tableId);
 
-                        return null;
+                            throw ExceptionUtils.sneakyThrow(e);
+                        }
+
+                        return realAssignments;
                     });
         });
     }
@@ -1240,32 +1264,16 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
     ) {
         return inBusyLockAsync(busyLock, () -> {
             int tableId = tableDescriptor.id();
-            int zoneId = tableDescriptor.zoneId();
 
             // Retrieve descriptor during synchronous call, before the 
previous catalog version could be concurrently compacted.
             CatalogZoneDescriptor zoneDescriptor = 
getZoneDescriptor(tableDescriptor, catalogVersion);
 
-            CompletableFuture<List<Assignments>> assignmentsFuture;
-
-            // Check if the table already has assignments in the meta storage 
locally.
-            // So, it means, that it is a recovery process and we should use 
the meta storage local assignments instead of calculation
-            // of the new ones.
-            if (partitionAssignmentsGetLocally(metaStorageMgr, tableId, 0, 
causalityToken) != null) {
-                assignmentsFuture = completedFuture(
-                        tableAssignmentsGetLocally(metaStorageMgr, tableId, 
zoneDescriptor.partitions(), causalityToken));
-            } else {
-                assignmentsFuture = 
distributionZoneManager.dataNodes(causalityToken, catalogVersion, zoneId)
-                        .thenApply(dataNodes -> 
AffinityUtils.calculateAssignments(
-                                dataNodes,
-                                zoneDescriptor.partitions(),
-                                zoneDescriptor.replicas()
-                        ).stream().map(Assignments::of).collect(toList()));
-
-                assignmentsFuture.thenAccept(assignmentsList -> {
-                    LOG.info(IgniteStringFormatter.format("Assignments 
calculated from data nodes [table={}, tableId={}, assignments={}, "
-                            + "revision={}]", tableDescriptor.name(), tableId, 
assignmentListToString(assignmentsList), causalityToken));
-                });
-            }
+            CompletableFuture<List<Assignments>> assignmentsFuture = 
getOrCreateAssignments(
+                    tableDescriptor,
+                    zoneDescriptor,
+                    causalityToken,
+                    catalogVersion
+            );
 
             CompletableFuture<List<Assignments>> assignmentsFutureAfterInvoke =
                     writeTableAssignmentsToMetastore(tableId, 
assignmentsFuture);
@@ -1396,13 +1404,40 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
     }
 
     /**
-     * Creates a string representation of the given assignments list to use it 
for logging.
-     *
-     * @param assignments List of assignments.
-     * @return String representation of the given assignments list to use it 
for logging.
+     * Check if the table already has assignments in the meta storage locally.
+     * So, it means, that it is a recovery process and we should use the meta 
storage local assignments instead of calculation
+     * of the new ones.
      */
-    private static String assignmentListToString(List<Assignments> 
assignments) {
-        return S.toString(assignments, (sb, e, i) -> 
sb.app(i).app('=').app(e.nodes()));
+    private CompletableFuture<List<Assignments>> getOrCreateAssignments(
+            CatalogTableDescriptor tableDescriptor,
+            CatalogZoneDescriptor zoneDescriptor,
+            long causalityToken,
+            int catalogVersion
+    ) {
+        int tableId = tableDescriptor.id();
+        CompletableFuture<List<Assignments>> assignmentsFuture;
+
+        if (partitionAssignmentsGetLocally(metaStorageMgr, tableId, 0, 
causalityToken) != null) {
+            assignmentsFuture = completedFuture(
+                    tableAssignmentsGetLocally(metaStorageMgr, tableId, 
zoneDescriptor.partitions(), causalityToken));
+        } else {
+            assignmentsFuture = 
distributionZoneManager.dataNodes(causalityToken, catalogVersion, 
zoneDescriptor.id())
+                    .thenApply(dataNodes -> AffinityUtils.calculateAssignments(
+                            dataNodes,
+                            zoneDescriptor.partitions(),
+                            zoneDescriptor.replicas()
+                    ).stream().map(Assignments::of).collect(toList()));
+
+            assignmentsFuture.thenAccept(assignmentsList -> LOG.info(
+                    "Assignments calculated from data nodes [table={}, 
tableId={}, assignments={}, revision={}]",
+                    tableDescriptor.name(),
+                    tableId,
+                    Assignments.assignmentListToString(assignmentsList),
+                    causalityToken
+            ));
+        }
+
+        return assignmentsFuture;
     }
 
     /**
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index 1c2c3a4d0c..ada4b72b3b 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -70,9 +70,12 @@ import java.util.concurrent.Phaser;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.function.Consumer;
 import java.util.function.LongFunction;
 import org.apache.ignite.internal.affinity.AffinityUtils;
+import org.apache.ignite.internal.affinity.Assignment;
+import org.apache.ignite.internal.affinity.Assignments;
 import org.apache.ignite.internal.catalog.CatalogManager;
 import org.apache.ignite.internal.catalog.CatalogTestUtils;
 import org.apache.ignite.internal.catalog.commands.ColumnParams;
@@ -341,6 +344,40 @@ public class TableManagerTest extends IgniteAbstractTest {
         assertSame(table, tblManagerFut.join().table(DYNAMIC_TABLE_NAME));
     }
 
+    /**
+     * Testing TableManager#writeTableAssignmentsToMetastore for 2 exceptional 
scenarios:
+     * 1. the method was interrupted in outer future before invoke calling 
completion.
+     * 2. the method was interrupted in inner metastore's future when the 
result of invocation had gotten, but after error happens;
+     *
+     * @throws Exception if something goes wrong on mocks creation.
+     */
+    @Test
+    public void testWriteTableAssignmentsToMetastoreExceptionally() throws 
Exception {
+        TableViewInternal table = 
mockManagersAndCreateTable(DYNAMIC_TABLE_NAME, tblManagerFut);
+        int tableId = table.tableId();
+        TableManager tableManager = tblManagerFut.join();
+        List<Assignments> assignmentsList = 
List.of(Assignments.of(Assignment.forPeer(node.id())));
+
+        // the first case scenario
+        CompletableFuture<List<Assignments>> assignmentsFuture = new 
CompletableFuture<>();
+        var outerExceptionMsg = "Outer future is interrupted";
+        assignmentsFuture.completeExceptionally(new 
TimeoutException(outerExceptionMsg));
+        CompletableFuture<List<Assignments>> writtenAssignmentsFuture = 
tableManager
+                .writeTableAssignmentsToMetastore(tableId, assignmentsFuture);
+        assertTrue(writtenAssignmentsFuture.isCompletedExceptionally());
+        assertThrowsWithCause(writtenAssignmentsFuture::get, 
TimeoutException.class, outerExceptionMsg);
+
+        // the second case scenario
+        assignmentsFuture = completedFuture(assignmentsList);
+        CompletableFuture<Boolean> invokeTimeoutFuture = new 
CompletableFuture<>();
+        var innerExceptionMsg = "Inner future is interrupted";
+        invokeTimeoutFuture.completeExceptionally(new 
TimeoutException(innerExceptionMsg));
+        when(msm.invoke(any(), any(List.class), 
any(List.class))).thenReturn(invokeTimeoutFuture);
+        writtenAssignmentsFuture = 
tableManager.writeTableAssignmentsToMetastore(tableId, assignmentsFuture);
+        assertTrue(writtenAssignmentsFuture.isCompletedExceptionally());
+        assertThrowsWithCause(writtenAssignmentsFuture::get, 
TimeoutException.class, innerExceptionMsg);
+    }
+
     /**
      * Tests drop a table through public API.
      *

Reply via email to