This is an automated email from the ASF dual-hosted git repository. apolovtsev pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new f10ab88c85 IGNITE-20416 Retry when schema change is detected during implicit transaction (#3584) f10ab88c85 is described below commit f10ab88c85266990e29b64b713c2076be51e6dcd Author: Roman Puchkovskiy <roman.puchkovs...@gmail.com> AuthorDate: Thu Apr 11 12:07:41 2024 +0400 IGNITE-20416 Retry when schema change is detected during implicit transaction (#3584) --- .../Table/SchemaSynchronizationTest.cs | 5 - .../ItSchemaSyncAndImplicitTransactionsTest.java | 290 +++++++++++++++++++++ .../schemasync/ItSchemaSyncSingleNodeTest.java | 8 +- .../ignite/internal/table/AbstractTableView.java | 29 ++- .../replicator/IncompatibleSchemaException.java | 3 +- 5 files changed, 319 insertions(+), 16 deletions(-) diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs index 7bc9cb63c4..13e09dff51 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs @@ -339,7 +339,6 @@ public class SchemaSynchronizationTest : IgniteTestsBase [Values(true, false)] bool insertNewColumn, [Values(true, false)] bool withRemove) { - using var metricListener = new MetricsTests.Listener(); await Client.Sql.ExecuteAsync(null, $"CREATE TABLE {TestTableName} (KEY bigint PRIMARY KEY)"); var table = await Client.Tables.GetTableAsync(TestTableName); @@ -369,10 +368,6 @@ public class SchemaSynchronizationTest : IgniteTestsBase yield return DataStreamerItem.Create(GetTuple(i)); } - // Wait for background streaming to complete. - // TODO: Remove this workaround when IGNITE-20416 is fixed. - metricListener.AssertMetricGreaterOrEqual("streamer-items-sent", 6, 3000); - // Update schema. // New schema has a new column with a default value, so it is not required to provide it in the streamed data. await Client.Sql.ExecuteAsync(null, $"ALTER TABLE {TestTableName} ADD COLUMN VAL varchar DEFAULT 'FOO'"); diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncAndImplicitTransactionsTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncAndImplicitTransactionsTest.java new file mode 100644 index 0000000000..061d5a0b20 --- /dev/null +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncAndImplicitTransactionsTest.java @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.schemasync; + +import static java.util.concurrent.CompletableFuture.runAsync; +import static java.util.function.Function.identity; +import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toMap; +import static org.apache.ignite.internal.SessionUtils.executeUpdate; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; +import static org.hamcrest.MatcherAssert.assertThat; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.function.BiConsumer; +import java.util.stream.IntStream; +import org.apache.ignite.internal.ClusterPerClassIntegrationTest; +import org.apache.ignite.table.KeyValueView; +import org.apache.ignite.table.RecordView; +import org.apache.ignite.table.Tuple; +import org.jetbrains.annotations.Nullable; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +/** + * Tests about Schema Sync interaction with implicit transactions. + */ +@SuppressWarnings("resource") +class ItSchemaSyncAndImplicitTransactionsTest extends ClusterPerClassIntegrationTest { + private static final int NODES_TO_START = 1; + + private static final String TABLE_NAME = "test"; + + private static final int ITERATIONS = 100; + + private static final int BATCH_SIZE = 10; + + @Override + protected int initialNodes() { + return NODES_TO_START; + } + + @BeforeEach + void createTable() { + executeUpdate( + "CREATE TABLE " + TABLE_NAME + " (id int, val varchar NOT NULL, PRIMARY KEY USING HASH (id))", + CLUSTER.aliveNode().sql() + ); + } + + @AfterEach + void dropTable() { + executeUpdate("DROP TABLE IF EXISTS " + TABLE_NAME, CLUSTER.aliveNode().sql()); + } + + private static void makeCompatibleChangeTo(String tableName) { + executeUpdate("ALTER TABLE " + tableName + " ALTER COLUMN val DROP NOT NULL", CLUSTER.aliveNode().sql()); + } + + @ParameterizedTest + @EnumSource(BinaryRecordViewOperation.class) + @DisplayName("Concurrent schema changes are transparent for implicit transactions via binary record views") + public void schemaChangesTransparencyForBinaryRecordView(BinaryRecordViewOperation operation) { + RecordView<Tuple> view = CLUSTER.aliveNode().tables().table(TABLE_NAME).recordView(); + + CompletableFuture<Void> operationsFuture = runAsync(() -> { + for (int i = 0; i < ITERATIONS; i++) { + operation.execute(i, view); + } + }); + + makeCompatibleChangeTo(TABLE_NAME); + + assertThat(operationsFuture, willCompleteSuccessfully()); + } + + @ParameterizedTest + @EnumSource(PlainRecordViewOperation.class) + @DisplayName("Concurrent schema changes are transparent for implicit transactions via plain record views") + public void schemaChangesTransparencyForPlainRecordView(PlainRecordViewOperation operation) { + RecordView<Record> view = CLUSTER.aliveNode().tables().table(TABLE_NAME).recordView(Record.class); + + CompletableFuture<Void> operationsFuture = runAsync(() -> { + for (int i = 0; i < ITERATIONS; i++) { + operation.execute(i, view); + } + }); + + makeCompatibleChangeTo(TABLE_NAME); + + assertThat(operationsFuture, willCompleteSuccessfully()); + } + + @ParameterizedTest + @EnumSource(BinaryKvViewOperation.class) + @DisplayName("Concurrent schema changes are transparent for implicit transactions via binary KV views") + public void schemaChangesTransparencyForBinaryKvView(BinaryKvViewOperation operation) { + KeyValueView<Tuple, Tuple> view = CLUSTER.aliveNode().tables().table(TABLE_NAME).keyValueView(); + + CompletableFuture<Void> operationsFuture = runAsync(() -> { + for (int i = 0; i < ITERATIONS; i++) { + operation.execute(i, view); + } + }); + + makeCompatibleChangeTo(TABLE_NAME); + + assertThat(operationsFuture, willCompleteSuccessfully()); + } + + @ParameterizedTest + @EnumSource(PlainKvViewOperation.class) + @DisplayName("Concurrent schema changes are transparent for implicit transactions via plain KV views") + public void schemaChangesTransparencyForPlainKvView(PlainKvViewOperation operation) { + KeyValueView<Integer, String> view = CLUSTER.aliveNode().tables().table(TABLE_NAME).keyValueView(Integer.class, String.class); + + CompletableFuture<Void> operationsFuture = runAsync(() -> { + for (int i = 0; i < ITERATIONS; i++) { + operation.execute(i, view); + } + }); + + makeCompatibleChangeTo(TABLE_NAME); + + assertThat(operationsFuture, willCompleteSuccessfully()); + } + + private static Tuple keyTuple(int id) { + return Tuple.create().set("id", id); + } + + private static List<Tuple> keyTuples(int base) { + return IntStream.range(base, base + BATCH_SIZE) + .mapToObj(ItSchemaSyncAndImplicitTransactionsTest::keyTuple) + .collect(toList()); + } + + private enum BinaryRecordViewOperation { + SINGLE_READ((view, base) -> view.get(null, keyTuple(base))), + MULTI_PARTITION_READ((view, base) -> view.getAll(null, keyTuples(base))), + SINGLE_WRITE((view, base) -> view.upsert(null, fullTuple(base))), + MULTI_PARTITION_WRITE((view, base) -> view.upsertAll(null, fullTuples(base))); + + private final BiConsumer<RecordView<Tuple>, Integer> action; + + BinaryRecordViewOperation(BiConsumer<RecordView<Tuple>, Integer> action) { + this.action = action; + } + + void execute(int base, RecordView<Tuple> view) { + action.accept(view, base); + } + + private static Tuple fullTuple(int id) { + return Tuple.create().set("id", id).set("val", Integer.toString(id)); + } + + private static List<Tuple> fullTuples(int base) { + return IntStream.range(base, base + BATCH_SIZE) + .mapToObj(BinaryRecordViewOperation::fullTuple) + .collect(toList()); + } + } + + @SuppressWarnings({"FieldCanBeLocal", "unused"}) + private static class Record { + private int id; + private @Nullable String val; + + private Record() { + } + + private Record(int id, @Nullable String val) { + this.id = id; + this.val = val; + } + } + + private enum PlainRecordViewOperation { + SINGLE_READ((view, base) -> view.get(null, keyRecord(base))), + MULTI_PARTITION_READ((view, base) -> view.getAll(null, keyRecords(base))), + SINGLE_WRITE((view, base) -> view.upsert(null, fullRecord(base))), + MULTI_PARTITION_WRITE((view, base) -> view.upsertAll(null, fullRecords(base))); + + private final BiConsumer<RecordView<Record>, Integer> action; + + PlainRecordViewOperation(BiConsumer<RecordView<Record>, Integer> action) { + this.action = action; + } + + void execute(int base, RecordView<Record> view) { + action.accept(view, base); + } + + private static Record keyRecord(int id) { + return new Record(id, null); + } + + private static List<Record> keyRecords(int base) { + return IntStream.range(base, base + BATCH_SIZE) + .mapToObj(PlainRecordViewOperation::keyRecord) + .collect(toList()); + } + + private static Record fullRecord(int id) { + return new Record(id, Integer.toString(id)); + } + + private static List<Record> fullRecords(int base) { + return IntStream.range(base, base + BATCH_SIZE) + .mapToObj(PlainRecordViewOperation::fullRecord) + .collect(toList()); + } + } + + private enum BinaryKvViewOperation { + SINGLE_READ((view, base) -> view.get(null, keyTuple(base))), + MULTI_PARTITION_READ((view, base) -> view.getAll(null, keyTuples(base))), + SINGLE_WRITE((view, base) -> view.put(null, keyTuple(base), valueTuple(base))), + MULTI_PARTITION_WRITE((view, base) -> view.putAll(null, mapOfTuples(base))); + + private final BiConsumer<KeyValueView<Tuple, Tuple>, Integer> action; + + BinaryKvViewOperation(BiConsumer<KeyValueView<Tuple, Tuple>, Integer> action) { + this.action = action; + } + + void execute(int base, KeyValueView<Tuple, Tuple> view) { + action.accept(view, base); + } + + private static Tuple valueTuple(int id) { + return Tuple.create().set("val", Integer.toString(id)); + } + + private static Map<Tuple, Tuple> mapOfTuples(int base) { + return IntStream.range(base, base + BATCH_SIZE) + .boxed() + .collect(toMap(ItSchemaSyncAndImplicitTransactionsTest::keyTuple, BinaryKvViewOperation::valueTuple)); + } + } + + private enum PlainKvViewOperation { + SINGLE_READ((view, base) -> view.get(null, base)), + MULTI_PARTITION_READ((view, base) -> view.getAll(null, keys(base))), + SINGLE_WRITE((view, base) -> view.put(null, base, Integer.toString(base))), + MULTI_PARTITION_WRITE((view, base) -> view.putAll(null, mapToValues(base))); + + private final BiConsumer<KeyValueView<Integer, String>, Integer> action; + + PlainKvViewOperation(BiConsumer<KeyValueView<Integer, String>, Integer> action) { + this.action = action; + } + + void execute(int base, KeyValueView<Integer, String> view) { + action.accept(view, base); + } + + private static List<Integer> keys(int base) { + return IntStream.range(base, base + BATCH_SIZE) + .boxed() + .collect(toList()); + } + + private static Map<Integer, String> mapToValues(int base) { + return IntStream.range(base, base + BATCH_SIZE) + .boxed() + .collect(toMap(identity(), n -> Integer.toString(n))); + } + } +} diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncSingleNodeTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncSingleNodeTest.java index 206670eced..eef367cfcf 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncSingleNodeTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncSingleNodeTest.java @@ -120,15 +120,11 @@ class ItSchemaSyncSingleNodeTest extends ClusterPerTestIntegrationTest { } private void createTable() { - cluster.doInSession(0, session -> { - executeUpdate("CREATE TABLE " + TABLE_NAME + " (id int, val varchar, PRIMARY KEY USING HASH (id))", session); - }); + executeUpdate("CREATE TABLE " + TABLE_NAME + " (id int, val varchar, PRIMARY KEY USING HASH (id))", node.sql()); } private void alterTable(String tableName) { - cluster.doInSession(0, session -> { - executeUpdate("ALTER TABLE " + tableName + " ADD COLUMN added int", session); - }); + executeUpdate("ALTER TABLE " + tableName + " ADD COLUMN added int", node.sql()); } private static void putPreExistingValueTo(Table table) { diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java b/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java index 9954734794..bfffdf5956 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java @@ -41,6 +41,7 @@ import org.apache.ignite.internal.table.criteria.CursorAdapter; import org.apache.ignite.internal.table.criteria.QueryCriteriaAsyncCursor; import org.apache.ignite.internal.table.criteria.SqlSerializer; import org.apache.ignite.internal.table.criteria.SqlSerializer.Builder; +import org.apache.ignite.internal.table.distributed.replicator.IncompatibleSchemaException; import org.apache.ignite.internal.table.distributed.replicator.InternalSchemaVersionMismatchException; import org.apache.ignite.internal.table.distributed.schema.SchemaVersions; import org.apache.ignite.internal.tx.InternalTransaction; @@ -161,21 +162,41 @@ abstract class AbstractTableView<R> implements CriteriaQuerySource<R> { return schemaVersionFuture .thenCompose(schemaVersion -> action.act(schemaVersion) .handle((res, ex) -> { + if (ex == null) { + return completedFuture(res); + } + if (isOrCausedBy(InternalSchemaVersionMismatchException.class, ex)) { + // There is no transaction, and table version was changed between taking the table version (that was used + // to marshal inputs and would be used to unmarshal outputs) and starting an implicit transaction + // in InternalTable. A transaction must always work with binary rows of the same table version matching the + // version corresponding to the transaction creation moment, so this mismatch is not tolerable: we need + // to retry the operation here. + assert tx == null : "Only for implicit transactions a retry might be requested"; - assert previousSchemaVersion == null || !Objects.equals(schemaVersion, previousSchemaVersion) - : "Same schema version (" + schemaVersion - + ") on a retry: something is wrong, is this caused by the test setup?"; + assertSchemaVersionIncreased(previousSchemaVersion, schemaVersion); // Repeat. + return withSchemaSync(tx, schemaVersion, action); + } else if (tx == null && isOrCausedBy(IncompatibleSchemaException.class, ex)) { + // Table version was changed while we were executing an implicit transaction (between it had been created + // and the moment when the operation actually touched the partition), let's retry. + assertSchemaVersionIncreased(previousSchemaVersion, schemaVersion); + return withSchemaSync(tx, schemaVersion, action); } else { - return ex == null ? completedFuture(res) : CompletableFuture.<T>failedFuture(ex); + return CompletableFuture.<T>failedFuture(ex); } })) .thenCompose(identity()); } + private static void assertSchemaVersionIncreased(@Nullable Integer previousSchemaVersion, Integer schemaVersion) { + assert previousSchemaVersion == null || !Objects.equals(schemaVersion, previousSchemaVersion) + : "Same schema version (" + schemaVersion + + ") on a retry: something is wrong, is this caused by the test setup?"; + } + /** * Map columns to it's names. * diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/IncompatibleSchemaException.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/IncompatibleSchemaException.java index f553110951..04480f784c 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/IncompatibleSchemaException.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/IncompatibleSchemaException.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.table.distributed.replicator; +import org.apache.ignite.internal.replicator.exception.ExpectedReplicationException; import org.apache.ignite.lang.ErrorGroups.Transactions; import org.apache.ignite.tx.TransactionException; @@ -24,7 +25,7 @@ import org.apache.ignite.tx.TransactionException; * Thrown when, during an attempt to execute a transactional operation, it turns out that the operation cannot be executed * because an incompatible schema change has happened. */ -public class IncompatibleSchemaException extends TransactionException { +public class IncompatibleSchemaException extends TransactionException implements ExpectedReplicationException { public IncompatibleSchemaException(String message) { super(Transactions.TX_INCOMPATIBLE_SCHEMA_ERR, message); }