This is an automated email from the ASF dual-hosted git repository. sanpwc pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new ce4050397f IGNITE-22001 Throw specific exception if during writeTableAssignmentsToMetastore process was interrupted (#3575) ce4050397f is described below commit ce4050397f04101618b5706571b43ab37ef9a0a0 Author: Mikhail Efremov <jakuten...@gmail.com> AuthorDate: Thu Apr 18 19:14:57 2024 +0600 IGNITE-22001 Throw specific exception if during writeTableAssignmentsToMetastore process was interrupted (#3575) --- .../ignite/internal/affinity/Assignments.java | 11 ++ .../internal/table/distributed/TableManager.java | 117 +++++++++++++-------- .../table/distributed/TableManagerTest.java | 37 +++++++ 3 files changed, 124 insertions(+), 41 deletions(-) diff --git a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/Assignments.java b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/Assignments.java index 60d2c14681..d0498b8047 100644 --- a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/Assignments.java +++ b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/Assignments.java @@ -24,6 +24,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Set; import org.apache.ignite.internal.tostring.IgniteToStringInclude; import org.apache.ignite.internal.tostring.S; @@ -135,4 +136,14 @@ public class Assignments implements Serializable { public String toString() { return S.toString(this); } + + /** + * Creates a string representation of the given assignments list for logging usage purpose mostly. + * + * @param assignments List of assignments to present as string. + * @return String representation of the given assignments list. + */ + public static String assignmentListToString(List<Assignments> assignments) { + return S.toString(assignments, (sb, e, i) -> sb.app(i).app('=').app(e.nodes())); + } } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java index 4e8084e26b..2798469056 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java @@ -195,7 +195,6 @@ import org.apache.ignite.internal.table.distributed.storage.TableRaftServiceImpl import org.apache.ignite.internal.table.distributed.wrappers.ExecutorInclinedPlacementDriver; import org.apache.ignite.internal.thread.IgniteThreadFactory; import org.apache.ignite.internal.thread.NamedThreadFactory; -import org.apache.ignite.internal.tostring.S; import org.apache.ignite.internal.tx.HybridTimestampTracker; import org.apache.ignite.internal.tx.LockManager; import org.apache.ignite.internal.tx.TxManager; @@ -210,6 +209,7 @@ import org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedS import org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbTableStorage; import org.apache.ignite.internal.util.CompletableFutures; import org.apache.ignite.internal.util.Cursor; +import org.apache.ignite.internal.util.ExceptionUtils; import org.apache.ignite.internal.util.IgniteSpinBusyLock; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.Lazy; @@ -697,7 +697,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { * @param assignmentsFuture Assignments future, to get the assignments that should be written. * @return Real list of assignments. */ - private CompletableFuture<List<Assignments>> writeTableAssignmentsToMetastore( + public CompletableFuture<List<Assignments>> writeTableAssignmentsToMetastore( int tableId, CompletableFuture<List<Assignments>> assignmentsFuture ) { @@ -707,20 +707,37 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { List<Operation> partitionAssignments = new ArrayList<>(newAssignments.size()); for (int i = 0; i < newAssignments.size(); i++) { - partitionAssignments.add(put( - stablePartAssignmentsKey( - new TablePartitionId(tableId, i)), - newAssignments.get(i).toBytes())); + ByteArray stableAssignmentsKey = stablePartAssignmentsKey(new TablePartitionId(tableId, i)); + byte[] anAssignment = newAssignments.get(i).toBytes(); + Operation op = put(stableAssignmentsKey, anAssignment); + partitionAssignments.add(op); } Condition condition = notExists(new ByteArray(partitionAssignments.get(0).key())); return metaStorageMgr .invoke(condition, partitionAssignments, Collections.emptyList()) + .handle((invokeResult, e) -> { + if (e != null) { + LOG.error( + "Couldn't write assignments [assignmentsList={}] to metastore during invoke.", + e, + Assignments.assignmentListToString(newAssignments) + ); + + throw ExceptionUtils.sneakyThrow(e); + } + + return invokeResult; + }) .thenCompose(invokeResult -> { if (invokeResult) { - LOG.info(IgniteStringFormatter.format("Assignments calculated from data nodes are successfully written" - + " to meta storage [tableId={}, assignments={}]", tableId, assignmentListToString(newAssignments))); + LOG.info( + "Assignments calculated from data nodes are successfully written to meta storage" + + " [tableId={}, assignments={}].", + tableId, + Assignments.assignmentListToString(newAssignments) + ); return completedFuture(newAssignments); } else { @@ -745,17 +762,24 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { realAssignments.add(real); } - LOG.info(IgniteStringFormatter.format("Assignments picked up from meta storage [tableId={}, " - + "assignments={}]", tableId, assignmentListToString(realAssignments))); + LOG.info( + "Assignments picked up from meta storage [tableId={}, assignments={}].", + tableId, + Assignments.assignmentListToString(realAssignments) + ); return realAssignments; }); } }) - .exceptionally(e -> { - LOG.error("Couldn't write assignments to metastore", e); + .handle((realAssignments, e) -> { + if (e != null) { + LOG.error("Couldn't get assignments from metastore for table [tableId={}].", e, tableId); - return null; + throw ExceptionUtils.sneakyThrow(e); + } + + return realAssignments; }); }); } @@ -1240,32 +1264,16 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { ) { return inBusyLockAsync(busyLock, () -> { int tableId = tableDescriptor.id(); - int zoneId = tableDescriptor.zoneId(); // Retrieve descriptor during synchronous call, before the previous catalog version could be concurrently compacted. CatalogZoneDescriptor zoneDescriptor = getZoneDescriptor(tableDescriptor, catalogVersion); - CompletableFuture<List<Assignments>> assignmentsFuture; - - // Check if the table already has assignments in the meta storage locally. - // So, it means, that it is a recovery process and we should use the meta storage local assignments instead of calculation - // of the new ones. - if (partitionAssignmentsGetLocally(metaStorageMgr, tableId, 0, causalityToken) != null) { - assignmentsFuture = completedFuture( - tableAssignmentsGetLocally(metaStorageMgr, tableId, zoneDescriptor.partitions(), causalityToken)); - } else { - assignmentsFuture = distributionZoneManager.dataNodes(causalityToken, catalogVersion, zoneId) - .thenApply(dataNodes -> AffinityUtils.calculateAssignments( - dataNodes, - zoneDescriptor.partitions(), - zoneDescriptor.replicas() - ).stream().map(Assignments::of).collect(toList())); - - assignmentsFuture.thenAccept(assignmentsList -> { - LOG.info(IgniteStringFormatter.format("Assignments calculated from data nodes [table={}, tableId={}, assignments={}, " - + "revision={}]", tableDescriptor.name(), tableId, assignmentListToString(assignmentsList), causalityToken)); - }); - } + CompletableFuture<List<Assignments>> assignmentsFuture = getOrCreateAssignments( + tableDescriptor, + zoneDescriptor, + causalityToken, + catalogVersion + ); CompletableFuture<List<Assignments>> assignmentsFutureAfterInvoke = writeTableAssignmentsToMetastore(tableId, assignmentsFuture); @@ -1396,13 +1404,40 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { } /** - * Creates a string representation of the given assignments list to use it for logging. - * - * @param assignments List of assignments. - * @return String representation of the given assignments list to use it for logging. + * Check if the table already has assignments in the meta storage locally. + * So, it means, that it is a recovery process and we should use the meta storage local assignments instead of calculation + * of the new ones. */ - private static String assignmentListToString(List<Assignments> assignments) { - return S.toString(assignments, (sb, e, i) -> sb.app(i).app('=').app(e.nodes())); + private CompletableFuture<List<Assignments>> getOrCreateAssignments( + CatalogTableDescriptor tableDescriptor, + CatalogZoneDescriptor zoneDescriptor, + long causalityToken, + int catalogVersion + ) { + int tableId = tableDescriptor.id(); + CompletableFuture<List<Assignments>> assignmentsFuture; + + if (partitionAssignmentsGetLocally(metaStorageMgr, tableId, 0, causalityToken) != null) { + assignmentsFuture = completedFuture( + tableAssignmentsGetLocally(metaStorageMgr, tableId, zoneDescriptor.partitions(), causalityToken)); + } else { + assignmentsFuture = distributionZoneManager.dataNodes(causalityToken, catalogVersion, zoneDescriptor.id()) + .thenApply(dataNodes -> AffinityUtils.calculateAssignments( + dataNodes, + zoneDescriptor.partitions(), + zoneDescriptor.replicas() + ).stream().map(Assignments::of).collect(toList())); + + assignmentsFuture.thenAccept(assignmentsList -> LOG.info( + "Assignments calculated from data nodes [table={}, tableId={}, assignments={}, revision={}]", + tableDescriptor.name(), + tableId, + Assignments.assignmentListToString(assignmentsList), + causalityToken + )); + } + + return assignmentsFuture; } /** diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java index 1c2c3a4d0c..ada4b72b3b 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java @@ -70,9 +70,12 @@ import java.util.concurrent.Phaser; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.function.Consumer; import java.util.function.LongFunction; import org.apache.ignite.internal.affinity.AffinityUtils; +import org.apache.ignite.internal.affinity.Assignment; +import org.apache.ignite.internal.affinity.Assignments; import org.apache.ignite.internal.catalog.CatalogManager; import org.apache.ignite.internal.catalog.CatalogTestUtils; import org.apache.ignite.internal.catalog.commands.ColumnParams; @@ -341,6 +344,40 @@ public class TableManagerTest extends IgniteAbstractTest { assertSame(table, tblManagerFut.join().table(DYNAMIC_TABLE_NAME)); } + /** + * Testing TableManager#writeTableAssignmentsToMetastore for 2 exceptional scenarios: + * 1. the method was interrupted in outer future before invoke calling completion. + * 2. the method was interrupted in inner metastore's future when the result of invocation had gotten, but after error happens; + * + * @throws Exception if something goes wrong on mocks creation. + */ + @Test + public void testWriteTableAssignmentsToMetastoreExceptionally() throws Exception { + TableViewInternal table = mockManagersAndCreateTable(DYNAMIC_TABLE_NAME, tblManagerFut); + int tableId = table.tableId(); + TableManager tableManager = tblManagerFut.join(); + List<Assignments> assignmentsList = List.of(Assignments.of(Assignment.forPeer(node.id()))); + + // the first case scenario + CompletableFuture<List<Assignments>> assignmentsFuture = new CompletableFuture<>(); + var outerExceptionMsg = "Outer future is interrupted"; + assignmentsFuture.completeExceptionally(new TimeoutException(outerExceptionMsg)); + CompletableFuture<List<Assignments>> writtenAssignmentsFuture = tableManager + .writeTableAssignmentsToMetastore(tableId, assignmentsFuture); + assertTrue(writtenAssignmentsFuture.isCompletedExceptionally()); + assertThrowsWithCause(writtenAssignmentsFuture::get, TimeoutException.class, outerExceptionMsg); + + // the second case scenario + assignmentsFuture = completedFuture(assignmentsList); + CompletableFuture<Boolean> invokeTimeoutFuture = new CompletableFuture<>(); + var innerExceptionMsg = "Inner future is interrupted"; + invokeTimeoutFuture.completeExceptionally(new TimeoutException(innerExceptionMsg)); + when(msm.invoke(any(), any(List.class), any(List.class))).thenReturn(invokeTimeoutFuture); + writtenAssignmentsFuture = tableManager.writeTableAssignmentsToMetastore(tableId, assignmentsFuture); + assertTrue(writtenAssignmentsFuture.isCompletedExceptionally()); + assertThrowsWithCause(writtenAssignmentsFuture::get, TimeoutException.class, innerExceptionMsg); + } + /** * Tests drop a table through public API. *