rpuch commented on code in PR #3177: URL: https://github.com/apache/ignite-3/pull/3177#discussion_r1481523264
########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableUtils.java: ########## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.distributed; + +import static org.apache.ignite.internal.util.CollectionUtils.view; + +import java.util.List; +import java.util.UUID; +import org.apache.ignite.internal.catalog.CatalogService; +import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor; +import org.apache.ignite.internal.catalog.descriptors.CatalogObjectDescriptor; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.tx.TransactionIds; + +/** Auxiliary class. */ +public class TableUtils { + /** + * Returns index IDs for the table of interest from the catalog for the active catalog version at the beginning timestamp of read-write + * transaction. + * + * <p>NOTE: To avoid races and errors, it is important to call the this method after schema sync at beginTs or to be sure that the Review Comment: ```suggestion * <p>NOTE: To avoid races and errors, it is important to call this method after schema sync at beginTs or to be sure that the ``` ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java: ########## @@ -2528,7 +2534,9 @@ private CompletableFuture<CompletableFuture<?>> applyUpdateCommand( true, null, null, - null); + null, + indexIdsAtRwTxBeginTs(catalogService, txId, tableId()) Review Comment: It looks like all `handleUpdate`/`handleUpdateAll` invocations get the same last argument: `indexIdsAtRwTxBeginTs(catalogService, txId, tableId()`. I suggest to calculate this inside them and remove it as an argument from these 2 methods. ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableUtils.java: ########## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.distributed; + +import static org.apache.ignite.internal.util.CollectionUtils.view; + +import java.util.List; +import java.util.UUID; +import org.apache.ignite.internal.catalog.CatalogService; +import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor; +import org.apache.ignite.internal.catalog.descriptors.CatalogObjectDescriptor; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.tx.TransactionIds; + +/** Auxiliary class. */ Review Comment: It would be nice to have a better javadoc. ########## modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItRwTransactionAndIndexesTest.java: ########## @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.index; + +import static org.apache.ignite.internal.catalog.commands.CatalogUtils.pkIndexName; +import static org.apache.ignite.internal.storage.impl.TestStorageEngine.ENGINE_NAME; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.clearInvocations; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; + +import org.apache.ignite.internal.ClusterPerClassIntegrationTest; +import org.apache.ignite.internal.app.IgniteImpl; +import org.apache.ignite.internal.storage.index.IndexStorage; +import org.apache.ignite.internal.table.TableImpl; +import org.apache.ignite.internal.table.TableTestUtils; +import org.apache.ignite.internal.table.distributed.replication.request.BuildIndexReplicaRequest; +import org.apache.ignite.tx.Transaction; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +/** Testing RW transactions and indexes. */ +public class ItRwTransactionAndIndexesTest extends ClusterPerClassIntegrationTest { + private static final String ZONE_NAME = "TEST_ZONE"; + + private static final String TABLE_NAME = "TEST_TABLE"; + + private static final String INDEX_NAME = "TEST_INDEX"; + + private static final String PK_INDEX_NAME = pkIndexName(TABLE_NAME); + + private static final String COLUMN_NAME = "SALARY"; + + @Override + protected int initialNodes() { + return 1; + } + + @AfterEach + void tearDown() { + sql("DROP TABLE IF EXISTS " + TABLE_NAME); + sql("DROP ZONE IF EXISTS " + ZONE_NAME); + + CLUSTER.runningNodes().forEach(IgniteImpl::stopDroppingMessages); + } + + @Test + void testCreateIndexInsideRwTransaction() { + TableImpl table = (TableImpl) createZoneAndTable(ZONE_NAME, TABLE_NAME, 1, 1, ENGINE_NAME); + + setAwaitIndexAvailability(false); + dropBuildAllIndexMessages(); + + Transaction rwTx = beginRwTransaction(); + + createIndex(TABLE_NAME, INDEX_NAME, COLUMN_NAME); + + IndexStorage pkIndexStorage = indexStorage(table, PK_INDEX_NAME); + IndexStorage newIndexStorage = indexStorage(table, INDEX_NAME); + + clearInvocations(pkIndexStorage, newIndexStorage); + + insertPeopleInTransaction(rwTx, TABLE_NAME, newPerson(1)); + + verify(pkIndexStorage).put(any()); + verify(newIndexStorage, never()).put(any()); + + assertDoesNotThrow(rwTx::commit); + } + + @Test + void testDropIndexInsideRwTransaction() { + TableImpl table = (TableImpl) createZoneAndTable(ZONE_NAME, TABLE_NAME, 1, 1, ENGINE_NAME); + + createIndex(TABLE_NAME, INDEX_NAME, COLUMN_NAME); + + Transaction rwTx = beginRwTransaction(); + + IndexStorage pkIndexStorage = indexStorage(table, PK_INDEX_NAME); + IndexStorage droppedIndexStorage = indexStorage(table, INDEX_NAME); + + clearInvocations(pkIndexStorage, droppedIndexStorage); + + dropIndex(INDEX_NAME); + + insertPeopleInTransaction(rwTx, TABLE_NAME, newPerson(0)); + + verify(pkIndexStorage).put(any()); + verify(droppedIndexStorage).put(any()); + + assertDoesNotThrow(rwTx::commit); + } + + private static IgniteImpl node() { + return CLUSTER.node(0); + } + + private static void dropBuildAllIndexMessages() { + node().dropMessages((s, networkMessage) -> networkMessage instanceof BuildIndexReplicaRequest); + } + + private static Transaction beginRwTransaction() { + Transaction tx = node().transactions().begin(); + + assertFalse(tx.isReadOnly()); + + return tx; + } + + private static Person newPerson(int id) { + return new Person(id, "person" + id, 100.0 + id); + } + + private static IndexStorage indexStorage(TableImpl table, String indexName) { + IndexStorage indexStorage = table.internalTable().storage().getIndex(0, indexId(indexName)); + + assertNotNull(indexStorage, indexName); + + return indexStorage; + } + + private static int indexId(String indexName) { + IgniteImpl node = node(); + + return TableTestUtils.getIndexIdStrict(node.catalogManager(), indexName, node.clock().nowLong()); + } +} Review Comment: Please add a trailing new line ########## modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItRwTransactionAndIndexesTest.java: ########## @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.index; + +import static org.apache.ignite.internal.catalog.commands.CatalogUtils.pkIndexName; +import static org.apache.ignite.internal.storage.impl.TestStorageEngine.ENGINE_NAME; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.clearInvocations; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; + +import org.apache.ignite.internal.ClusterPerClassIntegrationTest; +import org.apache.ignite.internal.app.IgniteImpl; +import org.apache.ignite.internal.storage.index.IndexStorage; +import org.apache.ignite.internal.table.TableImpl; +import org.apache.ignite.internal.table.TableTestUtils; +import org.apache.ignite.internal.table.distributed.replication.request.BuildIndexReplicaRequest; +import org.apache.ignite.tx.Transaction; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +/** Testing RW transactions and indexes. */ +public class ItRwTransactionAndIndexesTest extends ClusterPerClassIntegrationTest { + private static final String ZONE_NAME = "TEST_ZONE"; + + private static final String TABLE_NAME = "TEST_TABLE"; + + private static final String INDEX_NAME = "TEST_INDEX"; + + private static final String PK_INDEX_NAME = pkIndexName(TABLE_NAME); + + private static final String COLUMN_NAME = "SALARY"; + + @Override + protected int initialNodes() { + return 1; + } + + @AfterEach + void tearDown() { + sql("DROP TABLE IF EXISTS " + TABLE_NAME); + sql("DROP ZONE IF EXISTS " + ZONE_NAME); + + CLUSTER.runningNodes().forEach(IgniteImpl::stopDroppingMessages); + } + + @Test + void testCreateIndexInsideRwTransaction() { Review Comment: This tests that an index (in some state, who knows in which one) created after a transaction had started will not be written to by this transaction. Don't we want to also verify that a transaction that starts on an index in each state will write to the index? Second test actually makes sure that a transaction started on some index state (we don't know on which) writes to the index, but, according to the name, it's about how we deal with dropping, so it's a bit different. ########## modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItRwTransactionAndIndexesTest.java: ########## @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.index; + +import static org.apache.ignite.internal.catalog.commands.CatalogUtils.pkIndexName; +import static org.apache.ignite.internal.storage.impl.TestStorageEngine.ENGINE_NAME; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.clearInvocations; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; + +import org.apache.ignite.internal.ClusterPerClassIntegrationTest; +import org.apache.ignite.internal.app.IgniteImpl; +import org.apache.ignite.internal.storage.index.IndexStorage; +import org.apache.ignite.internal.table.TableImpl; +import org.apache.ignite.internal.table.TableTestUtils; +import org.apache.ignite.internal.table.distributed.replication.request.BuildIndexReplicaRequest; +import org.apache.ignite.tx.Transaction; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +/** Testing RW transactions and indexes. */ +public class ItRwTransactionAndIndexesTest extends ClusterPerClassIntegrationTest { + private static final String ZONE_NAME = "TEST_ZONE"; + + private static final String TABLE_NAME = "TEST_TABLE"; + + private static final String INDEX_NAME = "TEST_INDEX"; + + private static final String PK_INDEX_NAME = pkIndexName(TABLE_NAME); + + private static final String COLUMN_NAME = "SALARY"; + + @Override + protected int initialNodes() { + return 1; + } + + @AfterEach + void tearDown() { + sql("DROP TABLE IF EXISTS " + TABLE_NAME); + sql("DROP ZONE IF EXISTS " + ZONE_NAME); + + CLUSTER.runningNodes().forEach(IgniteImpl::stopDroppingMessages); + } + + @Test + void testCreateIndexInsideRwTransaction() { + TableImpl table = (TableImpl) createZoneAndTable(ZONE_NAME, TABLE_NAME, 1, 1, ENGINE_NAME); + + setAwaitIndexAvailability(false); + dropBuildAllIndexMessages(); + + Transaction rwTx = beginRwTransaction(); + + createIndex(TABLE_NAME, INDEX_NAME, COLUMN_NAME); + + IndexStorage pkIndexStorage = indexStorage(table, PK_INDEX_NAME); + IndexStorage newIndexStorage = indexStorage(table, INDEX_NAME); + + clearInvocations(pkIndexStorage, newIndexStorage); + + insertPeopleInTransaction(rwTx, TABLE_NAME, newPerson(1)); + + verify(pkIndexStorage).put(any()); + verify(newIndexStorage, never()).put(any()); + + assertDoesNotThrow(rwTx::commit); + } + + @Test + void testDropIndexInsideRwTransaction() { + TableImpl table = (TableImpl) createZoneAndTable(ZONE_NAME, TABLE_NAME, 1, 1, ENGINE_NAME); + + createIndex(TABLE_NAME, INDEX_NAME, COLUMN_NAME); + + Transaction rwTx = beginRwTransaction(); + + IndexStorage pkIndexStorage = indexStorage(table, PK_INDEX_NAME); + IndexStorage droppedIndexStorage = indexStorage(table, INDEX_NAME); + + clearInvocations(pkIndexStorage, droppedIndexStorage); + + dropIndex(INDEX_NAME); + + insertPeopleInTransaction(rwTx, TABLE_NAME, newPerson(0)); + + verify(pkIndexStorage).put(any()); + verify(droppedIndexStorage).put(any()); + + assertDoesNotThrow(rwTx::commit); + } + + private static IgniteImpl node() { + return CLUSTER.node(0); + } + + private static void dropBuildAllIndexMessages() { Review Comment: The name seems weird, how about `dropAnyBuildIndexMessage()`? ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexUpdateHandler.java: ########## @@ -143,20 +154,58 @@ public void buildIndex(int indexId, Stream<BinaryRowAndRowId> rowStream, @Nullab index.storage().setNextRowIdToBuild(nextRowIdToBuild); } - /** - * Waits for indexes to be created. - */ + /** Waits for indexes to be created. */ // TODO: IGNITE-19513 Fix it, we should have already waited for the indexes to be created public void waitIndexes() { indexes.get(); } - /** - * Waits for the specific index to be created. - */ + /** Waits for the specific index to be created. */ public void waitForIndex(int indexId) { indexes.addIndexToWaitIfAbsent(indexId); waitIndexes(); } + + private Iterable<TableSchemaAwareIndexStorage> indexes(@Nullable List<Integer> indexIds) { + Map<Integer, TableSchemaAwareIndexStorage> indexStorageById = indexes.get(); + + assert !indexStorageById.isEmpty(); + + if (indexIds == null) { + return indexStorageById.values(); + } + + return mapIterable(indexIds, indexId -> { Review Comment: `mapIterable` seems to be a misleading name as it does map+filter... but it's not for this PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
