This is an automated email from the ASF dual-hosted git repository. apolovtsev pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 5dc20b82c8 IGNITE-20042 Check table existence before executing each operation in an RW transaction (#2721) 5dc20b82c8 is described below commit 5dc20b82c8c0c09f66c4c04a98c74cece530d0ef Author: Roman Puchkovskiy <roman.puchkovs...@gmail.com> AuthorDate: Mon Oct 23 12:06:41 2023 +0400 IGNITE-20042 Check table existence before executing each operation in an RW transaction (#2721) --- .../ignite/internal/{Hacks.java => Kludges.java} | 4 +- .../testframework/BaseIgniteAbstractTest.java | 2 +- .../ignite/internal/replicator/ReplicaManager.java | 2 +- .../internal/readonly/ItReadOnlyTxInPastTest.java | 2 +- .../rebalance/ItRebalanceRecoveryTest.java | 2 +- .../runner/app/ItIgniteNodeRestartTest.java | 2 +- .../schemasync/ItSchemaSyncSingleNodeTest.java | 64 ++- .../internal/table/distributed/TableManager.java | 2 +- .../replicator/CompatValidationResult.java | 29 +- .../replicator/PartitionReplicaListener.java | 148 ++++--- .../replicator/SchemaCompatValidator.java | 68 ++- .../distributed/schema/NonHistoricSchemas.java | 7 +- .../distributed/schema/SchemaSyncService.java | 1 + .../replication/PartitionReplicaListenerTest.java | 461 ++++++++++++++++----- .../apache/ignite/internal/tx/TxManagerTest.java | 2 +- 15 files changed, 614 insertions(+), 182 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/Hacks.java b/modules/core/src/main/java/org/apache/ignite/internal/Kludges.java similarity index 90% rename from modules/core/src/main/java/org/apache/ignite/internal/Hacks.java rename to modules/core/src/main/java/org/apache/ignite/internal/Kludges.java index b9cbdc1498..e1a546d69a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/Hacks.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/Kludges.java @@ -18,9 +18,9 @@ package org.apache.ignite.internal; /** - * Contains hacks needed for the whole codebase. Should be removed as quickly as possible. + * Contains kludges needed for the whole codebase. Should be removed as quickly as possible. */ -public class Hacks { +public class Kludges { // TODO: Remove after IGNITE-20499 is fixed. /** Name of the property overriding idle safe time propagation period (in milliseconds). */ public static final String IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY = "IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS"; diff --git a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/BaseIgniteAbstractTest.java b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/BaseIgniteAbstractTest.java index 98221daeed..7ab1133e35 100644 --- a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/BaseIgniteAbstractTest.java +++ b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/BaseIgniteAbstractTest.java @@ -17,7 +17,7 @@ package org.apache.ignite.internal.testframework; -import static org.apache.ignite.internal.Hacks.IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY; +import static org.apache.ignite.internal.Kludges.IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY; import static org.apache.ignite.internal.lang.IgniteSystemProperties.IGNITE_SENSITIVE_DATA_LOGGING; import static org.apache.ignite.internal.lang.IgniteSystemProperties.getString; import static org.apache.ignite.internal.util.IgniteUtils.monotonicMs; diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java index 91fb9481c6..c9ff648060 100644 --- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java +++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java @@ -19,7 +19,7 @@ package org.apache.ignite.internal.replicator; import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.stream.Collectors.toSet; -import static org.apache.ignite.internal.Hacks.IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY; +import static org.apache.ignite.internal.Kludges.IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY; import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination; import java.io.IOException; diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/readonly/ItReadOnlyTxInPastTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/readonly/ItReadOnlyTxInPastTest.java index 4b1d651127..fdc00052c3 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/readonly/ItReadOnlyTxInPastTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/readonly/ItReadOnlyTxInPastTest.java @@ -17,7 +17,7 @@ package org.apache.ignite.internal.readonly; -import static org.apache.ignite.internal.Hacks.IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY; +import static org.apache.ignite.internal.Kludges.IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY; import static org.apache.ignite.internal.SessionUtils.executeUpdate; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceRecoveryTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceRecoveryTest.java index 20b290dc6a..33800e11db 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceRecoveryTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceRecoveryTest.java @@ -17,7 +17,7 @@ package org.apache.ignite.internal.rebalance; -import static org.apache.ignite.internal.Hacks.IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY; +import static org.apache.ignite.internal.Kludges.IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY; import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java index 751baf57f3..2b61381301 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java @@ -19,7 +19,7 @@ package org.apache.ignite.internal.runner.app; import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.concurrent.CompletableFuture.failedFuture; -import static org.apache.ignite.internal.Hacks.IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY; +import static org.apache.ignite.internal.Kludges.IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY; import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast; diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncSingleNodeTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncSingleNodeTest.java index 8df0537d46..a2ec535ba4 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncSingleNodeTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncSingleNodeTest.java @@ -37,6 +37,7 @@ import org.apache.ignite.table.Table; import org.apache.ignite.table.Tuple; import org.apache.ignite.tx.Transaction; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; @@ -78,9 +79,7 @@ class ItSchemaSyncSingleNodeTest extends ClusterPerTestIntegrationTest { @ParameterizedTest @EnumSource(Operation.class) void readWriteOperationInTxAfterAlteringSchemaOnTargetTableIsRejected(Operation operation) { - cluster.doInSession(0, session -> { - executeUpdate("CREATE TABLE " + TABLE_NAME + " (id int PRIMARY KEY, val varchar)", session); - }); + createTable(); Table table = node.tables().table(TABLE_NAME); @@ -101,7 +100,7 @@ class ItSchemaSyncSingleNodeTest extends ClusterPerTestIntegrationTest { assertThat( ex.getMessage(), containsString(String.format( - "Table schema was updated after the transaction was started [table=%s, startSchema=1, operationSchema=2", + "Table schema was updated after the transaction was started [table=%s, startSchema=1, operationSchema=2]", tableId )) ); @@ -121,6 +120,12 @@ class ItSchemaSyncSingleNodeTest extends ClusterPerTestIntegrationTest { assertThat(tx.state(), is(TxState.ABORTED)); } + private void createTable() { + cluster.doInSession(0, session -> { + executeUpdate("CREATE TABLE " + TABLE_NAME + " (id int PRIMARY KEY, val varchar)", session); + }); + } + private void alterTable(String tableName) { cluster.doInSession(0, session -> { executeUpdate("ALTER TABLE " + tableName + " ADD COLUMN added int", session); @@ -132,10 +137,10 @@ class ItSchemaSyncSingleNodeTest extends ClusterPerTestIntegrationTest { } private void enlistTableInTransaction(Table table, Transaction tx) { - executeReadOn(table, tx, cluster); + executeRwReadOn(table, tx, cluster); } - private static void executeReadOn(Table table, Transaction tx, Cluster cluster) { + private static void executeRwReadOn(Table table, Transaction tx, Cluster cluster) { cluster.doInSession(0, session -> { executeUpdate("SELECT * FROM " + table.name(), session, tx); }); @@ -180,7 +185,7 @@ class ItSchemaSyncSingleNodeTest extends ClusterPerTestIntegrationTest { SQL_READ { @Override void execute(Table table, Transaction tx, Cluster cluster) { - executeReadOn(table, tx, cluster); + executeRwReadOn(table, tx, cluster); } @Override @@ -218,4 +223,49 @@ class ItSchemaSyncSingleNodeTest extends ClusterPerTestIntegrationTest { assertDoesNotThrow(() -> putInTx(table, tx)); } + + @ParameterizedTest + @EnumSource(Operation.class) + @Disabled("https://issues.apache.org/jira/browse/IGNITE-20680") + void readWriteOperationAfterDroppingTargetTableIsRejected(Operation operation) { + createTable(); + + Table table = node.tables().table(TABLE_NAME); + + putPreExistingValueTo(table); + + InternalTransaction tx = (InternalTransaction) node.transactions().begin(); + + enlistTableInTransaction(table, tx); + + dropTable(TABLE_NAME); + + IgniteException ex; + + int tableId = ((TableImpl) table).tableId(); + + if (operation.sql()) { + ex = assertThrows(IgniteException.class, () -> operation.execute(table, tx, cluster)); + assertThat( + ex.getMessage(), + containsString(String.format("Table was dropped [table=%s]", tableId)) + ); + } else { + ex = assertThrows(IncompatibleSchemaException.class, () -> operation.execute(table, tx, cluster)); + assertThat( + ex.getMessage(), + is(String.format("Table was dropped [table=%s]", tableId)) + ); + } + + assertThat(ex.code(), is(Transactions.TX_INCOMPATIBLE_SCHEMA_ERR)); + + assertThat(tx.state(), is(TxState.ABORTED)); + } + + private void dropTable(String tableName) { + cluster.doInSession(0, session -> { + executeUpdate("DROP TABLE " + tableName, session); + }); + } } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java index 412c5df1c3..82aa20c5ba 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java @@ -950,7 +950,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { txStatePartitionStorage, transactionStateResolver, partitionUpdateHandlers.storageUpdateHandler, - new NonHistoricSchemas(schemaManager), + new NonHistoricSchemas(schemaManager, schemaSyncService), localNode(), schemaSyncService, catalogService, diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/CompatValidationResult.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/CompatValidationResult.java index a6b2e54d45..dfb7e30b4a 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/CompatValidationResult.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/CompatValidationResult.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.table.distributed.replicator; +import org.jetbrains.annotations.Nullable; + /** * Result of a schema compatibility validation. */ @@ -26,7 +28,7 @@ public class CompatValidationResult { private final boolean successful; private final int failedTableId; private final int fromSchemaVersion; - private final int toSchemaVersion; + private final @Nullable Integer toSchemaVersion; /** * Returns a successful validation result. @@ -38,18 +40,29 @@ public class CompatValidationResult { } /** - * Creates a validation result denoting validation failure. + * Creates a validation result denoting incompatible schema change. * * @param failedTableId Table which schema change is incompatible. * @param fromSchemaVersion Version number of the schema from which an incompatible transition tried to be made. * @param toSchemaVersion Version number of the schema to which an incompatible transition tried to be made. * @return A validation result for a failure. */ - public static CompatValidationResult failure(int failedTableId, int fromSchemaVersion, int toSchemaVersion) { + public static CompatValidationResult incompatibleChange(int failedTableId, int fromSchemaVersion, int toSchemaVersion) { return new CompatValidationResult(false, failedTableId, fromSchemaVersion, toSchemaVersion); } - private CompatValidationResult(boolean successful, int failedTableId, int fromSchemaVersion, int toSchemaVersion) { + /** + * Creates a validation result denoting 'table already dropped when commit is made' situation. + * + * @param failedTableId Table which schema change is incompatible. + * @param fromSchemaVersion Version number of the schema from which an incompatible transition tried to be made. + * @return A validation result for a failure. + */ + public static CompatValidationResult tableDropped(int failedTableId, int fromSchemaVersion) { + return new CompatValidationResult(false, failedTableId, fromSchemaVersion, null); + } + + private CompatValidationResult(boolean successful, int failedTableId, int fromSchemaVersion, @Nullable Integer toSchemaVersion) { this.successful = successful; this.failedTableId = failedTableId; this.fromSchemaVersion = fromSchemaVersion; @@ -65,6 +78,13 @@ public class CompatValidationResult { return successful; } + /** + * Returns whether the validation has failed due to table was already dropped at the commit timestamp. + */ + public boolean isTableDropped() { + return !successful && toSchemaVersion == null; + } + /** * Returns ID of the table for which the validation has failed. Should only be called for a failed validation result, * otherwise an exception is thrown. @@ -95,6 +115,7 @@ public class CompatValidationResult { */ public int toSchemaVersion() { assert !successful : "Should not be called on a successful result"; + assert toSchemaVersion != null : "Should not be called when there is no toSchemaVersion"; return toSchemaVersion; } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java index c7b238fcfa..4afcaa4dcf 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java @@ -26,7 +26,6 @@ import static java.util.concurrent.CompletableFuture.failedFuture; import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toMap; -import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp; import static org.apache.ignite.internal.lang.IgniteStringFormatter.format; import static org.apache.ignite.internal.tx.TxState.ABANDONED; import static org.apache.ignite.internal.tx.TxState.ABORTED; @@ -466,9 +465,9 @@ public class PartitionReplicaListener implements ReplicaListener { } } - CompletableFuture<Void> waitForSchemas = waitForSchemasBeforeReading(request); - - return waitForSchemas.thenCompose(unused -> processOperationRequest(request, isPrimary, senderId)); + return waitForSchemasBeforeReading(request) + .thenCompose(unused -> validateTableExistence(request)) + .thenCompose(opStartTimestamp -> processOperationRequest(request, isPrimary, senderId, opStartTimestamp)); } /** @@ -493,7 +492,40 @@ public class PartitionReplicaListener implements ReplicaListener { return tsToWaitForSchemas == null ? completedFuture(null) : schemaSyncService.waitForMetadataCompleteness(tsToWaitForSchemas); } - private CompletableFuture<?> processOperationRequest(ReplicaRequest request, @Nullable Boolean isPrimary, String senderId) { + private CompletableFuture<HybridTimestamp> validateTableExistence(ReplicaRequest request) { + HybridTimestamp opStartTs; + + if (request instanceof ReadWriteScanCloseReplicaRequest) { + // We don't need to validate close request for table existence. + opStartTs = null; + } else if (request instanceof ReadWriteReplicaRequest) { + opStartTs = hybridClock.now(); + } else if (request instanceof ReadOnlyReplicaRequest) { + opStartTs = ((ReadOnlyReplicaRequest) request).readTimestamp(); + } else if (request instanceof ReadOnlyDirectReplicaRequest) { + opStartTs = hybridClock.now(); + } else { + opStartTs = null; + } + + if (opStartTs == null) { + return completedFuture(null); + } + + return schemaSyncService.waitForMetadataCompleteness(opStartTs) + .thenApply(unused -> { + schemaCompatValidator.failIfTableDoesNotExistAt(opStartTs, tableId()); + + return opStartTs; + }); + } + + private CompletableFuture<?> processOperationRequest( + ReplicaRequest request, + @Nullable Boolean isPrimary, + String senderId, + HybridTimestamp opStartTimestamp + ) { if (request instanceof ReadWriteSingleRowReplicaRequest) { var req = (ReadWriteSingleRowReplicaRequest) request; @@ -523,7 +555,7 @@ public class PartitionReplicaListener implements ReplicaListener { if (allElementsAreNull(rows)) { return completedFuture(rows); } else { - return validateAtTimestamp(req.transactionId()) + return validateRwReadAgainstSchemaAfterTakingLocks(req.transactionId()) .thenApply(ignored -> rows); } }) @@ -557,9 +589,9 @@ public class PartitionReplicaListener implements ReplicaListener { } else if (request instanceof BuildIndexReplicaRequest) { return raftClient.run(toBuildIndexCommand((BuildIndexReplicaRequest) request)); } else if (request instanceof ReadOnlyDirectSingleRowReplicaRequest) { - return processReadOnlyDirectSingleEntryAction((ReadOnlyDirectSingleRowReplicaRequest) request); + return processReadOnlyDirectSingleEntryAction((ReadOnlyDirectSingleRowReplicaRequest) request, opStartTimestamp); } else if (request instanceof ReadOnlyDirectMultiRowReplicaRequest) { - return processReadOnlyDirectMultiEntryAction((ReadOnlyDirectMultiRowReplicaRequest) request); + return processReadOnlyDirectMultiEntryAction((ReadOnlyDirectMultiRowReplicaRequest) request, opStartTimestamp); } else { throw new UnsupportedReplicaRequestException(request.getClass()); } @@ -1289,10 +1321,16 @@ public class PartitionReplicaListener implements ReplicaListener { if (request.commit()) { HybridTimestamp commitTimestamp = request.commitTimestamp(); - return schemaCompatValidator.validateForward(txId, enlistedGroups, commitTimestamp) - .thenCompose(validationResult -> - finishAndCleanup(enlistedGroups, validationResult.isSuccessful(), commitTimestamp, txId, txCoordinatorId) - .thenAccept(unused -> throwIfSchemaValidationOnCommitFailed(validationResult))); + return schemaCompatValidator.validateCommit(txId, enlistedGroups, commitTimestamp) + .thenCompose(validationResult -> { + return finishAndCleanup( + enlistedGroups, + validationResult.isSuccessful(), + validationResult.isSuccessful() ? commitTimestamp : null, + txId, + txCoordinatorId + ).thenAccept(unused -> throwIfSchemaValidationOnCommitFailed(validationResult)); + }); } else { // Aborting. return finishAndCleanup(enlistedGroups, false, null, txId, txCoordinatorId); @@ -1301,9 +1339,15 @@ public class PartitionReplicaListener implements ReplicaListener { private static void throwIfSchemaValidationOnCommitFailed(CompatValidationResult validationResult) { if (!validationResult.isSuccessful()) { - throw new IncompatibleSchemaAbortException("Commit failed because schema " - + validationResult.fromSchemaVersion() + " is not forward-compatible with " - + validationResult.toSchemaVersion() + " for table " + validationResult.failedTableId()); + if (validationResult.isTableDropped()) { + throw new IncompatibleSchemaAbortException( + format("Commit failed because a table was already dropped [tableId={}]", validationResult.failedTableId()) + ); + } else { + throw new IncompatibleSchemaAbortException("Commit failed because schema " + + validationResult.fromSchemaVersion() + " is not forward-compatible with " + + validationResult.toSchemaVersion() + " for table " + validationResult.failedTableId()); + } } } @@ -1416,9 +1460,11 @@ public class PartitionReplicaListener implements ReplicaListener { @Nullable HybridTimestamp commitTimestamp, String txCoordinatorId ) { - HybridTimestamp currentTimestamp = hybridClock.now(); + assert !commit || (commitTimestamp != null); - return reliableCatalogVersionFor(currentTimestamp) + HybridTimestamp tsForCatalogVersion = commit ? commitTimestamp : hybridClock.now(); + + return reliableCatalogVersionFor(tsForCatalogVersion) .thenCompose(catalogVersion -> { synchronized (commandProcessingLinearizationMutex) { FinishTxCommandBuilder finishTxCmdBldr = MSG_FACTORY.finishTxCommand() @@ -1507,9 +1553,9 @@ public class PartitionReplicaListener implements ReplicaListener { } return allOffFuturesExceptionIgnored(txUpdateFutures, request).thenCompose(v -> { - long commandTimestamp = hybridClock.nowLong(); + HybridTimestamp commandTimestamp = hybridClock.now(); - return reliableCatalogVersionFor(hybridTimestamp(commandTimestamp)) + return reliableCatalogVersionFor(commandTimestamp) .thenCompose(catalogVersion -> { synchronized (commandProcessingLinearizationMutex) { TxCleanupCommand txCleanupCmd = MSG_FACTORY.txCleanupCommand() @@ -1811,13 +1857,14 @@ public class PartitionReplicaListener implements ReplicaListener { * Processes multiple entries direct request for read only transaction. * * @param request Read only multiple entries request. + * @param opStartTimestamp Moment when the operation processing was started in this class. * @return Result future. */ private CompletableFuture<List<BinaryRow>> processReadOnlyDirectMultiEntryAction( - ReadOnlyDirectMultiRowReplicaRequest request - ) { + ReadOnlyDirectMultiRowReplicaRequest request, + HybridTimestamp opStartTimestamp) { List<BinaryTuple> primaryKeys = resolvePks(request.primaryKeys()); - HybridTimestamp readTimestamp = hybridClock.now(); + HybridTimestamp readTimestamp = opStartTimestamp; if (request.requestType() != RequestType.RO_GET_ALL) { throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR, @@ -1905,7 +1952,7 @@ public class PartitionReplicaListener implements ReplicaListener { return completedFuture(new ReplicaResult(result, null)); } - return validateOperationAgainstSchema(request.transactionId()) + return validateWriteAgainstSchemaAfterTakingLocks(request.transactionId()) .thenCompose(catalogVersion -> awaitCleanup(rows, catalogVersion)) .thenCompose( catalogVersion -> applyUpdateAllCommand( @@ -1975,7 +2022,7 @@ public class PartitionReplicaListener implements ReplicaListener { return allOf(insertLockFuts) .thenCompose(ignored -> // We are inserting completely new rows - no need to cleanup anything in this case, hence empty times. - validateOperationAgainstSchema(request.transactionId()) + validateWriteAgainstSchemaAfterTakingLocks(request.transactionId()) ) .thenCompose(catalogVersion -> applyUpdateAllCommand( request, @@ -2036,7 +2083,7 @@ public class PartitionReplicaListener implements ReplicaListener { return completedFuture(new ReplicaResult(null, null)); } - return validateOperationAgainstSchema(request.transactionId()) + return validateWriteAgainstSchemaAfterTakingLocks(request.transactionId()) .thenCompose(catalogVersion -> awaitCleanup(rows, catalogVersion)) .thenCompose( catalogVersion -> applyUpdateAllCommand( @@ -2107,7 +2154,7 @@ public class PartitionReplicaListener implements ReplicaListener { return completedFuture(result); } - return validateAtTimestamp(txId) + return validateRwReadAgainstSchemaAfterTakingLocks(txId) .thenApply(unused -> new ReplicaResult(result, null)); }); } @@ -2154,7 +2201,7 @@ public class PartitionReplicaListener implements ReplicaListener { return completedFuture(new ReplicaResult(result, null)); } - return validateOperationAgainstSchema(request.transactionId()) + return validateWriteAgainstSchemaAfterTakingLocks(request.transactionId()) .thenCompose(catalogVersion -> awaitCleanup(rows, catalogVersion)) .thenCompose( catalogVersion -> applyUpdateAllCommand( @@ -2470,11 +2517,15 @@ public class PartitionReplicaListener implements ReplicaListener { * Processes single entry direct request for read only transaction. * * @param request Read only single entry request. + * @param opStartTimestamp Moment when the operation processing was started in this class. * @return Result future. */ - private CompletableFuture<BinaryRow> processReadOnlyDirectSingleEntryAction(ReadOnlyDirectSingleRowReplicaRequest request) { + private CompletableFuture<BinaryRow> processReadOnlyDirectSingleEntryAction( + ReadOnlyDirectSingleRowReplicaRequest request, + HybridTimestamp opStartTimestamp + ) { BinaryTuple primaryKey = resolvePk(request.primaryKey()); - HybridTimestamp readTimestamp = hybridClock.now(); + HybridTimestamp readTimestamp = opStartTimestamp; if (request.requestType() != RequestType.RO_GET) { throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR, @@ -2511,7 +2562,7 @@ public class PartitionReplicaListener implements ReplicaListener { return completedFuture(new ReplicaResult(false, request.full() ? null : completedFuture(null))); } - return validateOperationAgainstSchema(request.transactionId()) + return validateWriteAgainstSchemaAfterTakingLocks(request.transactionId()) .thenCompose(catalogVersion -> awaitCleanup(validatedRowId, catalogVersion)) .thenCompose( catalogVersion -> applyUpdateCommand( @@ -2536,7 +2587,7 @@ public class PartitionReplicaListener implements ReplicaListener { RowId rowId0 = new RowId(partId(), UUID.randomUUID()); return takeLocksForInsert(searchRow, rowId0, txId) - .thenCompose(rowIdLock -> validateOperationAgainstSchema(request.transactionId()) + .thenCompose(rowIdLock -> validateWriteAgainstSchemaAfterTakingLocks(request.transactionId()) .thenCompose( catalogVersion -> applyUpdateCommand( request, @@ -2567,7 +2618,7 @@ public class PartitionReplicaListener implements ReplicaListener { : takeLocksForUpdate(searchRow, rowId0, txId); return lockFut - .thenCompose(rowIdLock -> validateOperationAgainstSchema(request.transactionId()) + .thenCompose(rowIdLock -> validateWriteAgainstSchemaAfterTakingLocks(request.transactionId()) .thenCompose(catalogVersion -> awaitCleanup(rowId, catalogVersion)) .thenCompose( catalogVersion -> applyUpdateCommand( @@ -2599,7 +2650,7 @@ public class PartitionReplicaListener implements ReplicaListener { : takeLocksForUpdate(searchRow, rowId0, txId); return lockFut - .thenCompose(rowIdLock -> validateOperationAgainstSchema(request.transactionId()) + .thenCompose(rowIdLock -> validateWriteAgainstSchemaAfterTakingLocks(request.transactionId()) .thenCompose(catalogVersion -> awaitCleanup(rowId, catalogVersion)) .thenCompose( catalogVersion -> applyUpdateCommand( @@ -2627,7 +2678,7 @@ public class PartitionReplicaListener implements ReplicaListener { } return takeLocksForUpdate(searchRow, rowId, txId) - .thenCompose(rowIdLock -> validateOperationAgainstSchema(request.transactionId()) + .thenCompose(rowIdLock -> validateWriteAgainstSchemaAfterTakingLocks(request.transactionId()) .thenCompose(catalogVersion -> awaitCleanup(rowId, catalogVersion)) .thenCompose( catalogVersion -> applyUpdateCommand( @@ -2655,7 +2706,7 @@ public class PartitionReplicaListener implements ReplicaListener { } return takeLocksForUpdate(searchRow, rowId, txId) - .thenCompose(rowIdLock -> validateOperationAgainstSchema(request.transactionId()) + .thenCompose(rowIdLock -> validateWriteAgainstSchemaAfterTakingLocks(request.transactionId()) .thenCompose(catalogVersion -> awaitCleanup(rowId, catalogVersion)) .thenCompose( catalogVersion -> applyUpdateCommand( @@ -2706,7 +2757,7 @@ public class PartitionReplicaListener implements ReplicaListener { } return takeLocksForGet(rowId, txId) - .thenCompose(ignored -> validateAtTimestamp(txId)) + .thenCompose(ignored -> validateRwReadAgainstSchemaAfterTakingLocks(txId)) .thenApply(ignored -> new ReplicaResult(row, null)); }); } @@ -2717,7 +2768,7 @@ public class PartitionReplicaListener implements ReplicaListener { } return takeLocksForDelete(row, rowId, txId) - .thenCompose(rowLock -> validateOperationAgainstSchema(request.transactionId())) + .thenCompose(rowLock -> validateWriteAgainstSchemaAfterTakingLocks(request.transactionId())) .thenCompose(catalogVersion -> awaitCleanup(rowId, catalogVersion)) .thenCompose( catalogVersion -> applyUpdateCommand( @@ -2741,7 +2792,7 @@ public class PartitionReplicaListener implements ReplicaListener { } return takeLocksForDelete(row, rowId, txId) - .thenCompose(ignored -> validateOperationAgainstSchema(request.transactionId())) + .thenCompose(ignored -> validateWriteAgainstSchemaAfterTakingLocks(request.transactionId())) .thenCompose(catalogVersion -> awaitCleanup(rowId, catalogVersion)) .thenCompose( catalogVersion -> applyUpdateCommand( @@ -2985,7 +3036,7 @@ public class PartitionReplicaListener implements ReplicaListener { return completedFuture(new ReplicaResult(false, null)); } - return validateOperationAgainstSchema(txId) + return validateWriteAgainstSchemaAfterTakingLocks(txId) .thenCompose(catalogVersion -> awaitCleanup(rowIdLock.get1(), catalogVersion)) .thenCompose( catalogVersion -> applyUpdateCommand( @@ -3259,25 +3310,26 @@ public class PartitionReplicaListener implements ReplicaListener { return false; } - private CompletableFuture<Void> validateAtTimestamp(UUID txId) { + /** + * Takes current timestamp and makes schema related validations at this timestamp. + * + * @param txId Transaction ID. + * @return Future that will complete when validation completes. + */ + private CompletableFuture<Void> validateRwReadAgainstSchemaAfterTakingLocks(UUID txId) { HybridTimestamp operationTimestamp = hybridClock.now(); return schemaSyncService.waitForMetadataCompleteness(operationTimestamp) - .thenApply(unused -> { - failIfSchemaChangedSinceTxStart(txId, operationTimestamp); - - return null; - }); + .thenRun(() -> failIfSchemaChangedSinceTxStart(txId, operationTimestamp)); } /** - * Chooses operation timestamp and makes schema related validations. The operation timestamp is only used for validation, - * it is NOT sent as safeTime timestamp. + * Takes current timestamp and makes schema related validations at this timestamp. * * @param txId Transaction ID. * @return Future that will complete with catalog version associated with given operation though the operation timestamp. */ - private CompletableFuture<Integer> validateOperationAgainstSchema(UUID txId) { + private CompletableFuture<Integer> validateWriteAgainstSchemaAfterTakingLocks(UUID txId) { HybridTimestamp operationTimestamp = hybridClock.now(); return reliableCatalogVersionFor(operationTimestamp) diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatValidator.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatValidator.java index 9b37043388..90ca7d5d55 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatValidator.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatValidator.java @@ -32,7 +32,6 @@ import org.apache.ignite.internal.table.distributed.schema.FullTableSchema; import org.apache.ignite.internal.table.distributed.schema.Schemas; import org.apache.ignite.internal.table.distributed.schema.TableDefinitionDiff; import org.apache.ignite.internal.tx.TransactionIds; -import org.jetbrains.annotations.Nullable; /** * Validates schema compatibility. @@ -48,19 +47,19 @@ class SchemaCompatValidator { } /** - * Performs commit forward compatibility validation. That is, for each table enlisted in the transaction, checks to see whether the - * initial schema (identified by the begin timestamp) is forward-compatible with the commit schema (identified by the commit - * timestamp). + * Performs commit validation. That is, checks that each table enlisted in the tranasction still exists at the commit timestamp, + * and that the initial schema of the table (identified by the begin timestamp) is forward-compatible with the commit schema + * (identified by the commit timestamp). * * @param txId ID of the transaction that gets validated. * @param enlistedGroupIds IDs of the partitions that are enlisted with the transaction. - * @param commitTimestamp Commit timestamp (or {@code null} if it's an abort). + * @param commitTimestamp Commit timestamp. * @return Future of validation result. */ - CompletableFuture<CompatValidationResult> validateForward( + CompletableFuture<CompatValidationResult> validateCommit( UUID txId, Collection<TablePartitionId> enlistedGroupIds, - @Nullable HybridTimestamp commitTimestamp + HybridTimestamp commitTimestamp ) { HybridTimestamp beginTimestamp = TransactionIds.beginTimestamp(txId); @@ -68,23 +67,18 @@ class SchemaCompatValidator { .map(TablePartitionId::tableId) .collect(toSet()); - assert commitTimestamp != null; // Using compareTo() instead of after()/begin() because the latter methods take clock skew into account // which only makes sense when comparing 'unrelated' timestamps. beginTs and commitTs have a causal relationship, // so we don't need to account for clock skew. assert commitTimestamp.compareTo(beginTimestamp) > 0; return schemas.waitForSchemasAvailability(commitTimestamp) - .thenApply(ignored -> validateForwardSchemasCompatibility(tableIds, commitTimestamp, beginTimestamp)); + .thenApply(ignored -> validateCommit(tableIds, commitTimestamp, beginTimestamp)); } - private CompatValidationResult validateForwardSchemasCompatibility( - Set<Integer> tableIds, - HybridTimestamp commitTimestamp, - HybridTimestamp beginTimestamp - ) { + private CompatValidationResult validateCommit(Set<Integer> tableIds, HybridTimestamp commitTimestamp, HybridTimestamp beginTimestamp) { for (int tableId : tableIds) { - CompatValidationResult validationResult = validateForwardSchemaCompatibility(beginTimestamp, commitTimestamp, tableId); + CompatValidationResult validationResult = validateCommit(beginTimestamp, commitTimestamp, tableId); if (!validationResult.isSuccessful()) { return validationResult; @@ -94,6 +88,29 @@ class SchemaCompatValidator { return CompatValidationResult.success(); } + private CompatValidationResult validateCommit(HybridTimestamp beginTimestamp, HybridTimestamp commitTimestamp, int tableId) { + CatalogTableDescriptor tableAtCommitTs = catalogService.table(tableId, commitTimestamp.longValue()); + + if (tableAtCommitTs == null) { + CatalogTableDescriptor tableAtTxStart = catalogService.table(tableId, beginTimestamp.longValue()); + assert tableAtTxStart != null : "No table " + tableId + " at ts " + beginTimestamp; + + return CompatValidationResult.tableDropped(tableId, tableAtTxStart.schemaId()); + } + + return validateForwardSchemaCompatibility(beginTimestamp, commitTimestamp, tableId); + } + + /** + * Performs forward compatibility validation. That is, for the given table, checks to see whether the + * initial schema (identified by the begin timestamp) is forward-compatible with the commit schema (identified by the commit + * timestamp). + * + * @param beginTimestamp Begin timestamp of a transaction. + * @param commitTimestamp Commit timestamp. + * @param tableId ID of the table that is under validation. + * @return Validation result. + */ private CompatValidationResult validateForwardSchemaCompatibility( HybridTimestamp beginTimestamp, HybridTimestamp commitTimestamp, @@ -107,7 +124,7 @@ class SchemaCompatValidator { FullTableSchema oldSchema = tableSchemas.get(i); FullTableSchema newSchema = tableSchemas.get(i + 1); if (!isForwardCompatible(oldSchema, newSchema)) { - return CompatValidationResult.failure(tableId, oldSchema.schemaVersion(), newSchema.schemaVersion()); + return CompatValidationResult.incompatibleChange(tableId, oldSchema.schemaVersion(), newSchema.schemaVersion()); } } @@ -160,7 +177,7 @@ class SchemaCompatValidator { FullTableSchema oldSchema = tableSchemas.get(i); FullTableSchema newSchema = tableSchemas.get(i + 1); if (!isBackwardCompatible(oldSchema, newSchema)) { - return CompatValidationResult.failure(tableId, oldSchema.schemaVersion(), newSchema.schemaVersion()); + return CompatValidationResult.incompatibleChange(tableId, oldSchema.schemaVersion(), newSchema.schemaVersion()); } } @@ -178,7 +195,10 @@ class SchemaCompatValidator { CatalogTableDescriptor tableAtOpTs = catalogService.table(tableId, operationTimestamp.longValue()); assert tableAtBeginTs != null; - assert tableAtOpTs != null; + + if (tableAtOpTs == null) { + throw tableWasDroppedException(tableId); + } if (tableAtOpTs.tableVersion() != tableAtBeginTs.tableVersion()) { throw new IncompatibleSchemaException( @@ -189,4 +209,16 @@ class SchemaCompatValidator { ); } } + + private static IncompatibleSchemaException tableWasDroppedException(int tableId) { + return new IncompatibleSchemaException(String.format("Table was dropped [table=%d]", tableId)); + } + + void failIfTableDoesNotExistAt(HybridTimestamp operationTimestamp, int tableId) { + CatalogTableDescriptor tableAtOpTs = catalogService.table(tableId, operationTimestamp.longValue()); + + if (tableAtOpTs == null) { + throw tableWasDroppedException(tableId); + } + } } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/NonHistoricSchemas.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/NonHistoricSchemas.java index 83558e0f74..c6eb5f38c8 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/NonHistoricSchemas.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/NonHistoricSchemas.java @@ -52,13 +52,16 @@ import org.apache.ignite.internal.type.VarlenNativeType; public class NonHistoricSchemas implements Schemas { private final SchemaManager schemaManager; - public NonHistoricSchemas(SchemaManager schemaManager) { + private final SchemaSyncService schemaSyncService; + + public NonHistoricSchemas(SchemaManager schemaManager, SchemaSyncService schemaSyncService) { this.schemaManager = schemaManager; + this.schemaSyncService = schemaSyncService; } @Override public CompletableFuture<?> waitForSchemasAvailability(HybridTimestamp ts) { - return completedFuture(null); + return schemaSyncService.waitForMetadataCompleteness(ts); } @Override diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/SchemaSyncService.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/SchemaSyncService.java index aaaf4f47c0..b062860842 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/SchemaSyncService.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/SchemaSyncService.java @@ -23,6 +23,7 @@ import org.apache.ignite.internal.hlc.HybridTimestamp; /** * Implements Schema Synchronization wait logic as defined in IEP-98. */ +@SuppressWarnings("InterfaceMayBeAnnotatedFunctional") public interface SchemaSyncService { /** * Waits till metadata (like table/index schemas) is complete for the given timestamp. The 'complete' here means diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java index 68b824d48a..eedada14c4 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java @@ -94,6 +94,7 @@ import org.apache.ignite.internal.raft.service.LeaderWithTerm; import org.apache.ignite.internal.raft.service.RaftGroupService; import org.apache.ignite.internal.replicator.ReplicaResult; import org.apache.ignite.internal.replicator.ReplicaService; +import org.apache.ignite.internal.replicator.ReplicationGroupId; import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.schema.BinaryRow; import org.apache.ignite.internal.schema.BinaryRowConverter; @@ -134,6 +135,10 @@ import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler; import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage; import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage; import org.apache.ignite.internal.table.distributed.replication.request.BinaryTupleMessage; +import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyDirectMultiRowReplicaRequest; +import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyDirectSingleRowReplicaRequest; +import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyMultiRowPkReplicaRequest; +import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlySingleRowPkReplicaRequest; import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteReplicaRequest; import org.apache.ignite.internal.table.distributed.replicator.IncompatibleSchemaAbortException; import org.apache.ignite.internal.table.distributed.replicator.IncompatibleSchemaException; @@ -166,7 +171,6 @@ import org.apache.ignite.internal.util.Cursor; import org.apache.ignite.internal.util.Lazy; import org.apache.ignite.internal.util.PendingComparableValuesTracker; import org.apache.ignite.lang.ErrorGroups.Transactions; -import org.apache.ignite.lang.IgniteException; import org.apache.ignite.network.ClusterNode; import org.apache.ignite.network.ClusterNodeImpl; import org.apache.ignite.network.MessagingService; @@ -212,6 +216,8 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { private static final int TABLE_ID = 1; + private static final int ANOTHER_TABLE_ID = 2; + private final Map<UUID, Set<RowId>> pendingRows = new ConcurrentHashMap<>(); /** The storage stores partition data. */ @@ -576,18 +582,41 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { public void testReadOnlySingleRowReplicaRequestEmptyResult() throws Exception { BinaryRow testBinaryKey = nextBinaryKey(); - CompletableFuture<ReplicaResult> fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowPkReplicaRequest() - .groupId(grpId) - .readTimestampLong(clock.nowLong()) - .primaryKey(testBinaryKey.tupleSlice()) - .requestType(RequestType.RO_GET) - .build(), localNode.id()); + ByteBuffer pk = testBinaryKey.tupleSlice(); + + CompletableFuture<ReplicaResult> fut = doReadOnlySingleGet(pk); BinaryRow binaryRow = (BinaryRow) fut.get(1, TimeUnit.SECONDS).result(); assertNull(binaryRow); } + private CompletableFuture<ReplicaResult> doReadOnlySingleGet(ByteBuffer pk) { + return doReadOnlySingleGet(pk, clock.now()); + } + + private CompletableFuture<ReplicaResult> doReadOnlySingleGet(ByteBuffer pk, HybridTimestamp readTimestamp) { + ReadOnlySingleRowPkReplicaRequest request = TABLE_MESSAGES_FACTORY.readOnlySingleRowPkReplicaRequest() + .groupId(grpId) + .readTimestampLong(readTimestamp.longValue()) + .primaryKey(pk) + .requestType(RequestType.RO_GET) + .build(); + + return partitionReplicaListener.invoke(request, localNode.id()); + } + + private CompletableFuture<ReplicaResult> doReadOnlyDirectSingleGet(ByteBuffer pk) { + ReadOnlyDirectSingleRowReplicaRequest request = TABLE_MESSAGES_FACTORY.readOnlyDirectSingleRowReplicaRequest() + .groupId(grpId) + .primaryKey(pk) + .requestType(RequestType.RO_GET) + .enlistmentConsistencyToken(1L) + .build(); + + return partitionReplicaListener.invoke(request, localNode.id()); + } + @Test public void testReadOnlySingleRowReplicaRequestCommittedResult() throws Exception { UUID txId = newTxId(); @@ -599,12 +628,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, TABLE_ID, PART_ID); testMvPartitionStorage.commitWrite(rowId, clock.now()); - CompletableFuture<ReplicaResult> fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowPkReplicaRequest() - .groupId(grpId) - .readTimestampLong(clock.nowLong()) - .primaryKey(testBinaryKey.tupleSlice()) - .requestType(RequestType.RO_GET) - .build(), localNode.id()); + CompletableFuture<ReplicaResult> fut = doReadOnlySingleGet(testBinaryKey.tupleSlice()); BinaryRow binaryRow = (BinaryRow) fut.get(1, TimeUnit.SECONDS).result(); @@ -623,12 +647,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, TABLE_ID, PART_ID); txManager.updateTxMeta(txId, old -> new TxStateMeta(TxState.COMMITED, localNode.id(), clock.now())); - CompletableFuture<ReplicaResult> fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowPkReplicaRequest() - .groupId(grpId) - .readTimestampLong(clock.nowLong()) - .primaryKey(testBinaryKey.tupleSlice()) - .requestType(RequestType.RO_GET) - .build(), localNode.id()); + CompletableFuture<ReplicaResult> fut = doReadOnlySingleGet(testBinaryKey.tupleSlice()); BinaryRow binaryRow = (BinaryRow) fut.get(1, TimeUnit.SECONDS).result(); @@ -646,12 +665,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, TABLE_ID, PART_ID); txManager.updateTxMeta(txId, old -> new TxStateMeta(TxState.PENDING, localNode.id(), null)); - CompletableFuture<ReplicaResult> fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowPkReplicaRequest() - .groupId(grpId) - .readTimestampLong(clock.nowLong()) - .primaryKey(testBinaryKey.tupleSlice()) - .requestType(RequestType.RO_GET) - .build(), localNode.id()); + CompletableFuture<ReplicaResult> fut = doReadOnlySingleGet(testBinaryKey.tupleSlice()); BinaryRow binaryRow = (BinaryRow) fut.get(1, TimeUnit.SECONDS).result(); @@ -670,12 +684,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, TABLE_ID, PART_ID); txManager.updateTxMeta(txId, old -> new TxStateMeta(TxState.ABORTED, localNode.id(), null)); - CompletableFuture<ReplicaResult> fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowPkReplicaRequest() - .groupId(grpId) - .readTimestampLong(clock.nowLong()) - .primaryKey(testBinaryKey.tupleSlice()) - .requestType(RequestType.RO_GET) - .build(), localNode.id()); + CompletableFuture<ReplicaResult> fut = doReadOnlySingleGet(testBinaryKey.tupleSlice()); BinaryRow binaryRow = (BinaryRow) fut.get(1, TimeUnit.SECONDS).result(); @@ -981,11 +990,11 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { } @Test - public void testWriteIntentOnPrimaryReplicaInsertUpdateDelete() throws MarshallerException { + public void testWriteIntentOnPrimaryReplicaInsertUpdateDelete() { UUID txId = newTxId(); BinaryRow testRow = binaryRow(0); - BinaryRow testRowPk = kvMarshaller.marshal(new TestKey(0, "k0")); + BinaryRow testRowPk = marshalQuietly(new TestKey(0, "k0"), kvMarshaller); assertThat(doSingleRowRequest(txId, testRow, RequestType.RW_INSERT), willCompleteSuccessfully()); @@ -1032,8 +1041,16 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { cleanup(txId); } + private static <K, V> Row marshalQuietly(K key, KvMarshaller<K, V> marshaller) { + try { + return marshaller.marshal(key); + } catch (MarshallerException e) { + throw new RuntimeException(e); + } + } + @Test - public void testWriteIntentOnPrimaryReplicaMultiRowOps() throws MarshallerException { + public void testWriteIntentOnPrimaryReplicaMultiRowOps() { UUID txId = newTxId(); BinaryRow row0 = binaryRow(0); BinaryRow row1 = binaryRow(1); @@ -1056,8 +1073,8 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { checkRowInMvStorage(newRow1, true); Collection<BinaryRow> newRowPks = List.of( - kvMarshaller.marshal(new TestKey(0, "k0")), - kvMarshaller.marshal(new TestKey(1, "k1")) + marshalQuietly(new TestKey(0, "k0"), kvMarshaller), + marshalQuietly(new TestKey(1, "k1"), kvMarshaller) ); assertThat(doMultiRowPkRequest(txId, newRowPks, RequestType.RW_DELETE_ALL), willCompleteSuccessfully()); @@ -1099,6 +1116,10 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { } private CompletableFuture<?> doSingleRowPkRequest(UUID txId, BinaryRow binaryRow, RequestType requestType) { + return doSingleRowPkRequest(txId, binaryRow, requestType, false); + } + + private CompletableFuture<?> doSingleRowPkRequest(UUID txId, BinaryRow binaryRow, RequestType requestType, boolean full) { return partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteSingleRowPkReplicaRequest() .groupId(grpId) .transactionId(txId) @@ -1106,6 +1127,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { .primaryKey(binaryRow.tupleSlice()) .term(1L) .commitPartitionId(commitPartitionId()) + .full(full) .build(), localNode.id() ); @@ -1306,14 +1328,14 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { @Values(booleans = {false, true}) boolean upsertAfterDelete, @Values(booleans = {false, true}) boolean committed, @Values(booleans = {false, true}) boolean multiple - ) throws MarshallerException { + ) { BinaryRow br1 = binaryRow(1); - BinaryRow br1Pk = kvMarshaller.marshal(new TestKey(1, "k" + 1)); + BinaryRow br1Pk = marshalQuietly(new TestKey(1, "k" + 1), kvMarshaller); BinaryRow br2 = binaryRow(2); - BinaryRow br2Pk = kvMarshaller.marshal(new TestKey(2, "k" + 2)); + BinaryRow br2Pk = marshalQuietly(new TestKey(2, "k" + 2), kvMarshaller); if (insertFirst) { UUID tx0 = newTxId(); @@ -1369,7 +1391,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { List<BinaryRow> expected = committed ? (upsertAfterDelete ? allRows : allRowsButModified) : (insertFirst ? allRows : singletonList((BinaryRow) null)); - List<BinaryRow> res = roGetAll(allRowsPks, clock.nowLong()); + List<BinaryRow> res = roGetAll(allRowsPks, clock.now()); assertEquals(allRows.size(), res.size()); @@ -1556,13 +1578,13 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { private BinaryRow marshalKeyOrKeyValue(RequestType requestType, TestKey key) { try { - return RequestTypes.isKeyOnly(requestType) ? kvMarshaller.marshal(key) : kvMarshaller.marshal(key, someValue); + return RequestTypes.isKeyOnly(requestType) ? marshalQuietly(key, kvMarshaller) : kvMarshaller.marshal(key, someValue); } catch (MarshallerException e) { throw new AssertionError(e); } } - private void testFailsWhenReadingFromFutureIncompatibleSchema(ListenerInvocation listenerInvocation) { + private void testFailsWhenReadingFromFutureIncompatibleSchema(RwListenerInvocation listenerInvocation) { UUID targetTxId = transactionIdFor(clock.now()); TestKey key = simulateWriteWithSchemaVersionFromFuture(); @@ -1700,13 +1722,13 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { } @Test - public void failsWhenFullScanReadsTupleWithIncompatibleSchemaFromFuture() { + public void failsWhenScanReadsTupleWithIncompatibleSchemaFromFuture() { testFailsWhenReadingFromFutureIncompatibleSchema( - (targetTxId, key) -> doRwFullScanRetrieveBatchRequest(targetTxId, false) + (targetTxId, key) -> doRwScanRetrieveBatchRequest(targetTxId) ); } - private CompletableFuture<?> doRwFullScanRetrieveBatchRequest(UUID targetTxId, boolean full) { + private CompletableFuture<?> doRwScanRetrieveBatchRequest(UUID targetTxId) { return partitionReplicaListener.invoke( TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest() .groupId(grpId) @@ -1714,7 +1736,32 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { .term(1L) .scanId(1) .batchSize(100) - .full(full) + .full(false) + .build(), + localNode.id() + ); + } + + private CompletableFuture<?> doRwScanCloseRequest(UUID targetTxId) { + return partitionReplicaListener.invoke( + TABLE_MESSAGES_FACTORY.readWriteScanCloseReplicaRequest() + .groupId(grpId) + .transactionId(targetTxId) + .term(1L) + .scanId(1) + .build(), + localNode.id() + ); + } + + private CompletableFuture<?> doRoScanRetrieveBatchRequest(UUID targetTxId, HybridTimestamp readTimestamp) { + return partitionReplicaListener.invoke( + TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest() + .groupId(grpId) + .transactionId(targetTxId) + .scanId(1) + .batchSize(100) + .readTimestampLong(readTimestamp.longValue()) .build(), localNode.id() ); @@ -1742,7 +1789,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { .map(Arguments::of); } - private void testWritesAreSuppliedWithRequiredCatalogVersion(RequestType requestType, ListenerInvocation listenerInvocation) { + private void testWritesAreSuppliedWithRequiredCatalogVersion(RequestType requestType, RwListenerInvocation listenerInvocation) { TestKey key = nextKey(); if (RequestTypes.looksUpFirst(requestType)) { @@ -1755,16 +1802,11 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { when(catalogService.activeCatalogVersion(anyLong())).thenReturn(42); UUID targetTxId = newTxId(); - HybridTimestamp beginTs = TransactionIds.beginTimestamp(targetTxId); CompletableFuture<?> future = listenerInvocation.invoke(targetTxId, key); assertThat(future, willCompleteSuccessfully()); - // Make sure metadata completeness is awaited for (at operation timestamp, that is, later than beginTs). - //noinspection ConstantConditions - verify(schemaSyncService).waitForMetadataCompleteness(gt(beginTs)); - // Make sure catalog required version is filled in the executed update command. verify(mockRaftClient, atLeast(1)).run(commandCaptor.capture()); @@ -1822,21 +1864,21 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { boolean onExistingRow, boolean full ) { - ListenerInvocation invocation = null; + RwListenerInvocation invocation = null; if (RequestTypes.isSingleRowRwPkOnly(requestType)) { invocation = (targetTxId, key) -> { - return doSingleRowPkRequest(targetTxId, marshalKeyOrKeyValue(requestType, key), requestType); + return doSingleRowPkRequest(targetTxId, marshalKeyOrKeyValue(requestType, key), requestType, full); }; } else if (RequestTypes.isSingleRowRwFullRow(requestType)) { invocation = (targetTxId, key) -> { - return doSingleRowRequest(targetTxId, marshalKeyOrKeyValue(requestType, key), requestType); + return doSingleRowRequest(targetTxId, marshalKeyOrKeyValue(requestType, key), requestType, full); }; } else { fail("Uncovered type: " + requestType); } - testRwOperationsFailIfTableAlteredAfterTxStart(requestType, onExistingRow, invocation); + testRwOperationFailsIfTableWasAlteredAfterTxStart(requestType, onExistingRow, invocation); } @SuppressWarnings("unused") @@ -1851,10 +1893,10 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { .filter(RequestTypes::isSingleRowRw); } - private void testRwOperationsFailIfTableAlteredAfterTxStart( + private void testRwOperationFailsIfTableWasAlteredAfterTxStart( RequestType requestType, boolean onExistingRow, - ListenerInvocation listenerInvocation + RwListenerInvocation listenerInvocation ) { TestKey key = nextKey(); @@ -1865,13 +1907,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { UUID txId = newTxId(); HybridTimestamp txBeginTs = TransactionIds.beginTimestamp(txId); - CatalogTableDescriptor tableVersion1 = mock(CatalogTableDescriptor.class); - CatalogTableDescriptor tableVersion2 = mock(CatalogTableDescriptor.class); - when(tableVersion1.tableVersion()).thenReturn(CURRENT_SCHEMA_VERSION); - when(tableVersion2.tableVersion()).thenReturn(NEXT_SCHEMA_VERSION); - - when(catalogService.table(TABLE_ID, txBeginTs.longValue())).thenReturn(tableVersion1); - when(catalogService.table(eq(TABLE_ID), gt(txBeginTs.longValue()))).thenReturn(tableVersion2); + makeSchemaChangeAfter(txBeginTs); CompletableFuture<?> future = listenerInvocation.invoke(txId, key); @@ -1894,12 +1930,22 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { } } + private void makeSchemaChangeAfter(HybridTimestamp txBeginTs) { + CatalogTableDescriptor tableVersion1 = mock(CatalogTableDescriptor.class); + CatalogTableDescriptor tableVersion2 = mock(CatalogTableDescriptor.class); + when(tableVersion1.tableVersion()).thenReturn(CURRENT_SCHEMA_VERSION); + when(tableVersion2.tableVersion()).thenReturn(NEXT_SCHEMA_VERSION); + + when(catalogService.table(TABLE_ID, txBeginTs.longValue())).thenReturn(tableVersion1); + when(catalogService.table(eq(TABLE_ID), gt(txBeginTs.longValue()))).thenReturn(tableVersion2); + } + @CartesianTest @CartesianTest.MethodFactory("multiRowRwOperationTypesFactory") void multiRowRwOperationsFailIfTableAlteredAfterTxStart( RequestType requestType, boolean onExistingRow, boolean full ) { - ListenerInvocation invocation = null; + RwListenerInvocation invocation = null; if (RequestTypes.isMultipleRowsRwPkOnly(requestType)) { invocation = (targetTxId, key) -> { @@ -1913,7 +1959,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { fail("Uncovered type: " + requestType); } - testRwOperationsFailIfTableAlteredAfterTxStart(requestType, onExistingRow, invocation); + testRwOperationFailsIfTableWasAlteredAfterTxStart(requestType, onExistingRow, invocation); } @SuppressWarnings("unused") @@ -1933,7 +1979,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { @Values(booleans = {false, true}) boolean onExistingRow, @Values(booleans = {false, true}) boolean full ) { - testRwOperationsFailIfTableAlteredAfterTxStart(RequestType.RW_REPLACE, onExistingRow, (targetTxId, key) -> { + testRwOperationFailsIfTableWasAlteredAfterTxStart(RequestType.RW_REPLACE, onExistingRow, (targetTxId, key) -> { return doReplaceRequest( targetTxId, marshalKeyOrKeyValue(RequestType.RW_REPLACE, key), @@ -1944,15 +1990,232 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { } @CartesianTest - void rwScanRequestFailsIfTableAlteredAfterTxStart( + void rwScanRequestFailsIfTableAlteredAfterTxStart(@Values(booleans = {false, true}) boolean onExistingRow) { + testRwOperationFailsIfTableWasAlteredAfterTxStart(RequestType.RW_SCAN, onExistingRow, (targetTxId, key) -> { + return doRwScanRetrieveBatchRequest(targetTxId); + }); + } + + @Test + void rwScanCloseRequestSucceedsIfTableAlteredAfterTxStart() { + UUID txId = newTxId(); + HybridTimestamp txBeginTs = TransactionIds.beginTimestamp(txId); + + makeSchemaChangeAfter(txBeginTs); + + CompletableFuture<?> future = doRwScanCloseRequest(txId); + + assertThat(future, willCompleteSuccessfully()); + } + + @CartesianTest + @CartesianTest.MethodFactory("singleRowRwOperationTypesFactory") + void singleRowRwOperationsFailIfTableWasDropped(RequestType requestType, boolean onExistingRow, boolean full) { + RwListenerInvocation invocation = null; + + if (RequestTypes.isSingleRowRwPkOnly(requestType)) { + invocation = (targetTxId, key) -> { + return doSingleRowPkRequest(targetTxId, marshalKeyOrKeyValue(requestType, key), requestType, full); + }; + } else if (RequestTypes.isSingleRowRwFullRow(requestType)) { + invocation = (targetTxId, key) -> { + return doSingleRowRequest(targetTxId, marshalKeyOrKeyValue(requestType, key), requestType, full); + }; + } else { + fail("Uncovered type: " + requestType); + } + + testRwOperationFailsIfTableWasDropped(onExistingRow, invocation); + } + + private void testRwOperationFailsIfTableWasDropped(boolean onExistingRow, RwListenerInvocation listenerInvocation) { + TestKey key = nextKey(); + + if (onExistingRow) { + upsertInNewTxFor(key); + } + + UUID txId = newTxId(); + HybridTimestamp txBeginTs = TransactionIds.beginTimestamp(txId); + + makeTableBeDroppedAfter(txBeginTs); + + CompletableFuture<?> future = listenerInvocation.invoke(txId, key); + + IncompatibleSchemaException ex = assertWillThrowFast(future, IncompatibleSchemaException.class); + assertThat(ex.code(), is(Transactions.TX_INCOMPATIBLE_SCHEMA_ERR)); + assertThat(ex.getMessage(), is("Table was dropped [table=1]")); + } + + private void makeTableBeDroppedAfter(HybridTimestamp txBeginTs) { + makeTableBeDroppedAfter(txBeginTs, TABLE_ID); + } + + private void makeTableBeDroppedAfter(HybridTimestamp txBeginTs, int tableId) { + CatalogTableDescriptor tableVersion1 = mock(CatalogTableDescriptor.class); + when(tableVersion1.tableVersion()).thenReturn(CURRENT_SCHEMA_VERSION); + + when(catalogService.table(tableId, txBeginTs.longValue())).thenReturn(tableVersion1); + when(catalogService.table(eq(tableId), gt(txBeginTs.longValue()))).thenReturn(null); + } + + @CartesianTest + @CartesianTest.MethodFactory("multiRowRwOperationTypesFactory") + void multiRowRwOperationsFailIfTableWasDropped(RequestType requestType, boolean onExistingRow, boolean full) { + RwListenerInvocation invocation = null; + + if (RequestTypes.isMultipleRowsRwPkOnly(requestType)) { + invocation = (targetTxId, key) -> { + return doMultiRowPkRequest(targetTxId, List.of(marshalKeyOrKeyValue(requestType, key)), requestType, full); + }; + } else if (RequestTypes.isMultipleRowsRwFullRows(requestType)) { + invocation = (targetTxId, key) -> { + return doMultiRowRequest(targetTxId, List.of(marshalKeyOrKeyValue(requestType, key)), requestType, full); + }; + } else { + fail("Uncovered type: " + requestType); + } + + testRwOperationFailsIfTableWasDropped(onExistingRow, invocation); + } + + @CartesianTest + void replaceRequestFailsIfTableWasDropped( @Values(booleans = {false, true}) boolean onExistingRow, @Values(booleans = {false, true}) boolean full ) { - testRwOperationsFailIfTableAlteredAfterTxStart(RequestType.RW_SCAN, onExistingRow, (targetTxId, key) -> { - return doRwFullScanRetrieveBatchRequest(targetTxId, full); + testRwOperationFailsIfTableWasDropped(onExistingRow, (targetTxId, key) -> { + return doReplaceRequest( + targetTxId, + marshalKeyOrKeyValue(RequestType.RW_REPLACE, key), + marshalKeyOrKeyValue(RequestType.RW_REPLACE, key), + full + ); + }); + } + + @CartesianTest + void rwScanRequestFailsIfTableWasDropped(@Values(booleans = {false, true}) boolean onExistingRow) { + testRwOperationFailsIfTableWasDropped(onExistingRow, (targetTxId, key) -> { + return doRwScanRetrieveBatchRequest(targetTxId); + }); + } + + @Test + void rwScanCloseRequestSucceedsIfTableWasDropped() { + UUID txId = newTxId(); + HybridTimestamp txBeginTs = TransactionIds.beginTimestamp(txId); + + makeTableBeDroppedAfter(txBeginTs); + + CompletableFuture<?> future = doRwScanCloseRequest(txId); + + assertThat(future, willCompleteSuccessfully()); + } + + @CartesianTest + void singleRowRoGetFailsIfTableWasDropped( + @Values(booleans = {false, true}) boolean direct, + @Values(booleans = {false, true}) boolean onExistingRow + ) { + testRoOperationFailsIfTableWasDropped(onExistingRow, (targetTxId, readTimestamp, key) -> { + if (direct) { + return doReadOnlyDirectSingleGet(marshalQuietly(key, kvMarshaller).tupleSlice()); + } else { + return doReadOnlySingleGet(marshalQuietly(key, kvMarshaller).tupleSlice(), readTimestamp); + } + }); + } + + private void testRoOperationFailsIfTableWasDropped(boolean onExistingRow, RoListenerInvocation listenerInvocation) { + TestKey key = nextKey(); + + if (onExistingRow) { + upsertInNewTxFor(key); + } + + UUID txId = newTxId(); + HybridTimestamp readTs = clock.now(); + + when(catalogService.table(eq(TABLE_ID), anyLong())).thenReturn(null); + + CompletableFuture<?> future = listenerInvocation.invoke(txId, readTs, key); + + IncompatibleSchemaException ex = assertWillThrowFast(future, IncompatibleSchemaException.class); + assertThat(ex.code(), is(Transactions.TX_INCOMPATIBLE_SCHEMA_ERR)); + assertThat(ex.getMessage(), is("Table was dropped [table=1]")); + } + + @CartesianTest + void multiRowRoGetFailsIfTableWasDropped( + @Values(booleans = {false, true}) boolean direct, + @Values(booleans = {false, true}) boolean onExistingRow + ) { + testRoOperationFailsIfTableWasDropped(onExistingRow, (targetTxId, readTimestamp, key) -> { + if (direct) { + return doReadOnlyDirectMultiGet(List.of(marshalQuietly(key, kvMarshaller))); + } else { + return doReadOnlyMultiGet(List.of(marshalQuietly(key, kvMarshaller)), readTimestamp); + } }); } + @CartesianTest + void roScanRequestFailsIfTableWasDropped(@Values(booleans = {false, true}) boolean onExistingRow) { + testRoOperationFailsIfTableWasDropped(onExistingRow, (targetTxId, readTimestamp, key) -> { + return doRoScanRetrieveBatchRequest(targetTxId, readTimestamp); + }); + } + + @Test + void commitRequestFailsIfCommitPartitionTableWasDropped() { + testCommitRequestIfTableWasDropped(grpId, Set.of(grpId), grpId.tableId()); + } + + @Test + void commitRequestFailsIfNonCommitPartitionTableWasDropped() { + TablePartitionId anotherPartitionId = new TablePartitionId(ANOTHER_TABLE_ID, 0); + + testCommitRequestIfTableWasDropped(grpId, Set.of(grpId, anotherPartitionId), anotherPartitionId.tableId()); + } + + private void testCommitRequestIfTableWasDropped( + TablePartitionId commitPartitionId, + Set<ReplicationGroupId> groups, + int tableToBeDroppedId + ) { + when(schemas.tableSchemaVersionsBetween(anyInt(), any(), any(HybridTimestamp.class))) + .thenReturn(List.of( + tableSchema(CURRENT_SCHEMA_VERSION, List.of(nullableColumn("col"))) + )); + when(txManager.cleanup(any(), any(), any(), anyBoolean(), any())).thenReturn(completedFuture(null)); + + AtomicReference<Boolean> committed = interceptFinishTxCommand(); + + UUID txId = newTxId(); + HybridTimestamp txBeginTs = TransactionIds.beginTimestamp(txId); + + makeTableBeDroppedAfter(txBeginTs, tableToBeDroppedId); + + CompletableFuture<?> future = partitionReplicaListener.invoke( + TX_MESSAGES_FACTORY.txFinishReplicaRequest() + .groupId(commitPartitionId) + .groups(groups) + .txId(txId) + .term(1L) + .commit(true) + .commitTimestampLong(clock.nowLong()) + .build(), + localNode.id() + ); + + IncompatibleSchemaAbortException ex = assertWillThrowFast(future, IncompatibleSchemaAbortException.class); + assertThat(ex.code(), is(Transactions.TX_COMMIT_ERR)); + assertThat(ex.getMessage(), is("Commit failed because a table was already dropped [tableId=" + tableToBeDroppedId + "]")); + + assertThat("The transaction must have been aborted", committed.get(), is(false)); + } + private UUID newTxId() { return transactionIdFor(clock.now()); } @@ -1996,19 +2259,36 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { return (BinaryRow) future.join().result(); } - private List<BinaryRow> roGetAll(Collection<BinaryRow> rows, long readTimestamp) { - CompletableFuture<ReplicaResult> future = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyMultiRowPkReplicaRequest() - .groupId(grpId) - .requestType(RequestType.RO_GET_ALL) - .readTimestampLong(readTimestamp) - .primaryKeys(rows.stream().map(BinaryRow::tupleSlice).collect(toList())) - .build(), - localNode.id() - ); + private List<BinaryRow> roGetAll(Collection<BinaryRow> rows, HybridTimestamp readTimestamp) { + CompletableFuture<ReplicaResult> future = doReadOnlyMultiGet(rows, readTimestamp); + + assertThat(future, willCompleteSuccessfully()); return (List<BinaryRow>) future.join().result(); } + private CompletableFuture<ReplicaResult> doReadOnlyMultiGet(Collection<BinaryRow> rows, HybridTimestamp readTimestamp) { + ReadOnlyMultiRowPkReplicaRequest request = TABLE_MESSAGES_FACTORY.readOnlyMultiRowPkReplicaRequest() + .groupId(grpId) + .requestType(RequestType.RO_GET_ALL) + .readTimestampLong(readTimestamp.longValue()) + .primaryKeys(rows.stream().map(BinaryRow::tupleSlice).collect(toList())) + .build(); + + return partitionReplicaListener.invoke(request, localNode.id()); + } + + private CompletableFuture<ReplicaResult> doReadOnlyDirectMultiGet(Collection<BinaryRow> rows) { + ReadOnlyDirectMultiRowReplicaRequest request = TABLE_MESSAGES_FACTORY.readOnlyDirectMultiRowReplicaRequest() + .groupId(grpId) + .requestType(RequestType.RO_GET_ALL) + .primaryKeys(rows.stream().map(BinaryRow::tupleSlice).collect(toList())) + .enlistmentConsistencyToken(1L) + .build(); + + return partitionReplicaListener.invoke(request, localNode.id()); + } + private void cleanup(UUID txId) { HybridTimestamp commitTs = clock.now(); @@ -2046,11 +2326,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { } private BinaryRow nextBinaryKey() { - try { - return kvMarshaller.marshal(nextKey()); - } catch (MarshallerException e) { - throw new IgniteException(e); - } + return marshalQuietly(nextKey(), kvMarshaller); } private static TestKey nextKey() { @@ -2085,14 +2361,6 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { } } - private TestValue value(BinaryRow binaryRow) { - try { - return kvMarshaller.unmarshalValue(Row.wrapBinaryRow(schemaDescriptor, binaryRow)); - } catch (MarshallerException e) { - throw new IgniteException(e); - } - } - private static BinaryRowMessage binaryRowMessage(BinaryRow binaryRow) { return TABLE_MESSAGES_FACTORY.binaryRowMessage() .binaryTuple(binaryRow.tupleSlice()) @@ -2190,7 +2458,12 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { } @FunctionalInterface - private interface ListenerInvocation { + private interface RwListenerInvocation { CompletableFuture<?> invoke(UUID targetTxId, TestKey key); } + + @FunctionalInterface + private interface RoListenerInvocation { + CompletableFuture<?> invoke(UUID targetTxId, HybridTimestamp readTimestamp, TestKey key); + } } diff --git a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java index 359e9da571..cd3fd04aa8 100644 --- a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java +++ b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java @@ -19,7 +19,7 @@ package org.apache.ignite.internal.tx; import static java.lang.Math.abs; -import static org.apache.ignite.internal.Hacks.IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY; +import static org.apache.ignite.internal.Kludges.IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY; import static org.apache.ignite.internal.hlc.HybridTimestamp.CLOCK_SKEW; import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp; import static org.apache.ignite.internal.replicator.ReplicaManager.idleSafeTimePropagationPeriodMs;