This is an automated email from the ASF dual-hosted git repository. vpyatkov pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new cd7d979901 IGNITE-18835 Get rid of skipping safe time waiting on a primary node. (#1688) cd7d979901 is described below commit cd7d97990125cb02b2401c6140ea98da3ca646e4 Author: Mirza Aliev <alievmi...@gmail.com> AuthorDate: Tue Mar 7 21:12:18 2023 +0400 IGNITE-18835 Get rid of skipping safe time waiting on a primary node. (#1688) --- .../ItRaftCommandLeftInLogUntilRestartTest.java | 7 +- .../internal/table/ItReadOnlyTransactionTest.java | 211 ++++++++ .../ignite/internal/table/ItRoReadsTest.java | 543 +++++++++++++++++++++ .../org/apache/ignite/internal/app/IgniteImpl.java | 10 + .../replicator/PartitionReplicaListener.java | 20 +- .../table/impl/DummyInternalTableImpl.java | 55 ++- 6 files changed, 835 insertions(+), 11 deletions(-) diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java index 75688ae3e2..775a423dd7 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java @@ -32,7 +32,6 @@ import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.stream.IntStream; import org.apache.ignite.internal.app.IgniteImpl; -import org.apache.ignite.internal.hlc.HybridClockImpl; import org.apache.ignite.internal.schema.BinaryRow; import org.apache.ignite.internal.schema.BinaryRowEx; import org.apache.ignite.internal.schema.marshaller.TupleMarshallerImpl; @@ -175,9 +174,9 @@ public class ItRaftCommandLeftInLogUntilRestartTest extends ClusterPerClassInteg BinaryRowEx key = new TupleMarshallerImpl(table.schemaView()).marshal(Tuple.create().set("id", 42)); if (isNode0Leader) { - assertNull(table.internalTable().get(key, new HybridClockImpl().now(), node1.node()).get()); + assertNull(table.internalTable().get(key, node1.clock().now(), node1.node()).get()); } else { - assertNull(table.internalTable().get(key, new HybridClockImpl().now(), node0.node()).get()); + assertNull(table.internalTable().get(key, node1.clock().now(), node0.node()).get()); } var tx = node0.transactions().begin(); @@ -275,7 +274,7 @@ public class ItRaftCommandLeftInLogUntilRestartTest extends ClusterPerClassInteg BinaryRowEx testKey = new TupleMarshallerImpl(table.schemaView()).marshal(Tuple.create().set("ID", row[0])); - BinaryRow readOnlyRow = table.internalTable().get(testKey, new HybridClockImpl().now(), ignite.node()).get(); + BinaryRow readOnlyRow = table.internalTable().get(testKey, ignite.clock().now(), ignite.node()).get(); assertNotNull(readOnlyRow); assertEquals(row[1], new Row(table.schemaView().schema(), readOnlyRow).stringValue(2)); diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItReadOnlyTransactionTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItReadOnlyTransactionTest.java new file mode 100644 index 0000000000..dfc77c4e62 --- /dev/null +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItReadOnlyTransactionTest.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import org.apache.ignite.Ignite; +import org.apache.ignite.internal.app.IgniteImpl; +import org.apache.ignite.internal.hlc.HybridClock; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.internal.schema.SchemaDescriptor; +import org.apache.ignite.internal.schema.row.Row; +import org.apache.ignite.internal.schema.row.RowAssembler; +import org.apache.ignite.internal.sql.engine.AbstractBasicIntegrationTest; +import org.apache.ignite.network.ClusterNode; +import org.apache.ignite.tx.Transaction; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +/** + * Test reads with specific timestamp. + */ +public class ItReadOnlyTransactionTest extends AbstractBasicIntegrationTest { + /** Table name. */ + public static final String TABLE_NAME = "tbl"; + /** Gap in future to request a data. */ + public static final int FUTURE_GAP = 200; + + @BeforeAll + public void beforeTestStart() { + sql("CREATE TABLE " + TABLE_NAME + " (id INT PRIMARY KEY, val VARCHAR) WITH REPLICAS=" + nodes() + ", PARTITIONS=10"); + + Ignite ignite = CLUSTER_NODES.get(0); + + ignite.transactions().runInTransaction(tx -> { + for (int i = 0; i < 100; i++) { + sql(tx, "INSERT INTO " + TABLE_NAME + " VALUES (?, ?)", i, "str " + i); + } + + assertEquals(100, checkData(tx, id -> "str " + id)); + }); + + assertEquals(100, checkData(null, id -> "str " + id)); + } + + /** + * Check rows in the table {@link ItReadOnlyTransactionTest#TABLE_NAME}. + * + * @param tx Transaction. The parameter might be {@code null} for implicit transaction. + * @param valueMapper Function to map a primary key to a column. + * @return Count of rows in the table. + */ + private static int checkData(Transaction tx, Function<Integer, String> valueMapper) { + List<List<Object>> rows = sql(tx, "SELECT id, val FROM " + TABLE_NAME + " ORDER BY id"); + + for (List<Object> row : rows) { + var id = (Integer) row.get(0); + + assertEquals(valueMapper.apply(id), row.get(1)); + } + + return rows.size(); + } + + @Test + public void testFutureRead() throws Exception { + for (int i = 0; i < nodes(); i++) { + Ignite ignite = CLUSTER_NODES.get(i); + + InternalTable internalTable = ((TableImpl) ignite.tables().table(TABLE_NAME)).internalTable(); + SchemaDescriptor schema = ((TableImpl) ignite.tables().table(TABLE_NAME)).schemaView().schema(); + HybridClock clock = ((IgniteImpl) ignite).clock(); + + Collection<ClusterNode> nodes = ignite.clusterNodes(); + + for (ClusterNode clusterNode : nodes) { + CompletableFuture<BinaryRow> getFut = internalTable.get(createRowKey(schema, 100 + i), clock.now(), clusterNode); + + assertNull(getFut.join()); + } + + ArrayList<CompletableFuture<BinaryRow>> futs = new ArrayList<>(nodes.size()); + + long startTime = System.currentTimeMillis(); + + for (ClusterNode clusterNode : nodes) { + CompletableFuture<BinaryRow> getFut = internalTable.get( + createRowKey(schema, 100 + i), + new HybridTimestamp(clock.now().getPhysical() + FUTURE_GAP, 0), + clusterNode + ); + assertFalse(getFut.isDone()); + + futs.add(getFut); + } + + internalTable.insert(createRow(schema, 100 + i), null).get(); + + log.info("Delay to create a new data record [node={}, delay={}]", ignite.name(), (System.currentTimeMillis() - startTime)); + + assertTrue(System.currentTimeMillis() - startTime < FUTURE_GAP, + "Too long to execute [delay=" + (System.currentTimeMillis() - startTime) + ']'); + + for (var getFut : futs) { + assertNotNull(getFut.get(10, TimeUnit.SECONDS)); + } + } + + assertEquals(100 + nodes(), checkData(null, id -> id < 100 ? ("str " + id) : ("new str " + id))); + + Ignite ignite = CLUSTER_NODES.get(0); + + ignite.transactions().runInTransaction(tx -> { + for (int i = 100; i < 100 + nodes(); i++) { + sql(tx, "DELETE FROM " + TABLE_NAME + " WHERE id = ?", i); + } + }); + } + + @Test + public void testPastRead() throws Exception { + for (int i = 0; i < nodes(); i++) { + Ignite ignite = CLUSTER_NODES.get(i); + + InternalTable internalTable = ((TableImpl) ignite.tables().table(TABLE_NAME)).internalTable(); + SchemaDescriptor schema = ((TableImpl) ignite.tables().table(TABLE_NAME)).schemaView().schema(); + HybridClock clock = ((IgniteImpl) ignite).clock(); + + Collection<ClusterNode> nodes = ignite.clusterNodes(); + + for (ClusterNode clusterNode : nodes) { + CompletableFuture<BinaryRow> getFut = internalTable.get(createRowKey(schema, i), clock.now(), clusterNode); + + assertNotNull(getFut.join()); + } + + var pastTs = clock.now(); + + long startTime = System.currentTimeMillis(); + + internalTable.delete(createRowKey(schema, i), null).get(); + + for (ClusterNode clusterNode : nodes) { + CompletableFuture<BinaryRow> getFut = internalTable.get(createRowKey(schema, i), clock.now(), clusterNode); + + assertNull(getFut.join()); + } + + log.info("Delay to remove a data record [node={}, delay={}]", ignite.name(), (System.currentTimeMillis() - startTime)); + + for (ClusterNode clusterNode : nodes) { + CompletableFuture<BinaryRow> getFut = internalTable.get(createRowKey(schema, i), pastTs, clusterNode); + + assertNotNull(getFut.join()); + } + } + + assertEquals(100 - nodes(), checkData(null, id -> "str " + id)); + + Ignite ignite = CLUSTER_NODES.get(0); + + ignite.transactions().runInTransaction(tx -> { + for (int i = 0; i < nodes(); i++) { + sql(tx, "INSERT INTO " + TABLE_NAME + " VALUES (?, ?)", i, "str " + i); + } + }); + } + + private static Row createRow(SchemaDescriptor schema, int id) { + RowAssembler rowBuilder = new RowAssembler(schema); + + rowBuilder.appendInt(id); + rowBuilder.appendString("new str " + id); + + return new Row(schema, rowBuilder.build()); + } + + private static Row createRowKey(SchemaDescriptor schema, int id) { + RowAssembler rowBuilder = RowAssembler.keyAssembler(schema); + + rowBuilder.appendInt(id); + + return new Row(schema, rowBuilder.build()); + } +} diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItRoReadsTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItRoReadsTest.java new file mode 100644 index 0000000000..e7ab9eef57 --- /dev/null +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItRoReadsTest.java @@ -0,0 +1,543 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table; + +import static org.apache.ignite.internal.runner.app.ItTablesApiTest.SCHEMA; +import static org.apache.ignite.internal.schema.testutils.SchemaConfigurationConverter.convert; +import static org.apache.ignite.internal.testframework.IgniteTestUtils.await; +import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +import java.nio.ByteBuffer; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Flow.Publisher; +import java.util.concurrent.Flow.Subscriber; +import java.util.concurrent.Flow.Subscription; +import java.util.stream.Collectors; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgnitionManager; +import org.apache.ignite.internal.app.IgniteImpl; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.internal.schema.BinaryRowEx; +import org.apache.ignite.internal.schema.Column; +import org.apache.ignite.internal.schema.NativeTypes; +import org.apache.ignite.internal.schema.SchemaDescriptor; +import org.apache.ignite.internal.schema.row.Row; +import org.apache.ignite.internal.schema.row.RowAssembler; +import org.apache.ignite.internal.schema.testutils.builder.SchemaBuilders; +import org.apache.ignite.internal.schema.testutils.definition.ColumnDefinition; +import org.apache.ignite.internal.schema.testutils.definition.ColumnType; +import org.apache.ignite.internal.table.distributed.TableManager; +import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; +import org.apache.ignite.internal.testframework.WorkDirectory; +import org.apache.ignite.internal.testframework.WorkDirectoryExtension; +import org.apache.ignite.internal.tx.InternalTransaction; +import org.apache.ignite.internal.util.IgniteUtils; +import org.apache.ignite.lang.IgniteStringFormatter; +import org.apache.ignite.table.KeyValueView; +import org.apache.ignite.table.Table; +import org.apache.ignite.table.Tuple; +import org.apache.ignite.tx.Transaction; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.extension.ExtendWith; + +/** + * Tests for the read-only API. + */ +@ExtendWith(WorkDirectoryExtension.class) +public class ItRoReadsTest extends BaseIgniteAbstractTest { + private static final IgniteLogger LOG = Loggers.forClass(ItRoReadsTest.class); + + private static final String TABLE_NAME = "some-table"; + + private static final SchemaDescriptor SCHEMA_1 = new SchemaDescriptor( + 1, + new Column[]{new Column("key", NativeTypes.INT64, false)}, + new Column[]{ + new Column("valInt", NativeTypes.INT32, false), + new Column("valStr", NativeTypes.STRING, false) + } + ); + + private static final int BASE_PORT = 3344; + + private static final String NODE_BOOTSTRAP_CFG = "{\n" + + " \"network\": {\n" + + " \"port\":{},\n" + + " \"nodeFinder\":{\n" + + " \"netClusterNodes\": [ {} ]\n" + + " }\n" + + " }\n" + + "}"; + + private static Ignite NODE; + + @WorkDirectory + private static Path WORK_DIR; + + private Table table; + + @BeforeAll + static void startNode(TestInfo testInfo) { + String connectNodeAddr = "\"localhost:" + BASE_PORT + '\"'; + + String nodeName = testNodeName(testInfo, 0); + + String config = IgniteStringFormatter.format(NODE_BOOTSTRAP_CFG, BASE_PORT, connectNodeAddr); + + CompletableFuture<Ignite> future = IgnitionManager.start(nodeName, config, WORK_DIR.resolve(nodeName)); + + String metaStorageNodeName = testNodeName(testInfo, nodes() - 1); + + IgnitionManager.init(metaStorageNodeName, List.of(metaStorageNodeName), "cluster"); + + assertThat(future, willCompleteSuccessfully()); + + NODE = future.join(); + } + + @AfterAll + static void stopNode(TestInfo testInfo) throws Exception { + LOG.info("Start tearDown()"); + + NODE = null; + + IgniteUtils.closeAll(() -> IgnitionManager.stop(testNodeName(testInfo, 0))); + + LOG.info("End tearDown()"); + } + + @BeforeEach + void createTable() { + table = startTable(node(), TABLE_NAME); + } + + @AfterEach + void dropTable() { + stopTable(node(), TABLE_NAME); + + table = null; + } + + @Test + public void testRoGet() throws Exception { + IgniteImpl node = node(); + + InternalTable internalTable = ((TableImpl) table).internalTable(); + + Row keyValueRow = createKeyValueRow(1, 1, "some string row" + 1); + + BinaryRow res = internalTable.get(keyValueRow, node.clock().now(), node.node()).get(); + + assertNull(res); + + KeyValueView<Tuple, Tuple> keyValueView = table.keyValueView(); + + populateData(node, keyValueView, false); + + res = internalTable.get(keyValueRow, node.clock().now(), node.node()).get(); + + assertEquals(res.byteBuffer(), keyValueRow.byteBuffer()); + } + + @Test + public void testRoGetWithSeveralInserts() throws Exception { + IgniteImpl node = node(); + + InternalTable internalTable = ((TableImpl) table).internalTable(); + + Row keyValueRow = createKeyValueRow(1, 1, "some string row" + 1); + + Row keyValueRow2 = createKeyValueRow(1, 2, "some string row" + 2); + + Row keyRow = createKeyRow(1); + + assertNull(internalTable.get(keyRow, node.clock().now(), node.node()).get()); + assertNull(internalTable.get(keyRow, node.clock().now(), node.node()).get()); + + Transaction tx1 = node.transactions().begin(); + + internalTable.upsert(keyValueRow, (InternalTransaction) tx1).get(); + + tx1.commit(); + + Transaction tx2 = node.transactions().begin(); + + internalTable.upsert(keyValueRow2, (InternalTransaction) tx2).get(); + + tx2.commit(); + + BinaryRow res = internalTable.get(keyRow, node.clock().now(), node.node()).get(); + + assertEquals(res.byteBuffer(), keyValueRow2.byteBuffer()); + } + + @Test + public void testRoScanWithSeveralInserts() throws Exception { + IgniteImpl node = node(); + + InternalTable internalTable = ((TableImpl) table).internalTable(); + + Row keyValueRow = createKeyValueRow(1, 1, "some string row" + 1); + + Row keyValueRow2 = createKeyValueRow(1, 2, "some string row" + 2); + + Row keyRow = createKeyRow(1); + + assertNull(internalTable.get(keyRow, node.clock().now(), node.node()).get()); + assertNull(internalTable.get(keyRow, node.clock().now(), node.node()).get()); + + Transaction tx1 = node.transactions().begin(); + + internalTable.insert(keyValueRow, (InternalTransaction) tx1).get(); + + tx1.commit(); + + Transaction tx2 = node.transactions().begin(); + + internalTable.upsert(keyValueRow2, (InternalTransaction) tx2).get(); + + tx2.commit(); + + Publisher<BinaryRow> res = internalTable.scan(0, node.clock().now(), node.node()); + + CountDownLatch latch = new CountDownLatch(1); + + List<ByteBuffer> list = new ArrayList<>(); + + res.subscribe(new Subscriber<BinaryRow>() { + @Override + public void onSubscribe(Subscription subscription) { + subscription.request(100); + } + + @Override + public void onNext(BinaryRow item) { + list.add(item.byteBuffer()); + } + + @Override + public void onError(Throwable throwable) { + } + + @Override + public void onComplete() { + latch.countDown(); + } + }); + + latch.await(); + + assertEquals(1, list.size()); + + assertEquals(list.get(0), keyValueRow2.byteBuffer()); + } + + @Test + public void testRoGetOngoingCommitIsNotVisible() throws Exception { + IgniteImpl node = node(); + + InternalTable internalTable = ((TableImpl) table).internalTable(); + + Row keyValueRow = createKeyValueRow(1, 1, "some string row" + 1); + + Row keyValueRow2 = createKeyValueRow(1, 2, "some string row" + 2); + + assertNull(internalTable.get(keyValueRow, node.clock().now(), node.node()).get()); + assertNull(internalTable.get(keyValueRow2, node.clock().now(), node.node()).get()); + + Transaction tx1 = node.transactions().begin(); + + internalTable.insert(keyValueRow, (InternalTransaction) tx1).get(); + + tx1.commit(); + + Transaction tx2 = node.transactions().begin(); + + internalTable.upsert(keyValueRow2, (InternalTransaction) tx2); + + BinaryRow res = internalTable.get(keyValueRow, node.clock().now(), node.node()).get(); + + assertEquals(res.byteBuffer(), keyValueRow.byteBuffer()); + + tx2.commit(); + + res = internalTable.get(keyValueRow, node.clock().now(), node.node()).get(); + + assertEquals(res.byteBuffer(), keyValueRow2.byteBuffer()); + } + + @Test + public void testRoGetAll() throws Exception { + IgniteImpl node = node(); + + InternalTable internalTable = ((TableImpl) table).internalTable(); + + Row keyValueRow1 = createKeyValueRow(1, 1, "some string row" + 1); + Row keyValueRow2 = createKeyValueRow(2, 2, "some string row" + 2); + Row keyValueRow3 = createKeyValueRow(3, 3, "some string row" + 3); + + Set<BinaryRowEx> rowsToSearch = Set.of(keyValueRow1, keyValueRow2, keyValueRow3); + + KeyValueView<Tuple, Tuple> keyValueView = table.keyValueView(); + + Collection<BinaryRow> res = internalTable.getAll(rowsToSearch, node.clock().now(), node.node()).get(); + + assertEquals(res.size(), 0); + + node.transactions().runInTransaction(txs -> { + for (int i = 0; i < 15; i++) { + putValue(keyValueView, i, txs); + } + }); + + res = internalTable.getAll(rowsToSearch, node.clock().now(), node.node()).get(); + + assertEquals(res.size(), 3); + + Set<ByteBuffer> resultKeys = res.stream().map(BinaryRow::byteBuffer).collect(Collectors.toSet()); + + assertTrue(resultKeys.contains(keyValueRow1.byteBuffer())); + assertTrue(resultKeys.contains(keyValueRow2.byteBuffer())); + assertTrue(resultKeys.contains(keyValueRow3.byteBuffer())); + } + + @Test + public void testRoGetAllWithSeveralInserts() throws ExecutionException, InterruptedException { + IgniteImpl node = node(); + + InternalTable internalTable = ((TableImpl) table).internalTable(); + + Row keyValueRow1 = createKeyValueRow(1, 1, "some string row" + 1); + Row keyValueRow2 = createKeyValueRow(2, 2, "some string row" + 2); + Row keyValueRow3 = createKeyValueRow(3, 3, "some string row" + 3); + + Set<BinaryRowEx> rowsToSearch = Set.of(keyValueRow1, keyValueRow2, keyValueRow3); + + KeyValueView<Tuple, Tuple> keyValueView = table.keyValueView(); + + Collection<BinaryRow> res = internalTable.getAll(rowsToSearch, node.clock().now(), node.node()).get(); + + assertEquals(res.size(), 0); + + populateData(node(), keyValueView, false); + + res = internalTable.getAll(rowsToSearch, node.clock().now(), node.node()).get(); + + assertEquals(res.size(), 3); + + Set<ByteBuffer> resultKeys = res.stream().map(BinaryRow::byteBuffer).collect(Collectors.toSet()); + + assertTrue(resultKeys.contains(keyValueRow1.byteBuffer())); + assertTrue(resultKeys.contains(keyValueRow2.byteBuffer())); + assertTrue(resultKeys.contains(keyValueRow3.byteBuffer())); + + node.transactions().runInTransaction(txs -> { + for (int i = 0; i < 15; i++) { + putValue(keyValueView, i + 100, txs); + } + }); + + Row newKeyValueRow1 = createKeyValueRow(1, 101, "some string row" + 101); + Row newKeyValueRow2 = createKeyValueRow(2, 102, "some string row" + 102); + Row newKeyValueRow3 = createKeyValueRow(3, 103, "some string row" + 103); + + res = internalTable.getAll(rowsToSearch, node.clock().now(), node.node()).get(); + + assertEquals(res.size(), 3); + + resultKeys = res.stream().map(BinaryRow::byteBuffer).collect(Collectors.toSet()); + + assertTrue(resultKeys.contains(newKeyValueRow1.byteBuffer())); + assertTrue(resultKeys.contains(newKeyValueRow2.byteBuffer())); + assertTrue(resultKeys.contains(newKeyValueRow3.byteBuffer())); + } + + @Test + public void testRoScanAllImplicitPopulatingData() throws InterruptedException { + roScanAll(true); + } + + @Test + public void testRoScanAllExplicitPopulatingData() throws InterruptedException { + roScanAll(false); + } + + private void roScanAll(boolean implicit) throws InterruptedException { + IgniteImpl node = node(); + + InternalTable internalTable = ((TableImpl) table).internalTable(); + + KeyValueView<Tuple, Tuple> keyValueView = table.keyValueView(); + + Publisher<BinaryRow> res = internalTable.scan(0, node.clock().now(), node.node()); + + var subscriberAllDataAwaitLatch = new CountDownLatch(1); + + var retrievedItems = new ArrayList<BinaryRow>(); + + res.subscribe(new Subscriber<>() { + @Override + public void onSubscribe(Subscription subscription) { + subscription.request(10000); + } + + @Override + public void onNext(BinaryRow item) { + retrievedItems.add(item); + } + + @Override + public void onError(Throwable throwable) { + fail("onError call is not expected."); + } + + @Override + public void onComplete() { + subscriberAllDataAwaitLatch.countDown(); + } + }); + + subscriberAllDataAwaitLatch.await(); + + assertEquals(0, retrievedItems.size()); + + populateData(node, keyValueView, implicit); + + res = internalTable.scan(0, node.clock().now(), node.node()); + + var subscriberAllDataAwaitLatch2 = new CountDownLatch(1); + + res.subscribe(new Subscriber<>() { + @Override + public void onSubscribe(Subscription subscription) { + subscription.request(10000); + } + + @Override + public void onNext(BinaryRow item) { + retrievedItems.add(item); + } + + @Override + public void onError(Throwable throwable) { + fail("onError call is not expected."); + } + + @Override + public void onComplete() { + subscriberAllDataAwaitLatch2.countDown(); + } + }); + + subscriberAllDataAwaitLatch2.await(); + + assertEquals(15, retrievedItems.size()); + } + + private static Row createKeyValueRow(long id, int value, String str) { + RowAssembler rowBuilder = new RowAssembler(SCHEMA_1, false, -1); + + rowBuilder.appendLong(id); + rowBuilder.appendInt(value); + rowBuilder.appendString(str); + + return new Row(SCHEMA_1, rowBuilder.build()); + } + + private static Row createKeyRow(long id) { + RowAssembler rowBuilder = RowAssembler.keyAssembler(SCHEMA_1); + + rowBuilder.appendLong(id); + + return new Row(SCHEMA_1, rowBuilder.build()); + } + + private static void putValue(KeyValueView<Tuple, Tuple> kv, int val) { + putValue(kv, val, null); + } + + private static void putValue(KeyValueView<Tuple, Tuple> kv, int val, Transaction tx) { + Tuple tableKey = Tuple.create().set("key", Long.valueOf(val % 100)); + + Tuple value = Tuple.create().set("valInt", Integer.valueOf(val)).set("valStr", "some string row" + val); + + kv.put(tx, tableKey, value); + } + + private static void populateData(Ignite node, KeyValueView<Tuple, Tuple> keyValueView, boolean implicit) { + if (implicit) { + for (int i = 0; i < 15; i++) { + putValue(keyValueView, i); + } + } else { + Transaction tx1 = node.transactions().begin(); + + for (int i = 0; i < 15; i++) { + putValue(keyValueView, i, tx1); + } + + tx1.commit(); + } + } + + private static Table startTable(Ignite node, String tableName) { + List<ColumnDefinition> cols = new ArrayList<>(); + cols.add(SchemaBuilders.column("key", ColumnType.INT64).build()); + cols.add(SchemaBuilders.column("valInt", ColumnType.INT32).asNullable(true).build()); + cols.add(SchemaBuilders.column("valStr", ColumnType.string()).withDefaultValue("default").build()); + + return await(((TableManager) node.tables()).createTableAsync( + tableName, + tblCh -> convert(SchemaBuilders.tableBuilder(SCHEMA, tableName).columns( + cols).withPrimaryKey("key").build(), tblCh) + .changePartitions(1) + )); + } + + private static void stopTable(Ignite node, String tableName) { + await(((TableManager) node.tables()).dropTableAsync(tableName)); + } + + protected static int nodes() { + return 1; + } + + protected static IgniteImpl node() { + return (IgniteImpl) NODE; + } +} diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java index f579c56f2f..714198a2eb 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java @@ -971,4 +971,14 @@ public class IgniteImpl implements Ignite { public void stopDroppingMessages() { ((DefaultMessagingService) clusterSvc.messagingService()).stopDroppingMessages(); } + + /** + * Returns the node's hybrid clock. + * + * @return Hybrid clock. + */ + @TestOnly + public HybridClock clock() { + return clock; + } } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java index eb15ddb1a1..fe50db631e 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java @@ -396,7 +396,8 @@ public class PartitionReplicaListener implements ReplicaListener { IgniteUuid cursorId = new IgniteUuid(txId, request.scanId()); - CompletableFuture<Void> safeReadFuture = isPrimary ? completedFuture(null) : safeTime.waitFor(readTimestamp); + CompletableFuture<Void> safeReadFuture = isPrimaryInTimestamp(isPrimary, readTimestamp) ? completedFuture(null) + : safeTime.waitFor(readTimestamp); if (request.indexToUse() != null) { TableSchemaAwareIndexStorage indexStorage = secondaryIndexStorages.get().get(request.indexToUse()); @@ -486,11 +487,23 @@ public class PartitionReplicaListener implements ReplicaListener { format("Unknown single request [actionType={}]", request.requestType())); } - CompletableFuture<Void> safeReadFuture = isPrimary ? completedFuture(null) : safeTime.waitFor(request.readTimestamp()); + CompletableFuture<Void> safeReadFuture = isPrimaryInTimestamp(isPrimary, readTimestamp) ? completedFuture(null) + : safeTime.waitFor(request.readTimestamp()); return safeReadFuture.thenCompose(unused -> resolveRowByPkForReadOnly(searchRow, readTimestamp)); } + /** + * Checks that the node is primary and {@code timestamp} is already passed in the reference system of the current node. + * + * @param isPrimary True if the node is primary, false otherwise. + * @param timestamp Timestamp to check. + * @return True if the timestamp is already passed in the reference system of the current node and node is primary, false otherwise. + */ + private boolean isPrimaryInTimestamp(Boolean isPrimary, HybridTimestamp timestamp) { + return isPrimary && hybridClock.now().compareTo(timestamp) > 0; + } + /** * Processes multiple entries request for read only transaction. * @@ -510,7 +523,8 @@ public class PartitionReplicaListener implements ReplicaListener { format("Unknown single request [actionType={}]", request.requestType())); } - CompletableFuture<Void> safeReadFuture = isPrimary ? completedFuture(null) : safeTime.waitFor(request.readTimestamp()); + CompletableFuture<Void> safeReadFuture = isPrimaryInTimestamp(isPrimary, readTimestamp) ? completedFuture(null) + : safeTime.waitFor(request.readTimestamp()); return safeReadFuture.thenCompose(unused -> { ArrayList<CompletableFuture<BinaryRow>> resolutionFuts = new ArrayList<>(searchRows.size()); diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java index 71cbce1d0f..271a3a73bd 100644 --- a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java +++ b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java @@ -36,6 +36,8 @@ import org.apache.ignite.distributed.TestPartitionDataStorage; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridClockImpl; import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.raft.Command; import org.apache.ignite.internal.raft.Peer; import org.apache.ignite.internal.raft.WriteCommand; @@ -85,6 +87,8 @@ import org.jetbrains.annotations.Nullable; * Dummy table storage implementation. */ public class DummyInternalTableImpl extends InternalTableImpl { + private static final IgniteLogger LOG = Loggers.forClass(DummyInternalTableImpl.class); + public static final NetworkAddress ADDR = new NetworkAddress("127.0.0.1", 2004); private static final int PART_ID = 0; @@ -95,6 +99,8 @@ public class DummyInternalTableImpl extends InternalTableImpl { new Column[]{new Column("value", NativeTypes.INT64, false)} ); + private static final HybridClock CLOCK = new HybridClockImpl(); + private static final ReplicationGroupId crossTableGroupId = new TablePartitionId(UUID.randomUUID(), 0); private PartitionListener partitionListener; @@ -103,6 +109,9 @@ public class DummyInternalTableImpl extends InternalTableImpl { private ReplicationGroupId groupId; + /** The thread updates safe time on the dummy replica. */ + private Thread safeTimeUpdaterThread; + /** * Creates a new local table. * @@ -172,11 +181,11 @@ public class DummyInternalTableImpl extends InternalTableImpl { Int2ObjectMaps.singleton(PART_ID, mock(RaftGroupService.class)), 1, name -> mock(ClusterNode.class), - txManager == null ? new TxManagerImpl(replicaSvc, new HeapLockManager(), new HybridClockImpl()) : txManager, + txManager == null ? new TxManagerImpl(replicaSvc, new HeapLockManager(), CLOCK) : txManager, mock(MvTableStorage.class), new TestTxStateTableStorage(), replicaSvc, - new HybridClockImpl() + CLOCK ); RaftGroupService svc = partitionMap.get(0); @@ -256,7 +265,6 @@ public class DummyInternalTableImpl extends InternalTableImpl { IndexLocker pkLocker = new HashIndexLocker(indexId, true, this.txManager.lockManager(), row2Tuple); - HybridClock clock = new HybridClockImpl(); PendingComparableValuesTracker<HybridTimestamp> safeTime = new PendingComparableValuesTracker<>(new HybridTimestamp(1, 0)); PartitionDataStorage partitionDataStorage = new TestPartitionDataStorage(mvPartStorage); Supplier<Map<UUID, TableSchemaAwareIndexStorage>> indexes = () -> Map.of(pkStorage.get().id(), pkStorage.get()); @@ -281,7 +289,7 @@ public class DummyInternalTableImpl extends InternalTableImpl { () -> Map.of(pkLocker.id(), pkLocker), pkStorage, () -> Map.of(), - clock, + CLOCK, safeTime, txStateStorage().getOrCreateTxStateStorage(PART_ID), placementDriver, @@ -296,6 +304,34 @@ public class DummyInternalTableImpl extends InternalTableImpl { txStateStorage().getOrCreateTxStateStorage(PART_ID), safeTime ); + + safeTimeUpdaterThread = new Thread(new SafeTimeUpdater(safeTime), "safe-time-updater"); + + safeTimeUpdaterThread.start(); + } + + /** + * A process to update safe time periodically. + */ + private static class SafeTimeUpdater implements Runnable { + PendingComparableValuesTracker<HybridTimestamp> safeTime; + + public SafeTimeUpdater(PendingComparableValuesTracker<HybridTimestamp> safeTime) { + this.safeTime = safeTime; + } + + @Override + public void run() { + while (true) { + safeTime.update(CLOCK.now()); + + try { + Thread.sleep(1_000); + } catch (InterruptedException e) { + LOG.warn("The sfe time updater thread is interrupted"); + } + } + } } /** @@ -354,4 +390,15 @@ public class DummyInternalTableImpl extends InternalTableImpl { public CompletableFuture<ClusterNode> evaluateReadOnlyRecipientNode(int partId) { return CompletableFuture.completedFuture(mock(ClusterNode.class)); } + + @Override + public void close() { + super.close(); + + if (safeTimeUpdaterThread != null) { + safeTimeUpdaterThread.interrupt(); + + safeTimeUpdaterThread = null; + } + } }