This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 1f5d7a2b2d IGNITE-17577 Timestamp replaced with HybridTimestamp in 
MvPartitionStorage. (#1038)
1f5d7a2b2d is described below

commit 1f5d7a2b2d512f19a99cd0c6e5f401d393441c42
Author: ibessonov <[email protected]>
AuthorDate: Tue Aug 30 10:08:56 2022 +0300

    IGNITE-17577 Timestamp replaced with HybridTimestamp in MvPartitionStorage. 
(#1038)
---
 .../java/org/apache/ignite/hlc/HybridClock.java    |  57 ++++++---
 .../org/apache/ignite/hlc/HybridTimestamp.java     |  12 +-
 .../internal/storage/MvPartitionStorage.java       |  55 ++++++++-
 .../storage/AbstractMvPartitionStorageTest.java    | 127 +++++++++++----------
 .../storage/TestMvPartitionStorageTest.java        |   2 +-
 .../TestConcurrentHashMapMvPartitionStorage.java   |  16 +--
 .../mv/AbstractPageMemoryMvPartitionStorage.java   |  26 ++---
 .../storage/pagememory/mv/HybridTimestamps.java    |  98 ++++++++++++++++
 .../storage/pagememory/mv/ReadRowVersion.java      |  12 +-
 .../internal/storage/pagememory/mv/RowVersion.java |  36 ++----
 .../storage/pagememory/mv/RowVersionFreeList.java  |   8 +-
 .../pagememory/mv/ScanVersionChainByTimestamp.java |  12 +-
 .../internal/storage/pagememory/mv/Timestamps.java |  78 -------------
 .../storage/pagememory/mv/io/RowVersionDataIo.java |  12 +-
 .../AbstractPageMemoryMvPartitionStorageTest.java  |   7 +-
 .../storage/rocksdb/RocksDbMvPartitionStorage.java |  77 +++++++------
 .../storage/rocksdb/RocksDbTableStorage.java       |   1 -
 .../rocksdb/RocksDbMvPartitionStorageTest.java     |   4 +-
 18 files changed, 368 insertions(+), 272 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/hlc/HybridClock.java 
b/modules/core/src/main/java/org/apache/ignite/hlc/HybridClock.java
index f1017202fc..9ab7db033a 100644
--- a/modules/core/src/main/java/org/apache/ignite/hlc/HybridClock.java
+++ b/modules/core/src/main/java/org/apache/ignite/hlc/HybridClock.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.hlc;
 
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.VarHandle;
 import java.time.Clock;
 import org.apache.ignite.internal.tostring.S;
 
@@ -24,8 +26,21 @@ import org.apache.ignite.internal.tostring.S;
  * A Hybrid Logical Clock.
  */
 public class HybridClock {
+    /**
+     * Var handle for {@link #latestTime}.
+     */
+    private static final VarHandle LATEST_TIME;
+
+    static {
+        try {
+            LATEST_TIME = 
MethodHandles.lookup().findVarHandle(HybridClock.class, "latestTime", 
HybridTimestamp.class);
+        } catch (NoSuchFieldException | IllegalAccessException e) {
+            throw new ExceptionInInitializerError(e);
+        }
+    }
+
     /** Latest timestamp. */
-    private HybridTimestamp latestTime;
+    private volatile HybridTimestamp latestTime;
 
     /**
      * The constructor which initializes the latest time to current time by 
system clock.
@@ -39,16 +54,25 @@ public class HybridClock {
      *
      * @return The hybrid timestamp.
      */
-    public synchronized HybridTimestamp now() {
-        long currentTimeMillis = Clock.systemUTC().instant().toEpochMilli();
+    public HybridTimestamp now() {
+        while (true) {
+            long currentTimeMillis = 
Clock.systemUTC().instant().toEpochMilli();
 
-        if (latestTime.getPhysical() >= currentTimeMillis) {
-            latestTime = latestTime.addTicks(1);
-        } else {
-            latestTime = new HybridTimestamp(currentTimeMillis, 0);
-        }
+            // Read the latest time after accessing UTC time to reduce 
contention.
+            HybridTimestamp latestTime = this.latestTime;
+
+            HybridTimestamp newLatestTime;
+
+            if (latestTime.getPhysical() >= currentTimeMillis) {
+                newLatestTime = latestTime.addTicks(1);
+            } else {
+                newLatestTime = new HybridTimestamp(currentTimeMillis, 0);
+            }
 
-        return latestTime;
+            if (LATEST_TIME.compareAndSet(this, latestTime, newLatestTime)) {
+                return newLatestTime;
+            }
+        }
     }
 
     /**
@@ -57,14 +81,19 @@ public class HybridClock {
      * @param requestTime Timestamp from request.
      * @return The hybrid timestamp.
      */
-    public synchronized HybridTimestamp update(HybridTimestamp requestTime) {
-        HybridTimestamp now = new 
HybridTimestamp(Clock.systemUTC().instant().toEpochMilli(), -1);
+    public HybridTimestamp update(HybridTimestamp requestTime) {
+        while (true) {
+            HybridTimestamp now = new 
HybridTimestamp(Clock.systemUTC().instant().toEpochMilli(), -1);
 
-        latestTime = HybridTimestamp.max(now, requestTime, latestTime);
+            // Read the latest time after accessing UTC time to reduce 
contention.
+            HybridTimestamp latestTime = this.latestTime;
 
-        latestTime = latestTime.addTicks(1);
+            HybridTimestamp newLatestTime = HybridTimestamp.max(now, 
requestTime, latestTime).addTicks(1);
 
-        return latestTime;
+            if (LATEST_TIME.compareAndSet(this, latestTime, newLatestTime)) {
+                return newLatestTime;
+            }
+        }
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/hlc/HybridTimestamp.java 
b/modules/core/src/main/java/org/apache/ignite/hlc/HybridTimestamp.java
index a781574711..73d8bad4e3 100644
--- a/modules/core/src/main/java/org/apache/ignite/hlc/HybridTimestamp.java
+++ b/modules/core/src/main/java/org/apache/ignite/hlc/HybridTimestamp.java
@@ -1,6 +1,6 @@
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
@@ -28,6 +28,9 @@ public class HybridTimestamp implements 
Comparable<HybridTimestamp>, Serializabl
     /** Serial version UID. */
     private static final long serialVersionUID = 2459861612869605904L;
 
+    /** Timestamp size in bytes. */
+    public static final int HYBRID_TIMESTAMP_SIZE = Long.BYTES + Integer.BYTES;
+
     /** Physical clock. */
     private final long physical;
 
@@ -41,6 +44,11 @@ public class HybridTimestamp implements 
Comparable<HybridTimestamp>, Serializabl
      * @param logical The logical time.
      */
     public HybridTimestamp(long physical, int logical) {
+        assert physical > 0 : physical;
+        // Value -1 is used in "org.apache.ignite.hlc.HybridClock.update" to 
produce "0" after the increment.
+        // Real usable value cannot be negative.
+        assert logical >= -1 : logical;
+
         this.physical = physical;
         this.logical = logical;
     }
@@ -82,6 +90,8 @@ public class HybridTimestamp implements 
Comparable<HybridTimestamp>, Serializabl
      * @return The logical component.
      */
     public int getLogical() {
+        assert logical >= 0;
+
         return logical;
     }
 
diff --git 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
index a45815616c..b6e7074472 100644
--- 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
+++ 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
@@ -21,6 +21,7 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.BiConsumer;
 import java.util.function.Predicate;
+import org.apache.ignite.hlc.HybridTimestamp;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.tx.Timestamp;
 import org.apache.ignite.internal.util.Cursor;
@@ -85,6 +86,21 @@ public interface MvPartitionStorage extends AutoCloseable {
      */
     long persistedIndex();
 
+    /**
+     * Converts {@link Timestamp} to {@link HybridTimestamp} preserving local 
node time order.
+     *
+     * @deprecated Temporary method to support API compatibility.
+     */
+    @Deprecated
+    private static HybridTimestamp convertTimestamp(Timestamp timestamp) {
+        long ts = timestamp.getTimestamp();
+
+        // "timestamp" part consists of two sections:
+        // - a 48-bits number of milliseconds since the beginning of the epoch
+        // - a 16-bits counter
+        return new HybridTimestamp(ts >>> 16, (int) ts & 0xFFFF);
+    }
+
     /**
      * Reads either the committed value from the storage or the uncommitted 
value belonging to given transaction.
      *
@@ -97,6 +113,17 @@ public interface MvPartitionStorage extends AutoCloseable {
     @Nullable
     BinaryRow read(RowId rowId, UUID txId) throws TxIdMismatchException, 
StorageException;
 
+    /**
+     * Reads the value from the storage as it was at the given timestamp.
+     *
+     * @deprecated Use {@link #read(RowId, HybridTimestamp)}
+     */
+    @Nullable
+    @Deprecated
+    default BinaryRow read(RowId rowId, Timestamp timestamp) throws 
StorageException {
+        return read(rowId, convertTimestamp(timestamp));
+    }
+
     /**
      * Reads the value from the storage as it was at the given timestamp.
      *
@@ -105,7 +132,7 @@ public interface MvPartitionStorage extends AutoCloseable {
      * @return Binary row that corresponds to the key or {@code null} if value 
is not found.
      */
     @Nullable
-    BinaryRow read(RowId rowId, Timestamp timestamp) throws StorageException;
+    BinaryRow read(RowId rowId, HybridTimestamp timestamp) throws 
StorageException;
 
     /**
      * Creates an uncommitted version, assigning a new row id to it.
@@ -147,6 +174,16 @@ public interface MvPartitionStorage extends AutoCloseable {
      */
     @Nullable BinaryRow abortWrite(RowId rowId) throws StorageException;
 
+    /**
+     * Commits a pending update of the ongoing transaction.
+     *
+     * @deprecated Use {@link #commitWrite(RowId, HybridTimestamp)}
+     */
+    @Deprecated
+    default void commitWrite(RowId rowId, Timestamp timestamp) throws 
StorageException {
+        commitWrite(rowId, convertTimestamp(timestamp));
+    }
+
     /**
      * Commits a pending update of the ongoing transaction. Invoked during 
commit. Committed value will be versioned by the given timestamp.
      *
@@ -154,7 +191,7 @@ public interface MvPartitionStorage extends AutoCloseable {
      * @param timestamp Timestamp to associate with committed value.
      * @throws StorageException If failed to write data to the storage.
      */
-    void commitWrite(RowId rowId, Timestamp timestamp) throws StorageException;
+    void commitWrite(RowId rowId, HybridTimestamp timestamp) throws 
StorageException;
 
     /**
      * Scans the partition and returns a cursor of values. All filtered values 
must either be uncommitted in current transaction
@@ -167,6 +204,16 @@ public interface MvPartitionStorage extends AutoCloseable {
      */
     Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, UUID txId) throws 
TxIdMismatchException, StorageException;
 
+    /**
+     * Scans the partition and returns a cursor of values at the given 
timestamp.
+     *
+     * @deprecated Use {@link #scan(Predicate, HybridTimestamp)}
+     */
+    @Deprecated
+    default Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, Timestamp 
timestamp) throws StorageException {
+        return scan(keyFilter, convertTimestamp(timestamp));
+    }
+
     /**
      * Scans the partition and returns a cursor of values at the given 
timestamp.
      *
@@ -176,7 +223,7 @@ public interface MvPartitionStorage extends AutoCloseable {
      * @throws TxIdMismatchException If there's another pending update 
associated with different transaction id.
      * @throws StorageException If failed to read data from the storage.
      */
-    Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, Timestamp 
timestamp) throws StorageException;
+    Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, HybridTimestamp 
timestamp) throws StorageException;
 
     /**
      * Returns rows count belongs to current storage.
@@ -193,7 +240,7 @@ public interface MvPartitionStorage extends AutoCloseable {
      * Iterates over all versions of all entries, except for tombstones.
      *
      * @param consumer Closure to process entries.
-     * @deprecated This method was bord out of desperation and isn't 
well-designed. Implementation is not polished either. Currently, it's
+     * @deprecated This method was born out of desperation and isn't 
well-designed. Implementation is not polished either. Currently, it's
      *      only usage is to work-around in-memory PK index rebuild on node 
restart, which shouldn't even exist in the first place.
      */
     @Deprecated
diff --git 
a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
 
b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
index 1929314423..682f913079 100644
--- 
a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
+++ 
b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
@@ -39,6 +39,8 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
+import org.apache.ignite.hlc.HybridClock;
+import org.apache.ignite.hlc.HybridTimestamp;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.tx.Timestamp;
 import org.apache.ignite.internal.util.Cursor;
@@ -55,6 +57,9 @@ public abstract class AbstractMvPartitionStorageTest extends 
BaseMvStoragesTest
 
     protected final UUID txId = newTransactionId();
 
+    /** Hybrid clock to generate timestamps. */
+    protected final HybridClock clock = new HybridClock();
+
     protected final TestKey key = new TestKey(10, "foo");
     private final TestValue value = new TestValue(20, "bar");
     protected final BinaryRow binaryRow = binaryRow(key, value);
@@ -71,14 +76,14 @@ public abstract class AbstractMvPartitionStorageTest 
extends BaseMvStoragesTest
     /**
      * Reads a row inside of consistency closure.
      */
-    protected BinaryRow read(RowId rowId, Timestamp timestamp) {
+    protected BinaryRow read(RowId rowId, HybridTimestamp timestamp) {
         return storage.runConsistently(() -> storage.read(rowId, timestamp));
     }
 
     /**
      * Scans partition inside of consistency closure.
      */
-    protected Cursor<BinaryRow> scan(Predicate<BinaryRow> filter, Timestamp 
timestamp) {
+    protected Cursor<BinaryRow> scan(Predicate<BinaryRow> filter, 
HybridTimestamp timestamp) {
         return storage.runConsistently(() -> storage.scan(filter, timestamp));
     }
 
@@ -106,7 +111,7 @@ public abstract class AbstractMvPartitionStorageTest 
extends BaseMvStoragesTest
     /**
      * Commits write-intent inside of consistency closure.
      */
-    protected void commitWrite(RowId rowId, Timestamp tsExact) {
+    protected void commitWrite(RowId rowId, HybridTimestamp tsExact) {
         storage.runConsistently(() -> {
             storage.commitWrite(rowId, tsExact);
 
@@ -138,15 +143,13 @@ public abstract class AbstractMvPartitionStorageTest 
extends BaseMvStoragesTest
         assertEquals(PARTITION_ID, rowId.partitionId());
 
         assertNull(read(rowId, newTransactionId()));
-        assertNull(read(rowId, Timestamp.nextVersion()));
+        assertNull(read(rowId, clock.now()));
     }
 
     @Test
     public void testScanOverEmpty() throws Exception {
-        new RowId(PARTITION_ID);
-
         assertEquals(List.of(), convert(scan(row -> true, 
newTransactionId())));
-        assertEquals(List.of(), convert(scan(row -> true, 
Timestamp.nextVersion())));
+        assertEquals(List.of(), convert(scan(row -> true, clock.now())));
     }
 
     /**
@@ -169,7 +172,7 @@ public abstract class AbstractMvPartitionStorageTest 
extends BaseMvStoragesTest
         assertThrows(TxIdMismatchException.class, () -> read(rowId, 
newTransactionId()));
 
         // Read with timestamp returns null.
-        assertNull(read(rowId, Timestamp.nextVersion()));
+        assertNull(read(rowId, clock.now()));
     }
 
     /**
@@ -186,18 +189,18 @@ public abstract class AbstractMvPartitionStorageTest 
extends BaseMvStoragesTest
     }
 
     /**
-     * Tests basic invariants of {@link MvPartitionStorage#commitWrite(RowId, 
Timestamp)}.
+     * Tests basic invariants of {@link MvPartitionStorage#commitWrite(RowId, 
HybridTimestamp)}.
      */
     @Test
     public void testCommitWrite() {
         RowId rowId = insert(binaryRow, txId);
 
-        Timestamp tsBefore = Timestamp.nextVersion();
+        HybridTimestamp tsBefore = clock.now();
 
-        Timestamp tsExact = Timestamp.nextVersion();
+        HybridTimestamp tsExact = clock.now();
         commitWrite(rowId, tsExact);
 
-        Timestamp tsAfter = Timestamp.nextVersion();
+        HybridTimestamp tsAfter = clock.now();
 
         // Row is invisible at the time before writing.
         assertNull(read(rowId, tsBefore));
@@ -220,17 +223,17 @@ public abstract class AbstractMvPartitionStorageTest 
extends BaseMvStoragesTest
 
         assertRowMatches(read(rowId, tsExact), binaryRow);
         assertRowMatches(read(rowId, tsAfter), binaryRow);
-        assertRowMatches(read(rowId, Timestamp.nextVersion()), binaryRow);
+        assertRowMatches(read(rowId, clock.now()), binaryRow);
 
         // Only latest time behavior changes after commit.
-        commitWrite(rowId, Timestamp.nextVersion());
+        commitWrite(rowId, clock.now());
 
         assertRowMatches(read(rowId, newTxId), newRow);
 
         assertRowMatches(read(rowId, tsExact), binaryRow);
         assertRowMatches(read(rowId, tsAfter), binaryRow);
 
-        assertRowMatches(read(rowId, Timestamp.nextVersion()), newRow);
+        assertRowMatches(read(rowId, clock.now()), newRow);
 
         // Remove.
         UUID removeTxId = newTransactionId();
@@ -244,24 +247,24 @@ public abstract class AbstractMvPartitionStorageTest 
extends BaseMvStoragesTest
         assertRowMatches(read(rowId, tsExact), binaryRow);
         assertRowMatches(read(rowId, tsAfter), binaryRow);
 
-        assertRowMatches(read(rowId, Timestamp.nextVersion()), newRow);
+        assertRowMatches(read(rowId, clock.now()), newRow);
 
         // Commit remove.
-        Timestamp removeTs = Timestamp.nextVersion();
+        HybridTimestamp removeTs = clock.now();
         commitWrite(rowId, removeTs);
 
         assertNull(read(rowId, tsBefore));
 
         assertNull(read(rowId, removeTxId));
         assertNull(read(rowId, removeTs));
-        assertNull(read(rowId, Timestamp.nextVersion()));
+        assertNull(read(rowId, clock.now()));
 
         assertRowMatches(read(rowId, tsExact), binaryRow);
         assertRowMatches(read(rowId, tsAfter), binaryRow);
     }
 
     /**
-     * Tests basic invariants of {@link MvPartitionStorage#scan(Predicate, 
Timestamp)}.
+     * Tests basic invariants of {@link MvPartitionStorage#scan(Predicate, 
HybridTimestamp)}.
      */
     @Test
     public void testScan() throws Exception {
@@ -284,17 +287,17 @@ public abstract class AbstractMvPartitionStorageTest 
extends BaseMvStoragesTest
         assertEquals(List.of(value1), convert(storage.scan(row -> 
key(row).intKey == 1, txId1)));
         assertEquals(List.of(value2), convert(storage.scan(row -> 
key(row).intKey == 2, txId2)));
 
-        Timestamp ts1 = Timestamp.nextVersion();
+        HybridTimestamp ts1 = clock.now();
 
-        Timestamp ts2 = Timestamp.nextVersion();
+        HybridTimestamp ts2 = clock.now();
         commitWrite(rowId1, ts2);
 
-        Timestamp ts3 = Timestamp.nextVersion();
+        HybridTimestamp ts3 = clock.now();
 
-        Timestamp ts4 = Timestamp.nextVersion();
+        HybridTimestamp ts4 = clock.now();
         commitWrite(rowId2, ts4);
 
-        Timestamp ts5 = Timestamp.nextVersion();
+        HybridTimestamp ts5 = clock.now();
 
         // Full scan with various timestamp values.
         assertEquals(List.of(), convert(scan(row -> true, ts1)));
@@ -313,10 +316,10 @@ public abstract class AbstractMvPartitionStorageTest 
extends BaseMvStoragesTest
         TestValue value2 = new TestValue(20, "yyy");
 
         RowId rowId1 = insert(binaryRow(new TestKey(1, "1"), value1), txId);
-        commitWrite(rowId1, Timestamp.nextVersion());
+        commitWrite(rowId1, clock.now());
 
         RowId rowId2 = insert(binaryRow(new TestKey(2, "2"), value2), txId);
-        commitWrite(rowId2, Timestamp.nextVersion());
+        commitWrite(rowId2, clock.now());
 
         Cursor<BinaryRow> cursor = scan(row -> true, txId);
 
@@ -373,7 +376,7 @@ public abstract class AbstractMvPartitionStorageTest 
extends BaseMvStoragesTest
     @Test
     void readOfCommittedRowWithAnyTransactionIdReturnsTheRow() {
         RowId rowId = insert(binaryRow, txId);
-        commitWrite(rowId, Timestamp.nextVersion());
+        commitWrite(rowId, clock.now());
 
         BinaryRow foundRow = read(rowId, newTransactionId());
 
@@ -383,7 +386,7 @@ public abstract class AbstractMvPartitionStorageTest 
extends BaseMvStoragesTest
     @Test
     void readsUncommittedVersionEvenWhenThereIsCommittedVersionBeforeIt() {
         RowId rowId1 = insert(binaryRow, txId);
-        commitWrite(rowId1, Timestamp.nextVersion());
+        commitWrite(rowId1, clock.now());
 
         RowId rowId2 = insert(binaryRow2, txId);
 
@@ -395,10 +398,10 @@ public abstract class AbstractMvPartitionStorageTest 
extends BaseMvStoragesTest
     @Test
     void readsCommittedVersionEvenWhenThereIsCommittedVersionBeforeIt() {
         RowId rowId1 = insert(binaryRow, txId);
-        commitWrite(rowId1, Timestamp.nextVersion());
+        commitWrite(rowId1, clock.now());
 
         RowId rowId2 = insert(binaryRow2, txId);
-        commitWrite(rowId2, Timestamp.nextVersion());
+        commitWrite(rowId2, clock.now());
 
         BinaryRow foundRow = read(rowId2, txId);
 
@@ -408,7 +411,7 @@ public abstract class AbstractMvPartitionStorageTest 
extends BaseMvStoragesTest
     @Test
     void readByExactlyCommitTimestampFindsRow() {
         RowId rowId = insert(binaryRow, txId);
-        Timestamp commitTimestamp = Timestamp.nextVersion();
+        HybridTimestamp commitTimestamp = clock.now();
         commitWrite(rowId, commitTimestamp);
 
         BinaryRow foundRow = read(rowId, commitTimestamp);
@@ -419,10 +422,10 @@ public abstract class AbstractMvPartitionStorageTest 
extends BaseMvStoragesTest
     @Test
     void readByTimestampAfterCommitTimestampFindsRow() {
         RowId rowId = insert(binaryRow, txId);
-        Timestamp commitTimestamp = Timestamp.nextVersion();
+        HybridTimestamp commitTimestamp = clock.now();
         commitWrite(rowId, commitTimestamp);
 
-        Timestamp afterCommit = Timestamp.nextVersion();
+        HybridTimestamp afterCommit = clock.now();
         BinaryRow foundRow = read(rowId, afterCommit);
 
         assertRowMatches(foundRow, binaryRow);
@@ -430,10 +433,10 @@ public abstract class AbstractMvPartitionStorageTest 
extends BaseMvStoragesTest
 
     @Test
     void readByTimestampBeforeFirstVersionCommitTimestampFindsNothing() {
-        Timestamp beforeCommit = Timestamp.nextVersion();
+        HybridTimestamp beforeCommit = clock.now();
 
         RowId rowId = insert(binaryRow, txId);
-        Timestamp commitTimestamp = Timestamp.nextVersion();
+        HybridTimestamp commitTimestamp = clock.now();
         commitWrite(rowId, commitTimestamp);
 
         BinaryRow foundRow = read(rowId, beforeCommit);
@@ -444,11 +447,11 @@ public abstract class AbstractMvPartitionStorageTest 
extends BaseMvStoragesTest
     @Test
     void readByTimestampOfLastVersionFindsLastVersion() {
         RowId rowId = insert(binaryRow, txId);
-        Timestamp firstVersionTs = Timestamp.nextVersion();
+        HybridTimestamp firstVersionTs = clock.now();
         commitWrite(rowId, firstVersionTs);
 
         addWrite(rowId, binaryRow2, newTransactionId());
-        Timestamp secondVersionTs = Timestamp.nextVersion();
+        HybridTimestamp secondVersionTs = clock.now();
         commitWrite(rowId, secondVersionTs);
 
         BinaryRow foundRow = read(rowId, secondVersionTs);
@@ -459,11 +462,11 @@ public abstract class AbstractMvPartitionStorageTest 
extends BaseMvStoragesTest
     @Test
     void readByTimestampOfPreviousVersionFindsPreviousVersion() {
         RowId rowId = insert(binaryRow, txId);
-        Timestamp firstVersionTs = Timestamp.nextVersion();
+        HybridTimestamp firstVersionTs = clock.now();
         commitWrite(rowId, firstVersionTs);
 
         addWrite(rowId, binaryRow2, newTransactionId());
-        commitWrite(rowId, Timestamp.nextVersion());
+        commitWrite(rowId, clock.now());
 
         BinaryRow foundRow = read(rowId, firstVersionTs);
 
@@ -473,13 +476,13 @@ public abstract class AbstractMvPartitionStorageTest 
extends BaseMvStoragesTest
     @Test
     void readByTimestampBetweenVersionsFindsPreviousVersion() {
         RowId rowId = insert(binaryRow, txId);
-        Timestamp firstVersionTs = Timestamp.nextVersion();
+        HybridTimestamp firstVersionTs = clock.now();
         commitWrite(rowId, firstVersionTs);
 
-        Timestamp tsInBetween = Timestamp.nextVersion();
+        HybridTimestamp tsInBetween = clock.now();
 
         addWrite(rowId, binaryRow2, newTransactionId());
-        commitWrite(rowId, Timestamp.nextVersion());
+        commitWrite(rowId, clock.now());
 
         BinaryRow foundRow = read(rowId, tsInBetween);
 
@@ -489,11 +492,11 @@ public abstract class AbstractMvPartitionStorageTest 
extends BaseMvStoragesTest
     @Test
     void readByTimestampIgnoresUncommittedVersion() {
         RowId rowId = insert(binaryRow, newTransactionId());
-        commitWrite(rowId, Timestamp.nextVersion());
+        commitWrite(rowId, clock.now());
 
         addWrite(rowId, binaryRow2, newTransactionId());
 
-        Timestamp latestTs = Timestamp.nextVersion();
+        HybridTimestamp latestTs = clock.now();
         BinaryRow foundRow = read(rowId, latestTs);
 
         assertRowMatches(foundRow, binaryRow);
@@ -529,7 +532,7 @@ public abstract class AbstractMvPartitionStorageTest 
extends BaseMvStoragesTest
     @Test
     void addWriteReturnsNullIfNoUncommittedVersionExists() {
         RowId rowId = insert(binaryRow, newTransactionId());
-        commitWrite(rowId, Timestamp.nextVersion());
+        commitWrite(rowId, clock.now());
 
         BinaryRow returnedRow = addWrite(rowId, binaryRow2, txId);
 
@@ -539,7 +542,7 @@ public abstract class AbstractMvPartitionStorageTest 
extends BaseMvStoragesTest
     @Test
     void afterRemovalReadWithTxIdFindsNothing() {
         RowId rowId = insert(binaryRow, newTransactionId());
-        commitWrite(rowId, Timestamp.nextVersion());
+        commitWrite(rowId, clock.now());
 
         addWrite(rowId, null, txId);
 
@@ -551,12 +554,12 @@ public abstract class AbstractMvPartitionStorageTest 
extends BaseMvStoragesTest
     @Test
     void afterRemovalReadByLatestTimestampFindsNothing() {
         RowId rowId = insert(binaryRow, newTransactionId());
-        commitWrite(rowId, Timestamp.nextVersion());
+        commitWrite(rowId, clock.now());
 
         addWrite(rowId, null, newTransactionId());
-        commitWrite(rowId, Timestamp.nextVersion());
+        commitWrite(rowId, clock.now());
 
-        BinaryRow foundRow = read(rowId, Timestamp.nextVersion());
+        BinaryRow foundRow = read(rowId, clock.now());
 
         assertThat(foundRow, is(nullValue()));
     }
@@ -564,11 +567,11 @@ public abstract class AbstractMvPartitionStorageTest 
extends BaseMvStoragesTest
     @Test
     void afterRemovalPreviousVersionRemainsAccessibleByTimestamp() {
         RowId rowId = insert(binaryRow, newTransactionId());
-        Timestamp firstTimestamp = Timestamp.nextVersion();
+        HybridTimestamp firstTimestamp = clock.now();
         commitWrite(rowId, firstTimestamp);
 
         addWrite(rowId, null, newTransactionId());
-        commitWrite(rowId, Timestamp.nextVersion());
+        commitWrite(rowId, clock.now());
 
         BinaryRow foundRow = read(rowId, firstTimestamp);
 
@@ -587,7 +590,7 @@ public abstract class AbstractMvPartitionStorageTest 
extends BaseMvStoragesTest
     @Test
     void removalReturnsNullIfNoUncommittedVersionExists() {
         RowId rowId = insert(binaryRow, newTransactionId());
-        commitWrite(rowId, Timestamp.nextVersion());
+        commitWrite(rowId, clock.now());
 
         BinaryRow rowFromRemoval = addWrite(rowId, null, newTransactionId());
 
@@ -598,9 +601,9 @@ public abstract class AbstractMvPartitionStorageTest 
extends BaseMvStoragesTest
     void commitWriteMakesVersionAvailableToReadByTimestamp() {
         RowId rowId = insert(binaryRow, txId);
 
-        commitWrite(rowId, Timestamp.nextVersion());
+        commitWrite(rowId, clock.now());
 
-        BinaryRow foundRow = read(rowId, Timestamp.nextVersion());
+        BinaryRow foundRow = read(rowId, clock.now());
 
         assertRowMatches(foundRow, binaryRow);
     }
@@ -608,13 +611,13 @@ public abstract class AbstractMvPartitionStorageTest 
extends BaseMvStoragesTest
     @Test
     void commitAndAbortWriteNoOpIfNoUncommittedVersionExists() {
         RowId rowId = insert(binaryRow, newTransactionId());
-        commitWrite(rowId, Timestamp.nextVersion());
+        commitWrite(rowId, clock.now());
 
         abortWrite(rowId);
 
         assertRowMatches(read(rowId, newTransactionId()), binaryRow);
 
-        commitWrite(rowId, Timestamp.nextVersion());
+        commitWrite(rowId, clock.now());
 
         assertRowMatches(read(rowId, newTransactionId()), binaryRow);
     }
@@ -622,7 +625,7 @@ public abstract class AbstractMvPartitionStorageTest 
extends BaseMvStoragesTest
     @Test
     void abortWriteRemovesUncommittedVersion() {
         RowId rowId = insert(binaryRow, newTransactionId());
-        commitWrite(rowId, Timestamp.nextVersion());
+        commitWrite(rowId, clock.now());
 
         addWrite(rowId, binaryRow2, txId);
 
@@ -639,7 +642,7 @@ public abstract class AbstractMvPartitionStorageTest 
extends BaseMvStoragesTest
 
         abortWrite(rowId);
 
-        BinaryRow foundRow = read(rowId, Timestamp.nextVersion());
+        BinaryRow foundRow = read(rowId, clock.now());
 
         assertThat(foundRow, is(nullValue()));
     }
@@ -675,7 +678,7 @@ public abstract class AbstractMvPartitionStorageTest 
extends BaseMvStoragesTest
     void 
readByTimestampWorksCorrectlyAfterCommitAndAbortFollowedByUncommittedWrite() {
         RowId rowId = commitAbortAndAddUncommitted();
 
-        BinaryRow foundRow = storage.read(rowId, Timestamp.nextVersion());
+        BinaryRow foundRow = storage.read(rowId, clock.now());
 
         assertRowMatches(foundRow, binaryRow);
     }
@@ -684,7 +687,7 @@ public abstract class AbstractMvPartitionStorageTest 
extends BaseMvStoragesTest
         return storage.runConsistently(() -> {
             RowId rowId = storage.insert(binaryRow, txId);
 
-            storage.commitWrite(rowId, Timestamp.nextVersion());
+            storage.commitWrite(rowId, clock.now());
 
             storage.addWrite(rowId, binaryRow2, newTransactionId());
             storage.abortWrite(rowId);
@@ -701,7 +704,7 @@ public abstract class AbstractMvPartitionStorageTest 
extends BaseMvStoragesTest
     void 
scanByTimestampWorksCorrectlyAfterCommitAndAbortFollowedByUncommittedWrite() 
throws Exception {
         commitAbortAndAddUncommitted();
 
-        try (Cursor<BinaryRow> cursor = storage.scan(k -> true, 
Timestamp.nextVersion())) {
+        try (Cursor<BinaryRow> cursor = storage.scan(k -> true, clock.now())) {
             BinaryRow foundRow = cursor.next();
 
             assertRowMatches(foundRow, binaryRow);
@@ -714,7 +717,7 @@ public abstract class AbstractMvPartitionStorageTest 
extends BaseMvStoragesTest
     void readByTimestampWorksCorrectlyIfNoUncommittedValueExists() {
         RowId rowId = insert(binaryRow, txId);
 
-        BinaryRow foundRow = read(rowId, Timestamp.nextVersion());
+        BinaryRow foundRow = read(rowId, clock.now());
 
         assertThat(foundRow, is(nullValue()));
     }
diff --git 
a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/TestMvPartitionStorageTest.java
 
b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/TestMvPartitionStorageTest.java
index de48fe2b1a..175780712f 100644
--- 
a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/TestMvPartitionStorageTest.java
+++ 
b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/TestMvPartitionStorageTest.java
@@ -27,6 +27,6 @@ public class TestMvPartitionStorageTest extends 
AbstractMvPartitionStorageTest {
      * Creates new instance.
      */
     public TestMvPartitionStorageTest() {
-        storage = new TestConcurrentHashMapMvPartitionStorage(0);
+        storage = new TestConcurrentHashMapMvPartitionStorage(PARTITION_ID);
     }
 }
diff --git 
a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvPartitionStorage.java
 
b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvPartitionStorage.java
index 15a22fbf67..7ebd2c91c0 100644
--- 
a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvPartitionStorage.java
+++ 
b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvPartitionStorage.java
@@ -26,12 +26,12 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.function.BiConsumer;
 import java.util.function.Predicate;
+import org.apache.ignite.hlc.HybridTimestamp;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.TxIdMismatchException;
-import org.apache.ignite.internal.tx.Timestamp;
 import org.apache.ignite.internal.util.Cursor;
 import org.jetbrains.annotations.Nullable;
 
@@ -51,11 +51,11 @@ public class TestConcurrentHashMapMvPartitionStorage 
implements MvPartitionStora
 
     private static class VersionChain {
         final @Nullable BinaryRow row;
-        final @Nullable Timestamp begin;
+        final @Nullable HybridTimestamp begin;
         final @Nullable UUID txId;
         final @Nullable VersionChain next;
 
-        VersionChain(@Nullable BinaryRow row, @Nullable Timestamp begin, 
@Nullable UUID txId, @Nullable VersionChain next) {
+        VersionChain(@Nullable BinaryRow row, @Nullable HybridTimestamp begin, 
@Nullable UUID txId, @Nullable VersionChain next) {
             this.row = row;
             this.begin = begin;
             this.txId = txId;
@@ -66,7 +66,7 @@ public class TestConcurrentHashMapMvPartitionStorage 
implements MvPartitionStora
             return new VersionChain(row, null, txId, next);
         }
 
-        static VersionChain createCommitted(@Nullable Timestamp timestamp, 
VersionChain uncommittedVersionChain) {
+        static VersionChain createCommitted(@Nullable HybridTimestamp 
timestamp, VersionChain uncommittedVersionChain) {
             return new VersionChain(uncommittedVersionChain.row, timestamp, 
null, uncommittedVersionChain.next);
         }
 
@@ -161,7 +161,7 @@ public class TestConcurrentHashMapMvPartitionStorage 
implements MvPartitionStora
 
     /** {@inheritDoc} */
     @Override
-    public void commitWrite(RowId rowId, Timestamp timestamp) {
+    public void commitWrite(RowId rowId, HybridTimestamp timestamp) {
         map.compute(rowId, (ignored, versionChain) -> {
             assert versionChain != null;
 
@@ -183,7 +183,7 @@ public class TestConcurrentHashMapMvPartitionStorage 
implements MvPartitionStora
 
     /** {@inheritDoc} */
     @Override
-    public @Nullable BinaryRow read(RowId rowId, @Nullable Timestamp 
timestamp) {
+    public @Nullable BinaryRow read(RowId rowId, @Nullable HybridTimestamp 
timestamp) {
         VersionChain versionChain = map.get(rowId);
 
         return read(versionChain, timestamp, null, null);
@@ -192,7 +192,7 @@ public class TestConcurrentHashMapMvPartitionStorage 
implements MvPartitionStora
     @Nullable
     private static BinaryRow read(
             VersionChain versionChain,
-            @Nullable Timestamp timestamp,
+            @Nullable HybridTimestamp timestamp,
             @Nullable UUID txId,
             @Nullable Predicate<BinaryRow> filter
     ) {
@@ -252,7 +252,7 @@ public class TestConcurrentHashMapMvPartitionStorage 
implements MvPartitionStora
 
     /** {@inheritDoc} */
     @Override
-    public Cursor<BinaryRow> scan(Predicate<BinaryRow> filter, Timestamp 
timestamp) {
+    public Cursor<BinaryRow> scan(Predicate<BinaryRow> filter, HybridTimestamp 
timestamp) {
         Iterator<BinaryRow> iterator = map.values().stream()
                 .map(versionChain -> read(versionChain, timestamp, null, 
filter))
                 .filter(Objects::nonNull)
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
index 823ec149e9..0261dde770 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
@@ -23,6 +23,7 @@ import java.util.UUID;
 import java.util.function.BiConsumer;
 import java.util.function.Predicate;
 import org.apache.ignite.configuration.schemas.table.TableView;
+import org.apache.ignite.hlc.HybridTimestamp;
 import org.apache.ignite.internal.pagememory.PageMemory;
 import org.apache.ignite.internal.pagememory.datapage.DataPageReader;
 import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolderNoOp;
@@ -32,7 +33,6 @@ import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.TxIdMismatchException;
-import org.apache.ignite.internal.tx.Timestamp;
 import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.internal.util.IgniteCursor;
 import org.apache.ignite.lang.IgniteInternalCheckedException;
@@ -48,7 +48,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage 
implements MvPartitio
 
     private static final Predicate<BinaryRow> MATCH_ALL = row -> true;
 
-    private static final Predicate<Timestamp> ALWAYS_LOAD_VALUE = timestamp -> 
true;
+    private static final Predicate<HybridTimestamp> ALWAYS_LOAD_VALUE = 
timestamp -> true;
 
     private final int partitionId;
     private final int groupId;
@@ -97,7 +97,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage 
implements MvPartitio
 
     /** {@inheritDoc} */
     @Override
-    public @Nullable BinaryRow read(RowId rowId, Timestamp timestamp) throws 
StorageException {
+    public @Nullable BinaryRow read(RowId rowId, HybridTimestamp timestamp) 
throws StorageException {
         VersionChain versionChain = findVersionChain(rowId);
 
         if (versionChain == null) {
@@ -129,7 +129,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage 
implements MvPartitio
         return row;
     }
 
-    private RowVersion readRowVersion(long nextLink, Predicate<Timestamp> 
loadValue) {
+    private RowVersion readRowVersion(long nextLink, 
Predicate<HybridTimestamp> loadValue) {
         ReadRowVersion read = new ReadRowVersion(partitionId);
 
         try {
@@ -158,7 +158,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage 
implements MvPartitio
     private @Nullable ByteBufferRow findRowVersionInChain(
             VersionChain versionChain,
             @Nullable UUID transactionId,
-            @Nullable Timestamp timestamp,
+            @Nullable HybridTimestamp timestamp,
             Predicate<BinaryRow> keyFilter
     ) {
         assert transactionId != null ^ timestamp != null;
@@ -172,7 +172,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage 
implements MvPartitio
         }
     }
 
-    private @Nullable ByteBufferRow findRowVersionByTimestamp(VersionChain 
versionChain, Timestamp timestamp) {
+    private @Nullable ByteBufferRow findRowVersionByTimestamp(VersionChain 
versionChain, HybridTimestamp timestamp) {
         if (!versionChain.hasCommittedVersions()) {
             return null;
         }
@@ -298,7 +298,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage 
implements MvPartitio
 
     /** {@inheritDoc} */
     @Override
-    public void commitWrite(RowId rowId, Timestamp timestamp) throws 
StorageException {
+    public void commitWrite(RowId rowId, HybridTimestamp timestamp) throws 
StorageException {
         VersionChain currentVersionChain = findVersionChain(rowId);
 
         if (currentVersionChain == null || currentVersionChain.transactionId() 
== null) {
@@ -352,12 +352,12 @@ public abstract class 
AbstractPageMemoryMvPartitionStorage implements MvPartitio
 
     /** {@inheritDoc} */
     @Override
-    public Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, Timestamp 
timestamp) throws StorageException {
+    public Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, 
HybridTimestamp timestamp) throws StorageException {
         return internalScan(keyFilter, null, timestamp);
     }
 
-    private Cursor<BinaryRow> internalScan(Predicate<BinaryRow> keyFilter, 
@Nullable UUID transactionId, @Nullable Timestamp timestamp) {
-        assert transactionId != null ^ timestamp != null;
+    private Cursor<BinaryRow> internalScan(Predicate<BinaryRow> keyFilter, 
@Nullable UUID txId, @Nullable HybridTimestamp timestamp) {
+        assert txId != null ^ timestamp != null;
 
         IgniteCursor<VersionChain> treeCursor;
 
@@ -367,7 +367,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage 
implements MvPartitio
             throw new StorageException("Find failed", e);
         }
 
-        return new ScanCursor(treeCursor, keyFilter, transactionId, timestamp);
+        return new ScanCursor(treeCursor, keyFilter, txId, timestamp);
     }
 
     /** {@inheritDoc} */
@@ -408,7 +408,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage 
implements MvPartitio
 
         private final @Nullable UUID transactionId;
 
-        private final @Nullable Timestamp timestamp;
+        private final @Nullable HybridTimestamp timestamp;
 
         private BinaryRow nextRow = null;
 
@@ -418,7 +418,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage 
implements MvPartitio
                 IgniteCursor<VersionChain> treeCursor,
                 Predicate<BinaryRow> keyFilter,
                 @Nullable UUID transactionId,
-                @Nullable Timestamp timestamp
+                @Nullable HybridTimestamp timestamp
         ) {
             this.treeCursor = treeCursor;
             this.keyFilter = keyFilter;
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/HybridTimestamps.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/HybridTimestamps.java
new file mode 100644
index 0000000000..c756364ce1
--- /dev/null
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/HybridTimestamps.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import static org.apache.ignite.hlc.HybridTimestamp.HYBRID_TIMESTAMP_SIZE;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.getInt;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.getLong;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.putInt;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.putLong;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.hlc.HybridTimestamp;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Code to work with {@link HybridTimestamp}s.
+ */
+public class HybridTimestamps {
+    /**
+     * Physical time component to store for {@code null} hybrid timestamp 
values.
+     */
+    private static final long NULL_PHYSICAL_TIME = 0L;
+
+    /**
+     * Logical time component to store for {@code null} hybrid timestamp 
values.
+     */
+    private static final int NULL_LOGICAL_TIME = 0;
+
+    /**
+     * Reads a {@link HybridTimestamp} value from memory.
+     *
+     * @param pageAddr Address where page data starts.
+     * @param offset Offset to the timestamp value relative to pageAddr.
+     */
+    static @Nullable HybridTimestamp readTimestamp(long pageAddr, int offset) {
+        long physical = getLong(pageAddr, offset);
+        int logical = getInt(pageAddr, offset + Long.BYTES);
+
+        if (physical == NULL_PHYSICAL_TIME && logical == NULL_LOGICAL_TIME) {
+            return null;
+        }
+
+        return new HybridTimestamp(physical, logical);
+    }
+
+    /**
+     * Writes a {@link HybridTimestamp} to memory starting at the given 
address + offset.
+     *
+     * @param addr Memory address.
+     * @param offset Offset added to the address.
+     * @param timestamp The timestamp to write.
+     * @return Number of bytes written.
+     */
+    public static int writeTimestampToMemory(long addr, int offset, @Nullable 
HybridTimestamp timestamp) {
+        if (timestamp == null) {
+            putLong(addr, offset, NULL_PHYSICAL_TIME);
+
+            putInt(addr, offset + Long.BYTES, NULL_LOGICAL_TIME);
+        } else {
+            putLong(addr, offset, timestamp.getPhysical());
+
+            putInt(addr, offset + Long.BYTES, timestamp.getLogical());
+        }
+
+        return HYBRID_TIMESTAMP_SIZE;
+    }
+
+    /**
+     * Writes a {@link HybridTimestamp} to a buffer.
+     *
+     * @param buffer Buffer to which to write.
+     * @param timestamp The timestamp to write.
+     */
+    public static void writeTimestampToBuffer(ByteBuffer buffer, @Nullable 
HybridTimestamp timestamp) {
+        if (timestamp == null) {
+            buffer.putLong(NULL_PHYSICAL_TIME)
+                    .putInt(NULL_LOGICAL_TIME);
+        } else {
+            buffer.putLong(timestamp.getPhysical())
+                    .putInt(timestamp.getLogical());
+        }
+    }
+}
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadRowVersion.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadRowVersion.java
index bb6f7bd3cb..3441318ef6 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadRowVersion.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadRowVersion.java
@@ -21,17 +21,17 @@ import static 
org.apache.ignite.internal.storage.pagememory.mv.PartitionlessLink
 
 import java.nio.ByteBuffer;
 import java.util.function.Predicate;
+import org.apache.ignite.hlc.HybridTimestamp;
 import org.apache.ignite.internal.pagememory.datapage.PageMemoryTraversal;
 import org.apache.ignite.internal.pagememory.io.DataPagePayload;
 import org.apache.ignite.internal.pagememory.util.PageIdUtils;
 import org.apache.ignite.internal.schema.ByteBufferRow;
-import org.apache.ignite.internal.tx.Timestamp;
 import org.jetbrains.annotations.Nullable;
 
 /**
  * Traversal for reading a row version by its link. Loads the version value 
conditionally.
  */
-class ReadRowVersion implements PageMemoryTraversal<Predicate<Timestamp>> {
+class ReadRowVersion implements 
PageMemoryTraversal<Predicate<HybridTimestamp>> {
     private final int partitionId;
 
     private RowVersion result;
@@ -40,7 +40,7 @@ class ReadRowVersion implements 
PageMemoryTraversal<Predicate<Timestamp>> {
 
     private long firstFragmentLink;
 
-    private @Nullable Timestamp timestamp;
+    private @Nullable HybridTimestamp timestamp;
 
     private long nextLink;
 
@@ -52,7 +52,7 @@ class ReadRowVersion implements 
PageMemoryTraversal<Predicate<Timestamp>> {
 
     /** {@inheritDoc} */
     @Override
-    public long consumePagePayload(long link, long pageAddr, DataPagePayload 
payload, Predicate<Timestamp> loadValue) {
+    public long consumePagePayload(long link, long pageAddr, DataPagePayload 
payload, Predicate<HybridTimestamp> loadValue) {
         if (readingFirstSlot) {
             readingFirstSlot = false;
 
@@ -62,10 +62,10 @@ class ReadRowVersion implements 
PageMemoryTraversal<Predicate<Timestamp>> {
         }
     }
 
-    private long readFullOrInitiateReadFragmented(long link, long pageAddr, 
DataPagePayload payload, Predicate<Timestamp> loadValue) {
+    private long readFullOrInitiateReadFragmented(long link, long pageAddr, 
DataPagePayload payload, Predicate<HybridTimestamp> loadValue) {
         firstFragmentLink = link;
 
-        timestamp = Timestamps.readTimestamp(pageAddr, payload.offset() + 
RowVersion.TIMESTAMP_OFFSET);
+        timestamp = HybridTimestamps.readTimestamp(pageAddr, payload.offset() 
+ RowVersion.TIMESTAMP_OFFSET);
         nextLink = readPartitionlessLink(partitionId, pageAddr, 
payload.offset() + RowVersion.NEXT_LINK_OFFSET);
 
         if (!loadValue.test(timestamp)) {
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersion.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersion.java
index fc4baf5bf0..2b13ab7ccd 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersion.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersion.java
@@ -17,33 +17,31 @@
 
 package org.apache.ignite.internal.storage.pagememory.mv;
 
+import static org.apache.ignite.hlc.HybridTimestamp.HYBRID_TIMESTAMP_SIZE;
+
 import java.nio.ByteBuffer;
 import java.util.Objects;
+import org.apache.ignite.hlc.HybridTimestamp;
 import org.apache.ignite.internal.pagememory.Storable;
 import org.apache.ignite.internal.pagememory.io.AbstractDataPageIo;
 import org.apache.ignite.internal.pagememory.io.IoVersions;
 import org.apache.ignite.internal.storage.pagememory.mv.io.RowVersionDataIo;
 import org.apache.ignite.internal.tostring.IgniteToStringExclude;
 import org.apache.ignite.internal.tostring.S;
-import org.apache.ignite.internal.tx.Timestamp;
 import org.jetbrains.annotations.Nullable;
 
 /**
  * Represents row version inside row version chain.
  */
-public class RowVersion implements Storable {
-    /** A 'timestamp' representing absense of a timestamp. */
-    public static final Timestamp NULL_TIMESTAMP = new 
Timestamp(Long.MIN_VALUE, Long.MIN_VALUE);
-
+public final class RowVersion implements Storable {
     /** Represents an absent partitionless link. */
     public static final long NULL_LINK = 0;
 
-    private static final int TIMESTAMP_STORE_SIZE_BYTES = 2 * Long.BYTES;
     private static final int NEXT_LINK_STORE_SIZE_BYTES = 
PartitionlessLinks.PARTITIONLESS_LINK_SIZE_BYTES;
     private static final int VALUE_SIZE_STORE_SIZE_BYTES = Integer.BYTES;
 
     public static final int TIMESTAMP_OFFSET = 0;
-    public static final int NEXT_LINK_OFFSET = TIMESTAMP_STORE_SIZE_BYTES;
+    public static final int NEXT_LINK_OFFSET = TIMESTAMP_OFFSET + 
HYBRID_TIMESTAMP_SIZE;
     public static final int VALUE_SIZE_OFFSET = NEXT_LINK_OFFSET + 
NEXT_LINK_STORE_SIZE_BYTES;
     public static final int VALUE_OFFSET = VALUE_SIZE_OFFSET + 
VALUE_SIZE_STORE_SIZE_BYTES;
 
@@ -51,7 +49,7 @@ public class RowVersion implements Storable {
 
     private long link;
 
-    private final @Nullable Timestamp timestamp;
+    private final @Nullable HybridTimestamp timestamp;
 
     private final long nextLink;
 
@@ -70,30 +68,20 @@ public class RowVersion implements Storable {
     /**
      * Constructor.
      */
-    public RowVersion(int partitionId, long link, @Nullable Timestamp 
timestamp, long nextLink, @Nullable ByteBuffer value) {
+    public RowVersion(int partitionId, long link, @Nullable HybridTimestamp 
timestamp, long nextLink, @Nullable ByteBuffer value) {
         this.partitionId = partitionId;
         link(link);
 
-        assert !NULL_TIMESTAMP.equals(timestamp) : "Null timestamp provided";
-
         this.timestamp = timestamp;
         this.nextLink = nextLink;
         this.valueSize = value == null ? -1 : value.limit();
         this.value = value;
     }
 
-    public @Nullable Timestamp timestamp() {
+    public @Nullable HybridTimestamp timestamp() {
         return timestamp;
     }
 
-    public Timestamp timestampForStorage() {
-        return timestampForStorage(timestamp);
-    }
-
-    static Timestamp timestampForStorage(@Nullable Timestamp timestamp) {
-        return timestamp == null ? NULL_TIMESTAMP : timestamp;
-    }
-
     /**
      * Returns partitionless link of the next version or {@code 0} if this 
version is the last in the chain (i.e. it's the oldest version).
      */
@@ -126,10 +114,6 @@ public class RowVersion implements Storable {
     }
 
     boolean isUncommitted() {
-        return isUncommitted(timestamp);
-    }
-
-    static boolean isUncommitted(Timestamp timestamp) {
         return timestamp == null;
     }
 
@@ -160,13 +144,13 @@ public class RowVersion implements Storable {
     public int size() {
         assert value != null;
 
-        return TIMESTAMP_STORE_SIZE_BYTES + NEXT_LINK_STORE_SIZE_BYTES + 
VALUE_SIZE_STORE_SIZE_BYTES + value.limit();
+        return headerSize() + value.limit();
     }
 
     /** {@inheritDoc} */
     @Override
     public int headerSize() {
-        return TIMESTAMP_STORE_SIZE_BYTES + NEXT_LINK_STORE_SIZE_BYTES + 
VALUE_SIZE_STORE_SIZE_BYTES;
+        return HYBRID_TIMESTAMP_SIZE + NEXT_LINK_STORE_SIZE_BYTES + 
VALUE_SIZE_STORE_SIZE_BYTES;
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersionFreeList.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersionFreeList.java
index 835a94e94d..a3a20abb06 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersionFreeList.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersionFreeList.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.storage.pagememory.mv;
 
 import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.hlc.HybridTimestamp;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.pagememory.PageMemory;
@@ -29,7 +30,6 @@ import org.apache.ignite.internal.pagememory.reuse.ReuseList;
 import org.apache.ignite.internal.pagememory.util.PageHandler;
 import org.apache.ignite.internal.pagememory.util.PageLockListener;
 import org.apache.ignite.internal.storage.pagememory.mv.io.RowVersionDataIo;
-import org.apache.ignite.internal.tx.Timestamp;
 import org.apache.ignite.lang.IgniteInternalCheckedException;
 import org.jetbrains.annotations.Nullable;
 
@@ -108,7 +108,7 @@ public class RowVersionFreeList extends 
AbstractFreeList<RowVersion> {
      * @param newTimestamp timestamp to set
      * @throws IgniteInternalCheckedException if something fails
      */
-    public void updateTimestamp(long link, Timestamp newTimestamp) throws 
IgniteInternalCheckedException {
+    public void updateTimestamp(long link, HybridTimestamp newTimestamp) 
throws IgniteInternalCheckedException {
         updateDataRow(link, updateTimestampHandler, newTimestamp, statHolder);
     }
 
@@ -122,7 +122,7 @@ public class RowVersionFreeList extends 
AbstractFreeList<RowVersion> {
         super.removeDataRowByLink(link, statHolder);
     }
 
-    private class UpdateTimestampHandler implements PageHandler<Timestamp, 
Object> {
+    private class UpdateTimestampHandler implements 
PageHandler<HybridTimestamp, Object> {
         /** {@inheritDoc} */
         @Override
         public Object run(
@@ -131,7 +131,7 @@ public class RowVersionFreeList extends 
AbstractFreeList<RowVersion> {
                 long page,
                 long pageAddr,
                 PageIo io,
-                Timestamp arg,
+                HybridTimestamp arg,
                 int itemId,
                 IoStatisticsHolder statHolder
         ) throws IgniteInternalCheckedException {
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ScanVersionChainByTimestamp.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ScanVersionChainByTimestamp.java
index 83ce3fee7b..569c2086aa 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ScanVersionChainByTimestamp.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ScanVersionChainByTimestamp.java
@@ -19,10 +19,10 @@ package org.apache.ignite.internal.storage.pagememory.mv;
 
 import static 
org.apache.ignite.internal.storage.pagememory.mv.PartitionlessLinks.readPartitionlessLink;
 
+import org.apache.ignite.hlc.HybridTimestamp;
 import org.apache.ignite.internal.pagememory.datapage.PageMemoryTraversal;
 import org.apache.ignite.internal.pagememory.io.DataPagePayload;
 import org.apache.ignite.internal.schema.ByteBufferRow;
-import org.apache.ignite.internal.tx.Timestamp;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -32,7 +32,7 @@ import org.jetbrains.annotations.Nullable;
  * <p>NB: this traversal first traverses starting data slots of the Version 
Chain one after another; when it finds the
  * version it needs, it switches to traversing the slots comprising the 
version (because it might be fragmented).
  */
-class ScanVersionChainByTimestamp implements PageMemoryTraversal<Timestamp> {
+class ScanVersionChainByTimestamp implements 
PageMemoryTraversal<HybridTimestamp> {
     private final int partitionId;
 
     /** Contains the result when the traversal ends. */
@@ -52,9 +52,9 @@ class ScanVersionChainByTimestamp implements 
PageMemoryTraversal<Timestamp> {
 
     /** {@inheritDoc} */
     @Override
-    public long consumePagePayload(long link, long pageAddr, DataPagePayload 
payload, Timestamp timestamp) {
+    public long consumePagePayload(long link, long pageAddr, DataPagePayload 
payload, HybridTimestamp timestamp) {
         if (lookingForVersion) {
-            Timestamp rowVersionTs = Timestamps.readTimestamp(pageAddr, 
payload.offset() + RowVersion.TIMESTAMP_OFFSET);
+            HybridTimestamp rowVersionTs = 
HybridTimestamps.readTimestamp(pageAddr, payload.offset() + 
RowVersion.TIMESTAMP_OFFSET);
 
             if (rowTimestampMatches(rowVersionTs, timestamp)) {
                 return readFullyOrStartReadingFragmented(link, pageAddr, 
payload);
@@ -67,8 +67,8 @@ class ScanVersionChainByTimestamp implements 
PageMemoryTraversal<Timestamp> {
         }
     }
 
-    private boolean rowTimestampMatches(Timestamp rowVersionTs, Timestamp 
timestamp) {
-        return rowVersionTs != null && rowVersionTs.beforeOrEquals(timestamp);
+    private boolean rowTimestampMatches(HybridTimestamp rowVersionTs, 
HybridTimestamp timestamp) {
+        return rowVersionTs != null && rowVersionTs.compareTo(timestamp) <= 0;
     }
 
     private long readFullyOrStartReadingFragmented(long link, long pageAddr, 
DataPagePayload payload) {
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/Timestamps.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/Timestamps.java
deleted file mode 100644
index bbe6ba61cc..0000000000
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/Timestamps.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.storage.pagememory.mv;
-
-import static org.apache.ignite.internal.pagememory.util.PageUtils.getLong;
-import static org.apache.ignite.internal.pagememory.util.PageUtils.putLong;
-
-import java.nio.ByteBuffer;
-import org.apache.ignite.internal.tx.Timestamp;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Code to work with {@link Timestamp}s.
- */
-public class Timestamps {
-    /**
-     * Reads a {@link Timestamp} value from memory.
-     *
-     * @param pageAddr Address where page data starts.
-     * @param offset Offset to the timestamp value relative to pageAddr.
-     */
-    static @Nullable Timestamp readTimestamp(long pageAddr, int offset) {
-        long nodeId = getLong(pageAddr, offset);
-        long localTimestamp = getLong(pageAddr, offset + Long.BYTES);
-
-        Timestamp timestamp = new Timestamp(localTimestamp, nodeId);
-        if (timestamp.equals(RowVersion.NULL_TIMESTAMP)) {
-            timestamp = null;
-        }
-
-        return timestamp;
-    }
-
-    /**
-     * Writes a {@link Timestamp} to memory starting at the given address + 
offset.
-     *
-     * @param addr Memory address.
-     * @param offset Offset added to the address.
-     * @param timestamp The timestamp to write.
-     * @return Number of bytes written.
-     */
-    public static int writeTimestampToMemory(long addr, int offset, @Nullable 
Timestamp timestamp) {
-        Timestamp timestampForStorage = 
RowVersion.timestampForStorage(timestamp);
-
-        putLong(addr, offset, timestampForStorage.getNodeId());
-        putLong(addr, offset + Long.BYTES, timestampForStorage.getTimestamp());
-
-        return 2 * Long.BYTES;
-    }
-
-    /**
-     * Writes a {@link Timestamp} to a buffer.
-     *
-     * @param buffer Buffer to which to write.
-     * @param timestamp The timestamp to write.
-     */
-    public static void writeTimestampToBuffer(ByteBuffer buffer, @Nullable 
Timestamp timestamp) {
-        Timestamp timestampForStorage = 
RowVersion.timestampForStorage(timestamp);
-
-        buffer.putLong(timestampForStorage.getNodeId());
-        buffer.putLong(timestampForStorage.getTimestamp());
-    }
-}
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/RowVersionDataIo.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/RowVersionDataIo.java
index 3f32cf0464..c7d8846112 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/RowVersionDataIo.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/RowVersionDataIo.java
@@ -23,12 +23,12 @@ import static 
org.apache.ignite.internal.pagememory.util.PageUtils.putShort;
 import static 
org.apache.ignite.internal.storage.pagememory.mv.PartitionlessLinks.writePartitionlessLink;
 
 import java.nio.ByteBuffer;
+import org.apache.ignite.hlc.HybridTimestamp;
 import org.apache.ignite.internal.pagememory.io.AbstractDataPageIo;
 import org.apache.ignite.internal.pagememory.io.IoVersions;
+import org.apache.ignite.internal.storage.pagememory.mv.HybridTimestamps;
 import org.apache.ignite.internal.storage.pagememory.mv.PartitionlessLinks;
 import org.apache.ignite.internal.storage.pagememory.mv.RowVersion;
-import org.apache.ignite.internal.storage.pagememory.mv.Timestamps;
-import org.apache.ignite.internal.tx.Timestamp;
 import org.apache.ignite.lang.IgniteStringBuilder;
 import org.jetbrains.annotations.Nullable;
 
@@ -61,7 +61,7 @@ public class RowVersionDataIo extends 
AbstractDataPageIo<RowVersion> {
         putShort(addr, 0, (short) payloadSize);
         addr += 2;
 
-        addr += Timestamps.writeTimestampToMemory(addr, 0, row.timestamp());
+        addr += HybridTimestamps.writeTimestampToMemory(addr, 0, 
row.timestamp());
 
         addr += writePartitionlessLink(addr, row.nextLink());
 
@@ -81,7 +81,7 @@ public class RowVersionDataIo extends 
AbstractDataPageIo<RowVersion> {
             assert row.headerSize() <= payloadSize : "Header must entirely fit 
in the first fragment, but header size is "
                     + row.headerSize() + " and payload size is " + payloadSize;
 
-            Timestamps.writeTimestampToBuffer(buf, row.timestamp());
+            HybridTimestamps.writeTimestampToBuffer(buf, row.timestamp());
 
             PartitionlessLinks.writeToBuffer(buf, row.nextLink());
 
@@ -119,10 +119,10 @@ public class RowVersionDataIo extends 
AbstractDataPageIo<RowVersion> {
      * @param pageSize  size of the page
      * @param timestamp timestamp to store
      */
-    public void updateTimestamp(long pageAddr, int itemId, int pageSize, 
@Nullable Timestamp timestamp) {
+    public void updateTimestamp(long pageAddr, int itemId, int pageSize, 
@Nullable HybridTimestamp timestamp) {
         int payloadOffset = getPayloadOffset(pageAddr, itemId, pageSize, 0);
 
-        Timestamps.writeTimestampToMemory(pageAddr, payloadOffset + 
RowVersion.TIMESTAMP_OFFSET, timestamp);
+        HybridTimestamps.writeTimestampToMemory(pageAddr, payloadOffset + 
RowVersion.TIMESTAMP_OFFSET, timestamp);
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorageTest.java
 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorageTest.java
index 60234d4823..1fb6f8ad7c 100644
--- 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorageTest.java
+++ 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorageTest.java
@@ -28,7 +28,6 @@ import 
org.apache.ignite.internal.storage.AbstractMvPartitionStorageTest;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.testframework.WorkDirectory;
 import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
-import org.apache.ignite.internal.tx.Timestamp;
 import org.apache.ignite.internal.util.Cursor;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -83,9 +82,9 @@ abstract class AbstractPageMemoryMvPartitionStorageTest 
extends AbstractMvPartit
 
         RowId rowId = insert(longRow, txId);
 
-        commitWrite(rowId, Timestamp.nextVersion());
+        commitWrite(rowId, clock.now());
 
-        BinaryRow foundRow = read(rowId, Timestamp.nextVersion());
+        BinaryRow foundRow = read(rowId, clock.now());
 
         assertRowMatches(foundRow, longRow);
     }
@@ -109,7 +108,7 @@ abstract class AbstractPageMemoryMvPartitionStorageTest 
extends AbstractMvPartit
 
         RowId rowId = insert(longRow, txId);
 
-        commitWrite(rowId, Timestamp.nextVersion());
+        commitWrite(rowId, clock.now());
 
         try (Cursor<BinaryRow> cursor = storage.scan(row -> true, txId)) {
             BinaryRow foundRow = cursor.next();
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
index d7bf03d9e8..327dfa26c0 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
@@ -23,6 +23,7 @@ import static java.nio.ByteOrder.BIG_ENDIAN;
 import static java.nio.ByteOrder.LITTLE_ENDIAN;
 import static java.util.Arrays.copyOf;
 import static java.util.Arrays.copyOfRange;
+import static org.apache.ignite.hlc.HybridTimestamp.HYBRID_TIMESTAMP_SIZE;
 import static org.rocksdb.ReadTier.PERSISTED_TIER;
 
 import java.nio.ByteBuffer;
@@ -33,13 +34,13 @@ import java.util.concurrent.CompletableFuture;
 import java.util.function.BiConsumer;
 import java.util.function.Predicate;
 import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.hlc.HybridTimestamp;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.ByteBufferRow;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.TxIdMismatchException;
-import org.apache.ignite.internal.tx.Timestamp;
 import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.internal.util.GridUnsafe;
@@ -61,16 +62,16 @@ import org.rocksdb.WriteOptions;
  * | partId (2 bytes, BE) | rowId (16 bytes, BE) |</code></pre>
  * or
  * <pre><code>
- * | partId (2 bytes, BE) | rowId (16 bytes, BE) | timestamp (16 bytes, DESC) 
|</code></pre>
+ * | partId (2 bytes, BE) | rowId (16 bytes, BE) | timestamp (12 bytes, DESC) 
|</code></pre>
  * depending on transaction status. Pending transactions data doesn't have a 
timestamp assigned.
  *
  * <p/>BE means Big Endian, meaning that lexicographical bytes order matches a 
natural order of partitions.
  *
- * <p/>DESC means that timestamps are sorted from newest to oldest (N2O). 
Please refer to {@link #putTimestamp(ByteBuffer, Timestamp)} to
- * see how it's achieved. Missing timestamp could be interpreted as a moment 
infinitely far away in the future.
+ * <p/>DESC means that timestamps are sorted from newest to oldest (N2O). 
Please refer to {@link #putTimestamp(ByteBuffer, HybridTimestamp)}
+ * to see how it's achieved. Missing timestamp could be interpreted as a 
moment infinitely far away in the future.
  */
 public class RocksDbMvPartitionStorage implements MvPartitionStorage {
-    /** Position of row id inside of the key. */
+    /** Position of row id inside the key. */
     private static final int ROW_ID_OFFSET = Short.BYTES;
 
     /** UUID size in bytes. */
@@ -79,14 +80,11 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
     /** Size of the key without timestamp. */
     private static final int ROW_PREFIX_SIZE = ROW_ID_OFFSET + ROW_ID_SIZE;
 
-    /** Timestamp size in bytes. */
-    private static final int TIMESTAMP_SIZE = 2 * Long.BYTES;
-
-    /** Transaction id size. Matches timestamp size. */
-    private static final int TX_ID_SIZE = TIMESTAMP_SIZE;
+    /** Transaction id size. */
+    private static final int TX_ID_SIZE = 2 * Long.BYTES;
 
     /** Maximum size of the key. */
-    private static final int MAX_KEY_SIZE = ROW_PREFIX_SIZE + TIMESTAMP_SIZE;
+    private static final int MAX_KEY_SIZE = ROW_PREFIX_SIZE + 
HYBRID_TIMESTAMP_SIZE;
 
     /** Thread-local direct buffer instance to read keys from RocksDB. */
     private static final ThreadLocal<ByteBuffer> MV_KEY_BUFFER = 
withInitial(() -> allocateDirect(MAX_KEY_SIZE).order(BIG_ENDIAN));
@@ -296,7 +294,9 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
             // Check concurrent transaction data.
             byte[] keyBufArray = keyBuf.array();
 
-            byte[] previousValue = writeBatch.getFromBatchAndDB(db, cf, 
readOpts, copyOf(keyBufArray, ROW_PREFIX_SIZE));
+            byte[] keyBytes = copyOf(keyBufArray, ROW_PREFIX_SIZE);
+
+            byte[] previousValue = writeBatch.getFromBatchAndDB(db, cf, 
readOpts, keyBytes);
 
             // Previous value must belong to the same transaction.
             if (previousValue != null) {
@@ -309,12 +309,13 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
                 // Write empty value as a tombstone.
                 if (previousValue != null) {
                     // Reuse old array with transaction id already written to 
it.
-                    writeBatch.put(cf, copyOf(keyBufArray, ROW_PREFIX_SIZE), 
copyOf(previousValue, TX_ID_SIZE));
+                    writeBatch.put(cf, keyBytes, copyOf(previousValue, 
TX_ID_SIZE));
                 } else {
-                    // Use tail of the key buffer to save on array allocations.
-                    putTransactionId(keyBufArray, ROW_PREFIX_SIZE, txId);
+                    byte[] txIdBytes = new byte[TX_ID_SIZE];
+
+                    putTransactionId(txIdBytes, 0, txId);
 
-                    writeBatch.put(cf, copyOf(keyBufArray, ROW_PREFIX_SIZE), 
copyOfRange(keyBufArray, ROW_PREFIX_SIZE, MAX_KEY_SIZE));
+                    writeBatch.put(cf, keyBytes, txIdBytes);
                 }
             } else {
                 writeUnversioned(keyBufArray, row, txId);
@@ -358,7 +359,9 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
         ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
 
         try {
-            byte[] previousValue = writeBatch.getFromBatchAndDB(db, cf, 
readOpts, copyOf(keyBuf.array(), ROW_PREFIX_SIZE));
+            byte[] keyBytes = copyOf(keyBuf.array(), ROW_PREFIX_SIZE);
+
+            byte[] previousValue = writeBatch.getFromBatchAndDB(db, cf, 
readOpts, keyBytes);
 
             if (previousValue == null) {
                 //the chain doesn't contain an uncommitted write intent
@@ -366,7 +369,7 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
             }
 
             // Perform unconditional remove for the key without associated 
timestamp.
-            writeBatch.delete(cf, copyOf(keyBuf.array(), ROW_PREFIX_SIZE));
+            writeBatch.delete(cf, keyBytes);
 
             return wrapValueIntoBinaryRow(previousValue, true);
         } catch (RocksDBException e) {
@@ -376,14 +379,16 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
 
     /** {@inheritDoc} */
     @Override
-    public void commitWrite(RowId rowId, Timestamp timestamp) throws 
StorageException {
+    public void commitWrite(RowId rowId, HybridTimestamp timestamp) throws 
StorageException {
         WriteBatchWithIndex writeBatch = requireWriteBatch();
 
         ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
 
         try {
             // Read a value associated with pending write.
-            byte[] valueBytes = writeBatch.getFromBatchAndDB(db, cf, readOpts, 
copyOf(keyBuf.array(), ROW_PREFIX_SIZE));
+            byte[] uncommittedKeyBytes = copyOf(keyBuf.array(), 
ROW_PREFIX_SIZE);
+
+            byte[] valueBytes = writeBatch.getFromBatchAndDB(db, cf, readOpts, 
uncommittedKeyBytes);
 
             if (valueBytes == null) {
                 //the chain doesn't contain an uncommitted write intent
@@ -391,7 +396,7 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
             }
 
             // Delete pending write.
-            writeBatch.delete(cf, copyOf(keyBuf.array(), ROW_PREFIX_SIZE));
+            writeBatch.delete(cf, uncommittedKeyBytes);
 
             // Add timestamp to the key, and put the value back into the 
storage.
             putTimestamp(keyBuf, timestamp);
@@ -410,11 +415,11 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
 
     /** {@inheritDoc} */
     @Override
-    public @Nullable BinaryRow read(RowId rowId, Timestamp timestamp) throws 
StorageException {
+    public @Nullable BinaryRow read(RowId rowId, HybridTimestamp timestamp) 
throws StorageException {
         return read(rowId, timestamp, null);
     }
 
-    private @Nullable BinaryRow read(RowId rowId, @Nullable Timestamp 
timestamp, @Nullable UUID txId)
+    private @Nullable BinaryRow read(RowId rowId, @Nullable HybridTimestamp 
timestamp, @Nullable UUID txId)
             throws TxIdMismatchException, StorageException {
         assert timestamp == null ^ txId == null;
 
@@ -499,11 +504,11 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
 
     /** {@inheritDoc} */
     @Override
-    public Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, Timestamp 
timestamp) throws StorageException {
+    public Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, 
HybridTimestamp timestamp) throws StorageException {
         return scan(keyFilter, timestamp, null);
     }
 
-    private Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, @Nullable 
Timestamp timestamp, @Nullable UUID txId)
+    private Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, @Nullable 
HybridTimestamp timestamp, @Nullable UUID txId)
             throws TxIdMismatchException, StorageException {
         assert timestamp == null ^ txId == null;
 
@@ -516,10 +521,10 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
         it.seek(partitionStartPrefix());
 
         // Size of the buffer to seek values. Should fit partition id, row id 
and maybe timestamp, if it's not null.
-        int seekKeyBufSize = ROW_PREFIX_SIZE + (timestamp == null ? 0 : 
TIMESTAMP_SIZE);
+        int seekKeyBufSize = ROW_PREFIX_SIZE + (timestamp == null ? 0 : 
HYBRID_TIMESTAMP_SIZE);
 
         // Here's seek buffer itself. Originally it contains a valid partition 
id, row id payload that's filled with zeroes, and maybe
-        // a timestamp value. Zero row id guarantees that it's 
lexicographically less then or equal to any other row id stored in the
+        // a timestamp value. Zero row id guarantees that it's 
lexicographically less than or equal to any other row id stored in the
         // partition.
         // Byte buffer from a thread-local field can't be used here, because 
of two reasons:
         //  - no one guarantees that there will only be a single cursor;
@@ -613,7 +618,7 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
                         }
 
                         // Or iterator may still be valid even if there's no 
version for required timestamp. In this case row id
-                        // itself will be different and we must check it.
+                        // itself will be different, and we must check it.
                         keyLength = it.key(directBuffer.limit(MAX_KEY_SIZE));
 
                         valueHasTxId = keyLength == ROW_PREFIX_SIZE;
@@ -676,7 +681,7 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
             /** {@inheritDoc} */
             @Override
             public void close() throws Exception {
-                IgniteUtils.closeAll(options, it);
+                IgniteUtils.closeAll(it, options);
             }
 
             private void incrementRowId(ByteBuffer buf) {
@@ -698,6 +703,8 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
 
                 short partitionId = (short) (1 + buf.getShort(0));
 
+                assert partitionId != 0;
+
                 buf.putShort(0, partitionId);
             }
         };
@@ -807,14 +814,12 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
     /**
      * Writes a timestamp into a byte buffer, in descending lexicographical 
bytes order.
      */
-    private static void putTimestamp(ByteBuffer buf, Timestamp ts) {
+    private static void putTimestamp(ByteBuffer buf, HybridTimestamp ts) {
         assert buf.order() == BIG_ENDIAN;
 
-        // Two things to note here:
-        //  - "xor" with a single sign bit makes long values comparable 
according to RocksDB convention, where bytes are unsigned.
-        //  - "bitwise negation" turns ascending order into a descending one.
-        buf.putLong(~ts.getTimestamp() ^ (1L << 63));
-        buf.putLong(~ts.getNodeId() ^ (1L << 63));
+        // "bitwise negation" turns ascending order into a descending one.
+        buf.putLong(~ts.getPhysical());
+        buf.putInt(~ts.getLogical());
     }
 
     private static void putTransactionId(byte[] array, int off, UUID txId) {
@@ -839,7 +844,7 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
 
         if (invalid) {
             // Check the status first. This operation is guaranteed to throw 
if an internal error has occurred during
-            // the iteration. Otherwise we've exhausted the data range.
+            // the iteration. Otherwise, we've exhausted the data range.
             try {
                 it.status();
             } catch (RocksDBException e) {
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
index 2496f3c661..ea0dab45e7 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
@@ -130,7 +130,6 @@ public class RocksDbTableStorage implements MvTableStorage {
      */
     private volatile Runnable latestFlushClosure;
 
-    //TODO Use it instead of the "stopped" flag.
     /** Busy lock to stop synchronously. */
     final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
 
diff --git 
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageTest.java
 
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageTest.java
index 7773862707..eff5fe6465 100644
--- 
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageTest.java
+++ 
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageTest.java
@@ -86,11 +86,11 @@ public class RocksDbMvPartitionStorageTest extends 
AbstractMvPartitionStorageTes
 
         engine.start();
 
-        table = (RocksDbTableStorage) engine.createMvTable(tableCfg);
+        table = engine.createMvTable(tableCfg);
 
         table.start();
 
-        storage = table.getOrCreateMvPartition(0);
+        storage = table.getOrCreateMvPartition(PARTITION_ID);
     }
 
     @AfterEach

Reply via email to