This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new cd7d979901 IGNITE-18835 Get rid of skipping safe time waiting on a 
primary node. (#1688)
cd7d979901 is described below

commit cd7d97990125cb02b2401c6140ea98da3ca646e4
Author: Mirza Aliev <alievmi...@gmail.com>
AuthorDate: Tue Mar 7 21:12:18 2023 +0400

    IGNITE-18835 Get rid of skipping safe time waiting on a primary node. 
(#1688)
---
 .../ItRaftCommandLeftInLogUntilRestartTest.java    |   7 +-
 .../internal/table/ItReadOnlyTransactionTest.java  | 211 ++++++++
 .../ignite/internal/table/ItRoReadsTest.java       | 543 +++++++++++++++++++++
 .../org/apache/ignite/internal/app/IgniteImpl.java |  10 +
 .../replicator/PartitionReplicaListener.java       |  20 +-
 .../table/impl/DummyInternalTableImpl.java         |  55 ++-
 6 files changed, 835 insertions(+), 11 deletions(-)

diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
index 75688ae3e2..775a423dd7 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
@@ -32,7 +32,6 @@ import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.stream.IntStream;
 import org.apache.ignite.internal.app.IgniteImpl;
-import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryRowEx;
 import org.apache.ignite.internal.schema.marshaller.TupleMarshallerImpl;
@@ -175,9 +174,9 @@ public class ItRaftCommandLeftInLogUntilRestartTest extends 
ClusterPerClassInteg
         BinaryRowEx key = new 
TupleMarshallerImpl(table.schemaView()).marshal(Tuple.create().set("id", 42));
 
         if (isNode0Leader) {
-            assertNull(table.internalTable().get(key, new 
HybridClockImpl().now(), node1.node()).get());
+            assertNull(table.internalTable().get(key, node1.clock().now(), 
node1.node()).get());
         } else {
-            assertNull(table.internalTable().get(key, new 
HybridClockImpl().now(), node0.node()).get());
+            assertNull(table.internalTable().get(key, node1.clock().now(), 
node0.node()).get());
         }
 
         var tx = node0.transactions().begin();
@@ -275,7 +274,7 @@ public class ItRaftCommandLeftInLogUntilRestartTest extends 
ClusterPerClassInteg
 
                 BinaryRowEx testKey = new 
TupleMarshallerImpl(table.schemaView()).marshal(Tuple.create().set("ID", 
row[0]));
 
-                BinaryRow readOnlyRow = table.internalTable().get(testKey, new 
HybridClockImpl().now(), ignite.node()).get();
+                BinaryRow readOnlyRow = table.internalTable().get(testKey, 
ignite.clock().now(), ignite.node()).get();
 
                 assertNotNull(readOnlyRow);
                 assertEquals(row[1], new Row(table.schemaView().schema(), 
readOnlyRow).stringValue(2));
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItReadOnlyTransactionTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItReadOnlyTransactionTest.java
new file mode 100644
index 0000000000..dfc77c4e62
--- /dev/null
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItReadOnlyTransactionTest.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.row.Row;
+import org.apache.ignite.internal.schema.row.RowAssembler;
+import org.apache.ignite.internal.sql.engine.AbstractBasicIntegrationTest;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.tx.Transaction;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Test reads with specific timestamp.
+ */
+public class ItReadOnlyTransactionTest extends AbstractBasicIntegrationTest {
+    /** Table name. */
+    public static final String TABLE_NAME = "tbl";
+    /** Gap in future to request a data. */
+    public static final int FUTURE_GAP = 200;
+
+    @BeforeAll
+    public void beforeTestStart() {
+        sql("CREATE TABLE " + TABLE_NAME + " (id INT PRIMARY KEY, val VARCHAR) 
WITH REPLICAS=" + nodes() + ", PARTITIONS=10");
+
+        Ignite ignite = CLUSTER_NODES.get(0);
+
+        ignite.transactions().runInTransaction(tx -> {
+            for (int i = 0; i < 100; i++) {
+                sql(tx, "INSERT INTO " + TABLE_NAME + " VALUES (?, ?)", i, 
"str " + i);
+            }
+
+            assertEquals(100, checkData(tx, id -> "str " + id));
+        });
+
+        assertEquals(100, checkData(null, id -> "str " + id));
+    }
+
+    /**
+     * Check rows in the table {@link ItReadOnlyTransactionTest#TABLE_NAME}.
+     *
+     * @param tx Transaction. The parameter might be {@code null} for implicit 
transaction.
+     * @param valueMapper Function to map a primary key to a column.
+     * @return Count of rows in the table.
+     */
+    private static int checkData(Transaction tx, Function<Integer, String> 
valueMapper) {
+        List<List<Object>> rows = sql(tx, "SELECT id, val FROM " + TABLE_NAME 
+ " ORDER BY id");
+
+        for (List<Object> row : rows) {
+            var id = (Integer) row.get(0);
+
+            assertEquals(valueMapper.apply(id), row.get(1));
+        }
+
+        return rows.size();
+    }
+
+    @Test
+    public void testFutureRead() throws Exception {
+        for (int i = 0; i < nodes(); i++) {
+            Ignite ignite = CLUSTER_NODES.get(i);
+
+            InternalTable internalTable = ((TableImpl) 
ignite.tables().table(TABLE_NAME)).internalTable();
+            SchemaDescriptor schema = ((TableImpl) 
ignite.tables().table(TABLE_NAME)).schemaView().schema();
+            HybridClock clock = ((IgniteImpl) ignite).clock();
+
+            Collection<ClusterNode> nodes = ignite.clusterNodes();
+
+            for (ClusterNode clusterNode : nodes) {
+                CompletableFuture<BinaryRow> getFut = 
internalTable.get(createRowKey(schema, 100 + i), clock.now(), clusterNode);
+
+                assertNull(getFut.join());
+            }
+
+            ArrayList<CompletableFuture<BinaryRow>> futs = new 
ArrayList<>(nodes.size());
+
+            long startTime = System.currentTimeMillis();
+
+            for (ClusterNode clusterNode : nodes) {
+                CompletableFuture<BinaryRow> getFut = internalTable.get(
+                        createRowKey(schema, 100 + i),
+                        new HybridTimestamp(clock.now().getPhysical() + 
FUTURE_GAP, 0),
+                        clusterNode
+                );
+                assertFalse(getFut.isDone());
+
+                futs.add(getFut);
+            }
+
+            internalTable.insert(createRow(schema, 100 + i), null).get();
+
+            log.info("Delay to create a new data record [node={}, delay={}]", 
ignite.name(), (System.currentTimeMillis() - startTime));
+
+            assertTrue(System.currentTimeMillis() - startTime < FUTURE_GAP,
+                    "Too long to execute [delay=" + 
(System.currentTimeMillis() - startTime) + ']');
+
+            for (var getFut : futs) {
+                assertNotNull(getFut.get(10, TimeUnit.SECONDS));
+            }
+        }
+
+        assertEquals(100 + nodes(), checkData(null, id -> id < 100 ? ("str " + 
id) : ("new str " + id)));
+
+        Ignite ignite = CLUSTER_NODES.get(0);
+
+        ignite.transactions().runInTransaction(tx -> {
+            for (int i = 100; i < 100 + nodes(); i++) {
+                sql(tx, "DELETE FROM " + TABLE_NAME + " WHERE id = ?", i);
+            }
+        });
+    }
+
+    @Test
+    public void testPastRead() throws Exception {
+        for (int i = 0; i < nodes(); i++) {
+            Ignite ignite = CLUSTER_NODES.get(i);
+
+            InternalTable internalTable = ((TableImpl) 
ignite.tables().table(TABLE_NAME)).internalTable();
+            SchemaDescriptor schema = ((TableImpl) 
ignite.tables().table(TABLE_NAME)).schemaView().schema();
+            HybridClock clock = ((IgniteImpl) ignite).clock();
+
+            Collection<ClusterNode> nodes = ignite.clusterNodes();
+
+            for (ClusterNode clusterNode : nodes) {
+                CompletableFuture<BinaryRow> getFut = 
internalTable.get(createRowKey(schema, i), clock.now(), clusterNode);
+
+                assertNotNull(getFut.join());
+            }
+
+            var pastTs = clock.now();
+
+            long startTime = System.currentTimeMillis();
+
+            internalTable.delete(createRowKey(schema, i), null).get();
+
+            for (ClusterNode clusterNode : nodes) {
+                CompletableFuture<BinaryRow> getFut = 
internalTable.get(createRowKey(schema, i), clock.now(), clusterNode);
+
+                assertNull(getFut.join());
+            }
+
+            log.info("Delay to remove a data record [node={}, delay={}]", 
ignite.name(), (System.currentTimeMillis() - startTime));
+
+            for (ClusterNode clusterNode : nodes) {
+                CompletableFuture<BinaryRow> getFut = 
internalTable.get(createRowKey(schema, i), pastTs, clusterNode);
+
+                assertNotNull(getFut.join());
+            }
+        }
+
+        assertEquals(100 - nodes(), checkData(null, id -> "str " + id));
+
+        Ignite ignite = CLUSTER_NODES.get(0);
+
+        ignite.transactions().runInTransaction(tx -> {
+            for (int i = 0; i < nodes(); i++) {
+                sql(tx, "INSERT INTO " + TABLE_NAME + " VALUES (?, ?)", i, 
"str " + i);
+            }
+        });
+    }
+
+    private static Row createRow(SchemaDescriptor schema, int id) {
+        RowAssembler rowBuilder = new RowAssembler(schema);
+
+        rowBuilder.appendInt(id);
+        rowBuilder.appendString("new str " + id);
+
+        return new Row(schema, rowBuilder.build());
+    }
+
+    private static Row createRowKey(SchemaDescriptor schema, int id) {
+        RowAssembler rowBuilder = RowAssembler.keyAssembler(schema);
+
+        rowBuilder.appendInt(id);
+
+        return new Row(schema, rowBuilder.build());
+    }
+}
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItRoReadsTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItRoReadsTest.java
new file mode 100644
index 0000000000..e7ab9eef57
--- /dev/null
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItRoReadsTest.java
@@ -0,0 +1,543 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import static org.apache.ignite.internal.runner.app.ItTablesApiTest.SCHEMA;
+import static 
org.apache.ignite.internal.schema.testutils.SchemaConfigurationConverter.convert;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Flow.Publisher;
+import java.util.concurrent.Flow.Subscriber;
+import java.util.concurrent.Flow.Subscription;
+import java.util.stream.Collectors;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.BinaryRowEx;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.row.Row;
+import org.apache.ignite.internal.schema.row.RowAssembler;
+import org.apache.ignite.internal.schema.testutils.builder.SchemaBuilders;
+import org.apache.ignite.internal.schema.testutils.definition.ColumnDefinition;
+import org.apache.ignite.internal.schema.testutils.definition.ColumnType;
+import org.apache.ignite.internal.table.distributed.TableManager;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.table.KeyValueView;
+import org.apache.ignite.table.Table;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.tx.Transaction;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Tests for the read-only API.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+public class ItRoReadsTest extends BaseIgniteAbstractTest {
+    private static final IgniteLogger LOG = 
Loggers.forClass(ItRoReadsTest.class);
+
+    private static final String TABLE_NAME = "some-table";
+
+    private static final SchemaDescriptor SCHEMA_1 = new SchemaDescriptor(
+            1,
+            new Column[]{new Column("key", NativeTypes.INT64, false)},
+            new Column[]{
+                    new Column("valInt", NativeTypes.INT32, false),
+                    new Column("valStr", NativeTypes.STRING, false)
+            }
+    );
+
+    private static final int BASE_PORT = 3344;
+
+    private static final String NODE_BOOTSTRAP_CFG = "{\n"
+            + "  \"network\": {\n"
+            + "    \"port\":{},\n"
+            + "    \"nodeFinder\":{\n"
+            + "      \"netClusterNodes\": [ {} ]\n"
+            + "    }\n"
+            + "  }\n"
+            + "}";
+
+    private static Ignite NODE;
+
+    @WorkDirectory
+    private static Path WORK_DIR;
+
+    private Table table;
+
+    @BeforeAll
+    static void startNode(TestInfo testInfo) {
+        String connectNodeAddr = "\"localhost:" + BASE_PORT + '\"';
+
+        String nodeName = testNodeName(testInfo, 0);
+
+        String config = IgniteStringFormatter.format(NODE_BOOTSTRAP_CFG, 
BASE_PORT, connectNodeAddr);
+
+        CompletableFuture<Ignite> future = IgnitionManager.start(nodeName, 
config, WORK_DIR.resolve(nodeName));
+
+        String metaStorageNodeName = testNodeName(testInfo, nodes() - 1);
+
+        IgnitionManager.init(metaStorageNodeName, 
List.of(metaStorageNodeName), "cluster");
+
+        assertThat(future, willCompleteSuccessfully());
+
+        NODE = future.join();
+    }
+
+    @AfterAll
+    static void stopNode(TestInfo testInfo) throws Exception {
+        LOG.info("Start tearDown()");
+
+        NODE = null;
+
+        IgniteUtils.closeAll(() -> IgnitionManager.stop(testNodeName(testInfo, 
0)));
+
+        LOG.info("End tearDown()");
+    }
+
+    @BeforeEach
+    void createTable() {
+        table = startTable(node(), TABLE_NAME);
+    }
+
+    @AfterEach
+    void dropTable() {
+        stopTable(node(), TABLE_NAME);
+
+        table = null;
+    }
+
+    @Test
+    public void testRoGet() throws Exception {
+        IgniteImpl node = node();
+
+        InternalTable internalTable = ((TableImpl) table).internalTable();
+
+        Row keyValueRow = createKeyValueRow(1, 1, "some string row" + 1);
+
+        BinaryRow res = internalTable.get(keyValueRow, node.clock().now(), 
node.node()).get();
+
+        assertNull(res);
+
+        KeyValueView<Tuple, Tuple> keyValueView = table.keyValueView();
+
+        populateData(node, keyValueView, false);
+
+        res = internalTable.get(keyValueRow, node.clock().now(), 
node.node()).get();
+
+        assertEquals(res.byteBuffer(), keyValueRow.byteBuffer());
+    }
+
+    @Test
+    public void testRoGetWithSeveralInserts() throws Exception {
+        IgniteImpl node = node();
+
+        InternalTable internalTable = ((TableImpl) table).internalTable();
+
+        Row keyValueRow = createKeyValueRow(1, 1, "some string row" + 1);
+
+        Row keyValueRow2 = createKeyValueRow(1, 2, "some string row" + 2);
+
+        Row keyRow = createKeyRow(1);
+
+        assertNull(internalTable.get(keyRow, node.clock().now(), 
node.node()).get());
+        assertNull(internalTable.get(keyRow, node.clock().now(), 
node.node()).get());
+
+        Transaction tx1 = node.transactions().begin();
+
+        internalTable.upsert(keyValueRow, (InternalTransaction) tx1).get();
+
+        tx1.commit();
+
+        Transaction tx2 = node.transactions().begin();
+
+        internalTable.upsert(keyValueRow2, (InternalTransaction) tx2).get();
+
+        tx2.commit();
+
+        BinaryRow res = internalTable.get(keyRow, node.clock().now(), 
node.node()).get();
+
+        assertEquals(res.byteBuffer(), keyValueRow2.byteBuffer());
+    }
+
+    @Test
+    public void testRoScanWithSeveralInserts() throws Exception {
+        IgniteImpl node = node();
+
+        InternalTable internalTable = ((TableImpl) table).internalTable();
+
+        Row keyValueRow = createKeyValueRow(1, 1, "some string row" + 1);
+
+        Row keyValueRow2 = createKeyValueRow(1, 2, "some string row" + 2);
+
+        Row keyRow = createKeyRow(1);
+
+        assertNull(internalTable.get(keyRow, node.clock().now(), 
node.node()).get());
+        assertNull(internalTable.get(keyRow, node.clock().now(), 
node.node()).get());
+
+        Transaction tx1 = node.transactions().begin();
+
+        internalTable.insert(keyValueRow, (InternalTransaction) tx1).get();
+
+        tx1.commit();
+
+        Transaction tx2 = node.transactions().begin();
+
+        internalTable.upsert(keyValueRow2, (InternalTransaction) tx2).get();
+
+        tx2.commit();
+
+        Publisher<BinaryRow> res = internalTable.scan(0, node.clock().now(), 
node.node());
+
+        CountDownLatch latch = new CountDownLatch(1);
+
+        List<ByteBuffer> list = new ArrayList<>();
+
+        res.subscribe(new Subscriber<BinaryRow>() {
+            @Override
+            public void onSubscribe(Subscription subscription) {
+                subscription.request(100);
+            }
+
+            @Override
+            public void onNext(BinaryRow item) {
+                list.add(item.byteBuffer());
+            }
+
+            @Override
+            public void onError(Throwable throwable) {
+            }
+
+            @Override
+            public void onComplete() {
+                latch.countDown();
+            }
+        });
+
+        latch.await();
+
+        assertEquals(1, list.size());
+
+        assertEquals(list.get(0), keyValueRow2.byteBuffer());
+    }
+
+    @Test
+    public void testRoGetOngoingCommitIsNotVisible() throws Exception {
+        IgniteImpl node = node();
+
+        InternalTable internalTable = ((TableImpl) table).internalTable();
+
+        Row keyValueRow = createKeyValueRow(1, 1, "some string row" + 1);
+
+        Row keyValueRow2 = createKeyValueRow(1, 2, "some string row" + 2);
+
+        assertNull(internalTable.get(keyValueRow, node.clock().now(), 
node.node()).get());
+        assertNull(internalTable.get(keyValueRow2, node.clock().now(), 
node.node()).get());
+
+        Transaction tx1 = node.transactions().begin();
+
+        internalTable.insert(keyValueRow, (InternalTransaction) tx1).get();
+
+        tx1.commit();
+
+        Transaction tx2 = node.transactions().begin();
+
+        internalTable.upsert(keyValueRow2, (InternalTransaction) tx2);
+
+        BinaryRow res = internalTable.get(keyValueRow, node.clock().now(), 
node.node()).get();
+
+        assertEquals(res.byteBuffer(), keyValueRow.byteBuffer());
+
+        tx2.commit();
+
+        res = internalTable.get(keyValueRow, node.clock().now(), 
node.node()).get();
+
+        assertEquals(res.byteBuffer(), keyValueRow2.byteBuffer());
+    }
+
+    @Test
+    public void testRoGetAll() throws Exception {
+        IgniteImpl node = node();
+
+        InternalTable internalTable = ((TableImpl) table).internalTable();
+
+        Row keyValueRow1 = createKeyValueRow(1, 1, "some string row" + 1);
+        Row keyValueRow2 = createKeyValueRow(2, 2, "some string row" + 2);
+        Row keyValueRow3 = createKeyValueRow(3, 3, "some string row" + 3);
+
+        Set<BinaryRowEx> rowsToSearch = Set.of(keyValueRow1, keyValueRow2, 
keyValueRow3);
+
+        KeyValueView<Tuple, Tuple> keyValueView = table.keyValueView();
+
+        Collection<BinaryRow> res = internalTable.getAll(rowsToSearch, 
node.clock().now(), node.node()).get();
+
+        assertEquals(res.size(), 0);
+
+        node.transactions().runInTransaction(txs -> {
+            for (int i = 0; i < 15; i++) {
+                putValue(keyValueView, i, txs);
+            }
+        });
+
+        res = internalTable.getAll(rowsToSearch, node.clock().now(), 
node.node()).get();
+
+        assertEquals(res.size(), 3);
+
+        Set<ByteBuffer> resultKeys = 
res.stream().map(BinaryRow::byteBuffer).collect(Collectors.toSet());
+
+        assertTrue(resultKeys.contains(keyValueRow1.byteBuffer()));
+        assertTrue(resultKeys.contains(keyValueRow2.byteBuffer()));
+        assertTrue(resultKeys.contains(keyValueRow3.byteBuffer()));
+    }
+
+    @Test
+    public void testRoGetAllWithSeveralInserts() throws ExecutionException, 
InterruptedException {
+        IgniteImpl node = node();
+
+        InternalTable internalTable = ((TableImpl) table).internalTable();
+
+        Row keyValueRow1 = createKeyValueRow(1, 1, "some string row" + 1);
+        Row keyValueRow2 = createKeyValueRow(2, 2, "some string row" + 2);
+        Row keyValueRow3 = createKeyValueRow(3, 3, "some string row" + 3);
+
+        Set<BinaryRowEx> rowsToSearch = Set.of(keyValueRow1, keyValueRow2, 
keyValueRow3);
+
+        KeyValueView<Tuple, Tuple> keyValueView = table.keyValueView();
+
+        Collection<BinaryRow> res = internalTable.getAll(rowsToSearch, 
node.clock().now(), node.node()).get();
+
+        assertEquals(res.size(), 0);
+
+        populateData(node(), keyValueView, false);
+
+        res = internalTable.getAll(rowsToSearch, node.clock().now(), 
node.node()).get();
+
+        assertEquals(res.size(), 3);
+
+        Set<ByteBuffer> resultKeys = 
res.stream().map(BinaryRow::byteBuffer).collect(Collectors.toSet());
+
+        assertTrue(resultKeys.contains(keyValueRow1.byteBuffer()));
+        assertTrue(resultKeys.contains(keyValueRow2.byteBuffer()));
+        assertTrue(resultKeys.contains(keyValueRow3.byteBuffer()));
+
+        node.transactions().runInTransaction(txs -> {
+            for (int i = 0; i < 15; i++) {
+                putValue(keyValueView, i + 100, txs);
+            }
+        });
+
+        Row newKeyValueRow1 = createKeyValueRow(1, 101, "some string row" + 
101);
+        Row newKeyValueRow2 = createKeyValueRow(2, 102, "some string row" + 
102);
+        Row newKeyValueRow3 = createKeyValueRow(3, 103, "some string row" + 
103);
+
+        res = internalTable.getAll(rowsToSearch, node.clock().now(), 
node.node()).get();
+
+        assertEquals(res.size(), 3);
+
+        resultKeys = 
res.stream().map(BinaryRow::byteBuffer).collect(Collectors.toSet());
+
+        assertTrue(resultKeys.contains(newKeyValueRow1.byteBuffer()));
+        assertTrue(resultKeys.contains(newKeyValueRow2.byteBuffer()));
+        assertTrue(resultKeys.contains(newKeyValueRow3.byteBuffer()));
+    }
+
+    @Test
+    public void testRoScanAllImplicitPopulatingData() throws 
InterruptedException {
+        roScanAll(true);
+    }
+
+    @Test
+    public void testRoScanAllExplicitPopulatingData() throws 
InterruptedException {
+        roScanAll(false);
+    }
+
+    private void roScanAll(boolean implicit) throws InterruptedException {
+        IgniteImpl node = node();
+
+        InternalTable internalTable = ((TableImpl) table).internalTable();
+
+        KeyValueView<Tuple, Tuple> keyValueView = table.keyValueView();
+
+        Publisher<BinaryRow> res = internalTable.scan(0, node.clock().now(), 
node.node());
+
+        var subscriberAllDataAwaitLatch = new CountDownLatch(1);
+
+        var retrievedItems = new ArrayList<BinaryRow>();
+
+        res.subscribe(new Subscriber<>() {
+            @Override
+            public void onSubscribe(Subscription subscription) {
+                subscription.request(10000);
+            }
+
+            @Override
+            public void onNext(BinaryRow item) {
+                retrievedItems.add(item);
+            }
+
+            @Override
+            public void onError(Throwable throwable) {
+                fail("onError call is not expected.");
+            }
+
+            @Override
+            public void onComplete() {
+                subscriberAllDataAwaitLatch.countDown();
+            }
+        });
+
+        subscriberAllDataAwaitLatch.await();
+
+        assertEquals(0, retrievedItems.size());
+
+        populateData(node, keyValueView, implicit);
+
+        res = internalTable.scan(0, node.clock().now(), node.node());
+
+        var subscriberAllDataAwaitLatch2 = new CountDownLatch(1);
+
+        res.subscribe(new Subscriber<>() {
+            @Override
+            public void onSubscribe(Subscription subscription) {
+                subscription.request(10000);
+            }
+
+            @Override
+            public void onNext(BinaryRow item) {
+                retrievedItems.add(item);
+            }
+
+            @Override
+            public void onError(Throwable throwable) {
+                fail("onError call is not expected.");
+            }
+
+            @Override
+            public void onComplete() {
+                subscriberAllDataAwaitLatch2.countDown();
+            }
+        });
+
+        subscriberAllDataAwaitLatch2.await();
+
+        assertEquals(15, retrievedItems.size());
+    }
+
+    private static Row createKeyValueRow(long id, int value, String str) {
+        RowAssembler rowBuilder = new RowAssembler(SCHEMA_1, false, -1);
+
+        rowBuilder.appendLong(id);
+        rowBuilder.appendInt(value);
+        rowBuilder.appendString(str);
+
+        return new Row(SCHEMA_1, rowBuilder.build());
+    }
+
+    private static Row createKeyRow(long id) {
+        RowAssembler rowBuilder = RowAssembler.keyAssembler(SCHEMA_1);
+
+        rowBuilder.appendLong(id);
+
+        return new Row(SCHEMA_1, rowBuilder.build());
+    }
+
+    private static void putValue(KeyValueView<Tuple, Tuple> kv, int val) {
+        putValue(kv, val, null);
+    }
+
+    private static void putValue(KeyValueView<Tuple, Tuple> kv, int val, 
Transaction tx) {
+        Tuple tableKey = Tuple.create().set("key", Long.valueOf(val % 100));
+
+        Tuple value = Tuple.create().set("valInt", 
Integer.valueOf(val)).set("valStr", "some string row" + val);
+
+        kv.put(tx, tableKey, value);
+    }
+
+    private static void populateData(Ignite node, KeyValueView<Tuple, Tuple> 
keyValueView, boolean implicit) {
+        if (implicit) {
+            for (int i = 0; i < 15; i++) {
+                putValue(keyValueView, i);
+            }
+        } else {
+            Transaction tx1 = node.transactions().begin();
+
+            for (int i = 0; i < 15; i++) {
+                putValue(keyValueView, i, tx1);
+            }
+
+            tx1.commit();
+        }
+    }
+
+    private static Table startTable(Ignite node, String tableName) {
+        List<ColumnDefinition> cols = new ArrayList<>();
+        cols.add(SchemaBuilders.column("key", ColumnType.INT64).build());
+        cols.add(SchemaBuilders.column("valInt", 
ColumnType.INT32).asNullable(true).build());
+        cols.add(SchemaBuilders.column("valStr", 
ColumnType.string()).withDefaultValue("default").build());
+
+        return await(((TableManager) node.tables()).createTableAsync(
+                tableName,
+                tblCh -> convert(SchemaBuilders.tableBuilder(SCHEMA, 
tableName).columns(
+                        cols).withPrimaryKey("key").build(), tblCh)
+                        .changePartitions(1)
+        ));
+    }
+
+    private static void stopTable(Ignite node, String tableName) {
+        await(((TableManager) node.tables()).dropTableAsync(tableName));
+    }
+
+    protected static int nodes() {
+        return 1;
+    }
+
+    protected static IgniteImpl node() {
+        return (IgniteImpl) NODE;
+    }
+}
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index f579c56f2f..714198a2eb 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -971,4 +971,14 @@ public class IgniteImpl implements Ignite {
     public void stopDroppingMessages() {
         ((DefaultMessagingService) 
clusterSvc.messagingService()).stopDroppingMessages();
     }
+
+    /**
+     * Returns the node's hybrid clock.
+     *
+     * @return Hybrid clock.
+     */
+    @TestOnly
+    public HybridClock clock() {
+        return clock;
+    }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index eb15ddb1a1..fe50db631e 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -396,7 +396,8 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
         IgniteUuid cursorId = new IgniteUuid(txId, request.scanId());
 
-        CompletableFuture<Void> safeReadFuture = isPrimary ? 
completedFuture(null) : safeTime.waitFor(readTimestamp);
+        CompletableFuture<Void> safeReadFuture = 
isPrimaryInTimestamp(isPrimary, readTimestamp) ? completedFuture(null)
+                : safeTime.waitFor(readTimestamp);
 
         if (request.indexToUse() != null) {
             TableSchemaAwareIndexStorage indexStorage = 
secondaryIndexStorages.get().get(request.indexToUse());
@@ -486,11 +487,23 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                     format("Unknown single request [actionType={}]", 
request.requestType()));
         }
 
-        CompletableFuture<Void> safeReadFuture = isPrimary ? 
completedFuture(null) : safeTime.waitFor(request.readTimestamp());
+        CompletableFuture<Void> safeReadFuture = 
isPrimaryInTimestamp(isPrimary, readTimestamp) ? completedFuture(null)
+                : safeTime.waitFor(request.readTimestamp());
 
         return safeReadFuture.thenCompose(unused -> 
resolveRowByPkForReadOnly(searchRow, readTimestamp));
     }
 
+    /**
+     * Checks that the node is primary and {@code timestamp} is already passed 
in the reference system of the current node.
+     *
+     * @param isPrimary True if the node is primary, false otherwise.
+     * @param timestamp Timestamp to check.
+     * @return True if the timestamp is already passed in the reference system 
of the current node and node is primary, false otherwise.
+     */
+    private boolean isPrimaryInTimestamp(Boolean isPrimary, HybridTimestamp 
timestamp) {
+        return isPrimary && hybridClock.now().compareTo(timestamp) > 0;
+    }
+
     /**
      * Processes multiple entries request for read only transaction.
      *
@@ -510,7 +523,8 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                     format("Unknown single request [actionType={}]", 
request.requestType()));
         }
 
-        CompletableFuture<Void> safeReadFuture = isPrimary ? 
completedFuture(null) : safeTime.waitFor(request.readTimestamp());
+        CompletableFuture<Void> safeReadFuture = 
isPrimaryInTimestamp(isPrimary, readTimestamp) ? completedFuture(null)
+                : safeTime.waitFor(request.readTimestamp());
 
         return safeReadFuture.thenCompose(unused -> {
             ArrayList<CompletableFuture<BinaryRow>> resolutionFuts = new 
ArrayList<>(searchRows.size());
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index 71cbce1d0f..271a3a73bd 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -36,6 +36,8 @@ import org.apache.ignite.distributed.TestPartitionDataStorage;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.raft.Command;
 import org.apache.ignite.internal.raft.Peer;
 import org.apache.ignite.internal.raft.WriteCommand;
@@ -85,6 +87,8 @@ import org.jetbrains.annotations.Nullable;
  * Dummy table storage implementation.
  */
 public class DummyInternalTableImpl extends InternalTableImpl {
+    private static final IgniteLogger LOG = 
Loggers.forClass(DummyInternalTableImpl.class);
+
     public static final NetworkAddress ADDR = new NetworkAddress("127.0.0.1", 
2004);
 
     private static final int PART_ID = 0;
@@ -95,6 +99,8 @@ public class DummyInternalTableImpl extends InternalTableImpl 
{
             new Column[]{new Column("value", NativeTypes.INT64, false)}
     );
 
+    private static final  HybridClock CLOCK = new HybridClockImpl();
+
     private static final ReplicationGroupId crossTableGroupId = new 
TablePartitionId(UUID.randomUUID(), 0);
 
     private PartitionListener partitionListener;
@@ -103,6 +109,9 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
 
     private ReplicationGroupId groupId;
 
+    /** The thread updates safe time on the dummy replica. */
+    private Thread safeTimeUpdaterThread;
+
     /**
      * Creates a new local table.
      *
@@ -172,11 +181,11 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
                 Int2ObjectMaps.singleton(PART_ID, 
mock(RaftGroupService.class)),
                 1,
                 name -> mock(ClusterNode.class),
-                txManager == null ? new TxManagerImpl(replicaSvc, new 
HeapLockManager(), new HybridClockImpl()) : txManager,
+                txManager == null ? new TxManagerImpl(replicaSvc, new 
HeapLockManager(), CLOCK) : txManager,
                 mock(MvTableStorage.class),
                 new TestTxStateTableStorage(),
                 replicaSvc,
-                new HybridClockImpl()
+                CLOCK
         );
         RaftGroupService svc = partitionMap.get(0);
 
@@ -256,7 +265,6 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
 
         IndexLocker pkLocker = new HashIndexLocker(indexId, true, 
this.txManager.lockManager(), row2Tuple);
 
-        HybridClock clock = new HybridClockImpl();
         PendingComparableValuesTracker<HybridTimestamp> safeTime = new 
PendingComparableValuesTracker<>(new HybridTimestamp(1, 0));
         PartitionDataStorage partitionDataStorage = new 
TestPartitionDataStorage(mvPartStorage);
         Supplier<Map<UUID, TableSchemaAwareIndexStorage>> indexes = () -> 
Map.of(pkStorage.get().id(), pkStorage.get());
@@ -281,7 +289,7 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
                 () -> Map.of(pkLocker.id(), pkLocker),
                 pkStorage,
                 () -> Map.of(),
-                clock,
+                CLOCK,
                 safeTime,
                 txStateStorage().getOrCreateTxStateStorage(PART_ID),
                 placementDriver,
@@ -296,6 +304,34 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
                 txStateStorage().getOrCreateTxStateStorage(PART_ID),
                 safeTime
         );
+
+        safeTimeUpdaterThread = new Thread(new SafeTimeUpdater(safeTime), 
"safe-time-updater");
+
+        safeTimeUpdaterThread.start();
+    }
+
+    /**
+     * A process to update safe time periodically.
+     */
+    private static class SafeTimeUpdater implements Runnable {
+        PendingComparableValuesTracker<HybridTimestamp> safeTime;
+
+        public SafeTimeUpdater(PendingComparableValuesTracker<HybridTimestamp> 
safeTime) {
+            this.safeTime = safeTime;
+        }
+
+        @Override
+        public void run() {
+            while (true) {
+                safeTime.update(CLOCK.now());
+
+                try {
+                    Thread.sleep(1_000);
+                } catch (InterruptedException e) {
+                    LOG.warn("The sfe time updater thread is interrupted");
+                }
+            }
+        }
     }
 
     /**
@@ -354,4 +390,15 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
     public CompletableFuture<ClusterNode> evaluateReadOnlyRecipientNode(int 
partId) {
         return CompletableFuture.completedFuture(mock(ClusterNode.class));
     }
+
+    @Override
+    public void close() {
+        super.close();
+
+        if (safeTimeUpdaterThread != null) {
+            safeTimeUpdaterThread.interrupt();
+
+            safeTimeUpdaterThread = null;
+        }
+    }
 }


Reply via email to