This is an automated email from the ASF dual-hosted git repository. tkalkirill pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new dc637b77e7 IGNITE-21591 Add tuple updrades during index backfill process (#3280) dc637b77e7 is described below commit dc637b77e715fb20f1595e004feec85a08db36e2 Author: Kirill Tkalenko <tkalkir...@yandex.ru> AuthorDate: Tue Feb 27 09:32:22 2024 +0300 IGNITE-21591 Add tuple updrades during index backfill process (#3280) --- .../internal/index/ItBuildIndexOneNodeTest.java | 87 +++++++++++++- .../apache/ignite/internal/index/IndexManager.java | 2 +- .../ignite/internal/schema/BinaryRowUpgrader.java | 61 ++++++++++ .../internal/schema/BinaryRowUpgraderTest.java | 133 +++++++++++++++++++++ .../ReplicasSafeTimePropagationTest.java | 4 +- .../internal/table/distributed/TableManager.java | 9 +- .../table/distributed/raft/PartitionListener.java | 50 +++++++- .../replicator/PartitionReplicaListener.java | 1 + .../raft/PartitionCommandListenerTest.java | 22 +++- .../apache/ignite/distributed/ItTxTestCluster.java | 7 +- .../table/impl/DummyInternalTableImpl.java | 3 +- 11 files changed, 356 insertions(+), 23 deletions(-) diff --git a/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexOneNodeTest.java b/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexOneNodeTest.java index 4873a7210b..8cf4826abd 100644 --- a/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexOneNodeTest.java +++ b/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexOneNodeTest.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.index; import static java.util.concurrent.CompletableFuture.failedFuture; +import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME; import static org.apache.ignite.internal.lang.IgniteStringFormatter.format; import static org.apache.ignite.internal.sql.engine.util.QueryChecker.containsIndexScan; import static org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync; @@ -42,8 +43,11 @@ import org.apache.ignite.internal.catalog.events.MakeIndexAvailableEventParamete import org.apache.ignite.internal.sql.BaseSqlIntegrationTest; import org.apache.ignite.internal.sql.engine.util.QueryChecker; import org.apache.ignite.internal.table.distributed.replication.request.BuildIndexReplicaRequest; +import org.apache.ignite.tx.Transaction; +import org.apache.ignite.tx.TransactionOptions; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; /** Integration test for testing the building of an index in a single node cluster. */ @@ -113,7 +117,7 @@ public class ItBuildIndexOneNodeTest extends BaseSqlIntegrationTest { // Now let's check the data itself. assertQuery(format("SELECT * FROM {} WHERE salary > 0.0", TABLE_NAME)) - .matches(containsIndexScan("PUBLIC", TABLE_NAME, INDEX_NAME)) + .matches(containsIndexScan(DEFAULT_SCHEMA_NAME, TABLE_NAME, INDEX_NAME)) .returns(0, "0", 10.0) .check(); } @@ -146,7 +150,7 @@ public class ItBuildIndexOneNodeTest extends BaseSqlIntegrationTest { // Now let's check the data itself. assertQuery(format("SELECT * FROM {} WHERE salary > 0.0", TABLE_NAME)) - .matches(containsIndexScan("PUBLIC", TABLE_NAME, INDEX_NAME)) + .matches(containsIndexScan(DEFAULT_SCHEMA_NAME, TABLE_NAME, INDEX_NAME)) .returnRowCount(nextPersonId.get()) .check(); } @@ -186,7 +190,7 @@ public class ItBuildIndexOneNodeTest extends BaseSqlIntegrationTest { // Now let's check the data itself. QueryChecker queryChecker = assertQuery(format("SELECT NAME FROM {} WHERE salary > 0.0 ORDER BY ID ASC", TABLE_NAME)) - .matches(containsIndexScan("PUBLIC", TABLE_NAME, INDEX_NAME)) + .matches(containsIndexScan(DEFAULT_SCHEMA_NAME, TABLE_NAME, INDEX_NAME)) .ordered(); int updatedRowCount = updateIntoTableFuture.join(); @@ -232,11 +236,78 @@ public class ItBuildIndexOneNodeTest extends BaseSqlIntegrationTest { // Now let's check the data itself. assertQuery(format("SELECT NAME FROM {} WHERE salary > 0.0", TABLE_NAME)) - .matches(containsIndexScan("PUBLIC", TABLE_NAME, INDEX_NAME)) + .matches(containsIndexScan(DEFAULT_SCHEMA_NAME, TABLE_NAME, INDEX_NAME)) .returnRowCount(nextPersonId.get() - deleteFromTableFuture.join()) .check(); } + @Disabled("https://issues.apache.org/jira/browse/IGNITE-21606") + @Test + void testBuildingIndexWithUpdateSchema() throws Exception { + createZoneAndTable(ZONE_NAME, TABLE_NAME, 1, 1); + + insertPeople(TABLE_NAME, new Person(0, "0", 10.0)); + + sql(format("ALTER TABLE {} ADD COLUMN SURNAME VARCHAR DEFAULT 'foo'", TABLE_NAME)); + + String indexName0 = INDEX_NAME + 0; + String indexName1 = INDEX_NAME + 1; + + createIndex(TABLE_NAME, indexName0, "SALARY"); + createIndex(TABLE_NAME, indexName1, "SURNAME"); + + awaitIndexesBecomeAvailable(node(), indexName0); + awaitIndexesBecomeAvailable(node(), indexName1); + + // Hack so that we can wait for the index to be added to the sql planner. + waitForReadTimestampThatObservesMostRecentCatalog(); + + assertQuery(format("SELECT * FROM {} WHERE salary > 0.0", TABLE_NAME)) + .matches(containsIndexScan(DEFAULT_SCHEMA_NAME, TABLE_NAME, indexName0)) + .returns(0, "0", 10.0, "foo") + .check(); + + assertQuery(format("SELECT * FROM {} WHERE SURNAME = 'foo'", TABLE_NAME)) + .matches(containsIndexScan(DEFAULT_SCHEMA_NAME, TABLE_NAME, indexName1)) + .returns(0, "0", 10.0, "foo") + .check(); + } + + @Disabled("https://issues.apache.org/jira/browse/IGNITE-21606") + @Test + void testBuildingIndexWithUpdateSchemaAfterCreateIndex() throws Exception { + createZoneAndTable(ZONE_NAME, TABLE_NAME, 1, 1); + + insertPeople(TABLE_NAME, new Person(0, "0", 10.0)); + + String columName = "SURNAME"; + + sql(format("ALTER TABLE {} ADD COLUMN {} VARCHAR DEFAULT 'foo'", TABLE_NAME, columName)); + + // Hack to prevent the index from going into status BUILDING until we update the default value for the column. + Transaction rwTx = node().transactions().begin(new TransactionOptions().readOnly(false)); + + try { + setAwaitIndexAvailability(false); + + createIndex(TABLE_NAME, INDEX_NAME, columName); + + sql(format("ALTER TABLE {} ALTER COLUMN {} SET DEFAULT 'bar'", TABLE_NAME, columName)); + } finally { + setAwaitIndexAvailability(true); + rwTx.commit(); + } + + // Hack so that we can wait for the index to be added to the sql planner. + awaitIndexesBecomeAvailable(node(), INDEX_NAME); + waitForReadTimestampThatObservesMostRecentCatalog(); + + assertQuery(format("SELECT * FROM {} WHERE SURNAME = 'foo'", TABLE_NAME)) + .matches(containsIndexScan(DEFAULT_SCHEMA_NAME, TABLE_NAME, INDEX_NAME)) + .returns(0, "0", 10.0, "foo") + .check(); + } + private static IgniteImpl node() { return CLUSTER.node(0); } @@ -279,10 +350,14 @@ public class ItBuildIndexOneNodeTest extends BaseSqlIntegrationTest { } private static void createIndexForSalaryFieldAndWaitBecomeAvailable() throws Exception { - createIndex(TABLE_NAME, INDEX_NAME, "SALARY"); + createIndexAndWaitBecomeAvailable(INDEX_NAME, "SALARY"); + } + + private static void createIndexAndWaitBecomeAvailable(String indexName, String columnName) throws Exception { + createIndex(TABLE_NAME, indexName, columnName); // Hack so that we can wait for the index to be added to the sql planner. - awaitIndexesBecomeAvailable(node(), INDEX_NAME); + awaitIndexesBecomeAvailable(node(), indexName); waitForReadTimestampThatObservesMostRecentCatalog(); } diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java index 360b2a4a75..192a121f97 100644 --- a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java +++ b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java @@ -286,7 +286,7 @@ public class IndexManager implements IgniteComponent { for (int i = 0; i < indexedColumns.length; i++) { Column column = descriptor.column(indexedColumns[i]); - assert column != null : indexedColumns[i]; + assert column != null : "schemaVersion=" + descriptor.version() + ", column=" + indexedColumns[i]; result[i] = column.schemaIndex(); } diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRowUpgrader.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRowUpgrader.java new file mode 100644 index 0000000000..ad3ad4ad2e --- /dev/null +++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRowUpgrader.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.schema; + +import org.apache.ignite.internal.schema.row.Row; +import org.apache.ignite.internal.schema.row.RowAssembler; + +/** {@link BinaryRow} upgrader to the required schema version. */ +public class BinaryRowUpgrader { + private final SchemaRegistry schemaRegistry; + + private final SchemaDescriptor targetSchema; + + /** Constructor. */ + public BinaryRowUpgrader(SchemaRegistry schemaRegistry, int targetSchemaVersion) { + this(schemaRegistry, schemaRegistry.schema(targetSchemaVersion)); + } + + /** Constructor. */ + public BinaryRowUpgrader(SchemaRegistry schemaRegistry, SchemaDescriptor targetSchema) { + this.schemaRegistry = schemaRegistry; + this.targetSchema = targetSchema; + } + + /** + * Returns an upgraded {@link BinaryRow} to the required schema version, or the {@code source} if its schema version is greater than or + * equal to to the required schema version. + * + * @param source Source binary row. + */ + public BinaryRow upgrade(BinaryRow source) { + if (source.schemaVersion() >= targetSchema.version()) { + return source; + } + + Row upgradedRow = schemaRegistry.resolve(source, targetSchema); + + var rowAssembler = new RowAssembler(targetSchema, -1); + + for (int i = 0; i < targetSchema.length(); i++) { + rowAssembler.appendValue(upgradedRow.value(i)); + } + + return new BinaryRowImpl(targetSchema.version(), rowAssembler.build().tupleSlice()); + } +} diff --git a/modules/schema/src/test/java/org/apache/ignite/internal/schema/BinaryRowUpgraderTest.java b/modules/schema/src/test/java/org/apache/ignite/internal/schema/BinaryRowUpgraderTest.java new file mode 100644 index 0000000000..0fbeebf2de --- /dev/null +++ b/modules/schema/src/test/java/org/apache/ignite/internal/schema/BinaryRowUpgraderTest.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.schema; + +import static org.apache.ignite.internal.schema.BinaryRowMatcher.equalToRow; +import static org.apache.ignite.internal.type.NativeTypes.INT32; +import static org.apache.ignite.internal.type.NativeTypes.INT64; +import static org.apache.ignite.internal.type.NativeTypes.STRING; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.sameInstance; + +import org.apache.ignite.internal.schema.registry.SchemaRegistryImpl; +import org.apache.ignite.internal.schema.row.RowAssembler; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +/** For {@link BinaryRowUpgrader} testing. */ +public class BinaryRowUpgraderTest { + private static final SchemaDescriptor ORIGINAL_SCHEMA = new SchemaDescriptor( + 1, + new Column[]{new Column("k", INT32, false)}, + new Column[]{new Column("v", INT32, true)} + ); + + private static final SchemaDescriptor NULLABLE_COLUMN_ADDED = new SchemaDescriptor( + 2, + new Column[]{new Column("k", INT32, false)}, + new Column[]{ + new Column("v", INT32, true), + new Column("vLong", INT64, true) + } + ); + + private static final SchemaDescriptor DEFAULTED_COLUMN_ADDED = new SchemaDescriptor( + 3, + new Column[]{new Column("k", INT32, false)}, + new Column[]{ + new Column("v", INT32, true), + new Column("vLong", INT64, true), + new Column("s", STRING, true, DefaultValueProvider.constantProvider("foo")) + } + ); + + private static final SchemaDescriptor DEFAULT_COLUMN_CHANGED = new SchemaDescriptor( + 4, + new Column[]{new Column("k", INT32, false)}, + new Column[]{ + new Column("v", INT32, true), + new Column("vLong", INT64, true), + new Column("s", STRING, true, DefaultValueProvider.constantProvider("bar")) + } + ); + + private static final SchemaRegistry SCHEMA_REGISTRY = new SchemaRegistryImpl(schemaVersion -> { + if (ORIGINAL_SCHEMA.version() == schemaVersion) { + return ORIGINAL_SCHEMA; + } else if (NULLABLE_COLUMN_ADDED.version() == schemaVersion) { + return NULLABLE_COLUMN_ADDED; + } else if (DEFAULTED_COLUMN_ADDED.version() == schemaVersion) { + return DEFAULTED_COLUMN_ADDED; + } else if (DEFAULT_COLUMN_CHANGED.version() == schemaVersion) { + return DEFAULT_COLUMN_CHANGED; + } + + return null; + }, ORIGINAL_SCHEMA); + + @BeforeAll + static void beforeAll() { + NULLABLE_COLUMN_ADDED.columnMapping(SchemaUtils.columnMapper(ORIGINAL_SCHEMA, NULLABLE_COLUMN_ADDED)); + DEFAULTED_COLUMN_ADDED.columnMapping(SchemaUtils.columnMapper(NULLABLE_COLUMN_ADDED, DEFAULTED_COLUMN_ADDED)); + DEFAULT_COLUMN_CHANGED.columnMapping(SchemaUtils.columnMapper(DEFAULTED_COLUMN_ADDED, DEFAULT_COLUMN_CHANGED)); + } + + @Test + void testNoUpgradeRow() { + BinaryRow source = binaryRow(ORIGINAL_SCHEMA, 1, 2); + + assertThat(upgradeRow(ORIGINAL_SCHEMA, source), sameInstance(source)); + } + + @Test + void testUpgradeRow() { + BinaryRow source = binaryRow(ORIGINAL_SCHEMA, 1, 2); + BinaryRow expected = binaryRow(NULLABLE_COLUMN_ADDED, 1, 2, null); + + assertThat(upgradeRow(NULLABLE_COLUMN_ADDED, source), equalToRow(expected)); + } + + @Test + void testUpgradeRowWithStringDefault() { + BinaryRow source = binaryRow(NULLABLE_COLUMN_ADDED, 1, 2, 10L); + BinaryRow expected = binaryRow(DEFAULTED_COLUMN_ADDED, 1, 2, 10L, "foo"); + + assertThat(upgradeRow(DEFAULTED_COLUMN_ADDED, source), equalToRow(expected)); + } + + @Test + void testDontDowngradeSourceBinaryRow() { + BinaryRow source = binaryRow(DEFAULT_COLUMN_CHANGED, 1, 2, 10L, "bar"); + + assertThat(upgradeRow(DEFAULTED_COLUMN_ADDED, source), sameInstance(source)); + } + + private static BinaryRow binaryRow(SchemaDescriptor schema, Object... values) { + var rowAssembler = new RowAssembler(schema, -1); + + for (Object value : values) { + rowAssembler.appendValue(value); + } + + return new BinaryRowImpl(schema.version(), rowAssembler.build().tupleSlice()); + } + + private static BinaryRow upgradeRow(SchemaDescriptor targetSchema, BinaryRow source) { + return new BinaryRowUpgrader(SCHEMA_REGISTRY, targetSchema).upgrade(source); + } +} diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java index ed0768455f..8086c595d7 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java @@ -57,6 +57,7 @@ import org.apache.ignite.internal.raft.server.RaftGroupOptions; import org.apache.ignite.internal.raft.service.RaftGroupService; import org.apache.ignite.internal.replicator.TestReplicationGroupId; import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory; +import org.apache.ignite.internal.schema.SchemaRegistry; import org.apache.ignite.internal.table.distributed.StorageUpdateHandler; import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage; import org.apache.ignite.internal.table.distributed.raft.PartitionListener; @@ -282,7 +283,8 @@ public class ReplicasSafeTimePropagationTest extends IgniteAbstractTest { mock(TxStateStorage.class), mock(PendingComparableValuesTracker.class), mock(PendingComparableValuesTracker.class), - mock(CatalogService.class) + mock(CatalogService.class), + mock(SchemaRegistry.class) ), RaftGroupEventsListener.noopLsnr, RaftGroupOptions.defaults() diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java index 2a5a7b4c72..f64a71559c 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java @@ -873,7 +873,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { newConfiguration, safeTimeTracker, storageIndexTracker, - internalTbl, + table, partitionStorages.getTxStateStorage(), partitionDataStorage, partitionUpdateHandlers @@ -1915,11 +1915,13 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { PeersAndLearners stableConfiguration, PendingComparableValuesTracker<HybridTimestamp, Void> safeTimeTracker, PendingComparableValuesTracker<Long, Void> storageIndexTracker, - InternalTable internalTable, + TableImpl table, TxStateStorage txStatePartitionStorage, PartitionDataStorage partitionDataStorage, PartitionUpdateHandlers partitionUpdateHandlers ) throws NodeStoppingException { + InternalTable internalTable = table.internalTable(); + RaftGroupOptions groupOptions = groupOptionsForPartition( internalTable.storage(), internalTable.txStateStorage(), @@ -1934,7 +1936,8 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { txStatePartitionStorage, safeTimeTracker, storageIndexTracker, - catalogService + catalogService, + table.schemaView() ); RaftGroupEventsListener raftGrpEvtsLsnr = new RebalanceRaftGroupEventsListener( diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java index b3d81f7552..69bd2e2cd9 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java @@ -40,6 +40,8 @@ import java.util.function.Consumer; import java.util.stream.Stream; import org.apache.ignite.internal.catalog.Catalog; import org.apache.ignite.internal.catalog.CatalogService; +import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor; +import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.lang.IgniteInternalException; import org.apache.ignite.internal.lang.SafeTimeReorderException; @@ -54,6 +56,10 @@ import org.apache.ignite.internal.raft.service.CommittedConfiguration; import org.apache.ignite.internal.raft.service.RaftGroupListener; import org.apache.ignite.internal.replicator.command.SafeTimePropagatingCommand; import org.apache.ignite.internal.replicator.command.SafeTimeSyncCommand; +import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.internal.schema.BinaryRowUpgrader; +import org.apache.ignite.internal.schema.SchemaDescriptor; +import org.apache.ignite.internal.schema.SchemaRegistry; import org.apache.ignite.internal.storage.BinaryRowAndRowId; import org.apache.ignite.internal.storage.MvPartitionStorage; import org.apache.ignite.internal.storage.MvPartitionStorage.Locker; @@ -108,6 +114,8 @@ public class PartitionListener implements RaftGroupListener, BeforeApplyHandler private final CatalogService catalogService; + private final SchemaRegistry schemaRegistry; + /** * The constructor. * @@ -124,7 +132,8 @@ public class PartitionListener implements RaftGroupListener, BeforeApplyHandler TxStateStorage txStateStorage, PendingComparableValuesTracker<HybridTimestamp, Void> safeTime, PendingComparableValuesTracker<Long, Void> storageIndexTracker, - CatalogService catalogService + CatalogService catalogService, + SchemaRegistry schemaRegistry ) { this.txManager = txManager; this.storage = partitionDataStorage; @@ -133,6 +142,7 @@ public class PartitionListener implements RaftGroupListener, BeforeApplyHandler this.safeTime = safeTime; this.storageIndexTracker = storageIndexTracker; this.catalogService = catalogService; + this.schemaRegistry = schemaRegistry; } @Override @@ -517,13 +527,20 @@ public class PartitionListener implements RaftGroupListener, BeforeApplyHandler BuildIndexRowVersionChooser rowVersionChooser = createBuildIndexRowVersionChooser(cmd); + BinaryRowUpgrader binaryRowUpgrader = createBinaryRowUpgrader(cmd); + storage.runConsistently(locker -> { List<UUID> rowUuids = new ArrayList<>(cmd.rowIds()); // Natural UUID order matches RowId order within the same partition. Collections.sort(rowUuids); - Stream<BinaryRowAndRowId> buildIndexRowStream = createBuildIndexRowStream(rowUuids, locker, rowVersionChooser); + Stream<BinaryRowAndRowId> buildIndexRowStream = createBuildIndexRowStream( + rowUuids, + locker, + rowVersionChooser, + binaryRowUpgrader + ); RowId nextRowIdToBuild = cmd.finish() ? null : toRowId(requireNonNull(last(rowUuids))).increment(); @@ -576,13 +593,15 @@ public class PartitionListener implements RaftGroupListener, BeforeApplyHandler private Stream<BinaryRowAndRowId> createBuildIndexRowStream( List<UUID> rowUuids, Locker locker, - BuildIndexRowVersionChooser rowVersionChooser + BuildIndexRowVersionChooser rowVersionChooser, + BinaryRowUpgrader binaryRowUpgrader ) { return rowUuids.stream() .map(this::toRowId) .peek(locker::lock) .map(rowVersionChooser::chooseForBuildIndex) - .flatMap(Collection::stream); + .flatMap(Collection::stream) + .map(binaryRowAndRowId -> upgradeBinaryRow(binaryRowUpgrader, binaryRowAndRowId)); } private RowId toRowId(UUID rowUuid) { @@ -621,4 +640,27 @@ public class PartitionListener implements RaftGroupListener, BeforeApplyHandler return new BuildIndexRowVersionChooser(storage, indexCreationCatalog.time(), startBuildingIndexCatalog.time()); } + + private BinaryRowUpgrader createBinaryRowUpgrader(BuildIndexCommand command) { + int indexCreationCatalogVersion = command.creationCatalogVersion(); + + CatalogIndexDescriptor indexDescriptor = catalogService.index(command.indexId(), indexCreationCatalogVersion); + + assert indexDescriptor != null : "indexId=" + command.indexId() + ", catalogVersion=" + indexCreationCatalogVersion; + + CatalogTableDescriptor tableDescriptor = catalogService.table(indexDescriptor.tableId(), indexCreationCatalogVersion); + + assert tableDescriptor != null : "tableId=" + indexDescriptor.tableId() + ", catalogVersion=" + indexCreationCatalogVersion; + + SchemaDescriptor schema = schemaRegistry.schema(tableDescriptor.tableVersion()); + + return new BinaryRowUpgrader(schemaRegistry, schema); + } + + private static BinaryRowAndRowId upgradeBinaryRow(BinaryRowUpgrader upgrader, BinaryRowAndRowId source) { + BinaryRow sourceBinaryRow = source.binaryRow(); + BinaryRow upgradedBinaryRow = upgrader.upgrade(sourceBinaryRow); + + return upgradedBinaryRow == sourceBinaryRow ? source : new BinaryRowAndRowId(upgradedBinaryRow, source.rowId()); + } } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java index d9b5fbbea9..02bd232158 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java @@ -1355,6 +1355,7 @@ public class PartitionReplicaListener implements ReplicaListener { * @return {@code true} if index row matches the binary row, {@code false} otherwise. */ private static boolean indexRowMatches(IndexRow indexRow, BinaryRow binaryRow, TableSchemaAwareIndexStorage schemaAwareIndexStorage) { + // TODO: IGNITE-21606 It is necessary to upgrade the tuple to the required schema version BinaryTuple actualIndexRow = schemaAwareIndexStorage.indexRowResolver().extractColumns(binaryRow); return indexRow.indexColumns().byteBuffer().equals(actualIndexRow.byteBuffer()); diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java index bd534b2861..656b8d4326 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java @@ -59,6 +59,7 @@ import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder; import org.apache.ignite.internal.catalog.Catalog; import org.apache.ignite.internal.catalog.CatalogService; import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor; +import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor; import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; import org.apache.ignite.internal.hlc.HybridClock; @@ -79,6 +80,7 @@ import org.apache.ignite.internal.schema.BinaryRowConverter; import org.apache.ignite.internal.schema.BinaryTuple; import org.apache.ignite.internal.schema.Column; import org.apache.ignite.internal.schema.SchemaDescriptor; +import org.apache.ignite.internal.schema.SchemaRegistry; import org.apache.ignite.internal.schema.configuration.StorageUpdateConfiguration; import org.apache.ignite.internal.schema.row.Row; import org.apache.ignite.internal.schema.row.RowAssembler; @@ -101,6 +103,7 @@ import org.apache.ignite.internal.table.distributed.command.WriteIntentSwitchCom import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler; import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage; import org.apache.ignite.internal.table.impl.DummyInternalTableImpl; +import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl; import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; import org.apache.ignite.internal.testframework.WorkDirectory; import org.apache.ignite.internal.testframework.WorkDirectoryExtension; @@ -142,6 +145,8 @@ public class PartitionCommandListenerTest extends BaseIgniteAbstractTest { new Column[]{new Column("value", NativeTypes.INT32, false)} ); + private static final SchemaRegistry SCHEMA_REGISTRY = new DummySchemaManagerImpl(SCHEMA); + private PartitionListener commandListener; private final AtomicLong raftIndex = new AtomicLong(); @@ -216,12 +221,19 @@ public class PartitionCommandListenerTest extends BaseIgniteAbstractTest { Catalog catalog = mock(Catalog.class); + lenient().when(catalog.index(indexId)).thenReturn(indexDescriptor); + lenient().when(catalogService.catalog(anyInt())).thenReturn(catalog); + indexDescriptor = mock(CatalogIndexDescriptor.class); lenient().when(indexDescriptor.id()).thenReturn(indexId); - lenient().when(catalog.index(indexId)).thenReturn(indexDescriptor); lenient().when(catalogService.indexes(anyInt(), anyInt())).thenReturn(List.of(indexDescriptor)); - lenient().when(catalogService.catalog(anyInt())).thenReturn(catalog); + lenient().when(catalogService.index(anyInt(), anyInt())).thenReturn(indexDescriptor); + + CatalogTableDescriptor tableDescriptor = mock(CatalogTableDescriptor.class); + + lenient().when(tableDescriptor.tableVersion()).thenReturn(SCHEMA.version()); + lenient().when(catalogService.table(anyInt(), anyInt())).thenReturn(tableDescriptor); commandListener = new PartitionListener( mock(TxManager.class), @@ -230,7 +242,8 @@ public class PartitionCommandListenerTest extends BaseIgniteAbstractTest { txStateStorage, safeTimeTracker, new PendingComparableValuesTracker<>(0L), - catalogService + catalogService, + SCHEMA_REGISTRY ); } @@ -330,7 +343,8 @@ public class PartitionCommandListenerTest extends BaseIgniteAbstractTest { txStateStorage, safeTimeTracker, new PendingComparableValuesTracker<>(0L), - catalogService + catalogService, + SCHEMA_REGISTRY ); txStateStorage.lastApplied(3L, 1L); diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java index e7dae8485a..e4c6da5eed 100644 --- a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java +++ b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java @@ -572,6 +572,8 @@ public class ItTxTestCluster { new RaftGroupEventsClientListener() ); + DummySchemaManagerImpl schemaManager = new DummySchemaManagerImpl(schemaDescriptor); + PartitionListener partitionListener = new PartitionListener( txManagers.get(assignment), partitionDataStorage, @@ -579,7 +581,8 @@ public class ItTxTestCluster { txStateStorage, safeTime, storageIndexTracker, - catalogService + catalogService, + schemaManager ); CompletableFuture<Void> partitionReadyFuture = raftServers.get(assignment).startRaftGroupNode( @@ -591,8 +594,6 @@ public class ItTxTestCluster { ).thenAccept( raftSvc -> { try { - DummySchemaManagerImpl schemaManager = new DummySchemaManagerImpl(schemaDescriptor); - PartitionReplicaListener listener = newReplicaListener( mvPartStorage, raftSvc, diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java index d1f46cba88..1ae5e6cc71 100644 --- a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java +++ b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java @@ -403,7 +403,8 @@ public class DummyInternalTableImpl extends InternalTableImpl { txStateStorage().getOrCreateTxStateStorage(PART_ID), safeTime, new PendingComparableValuesTracker<>(0L), - catalogService + catalogService, + schemaManager ); }