This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new dc637b77e7 IGNITE-21591 Add tuple updrades during index backfill 
process (#3280)
dc637b77e7 is described below

commit dc637b77e715fb20f1595e004feec85a08db36e2
Author: Kirill Tkalenko <tkalkir...@yandex.ru>
AuthorDate: Tue Feb 27 09:32:22 2024 +0300

    IGNITE-21591 Add tuple updrades during index backfill process (#3280)
---
 .../internal/index/ItBuildIndexOneNodeTest.java    |  87 +++++++++++++-
 .../apache/ignite/internal/index/IndexManager.java |   2 +-
 .../ignite/internal/schema/BinaryRowUpgrader.java  |  61 ++++++++++
 .../internal/schema/BinaryRowUpgraderTest.java     | 133 +++++++++++++++++++++
 .../ReplicasSafeTimePropagationTest.java           |   4 +-
 .../internal/table/distributed/TableManager.java   |   9 +-
 .../table/distributed/raft/PartitionListener.java  |  50 +++++++-
 .../replicator/PartitionReplicaListener.java       |   1 +
 .../raft/PartitionCommandListenerTest.java         |  22 +++-
 .../apache/ignite/distributed/ItTxTestCluster.java |   7 +-
 .../table/impl/DummyInternalTableImpl.java         |   3 +-
 11 files changed, 356 insertions(+), 23 deletions(-)

diff --git 
a/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexOneNodeTest.java
 
b/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexOneNodeTest.java
index 4873a7210b..8cf4826abd 100644
--- 
a/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexOneNodeTest.java
+++ 
b/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexOneNodeTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.index;
 
 import static java.util.concurrent.CompletableFuture.failedFuture;
+import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME;
 import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
 import static 
org.apache.ignite.internal.sql.engine.util.QueryChecker.containsIndexScan;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
@@ -42,8 +43,11 @@ import 
org.apache.ignite.internal.catalog.events.MakeIndexAvailableEventParamete
 import org.apache.ignite.internal.sql.BaseSqlIntegrationTest;
 import org.apache.ignite.internal.sql.engine.util.QueryChecker;
 import 
org.apache.ignite.internal.table.distributed.replication.request.BuildIndexReplicaRequest;
+import org.apache.ignite.tx.Transaction;
+import org.apache.ignite.tx.TransactionOptions;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 /** Integration test for testing the building of an index in a single node 
cluster. */
@@ -113,7 +117,7 @@ public class ItBuildIndexOneNodeTest extends 
BaseSqlIntegrationTest {
 
         // Now let's check the data itself.
         assertQuery(format("SELECT * FROM {} WHERE salary > 0.0", TABLE_NAME))
-                .matches(containsIndexScan("PUBLIC", TABLE_NAME, INDEX_NAME))
+                .matches(containsIndexScan(DEFAULT_SCHEMA_NAME, TABLE_NAME, 
INDEX_NAME))
                 .returns(0, "0", 10.0)
                 .check();
     }
@@ -146,7 +150,7 @@ public class ItBuildIndexOneNodeTest extends 
BaseSqlIntegrationTest {
 
         // Now let's check the data itself.
         assertQuery(format("SELECT * FROM {} WHERE salary > 0.0", TABLE_NAME))
-                .matches(containsIndexScan("PUBLIC", TABLE_NAME, INDEX_NAME))
+                .matches(containsIndexScan(DEFAULT_SCHEMA_NAME, TABLE_NAME, 
INDEX_NAME))
                 .returnRowCount(nextPersonId.get())
                 .check();
     }
@@ -186,7 +190,7 @@ public class ItBuildIndexOneNodeTest extends 
BaseSqlIntegrationTest {
 
         // Now let's check the data itself.
         QueryChecker queryChecker = assertQuery(format("SELECT NAME FROM {} 
WHERE salary > 0.0 ORDER BY ID ASC", TABLE_NAME))
-                .matches(containsIndexScan("PUBLIC", TABLE_NAME, INDEX_NAME))
+                .matches(containsIndexScan(DEFAULT_SCHEMA_NAME, TABLE_NAME, 
INDEX_NAME))
                 .ordered();
 
         int updatedRowCount = updateIntoTableFuture.join();
@@ -232,11 +236,78 @@ public class ItBuildIndexOneNodeTest extends 
BaseSqlIntegrationTest {
 
         // Now let's check the data itself.
         assertQuery(format("SELECT NAME FROM {} WHERE salary > 0.0", 
TABLE_NAME))
-                .matches(containsIndexScan("PUBLIC", TABLE_NAME, INDEX_NAME))
+                .matches(containsIndexScan(DEFAULT_SCHEMA_NAME, TABLE_NAME, 
INDEX_NAME))
                 .returnRowCount(nextPersonId.get() - 
deleteFromTableFuture.join())
                 .check();
     }
 
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-21606";)
+    @Test
+    void testBuildingIndexWithUpdateSchema() throws Exception {
+        createZoneAndTable(ZONE_NAME, TABLE_NAME, 1, 1);
+
+        insertPeople(TABLE_NAME, new Person(0, "0", 10.0));
+
+        sql(format("ALTER TABLE {} ADD COLUMN SURNAME VARCHAR DEFAULT 'foo'", 
TABLE_NAME));
+
+        String indexName0 = INDEX_NAME + 0;
+        String indexName1 = INDEX_NAME + 1;
+
+        createIndex(TABLE_NAME, indexName0, "SALARY");
+        createIndex(TABLE_NAME, indexName1, "SURNAME");
+
+        awaitIndexesBecomeAvailable(node(), indexName0);
+        awaitIndexesBecomeAvailable(node(), indexName1);
+
+        // Hack so that we can wait for the index to be added to the sql 
planner.
+        waitForReadTimestampThatObservesMostRecentCatalog();
+
+        assertQuery(format("SELECT * FROM {} WHERE salary > 0.0", TABLE_NAME))
+                .matches(containsIndexScan(DEFAULT_SCHEMA_NAME, TABLE_NAME, 
indexName0))
+                .returns(0, "0", 10.0, "foo")
+                .check();
+
+        assertQuery(format("SELECT * FROM {} WHERE SURNAME = 'foo'", 
TABLE_NAME))
+                .matches(containsIndexScan(DEFAULT_SCHEMA_NAME, TABLE_NAME, 
indexName1))
+                .returns(0, "0", 10.0, "foo")
+                .check();
+    }
+
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-21606";)
+    @Test
+    void testBuildingIndexWithUpdateSchemaAfterCreateIndex() throws Exception {
+        createZoneAndTable(ZONE_NAME, TABLE_NAME, 1, 1);
+
+        insertPeople(TABLE_NAME, new Person(0, "0", 10.0));
+
+        String columName = "SURNAME";
+
+        sql(format("ALTER TABLE {} ADD COLUMN {} VARCHAR DEFAULT 'foo'", 
TABLE_NAME, columName));
+
+        // Hack to prevent the index from going into status BUILDING until we 
update the default value for the column.
+        Transaction rwTx = node().transactions().begin(new 
TransactionOptions().readOnly(false));
+
+        try {
+            setAwaitIndexAvailability(false);
+
+            createIndex(TABLE_NAME, INDEX_NAME, columName);
+
+            sql(format("ALTER TABLE {} ALTER COLUMN {} SET DEFAULT 'bar'", 
TABLE_NAME, columName));
+        } finally {
+            setAwaitIndexAvailability(true);
+            rwTx.commit();
+        }
+
+        // Hack so that we can wait for the index to be added to the sql 
planner.
+        awaitIndexesBecomeAvailable(node(), INDEX_NAME);
+        waitForReadTimestampThatObservesMostRecentCatalog();
+
+        assertQuery(format("SELECT * FROM {} WHERE SURNAME = 'foo'", 
TABLE_NAME))
+                .matches(containsIndexScan(DEFAULT_SCHEMA_NAME, TABLE_NAME, 
INDEX_NAME))
+                .returns(0, "0", 10.0, "foo")
+                .check();
+    }
+
     private static IgniteImpl node() {
         return CLUSTER.node(0);
     }
@@ -279,10 +350,14 @@ public class ItBuildIndexOneNodeTest extends 
BaseSqlIntegrationTest {
     }
 
     private static void createIndexForSalaryFieldAndWaitBecomeAvailable() 
throws Exception {
-        createIndex(TABLE_NAME, INDEX_NAME, "SALARY");
+        createIndexAndWaitBecomeAvailable(INDEX_NAME, "SALARY");
+    }
+
+    private static void createIndexAndWaitBecomeAvailable(String indexName, 
String columnName) throws Exception {
+        createIndex(TABLE_NAME, indexName, columnName);
 
         // Hack so that we can wait for the index to be added to the sql 
planner.
-        awaitIndexesBecomeAvailable(node(), INDEX_NAME);
+        awaitIndexesBecomeAvailable(node(), indexName);
         waitForReadTimestampThatObservesMostRecentCatalog();
     }
 
diff --git 
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
 
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
index 360b2a4a75..192a121f97 100644
--- 
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
+++ 
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
@@ -286,7 +286,7 @@ public class IndexManager implements IgniteComponent {
             for (int i = 0; i < indexedColumns.length; i++) {
                 Column column = descriptor.column(indexedColumns[i]);
 
-                assert column != null : indexedColumns[i];
+                assert column != null : "schemaVersion=" + 
descriptor.version() + ", column=" + indexedColumns[i];
 
                 result[i] = column.schemaIndex();
             }
diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRowUpgrader.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRowUpgrader.java
new file mode 100644
index 0000000000..ad3ad4ad2e
--- /dev/null
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRowUpgrader.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schema;
+
+import org.apache.ignite.internal.schema.row.Row;
+import org.apache.ignite.internal.schema.row.RowAssembler;
+
+/** {@link BinaryRow} upgrader to the required schema version. */
+public class BinaryRowUpgrader {
+    private final SchemaRegistry schemaRegistry;
+
+    private final SchemaDescriptor targetSchema;
+
+    /** Constructor. */
+    public BinaryRowUpgrader(SchemaRegistry schemaRegistry, int 
targetSchemaVersion) {
+        this(schemaRegistry, schemaRegistry.schema(targetSchemaVersion));
+    }
+
+    /** Constructor. */
+    public BinaryRowUpgrader(SchemaRegistry schemaRegistry, SchemaDescriptor 
targetSchema) {
+        this.schemaRegistry = schemaRegistry;
+        this.targetSchema = targetSchema;
+    }
+
+    /**
+     * Returns an upgraded {@link BinaryRow} to the required schema version, 
or the {@code source} if its schema version is greater than or
+     * equal to to the required schema version.
+     *
+     * @param source Source binary row.
+     */
+    public BinaryRow upgrade(BinaryRow source) {
+        if (source.schemaVersion() >= targetSchema.version()) {
+            return source;
+        }
+
+        Row upgradedRow = schemaRegistry.resolve(source, targetSchema);
+
+        var rowAssembler = new RowAssembler(targetSchema, -1);
+
+        for (int i = 0; i < targetSchema.length(); i++) {
+            rowAssembler.appendValue(upgradedRow.value(i));
+        }
+
+        return new BinaryRowImpl(targetSchema.version(), 
rowAssembler.build().tupleSlice());
+    }
+}
diff --git 
a/modules/schema/src/test/java/org/apache/ignite/internal/schema/BinaryRowUpgraderTest.java
 
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/BinaryRowUpgraderTest.java
new file mode 100644
index 0000000000..0fbeebf2de
--- /dev/null
+++ 
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/BinaryRowUpgraderTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schema;
+
+import static org.apache.ignite.internal.schema.BinaryRowMatcher.equalToRow;
+import static org.apache.ignite.internal.type.NativeTypes.INT32;
+import static org.apache.ignite.internal.type.NativeTypes.INT64;
+import static org.apache.ignite.internal.type.NativeTypes.STRING;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.sameInstance;
+
+import org.apache.ignite.internal.schema.registry.SchemaRegistryImpl;
+import org.apache.ignite.internal.schema.row.RowAssembler;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+/** For {@link BinaryRowUpgrader} testing. */
+public class BinaryRowUpgraderTest {
+    private static final SchemaDescriptor ORIGINAL_SCHEMA = new 
SchemaDescriptor(
+            1,
+            new Column[]{new Column("k", INT32, false)},
+            new Column[]{new Column("v", INT32, true)}
+    );
+
+    private static final SchemaDescriptor NULLABLE_COLUMN_ADDED = new 
SchemaDescriptor(
+            2,
+            new Column[]{new Column("k", INT32, false)},
+            new Column[]{
+                    new Column("v", INT32, true),
+                    new Column("vLong", INT64, true)
+            }
+    );
+
+    private static final SchemaDescriptor DEFAULTED_COLUMN_ADDED = new 
SchemaDescriptor(
+            3,
+            new Column[]{new Column("k", INT32, false)},
+            new Column[]{
+                    new Column("v", INT32, true),
+                    new Column("vLong", INT64, true),
+                    new Column("s", STRING, true, 
DefaultValueProvider.constantProvider("foo"))
+            }
+    );
+
+    private static final SchemaDescriptor DEFAULT_COLUMN_CHANGED = new 
SchemaDescriptor(
+            4,
+            new Column[]{new Column("k", INT32, false)},
+            new Column[]{
+                    new Column("v", INT32, true),
+                    new Column("vLong", INT64, true),
+                    new Column("s", STRING, true, 
DefaultValueProvider.constantProvider("bar"))
+            }
+    );
+
+    private static final SchemaRegistry SCHEMA_REGISTRY = new 
SchemaRegistryImpl(schemaVersion -> {
+        if (ORIGINAL_SCHEMA.version() == schemaVersion) {
+            return ORIGINAL_SCHEMA;
+        } else if (NULLABLE_COLUMN_ADDED.version() == schemaVersion) {
+            return NULLABLE_COLUMN_ADDED;
+        } else if (DEFAULTED_COLUMN_ADDED.version() == schemaVersion) {
+            return DEFAULTED_COLUMN_ADDED;
+        } else if (DEFAULT_COLUMN_CHANGED.version() == schemaVersion) {
+            return DEFAULT_COLUMN_CHANGED;
+        }
+
+        return null;
+    }, ORIGINAL_SCHEMA);
+
+    @BeforeAll
+    static void beforeAll() {
+        
NULLABLE_COLUMN_ADDED.columnMapping(SchemaUtils.columnMapper(ORIGINAL_SCHEMA, 
NULLABLE_COLUMN_ADDED));
+        
DEFAULTED_COLUMN_ADDED.columnMapping(SchemaUtils.columnMapper(NULLABLE_COLUMN_ADDED,
 DEFAULTED_COLUMN_ADDED));
+        
DEFAULT_COLUMN_CHANGED.columnMapping(SchemaUtils.columnMapper(DEFAULTED_COLUMN_ADDED,
 DEFAULT_COLUMN_CHANGED));
+    }
+
+    @Test
+    void testNoUpgradeRow() {
+        BinaryRow source = binaryRow(ORIGINAL_SCHEMA, 1, 2);
+
+        assertThat(upgradeRow(ORIGINAL_SCHEMA, source), sameInstance(source));
+    }
+
+    @Test
+    void testUpgradeRow() {
+        BinaryRow source = binaryRow(ORIGINAL_SCHEMA, 1, 2);
+        BinaryRow expected = binaryRow(NULLABLE_COLUMN_ADDED, 1, 2, null);
+
+        assertThat(upgradeRow(NULLABLE_COLUMN_ADDED, source), 
equalToRow(expected));
+    }
+
+    @Test
+    void testUpgradeRowWithStringDefault() {
+        BinaryRow source = binaryRow(NULLABLE_COLUMN_ADDED, 1, 2, 10L);
+        BinaryRow expected = binaryRow(DEFAULTED_COLUMN_ADDED, 1, 2, 10L, 
"foo");
+
+        assertThat(upgradeRow(DEFAULTED_COLUMN_ADDED, source), 
equalToRow(expected));
+    }
+
+    @Test
+    void testDontDowngradeSourceBinaryRow() {
+        BinaryRow source = binaryRow(DEFAULT_COLUMN_CHANGED, 1, 2, 10L, "bar");
+
+        assertThat(upgradeRow(DEFAULTED_COLUMN_ADDED, source), 
sameInstance(source));
+    }
+
+    private static BinaryRow binaryRow(SchemaDescriptor schema, Object... 
values) {
+        var rowAssembler = new RowAssembler(schema, -1);
+
+        for (Object value : values) {
+            rowAssembler.appendValue(value);
+        }
+
+        return new BinaryRowImpl(schema.version(), 
rowAssembler.build().tupleSlice());
+    }
+
+    private static BinaryRow upgradeRow(SchemaDescriptor targetSchema, 
BinaryRow source) {
+        return new BinaryRowUpgrader(SCHEMA_REGISTRY, 
targetSchema).upgrade(source);
+    }
+}
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java
index ed0768455f..8086c595d7 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java
@@ -57,6 +57,7 @@ import 
org.apache.ignite.internal.raft.server.RaftGroupOptions;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.replicator.TestReplicationGroupId;
 import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import org.apache.ignite.internal.schema.SchemaRegistry;
 import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
 import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
 import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
@@ -282,7 +283,8 @@ public class ReplicasSafeTimePropagationTest extends 
IgniteAbstractTest {
                                     mock(TxStateStorage.class),
                                     mock(PendingComparableValuesTracker.class),
                                     mock(PendingComparableValuesTracker.class),
-                                    mock(CatalogService.class)
+                                    mock(CatalogService.class),
+                                    mock(SchemaRegistry.class)
                             ),
                             RaftGroupEventsListener.noopLsnr,
                             RaftGroupOptions.defaults()
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 2a5a7b4c72..f64a71559c 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -873,7 +873,7 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
                             newConfiguration,
                             safeTimeTracker,
                             storageIndexTracker,
-                            internalTbl,
+                            table,
                             partitionStorages.getTxStateStorage(),
                             partitionDataStorage,
                             partitionUpdateHandlers
@@ -1915,11 +1915,13 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
             PeersAndLearners stableConfiguration,
             PendingComparableValuesTracker<HybridTimestamp, Void> 
safeTimeTracker,
             PendingComparableValuesTracker<Long, Void> storageIndexTracker,
-            InternalTable internalTable,
+            TableImpl table,
             TxStateStorage txStatePartitionStorage,
             PartitionDataStorage partitionDataStorage,
             PartitionUpdateHandlers partitionUpdateHandlers
     ) throws NodeStoppingException {
+        InternalTable internalTable = table.internalTable();
+
         RaftGroupOptions groupOptions = groupOptionsForPartition(
                 internalTable.storage(),
                 internalTable.txStateStorage(),
@@ -1934,7 +1936,8 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                 txStatePartitionStorage,
                 safeTimeTracker,
                 storageIndexTracker,
-                catalogService
+                catalogService,
+                table.schemaView()
         );
 
         RaftGroupEventsListener raftGrpEvtsLsnr = new 
RebalanceRaftGroupEventsListener(
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index b3d81f7552..69bd2e2cd9 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -40,6 +40,8 @@ import java.util.function.Consumer;
 import java.util.stream.Stream;
 import org.apache.ignite.internal.catalog.Catalog;
 import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.lang.SafeTimeReorderException;
@@ -54,6 +56,10 @@ import 
org.apache.ignite.internal.raft.service.CommittedConfiguration;
 import org.apache.ignite.internal.raft.service.RaftGroupListener;
 import 
org.apache.ignite.internal.replicator.command.SafeTimePropagatingCommand;
 import org.apache.ignite.internal.replicator.command.SafeTimeSyncCommand;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.BinaryRowUpgrader;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.SchemaRegistry;
 import org.apache.ignite.internal.storage.BinaryRowAndRowId;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.MvPartitionStorage.Locker;
@@ -108,6 +114,8 @@ public class PartitionListener implements 
RaftGroupListener, BeforeApplyHandler
 
     private final CatalogService catalogService;
 
+    private final SchemaRegistry schemaRegistry;
+
     /**
      * The constructor.
      *
@@ -124,7 +132,8 @@ public class PartitionListener implements 
RaftGroupListener, BeforeApplyHandler
             TxStateStorage txStateStorage,
             PendingComparableValuesTracker<HybridTimestamp, Void> safeTime,
             PendingComparableValuesTracker<Long, Void> storageIndexTracker,
-            CatalogService catalogService
+            CatalogService catalogService,
+            SchemaRegistry schemaRegistry
     ) {
         this.txManager = txManager;
         this.storage = partitionDataStorage;
@@ -133,6 +142,7 @@ public class PartitionListener implements 
RaftGroupListener, BeforeApplyHandler
         this.safeTime = safeTime;
         this.storageIndexTracker = storageIndexTracker;
         this.catalogService = catalogService;
+        this.schemaRegistry = schemaRegistry;
     }
 
     @Override
@@ -517,13 +527,20 @@ public class PartitionListener implements 
RaftGroupListener, BeforeApplyHandler
 
         BuildIndexRowVersionChooser rowVersionChooser = 
createBuildIndexRowVersionChooser(cmd);
 
+        BinaryRowUpgrader binaryRowUpgrader = createBinaryRowUpgrader(cmd);
+
         storage.runConsistently(locker -> {
             List<UUID> rowUuids = new ArrayList<>(cmd.rowIds());
 
             // Natural UUID order matches RowId order within the same 
partition.
             Collections.sort(rowUuids);
 
-            Stream<BinaryRowAndRowId> buildIndexRowStream = 
createBuildIndexRowStream(rowUuids, locker, rowVersionChooser);
+            Stream<BinaryRowAndRowId> buildIndexRowStream = 
createBuildIndexRowStream(
+                    rowUuids,
+                    locker,
+                    rowVersionChooser,
+                    binaryRowUpgrader
+            );
 
             RowId nextRowIdToBuild = cmd.finish() ? null : 
toRowId(requireNonNull(last(rowUuids))).increment();
 
@@ -576,13 +593,15 @@ public class PartitionListener implements 
RaftGroupListener, BeforeApplyHandler
     private Stream<BinaryRowAndRowId> createBuildIndexRowStream(
             List<UUID> rowUuids,
             Locker locker,
-            BuildIndexRowVersionChooser rowVersionChooser
+            BuildIndexRowVersionChooser rowVersionChooser,
+            BinaryRowUpgrader binaryRowUpgrader
     ) {
         return rowUuids.stream()
                 .map(this::toRowId)
                 .peek(locker::lock)
                 .map(rowVersionChooser::chooseForBuildIndex)
-                .flatMap(Collection::stream);
+                .flatMap(Collection::stream)
+                .map(binaryRowAndRowId -> upgradeBinaryRow(binaryRowUpgrader, 
binaryRowAndRowId));
     }
 
     private RowId toRowId(UUID rowUuid) {
@@ -621,4 +640,27 @@ public class PartitionListener implements 
RaftGroupListener, BeforeApplyHandler
 
         return new BuildIndexRowVersionChooser(storage, 
indexCreationCatalog.time(), startBuildingIndexCatalog.time());
     }
+
+    private BinaryRowUpgrader createBinaryRowUpgrader(BuildIndexCommand 
command) {
+        int indexCreationCatalogVersion = command.creationCatalogVersion();
+
+        CatalogIndexDescriptor indexDescriptor = 
catalogService.index(command.indexId(), indexCreationCatalogVersion);
+
+        assert indexDescriptor != null : "indexId=" + command.indexId() + ", 
catalogVersion=" + indexCreationCatalogVersion;
+
+        CatalogTableDescriptor tableDescriptor = 
catalogService.table(indexDescriptor.tableId(), indexCreationCatalogVersion);
+
+        assert tableDescriptor != null : "tableId=" + 
indexDescriptor.tableId() + ", catalogVersion=" + indexCreationCatalogVersion;
+
+        SchemaDescriptor schema = 
schemaRegistry.schema(tableDescriptor.tableVersion());
+
+        return new BinaryRowUpgrader(schemaRegistry, schema);
+    }
+
+    private static BinaryRowAndRowId upgradeBinaryRow(BinaryRowUpgrader 
upgrader, BinaryRowAndRowId source) {
+        BinaryRow sourceBinaryRow = source.binaryRow();
+        BinaryRow upgradedBinaryRow = upgrader.upgrade(sourceBinaryRow);
+
+        return upgradedBinaryRow == sourceBinaryRow ? source : new 
BinaryRowAndRowId(upgradedBinaryRow, source.rowId());
+    }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index d9b5fbbea9..02bd232158 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -1355,6 +1355,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @return {@code true} if index row matches the binary row, {@code false} 
otherwise.
      */
     private static boolean indexRowMatches(IndexRow indexRow, BinaryRow 
binaryRow, TableSchemaAwareIndexStorage schemaAwareIndexStorage) {
+        // TODO: IGNITE-21606 It is necessary to upgrade the tuple to the 
required schema version
         BinaryTuple actualIndexRow = 
schemaAwareIndexStorage.indexRowResolver().extractColumns(binaryRow);
 
         return 
indexRow.indexColumns().byteBuffer().equals(actualIndexRow.byteBuffer());
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
index bd534b2861..656b8d4326 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
@@ -59,6 +59,7 @@ import 
org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
 import org.apache.ignite.internal.catalog.Catalog;
 import org.apache.ignite.internal.catalog.CatalogService;
 import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
 import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.hlc.HybridClock;
@@ -79,6 +80,7 @@ import org.apache.ignite.internal.schema.BinaryRowConverter;
 import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.SchemaRegistry;
 import 
org.apache.ignite.internal.schema.configuration.StorageUpdateConfiguration;
 import org.apache.ignite.internal.schema.row.Row;
 import org.apache.ignite.internal.schema.row.RowAssembler;
@@ -101,6 +103,7 @@ import 
org.apache.ignite.internal.table.distributed.command.WriteIntentSwitchCom
 import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
 import 
org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
 import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
+import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.internal.testframework.WorkDirectory;
 import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
@@ -142,6 +145,8 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
             new Column[]{new Column("value", NativeTypes.INT32, false)}
     );
 
+    private static final SchemaRegistry SCHEMA_REGISTRY = new 
DummySchemaManagerImpl(SCHEMA);
+
     private PartitionListener commandListener;
 
     private final AtomicLong raftIndex = new AtomicLong();
@@ -216,12 +221,19 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
 
         Catalog catalog = mock(Catalog.class);
 
+        lenient().when(catalog.index(indexId)).thenReturn(indexDescriptor);
+        lenient().when(catalogService.catalog(anyInt())).thenReturn(catalog);
+
         indexDescriptor = mock(CatalogIndexDescriptor.class);
 
         lenient().when(indexDescriptor.id()).thenReturn(indexId);
-        lenient().when(catalog.index(indexId)).thenReturn(indexDescriptor);
         lenient().when(catalogService.indexes(anyInt(), 
anyInt())).thenReturn(List.of(indexDescriptor));
-        lenient().when(catalogService.catalog(anyInt())).thenReturn(catalog);
+        lenient().when(catalogService.index(anyInt(), 
anyInt())).thenReturn(indexDescriptor);
+
+        CatalogTableDescriptor tableDescriptor = 
mock(CatalogTableDescriptor.class);
+
+        
lenient().when(tableDescriptor.tableVersion()).thenReturn(SCHEMA.version());
+        lenient().when(catalogService.table(anyInt(), 
anyInt())).thenReturn(tableDescriptor);
 
         commandListener = new PartitionListener(
                 mock(TxManager.class),
@@ -230,7 +242,8 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
                 txStateStorage,
                 safeTimeTracker,
                 new PendingComparableValuesTracker<>(0L),
-                catalogService
+                catalogService,
+                SCHEMA_REGISTRY
         );
     }
 
@@ -330,7 +343,8 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
                 txStateStorage,
                 safeTimeTracker,
                 new PendingComparableValuesTracker<>(0L),
-                catalogService
+                catalogService,
+                SCHEMA_REGISTRY
         );
 
         txStateStorage.lastApplied(3L, 1L);
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index e7dae8485a..e4c6da5eed 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -572,6 +572,8 @@ public class ItTxTestCluster {
                         new RaftGroupEventsClientListener()
                 );
 
+                DummySchemaManagerImpl schemaManager = new 
DummySchemaManagerImpl(schemaDescriptor);
+
                 PartitionListener partitionListener = new PartitionListener(
                         txManagers.get(assignment),
                         partitionDataStorage,
@@ -579,7 +581,8 @@ public class ItTxTestCluster {
                         txStateStorage,
                         safeTime,
                         storageIndexTracker,
-                        catalogService
+                        catalogService,
+                        schemaManager
                 );
 
                 CompletableFuture<Void> partitionReadyFuture = 
raftServers.get(assignment).startRaftGroupNode(
@@ -591,8 +594,6 @@ public class ItTxTestCluster {
                 ).thenAccept(
                         raftSvc -> {
                             try {
-                                DummySchemaManagerImpl schemaManager = new 
DummySchemaManagerImpl(schemaDescriptor);
-
                                 PartitionReplicaListener listener = 
newReplicaListener(
                                         mvPartStorage,
                                         raftSvc,
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index d1f46cba88..1ae5e6cc71 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -403,7 +403,8 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
                 txStateStorage().getOrCreateTxStateStorage(PART_ID),
                 safeTime,
                 new PendingComparableValuesTracker<>(0L),
-                catalogService
+                catalogService,
+                schemaManager
         );
     }
 


Reply via email to