This is an automated email from the ASF dual-hosted git repository.

apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new f10ab88c85 IGNITE-20416 Retry when schema change is detected during 
implicit transaction (#3584)
f10ab88c85 is described below

commit f10ab88c85266990e29b64b713c2076be51e6dcd
Author: Roman Puchkovskiy <roman.puchkovs...@gmail.com>
AuthorDate: Thu Apr 11 12:07:41 2024 +0400

    IGNITE-20416 Retry when schema change is detected during implicit 
transaction (#3584)
---
 .../Table/SchemaSynchronizationTest.cs             |   5 -
 .../ItSchemaSyncAndImplicitTransactionsTest.java   | 290 +++++++++++++++++++++
 .../schemasync/ItSchemaSyncSingleNodeTest.java     |   8 +-
 .../ignite/internal/table/AbstractTableView.java   |  29 ++-
 .../replicator/IncompatibleSchemaException.java    |   3 +-
 5 files changed, 319 insertions(+), 16 deletions(-)

diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
index 7bc9cb63c4..13e09dff51 100644
--- 
a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
+++ 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
@@ -339,7 +339,6 @@ public class SchemaSynchronizationTest : IgniteTestsBase
         [Values(true, false)] bool insertNewColumn,
         [Values(true, false)] bool withRemove)
     {
-        using var metricListener = new MetricsTests.Listener();
         await Client.Sql.ExecuteAsync(null, $"CREATE TABLE {TestTableName} 
(KEY bigint PRIMARY KEY)");
 
         var table = await Client.Tables.GetTableAsync(TestTableName);
@@ -369,10 +368,6 @@ public class SchemaSynchronizationTest : IgniteTestsBase
                 yield return DataStreamerItem.Create(GetTuple(i));
             }
 
-            // Wait for background streaming to complete.
-            // TODO: Remove this workaround when IGNITE-20416 is fixed.
-            metricListener.AssertMetricGreaterOrEqual("streamer-items-sent", 
6, 3000);
-
             // Update schema.
             // New schema has a new column with a default value, so it is not 
required to provide it in the streamed data.
             await Client.Sql.ExecuteAsync(null, $"ALTER TABLE {TestTableName} 
ADD COLUMN VAL varchar DEFAULT 'FOO'");
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncAndImplicitTransactionsTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncAndImplicitTransactionsTest.java
new file mode 100644
index 0000000000..061d5a0b20
--- /dev/null
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncAndImplicitTransactionsTest.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schemasync;
+
+import static java.util.concurrent.CompletableFuture.runAsync;
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+import static org.apache.ignite.internal.SessionUtils.executeUpdate;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
+import org.apache.ignite.table.KeyValueView;
+import org.apache.ignite.table.RecordView;
+import org.apache.ignite.table.Tuple;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+/**
+ * Tests about Schema Sync interaction with implicit transactions.
+ */
+@SuppressWarnings("resource")
+class ItSchemaSyncAndImplicitTransactionsTest extends 
ClusterPerClassIntegrationTest {
+    private static final int NODES_TO_START = 1;
+
+    private static final String TABLE_NAME = "test";
+
+    private static final int ITERATIONS = 100;
+
+    private static final int BATCH_SIZE = 10;
+
+    @Override
+    protected int initialNodes() {
+        return NODES_TO_START;
+    }
+
+    @BeforeEach
+    void createTable() {
+        executeUpdate(
+                "CREATE TABLE " + TABLE_NAME + " (id int, val varchar NOT 
NULL, PRIMARY KEY USING HASH (id))",
+                CLUSTER.aliveNode().sql()
+        );
+    }
+
+    @AfterEach
+    void dropTable() {
+        executeUpdate("DROP TABLE IF EXISTS " + TABLE_NAME, 
CLUSTER.aliveNode().sql());
+    }
+
+    private static void makeCompatibleChangeTo(String tableName) {
+        executeUpdate("ALTER TABLE " + tableName + " ALTER COLUMN val DROP NOT 
NULL", CLUSTER.aliveNode().sql());
+    }
+
+    @ParameterizedTest
+    @EnumSource(BinaryRecordViewOperation.class)
+    @DisplayName("Concurrent schema changes are transparent for implicit 
transactions via binary record views")
+    public void 
schemaChangesTransparencyForBinaryRecordView(BinaryRecordViewOperation 
operation) {
+        RecordView<Tuple> view = 
CLUSTER.aliveNode().tables().table(TABLE_NAME).recordView();
+
+        CompletableFuture<Void> operationsFuture = runAsync(() -> {
+            for (int i = 0; i < ITERATIONS; i++) {
+                operation.execute(i, view);
+            }
+        });
+
+        makeCompatibleChangeTo(TABLE_NAME);
+
+        assertThat(operationsFuture, willCompleteSuccessfully());
+    }
+
+    @ParameterizedTest
+    @EnumSource(PlainRecordViewOperation.class)
+    @DisplayName("Concurrent schema changes are transparent for implicit 
transactions via plain record views")
+    public void 
schemaChangesTransparencyForPlainRecordView(PlainRecordViewOperation operation) 
{
+        RecordView<Record> view = 
CLUSTER.aliveNode().tables().table(TABLE_NAME).recordView(Record.class);
+
+        CompletableFuture<Void> operationsFuture = runAsync(() -> {
+            for (int i = 0; i < ITERATIONS; i++) {
+                operation.execute(i, view);
+            }
+        });
+
+        makeCompatibleChangeTo(TABLE_NAME);
+
+        assertThat(operationsFuture, willCompleteSuccessfully());
+    }
+
+    @ParameterizedTest
+    @EnumSource(BinaryKvViewOperation.class)
+    @DisplayName("Concurrent schema changes are transparent for implicit 
transactions via binary KV views")
+    public void schemaChangesTransparencyForBinaryKvView(BinaryKvViewOperation 
operation) {
+        KeyValueView<Tuple, Tuple> view = 
CLUSTER.aliveNode().tables().table(TABLE_NAME).keyValueView();
+
+        CompletableFuture<Void> operationsFuture = runAsync(() -> {
+            for (int i = 0; i < ITERATIONS; i++) {
+                operation.execute(i, view);
+            }
+        });
+
+        makeCompatibleChangeTo(TABLE_NAME);
+
+        assertThat(operationsFuture, willCompleteSuccessfully());
+    }
+
+    @ParameterizedTest
+    @EnumSource(PlainKvViewOperation.class)
+    @DisplayName("Concurrent schema changes are transparent for implicit 
transactions via plain KV views")
+    public void schemaChangesTransparencyForPlainKvView(PlainKvViewOperation 
operation) {
+        KeyValueView<Integer, String> view = 
CLUSTER.aliveNode().tables().table(TABLE_NAME).keyValueView(Integer.class, 
String.class);
+
+        CompletableFuture<Void> operationsFuture = runAsync(() -> {
+            for (int i = 0; i < ITERATIONS; i++) {
+                operation.execute(i, view);
+            }
+        });
+
+        makeCompatibleChangeTo(TABLE_NAME);
+
+        assertThat(operationsFuture, willCompleteSuccessfully());
+    }
+
+    private static Tuple keyTuple(int id) {
+        return Tuple.create().set("id", id);
+    }
+
+    private static List<Tuple> keyTuples(int base) {
+        return IntStream.range(base, base + BATCH_SIZE)
+                .mapToObj(ItSchemaSyncAndImplicitTransactionsTest::keyTuple)
+                .collect(toList());
+    }
+
+    private enum BinaryRecordViewOperation {
+        SINGLE_READ((view, base) -> view.get(null, keyTuple(base))),
+        MULTI_PARTITION_READ((view, base) -> view.getAll(null, 
keyTuples(base))),
+        SINGLE_WRITE((view, base) -> view.upsert(null, fullTuple(base))),
+        MULTI_PARTITION_WRITE((view, base) -> view.upsertAll(null, 
fullTuples(base)));
+
+        private final BiConsumer<RecordView<Tuple>, Integer> action;
+
+        BinaryRecordViewOperation(BiConsumer<RecordView<Tuple>, Integer> 
action) {
+            this.action = action;
+        }
+
+        void execute(int base, RecordView<Tuple> view) {
+            action.accept(view, base);
+        }
+
+        private static Tuple fullTuple(int id) {
+            return Tuple.create().set("id", id).set("val", 
Integer.toString(id));
+        }
+
+        private static List<Tuple> fullTuples(int base) {
+            return IntStream.range(base, base + BATCH_SIZE)
+                    .mapToObj(BinaryRecordViewOperation::fullTuple)
+                    .collect(toList());
+        }
+    }
+
+    @SuppressWarnings({"FieldCanBeLocal", "unused"})
+    private static class Record {
+        private int id;
+        private @Nullable String val;
+
+        private Record() {
+        }
+
+        private Record(int id, @Nullable String val) {
+            this.id = id;
+            this.val = val;
+        }
+    }
+
+    private enum PlainRecordViewOperation {
+        SINGLE_READ((view, base) -> view.get(null, keyRecord(base))),
+        MULTI_PARTITION_READ((view, base) -> view.getAll(null, 
keyRecords(base))),
+        SINGLE_WRITE((view, base) -> view.upsert(null, fullRecord(base))),
+        MULTI_PARTITION_WRITE((view, base) -> view.upsertAll(null, 
fullRecords(base)));
+
+        private final BiConsumer<RecordView<Record>, Integer> action;
+
+        PlainRecordViewOperation(BiConsumer<RecordView<Record>, Integer> 
action) {
+            this.action = action;
+        }
+
+        void execute(int base, RecordView<Record> view) {
+            action.accept(view, base);
+        }
+
+        private static Record keyRecord(int id) {
+            return new Record(id, null);
+        }
+
+        private static List<Record> keyRecords(int base) {
+            return IntStream.range(base, base + BATCH_SIZE)
+                    .mapToObj(PlainRecordViewOperation::keyRecord)
+                    .collect(toList());
+        }
+
+        private static Record fullRecord(int id) {
+            return new Record(id, Integer.toString(id));
+        }
+
+        private static List<Record> fullRecords(int base) {
+            return IntStream.range(base, base + BATCH_SIZE)
+                    .mapToObj(PlainRecordViewOperation::fullRecord)
+                    .collect(toList());
+        }
+    }
+
+    private enum BinaryKvViewOperation {
+        SINGLE_READ((view, base) -> view.get(null, keyTuple(base))),
+        MULTI_PARTITION_READ((view, base) -> view.getAll(null, 
keyTuples(base))),
+        SINGLE_WRITE((view, base) -> view.put(null, keyTuple(base), 
valueTuple(base))),
+        MULTI_PARTITION_WRITE((view, base) -> view.putAll(null, 
mapOfTuples(base)));
+
+        private final BiConsumer<KeyValueView<Tuple, Tuple>, Integer> action;
+
+        BinaryKvViewOperation(BiConsumer<KeyValueView<Tuple, Tuple>, Integer> 
action) {
+            this.action = action;
+        }
+
+        void execute(int base, KeyValueView<Tuple, Tuple> view) {
+            action.accept(view, base);
+        }
+
+        private static Tuple valueTuple(int id) {
+            return Tuple.create().set("val", Integer.toString(id));
+        }
+
+        private static Map<Tuple, Tuple> mapOfTuples(int base) {
+            return IntStream.range(base, base + BATCH_SIZE)
+                    .boxed()
+                    
.collect(toMap(ItSchemaSyncAndImplicitTransactionsTest::keyTuple, 
BinaryKvViewOperation::valueTuple));
+        }
+    }
+
+    private enum PlainKvViewOperation {
+        SINGLE_READ((view, base) -> view.get(null, base)),
+        MULTI_PARTITION_READ((view, base) -> view.getAll(null, keys(base))),
+        SINGLE_WRITE((view, base) -> view.put(null, base, 
Integer.toString(base))),
+        MULTI_PARTITION_WRITE((view, base) -> view.putAll(null, 
mapToValues(base)));
+
+        private final BiConsumer<KeyValueView<Integer, String>, Integer> 
action;
+
+        PlainKvViewOperation(BiConsumer<KeyValueView<Integer, String>, 
Integer> action) {
+            this.action = action;
+        }
+
+        void execute(int base, KeyValueView<Integer, String> view) {
+            action.accept(view, base);
+        }
+
+        private static List<Integer> keys(int base) {
+            return IntStream.range(base, base + BATCH_SIZE)
+                    .boxed()
+                    .collect(toList());
+        }
+
+        private static Map<Integer, String> mapToValues(int base) {
+            return IntStream.range(base, base + BATCH_SIZE)
+                    .boxed()
+                    .collect(toMap(identity(), n -> Integer.toString(n)));
+        }
+    }
+}
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncSingleNodeTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncSingleNodeTest.java
index 206670eced..eef367cfcf 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncSingleNodeTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncSingleNodeTest.java
@@ -120,15 +120,11 @@ class ItSchemaSyncSingleNodeTest extends 
ClusterPerTestIntegrationTest {
     }
 
     private void createTable() {
-        cluster.doInSession(0, session -> {
-            executeUpdate("CREATE TABLE " + TABLE_NAME + " (id int, val 
varchar, PRIMARY KEY USING HASH (id))", session);
-        });
+        executeUpdate("CREATE TABLE " + TABLE_NAME + " (id int, val varchar, 
PRIMARY KEY USING HASH (id))", node.sql());
     }
 
     private void alterTable(String tableName) {
-        cluster.doInSession(0, session -> {
-            executeUpdate("ALTER TABLE " + tableName + " ADD COLUMN added 
int", session);
-        });
+        executeUpdate("ALTER TABLE " + tableName + " ADD COLUMN added int", 
node.sql());
     }
 
     private static void putPreExistingValueTo(Table table) {
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
index 9954734794..bfffdf5956 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
@@ -41,6 +41,7 @@ import 
org.apache.ignite.internal.table.criteria.CursorAdapter;
 import org.apache.ignite.internal.table.criteria.QueryCriteriaAsyncCursor;
 import org.apache.ignite.internal.table.criteria.SqlSerializer;
 import org.apache.ignite.internal.table.criteria.SqlSerializer.Builder;
+import 
org.apache.ignite.internal.table.distributed.replicator.IncompatibleSchemaException;
 import 
org.apache.ignite.internal.table.distributed.replicator.InternalSchemaVersionMismatchException;
 import org.apache.ignite.internal.table.distributed.schema.SchemaVersions;
 import org.apache.ignite.internal.tx.InternalTransaction;
@@ -161,21 +162,41 @@ abstract class AbstractTableView<R> implements 
CriteriaQuerySource<R> {
         return schemaVersionFuture
                 .thenCompose(schemaVersion -> action.act(schemaVersion)
                         .handle((res, ex) -> {
+                            if (ex == null) {
+                                return completedFuture(res);
+                            }
+
                             if 
(isOrCausedBy(InternalSchemaVersionMismatchException.class, ex)) {
+                                // There is no transaction, and table version 
was changed between taking the table version (that was used
+                                // to marshal inputs and would be used to 
unmarshal outputs) and starting an implicit transaction
+                                // in InternalTable. A transaction must always 
work with binary rows of the same table version matching the
+                                // version corresponding to the transaction 
creation moment, so this mismatch is not tolerable: we need
+                                // to retry the operation here.
+
                                 assert tx == null : "Only for implicit 
transactions a retry might be requested";
-                                assert previousSchemaVersion == null || 
!Objects.equals(schemaVersion, previousSchemaVersion)
-                                        : "Same schema version (" + 
schemaVersion
-                                        + ") on a retry: something is wrong, 
is this caused by the test setup?";
+                                
assertSchemaVersionIncreased(previousSchemaVersion, schemaVersion);
 
                                 // Repeat.
+                                return withSchemaSync(tx, schemaVersion, 
action);
+                            } else if (tx == null && 
isOrCausedBy(IncompatibleSchemaException.class, ex)) {
+                                // Table version was changed while we were 
executing an implicit transaction (between it had been created
+                                // and the moment when the operation actually 
touched the partition), let's retry.
+                                
assertSchemaVersionIncreased(previousSchemaVersion, schemaVersion);
+
                                 return withSchemaSync(tx, schemaVersion, 
action);
                             } else {
-                                return ex == null ? completedFuture(res) : 
CompletableFuture.<T>failedFuture(ex);
+                                return CompletableFuture.<T>failedFuture(ex);
                             }
                         }))
                 .thenCompose(identity());
     }
 
+    private static void assertSchemaVersionIncreased(@Nullable Integer 
previousSchemaVersion, Integer schemaVersion) {
+        assert previousSchemaVersion == null || !Objects.equals(schemaVersion, 
previousSchemaVersion)
+                : "Same schema version (" + schemaVersion
+                + ") on a retry: something is wrong, is this caused by the 
test setup?";
+    }
+
     /**
      * Map columns to it's names.
      *
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/IncompatibleSchemaException.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/IncompatibleSchemaException.java
index f553110951..04480f784c 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/IncompatibleSchemaException.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/IncompatibleSchemaException.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.table.distributed.replicator;
 
+import 
org.apache.ignite.internal.replicator.exception.ExpectedReplicationException;
 import org.apache.ignite.lang.ErrorGroups.Transactions;
 import org.apache.ignite.tx.TransactionException;
 
@@ -24,7 +25,7 @@ import org.apache.ignite.tx.TransactionException;
  * Thrown when, during an attempt to execute a transactional operation, it 
turns out that the operation cannot be executed
  * because an incompatible schema change has happened.
  */
-public class IncompatibleSchemaException extends TransactionException {
+public class IncompatibleSchemaException extends TransactionException 
implements ExpectedReplicationException {
     public IncompatibleSchemaException(String message) {
         super(Transactions.TX_INCOMPATIBLE_SCHEMA_ERR, message);
     }

Reply via email to