This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 1f5d7a2b2d IGNITE-17577 Timestamp replaced with HybridTimestamp in
MvPartitionStorage. (#1038)
1f5d7a2b2d is described below
commit 1f5d7a2b2d512f19a99cd0c6e5f401d393441c42
Author: ibessonov <[email protected]>
AuthorDate: Tue Aug 30 10:08:56 2022 +0300
IGNITE-17577 Timestamp replaced with HybridTimestamp in MvPartitionStorage.
(#1038)
---
.../java/org/apache/ignite/hlc/HybridClock.java | 57 ++++++---
.../org/apache/ignite/hlc/HybridTimestamp.java | 12 +-
.../internal/storage/MvPartitionStorage.java | 55 ++++++++-
.../storage/AbstractMvPartitionStorageTest.java | 127 +++++++++++----------
.../storage/TestMvPartitionStorageTest.java | 2 +-
.../TestConcurrentHashMapMvPartitionStorage.java | 16 +--
.../mv/AbstractPageMemoryMvPartitionStorage.java | 26 ++---
.../storage/pagememory/mv/HybridTimestamps.java | 98 ++++++++++++++++
.../storage/pagememory/mv/ReadRowVersion.java | 12 +-
.../internal/storage/pagememory/mv/RowVersion.java | 36 ++----
.../storage/pagememory/mv/RowVersionFreeList.java | 8 +-
.../pagememory/mv/ScanVersionChainByTimestamp.java | 12 +-
.../internal/storage/pagememory/mv/Timestamps.java | 78 -------------
.../storage/pagememory/mv/io/RowVersionDataIo.java | 12 +-
.../AbstractPageMemoryMvPartitionStorageTest.java | 7 +-
.../storage/rocksdb/RocksDbMvPartitionStorage.java | 77 +++++++------
.../storage/rocksdb/RocksDbTableStorage.java | 1 -
.../rocksdb/RocksDbMvPartitionStorageTest.java | 4 +-
18 files changed, 368 insertions(+), 272 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/hlc/HybridClock.java
b/modules/core/src/main/java/org/apache/ignite/hlc/HybridClock.java
index f1017202fc..9ab7db033a 100644
--- a/modules/core/src/main/java/org/apache/ignite/hlc/HybridClock.java
+++ b/modules/core/src/main/java/org/apache/ignite/hlc/HybridClock.java
@@ -17,6 +17,8 @@
package org.apache.ignite.hlc;
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.VarHandle;
import java.time.Clock;
import org.apache.ignite.internal.tostring.S;
@@ -24,8 +26,21 @@ import org.apache.ignite.internal.tostring.S;
* A Hybrid Logical Clock.
*/
public class HybridClock {
+ /**
+ * Var handle for {@link #latestTime}.
+ */
+ private static final VarHandle LATEST_TIME;
+
+ static {
+ try {
+ LATEST_TIME =
MethodHandles.lookup().findVarHandle(HybridClock.class, "latestTime",
HybridTimestamp.class);
+ } catch (NoSuchFieldException | IllegalAccessException e) {
+ throw new ExceptionInInitializerError(e);
+ }
+ }
+
/** Latest timestamp. */
- private HybridTimestamp latestTime;
+ private volatile HybridTimestamp latestTime;
/**
* The constructor which initializes the latest time to current time by
system clock.
@@ -39,16 +54,25 @@ public class HybridClock {
*
* @return The hybrid timestamp.
*/
- public synchronized HybridTimestamp now() {
- long currentTimeMillis = Clock.systemUTC().instant().toEpochMilli();
+ public HybridTimestamp now() {
+ while (true) {
+ long currentTimeMillis =
Clock.systemUTC().instant().toEpochMilli();
- if (latestTime.getPhysical() >= currentTimeMillis) {
- latestTime = latestTime.addTicks(1);
- } else {
- latestTime = new HybridTimestamp(currentTimeMillis, 0);
- }
+ // Read the latest time after accessing UTC time to reduce
contention.
+ HybridTimestamp latestTime = this.latestTime;
+
+ HybridTimestamp newLatestTime;
+
+ if (latestTime.getPhysical() >= currentTimeMillis) {
+ newLatestTime = latestTime.addTicks(1);
+ } else {
+ newLatestTime = new HybridTimestamp(currentTimeMillis, 0);
+ }
- return latestTime;
+ if (LATEST_TIME.compareAndSet(this, latestTime, newLatestTime)) {
+ return newLatestTime;
+ }
+ }
}
/**
@@ -57,14 +81,19 @@ public class HybridClock {
* @param requestTime Timestamp from request.
* @return The hybrid timestamp.
*/
- public synchronized HybridTimestamp update(HybridTimestamp requestTime) {
- HybridTimestamp now = new
HybridTimestamp(Clock.systemUTC().instant().toEpochMilli(), -1);
+ public HybridTimestamp update(HybridTimestamp requestTime) {
+ while (true) {
+ HybridTimestamp now = new
HybridTimestamp(Clock.systemUTC().instant().toEpochMilli(), -1);
- latestTime = HybridTimestamp.max(now, requestTime, latestTime);
+ // Read the latest time after accessing UTC time to reduce
contention.
+ HybridTimestamp latestTime = this.latestTime;
- latestTime = latestTime.addTicks(1);
+ HybridTimestamp newLatestTime = HybridTimestamp.max(now,
requestTime, latestTime).addTicks(1);
- return latestTime;
+ if (LATEST_TIME.compareAndSet(this, latestTime, newLatestTime)) {
+ return newLatestTime;
+ }
+ }
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/main/java/org/apache/ignite/hlc/HybridTimestamp.java
b/modules/core/src/main/java/org/apache/ignite/hlc/HybridTimestamp.java
index a781574711..73d8bad4e3 100644
--- a/modules/core/src/main/java/org/apache/ignite/hlc/HybridTimestamp.java
+++ b/modules/core/src/main/java/org/apache/ignite/hlc/HybridTimestamp.java
@@ -1,6 +1,6 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
@@ -28,6 +28,9 @@ public class HybridTimestamp implements
Comparable<HybridTimestamp>, Serializabl
/** Serial version UID. */
private static final long serialVersionUID = 2459861612869605904L;
+ /** Timestamp size in bytes. */
+ public static final int HYBRID_TIMESTAMP_SIZE = Long.BYTES + Integer.BYTES;
+
/** Physical clock. */
private final long physical;
@@ -41,6 +44,11 @@ public class HybridTimestamp implements
Comparable<HybridTimestamp>, Serializabl
* @param logical The logical time.
*/
public HybridTimestamp(long physical, int logical) {
+ assert physical > 0 : physical;
+ // Value -1 is used in "org.apache.ignite.hlc.HybridClock.update" to
produce "0" after the increment.
+ // Real usable value cannot be negative.
+ assert logical >= -1 : logical;
+
this.physical = physical;
this.logical = logical;
}
@@ -82,6 +90,8 @@ public class HybridTimestamp implements
Comparable<HybridTimestamp>, Serializabl
* @return The logical component.
*/
public int getLogical() {
+ assert logical >= 0;
+
return logical;
}
diff --git
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
index a45815616c..b6e7074472 100644
---
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
+++
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
@@ -21,6 +21,7 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.function.Predicate;
+import org.apache.ignite.hlc.HybridTimestamp;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.tx.Timestamp;
import org.apache.ignite.internal.util.Cursor;
@@ -85,6 +86,21 @@ public interface MvPartitionStorage extends AutoCloseable {
*/
long persistedIndex();
+ /**
+ * Converts {@link Timestamp} to {@link HybridTimestamp} preserving local
node time order.
+ *
+ * @deprecated Temporary method to support API compatibility.
+ */
+ @Deprecated
+ private static HybridTimestamp convertTimestamp(Timestamp timestamp) {
+ long ts = timestamp.getTimestamp();
+
+ // "timestamp" part consists of two sections:
+ // - a 48-bits number of milliseconds since the beginning of the epoch
+ // - a 16-bits counter
+ return new HybridTimestamp(ts >>> 16, (int) ts & 0xFFFF);
+ }
+
/**
* Reads either the committed value from the storage or the uncommitted
value belonging to given transaction.
*
@@ -97,6 +113,17 @@ public interface MvPartitionStorage extends AutoCloseable {
@Nullable
BinaryRow read(RowId rowId, UUID txId) throws TxIdMismatchException,
StorageException;
+ /**
+ * Reads the value from the storage as it was at the given timestamp.
+ *
+ * @deprecated Use {@link #read(RowId, HybridTimestamp)}
+ */
+ @Nullable
+ @Deprecated
+ default BinaryRow read(RowId rowId, Timestamp timestamp) throws
StorageException {
+ return read(rowId, convertTimestamp(timestamp));
+ }
+
/**
* Reads the value from the storage as it was at the given timestamp.
*
@@ -105,7 +132,7 @@ public interface MvPartitionStorage extends AutoCloseable {
* @return Binary row that corresponds to the key or {@code null} if value
is not found.
*/
@Nullable
- BinaryRow read(RowId rowId, Timestamp timestamp) throws StorageException;
+ BinaryRow read(RowId rowId, HybridTimestamp timestamp) throws
StorageException;
/**
* Creates an uncommitted version, assigning a new row id to it.
@@ -147,6 +174,16 @@ public interface MvPartitionStorage extends AutoCloseable {
*/
@Nullable BinaryRow abortWrite(RowId rowId) throws StorageException;
+ /**
+ * Commits a pending update of the ongoing transaction.
+ *
+ * @deprecated Use {@link #commitWrite(RowId, HybridTimestamp)}
+ */
+ @Deprecated
+ default void commitWrite(RowId rowId, Timestamp timestamp) throws
StorageException {
+ commitWrite(rowId, convertTimestamp(timestamp));
+ }
+
/**
* Commits a pending update of the ongoing transaction. Invoked during
commit. Committed value will be versioned by the given timestamp.
*
@@ -154,7 +191,7 @@ public interface MvPartitionStorage extends AutoCloseable {
* @param timestamp Timestamp to associate with committed value.
* @throws StorageException If failed to write data to the storage.
*/
- void commitWrite(RowId rowId, Timestamp timestamp) throws StorageException;
+ void commitWrite(RowId rowId, HybridTimestamp timestamp) throws
StorageException;
/**
* Scans the partition and returns a cursor of values. All filtered values
must either be uncommitted in current transaction
@@ -167,6 +204,16 @@ public interface MvPartitionStorage extends AutoCloseable {
*/
Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, UUID txId) throws
TxIdMismatchException, StorageException;
+ /**
+ * Scans the partition and returns a cursor of values at the given
timestamp.
+ *
+ * @deprecated Use {@link #scan(Predicate, HybridTimestamp)}
+ */
+ @Deprecated
+ default Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, Timestamp
timestamp) throws StorageException {
+ return scan(keyFilter, convertTimestamp(timestamp));
+ }
+
/**
* Scans the partition and returns a cursor of values at the given
timestamp.
*
@@ -176,7 +223,7 @@ public interface MvPartitionStorage extends AutoCloseable {
* @throws TxIdMismatchException If there's another pending update
associated with different transaction id.
* @throws StorageException If failed to read data from the storage.
*/
- Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, Timestamp
timestamp) throws StorageException;
+ Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, HybridTimestamp
timestamp) throws StorageException;
/**
* Returns rows count belongs to current storage.
@@ -193,7 +240,7 @@ public interface MvPartitionStorage extends AutoCloseable {
* Iterates over all versions of all entries, except for tombstones.
*
* @param consumer Closure to process entries.
- * @deprecated This method was bord out of desperation and isn't
well-designed. Implementation is not polished either. Currently, it's
+ * @deprecated This method was born out of desperation and isn't
well-designed. Implementation is not polished either. Currently, it's
* only usage is to work-around in-memory PK index rebuild on node
restart, which shouldn't even exist in the first place.
*/
@Deprecated
diff --git
a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
index 1929314423..682f913079 100644
---
a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
+++
b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
@@ -39,6 +39,8 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.Predicate;
import java.util.stream.Collectors;
+import org.apache.ignite.hlc.HybridClock;
+import org.apache.ignite.hlc.HybridTimestamp;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.tx.Timestamp;
import org.apache.ignite.internal.util.Cursor;
@@ -55,6 +57,9 @@ public abstract class AbstractMvPartitionStorageTest extends
BaseMvStoragesTest
protected final UUID txId = newTransactionId();
+ /** Hybrid clock to generate timestamps. */
+ protected final HybridClock clock = new HybridClock();
+
protected final TestKey key = new TestKey(10, "foo");
private final TestValue value = new TestValue(20, "bar");
protected final BinaryRow binaryRow = binaryRow(key, value);
@@ -71,14 +76,14 @@ public abstract class AbstractMvPartitionStorageTest
extends BaseMvStoragesTest
/**
* Reads a row inside of consistency closure.
*/
- protected BinaryRow read(RowId rowId, Timestamp timestamp) {
+ protected BinaryRow read(RowId rowId, HybridTimestamp timestamp) {
return storage.runConsistently(() -> storage.read(rowId, timestamp));
}
/**
* Scans partition inside of consistency closure.
*/
- protected Cursor<BinaryRow> scan(Predicate<BinaryRow> filter, Timestamp
timestamp) {
+ protected Cursor<BinaryRow> scan(Predicate<BinaryRow> filter,
HybridTimestamp timestamp) {
return storage.runConsistently(() -> storage.scan(filter, timestamp));
}
@@ -106,7 +111,7 @@ public abstract class AbstractMvPartitionStorageTest
extends BaseMvStoragesTest
/**
* Commits write-intent inside of consistency closure.
*/
- protected void commitWrite(RowId rowId, Timestamp tsExact) {
+ protected void commitWrite(RowId rowId, HybridTimestamp tsExact) {
storage.runConsistently(() -> {
storage.commitWrite(rowId, tsExact);
@@ -138,15 +143,13 @@ public abstract class AbstractMvPartitionStorageTest
extends BaseMvStoragesTest
assertEquals(PARTITION_ID, rowId.partitionId());
assertNull(read(rowId, newTransactionId()));
- assertNull(read(rowId, Timestamp.nextVersion()));
+ assertNull(read(rowId, clock.now()));
}
@Test
public void testScanOverEmpty() throws Exception {
- new RowId(PARTITION_ID);
-
assertEquals(List.of(), convert(scan(row -> true,
newTransactionId())));
- assertEquals(List.of(), convert(scan(row -> true,
Timestamp.nextVersion())));
+ assertEquals(List.of(), convert(scan(row -> true, clock.now())));
}
/**
@@ -169,7 +172,7 @@ public abstract class AbstractMvPartitionStorageTest
extends BaseMvStoragesTest
assertThrows(TxIdMismatchException.class, () -> read(rowId,
newTransactionId()));
// Read with timestamp returns null.
- assertNull(read(rowId, Timestamp.nextVersion()));
+ assertNull(read(rowId, clock.now()));
}
/**
@@ -186,18 +189,18 @@ public abstract class AbstractMvPartitionStorageTest
extends BaseMvStoragesTest
}
/**
- * Tests basic invariants of {@link MvPartitionStorage#commitWrite(RowId,
Timestamp)}.
+ * Tests basic invariants of {@link MvPartitionStorage#commitWrite(RowId,
HybridTimestamp)}.
*/
@Test
public void testCommitWrite() {
RowId rowId = insert(binaryRow, txId);
- Timestamp tsBefore = Timestamp.nextVersion();
+ HybridTimestamp tsBefore = clock.now();
- Timestamp tsExact = Timestamp.nextVersion();
+ HybridTimestamp tsExact = clock.now();
commitWrite(rowId, tsExact);
- Timestamp tsAfter = Timestamp.nextVersion();
+ HybridTimestamp tsAfter = clock.now();
// Row is invisible at the time before writing.
assertNull(read(rowId, tsBefore));
@@ -220,17 +223,17 @@ public abstract class AbstractMvPartitionStorageTest
extends BaseMvStoragesTest
assertRowMatches(read(rowId, tsExact), binaryRow);
assertRowMatches(read(rowId, tsAfter), binaryRow);
- assertRowMatches(read(rowId, Timestamp.nextVersion()), binaryRow);
+ assertRowMatches(read(rowId, clock.now()), binaryRow);
// Only latest time behavior changes after commit.
- commitWrite(rowId, Timestamp.nextVersion());
+ commitWrite(rowId, clock.now());
assertRowMatches(read(rowId, newTxId), newRow);
assertRowMatches(read(rowId, tsExact), binaryRow);
assertRowMatches(read(rowId, tsAfter), binaryRow);
- assertRowMatches(read(rowId, Timestamp.nextVersion()), newRow);
+ assertRowMatches(read(rowId, clock.now()), newRow);
// Remove.
UUID removeTxId = newTransactionId();
@@ -244,24 +247,24 @@ public abstract class AbstractMvPartitionStorageTest
extends BaseMvStoragesTest
assertRowMatches(read(rowId, tsExact), binaryRow);
assertRowMatches(read(rowId, tsAfter), binaryRow);
- assertRowMatches(read(rowId, Timestamp.nextVersion()), newRow);
+ assertRowMatches(read(rowId, clock.now()), newRow);
// Commit remove.
- Timestamp removeTs = Timestamp.nextVersion();
+ HybridTimestamp removeTs = clock.now();
commitWrite(rowId, removeTs);
assertNull(read(rowId, tsBefore));
assertNull(read(rowId, removeTxId));
assertNull(read(rowId, removeTs));
- assertNull(read(rowId, Timestamp.nextVersion()));
+ assertNull(read(rowId, clock.now()));
assertRowMatches(read(rowId, tsExact), binaryRow);
assertRowMatches(read(rowId, tsAfter), binaryRow);
}
/**
- * Tests basic invariants of {@link MvPartitionStorage#scan(Predicate,
Timestamp)}.
+ * Tests basic invariants of {@link MvPartitionStorage#scan(Predicate,
HybridTimestamp)}.
*/
@Test
public void testScan() throws Exception {
@@ -284,17 +287,17 @@ public abstract class AbstractMvPartitionStorageTest
extends BaseMvStoragesTest
assertEquals(List.of(value1), convert(storage.scan(row ->
key(row).intKey == 1, txId1)));
assertEquals(List.of(value2), convert(storage.scan(row ->
key(row).intKey == 2, txId2)));
- Timestamp ts1 = Timestamp.nextVersion();
+ HybridTimestamp ts1 = clock.now();
- Timestamp ts2 = Timestamp.nextVersion();
+ HybridTimestamp ts2 = clock.now();
commitWrite(rowId1, ts2);
- Timestamp ts3 = Timestamp.nextVersion();
+ HybridTimestamp ts3 = clock.now();
- Timestamp ts4 = Timestamp.nextVersion();
+ HybridTimestamp ts4 = clock.now();
commitWrite(rowId2, ts4);
- Timestamp ts5 = Timestamp.nextVersion();
+ HybridTimestamp ts5 = clock.now();
// Full scan with various timestamp values.
assertEquals(List.of(), convert(scan(row -> true, ts1)));
@@ -313,10 +316,10 @@ public abstract class AbstractMvPartitionStorageTest
extends BaseMvStoragesTest
TestValue value2 = new TestValue(20, "yyy");
RowId rowId1 = insert(binaryRow(new TestKey(1, "1"), value1), txId);
- commitWrite(rowId1, Timestamp.nextVersion());
+ commitWrite(rowId1, clock.now());
RowId rowId2 = insert(binaryRow(new TestKey(2, "2"), value2), txId);
- commitWrite(rowId2, Timestamp.nextVersion());
+ commitWrite(rowId2, clock.now());
Cursor<BinaryRow> cursor = scan(row -> true, txId);
@@ -373,7 +376,7 @@ public abstract class AbstractMvPartitionStorageTest
extends BaseMvStoragesTest
@Test
void readOfCommittedRowWithAnyTransactionIdReturnsTheRow() {
RowId rowId = insert(binaryRow, txId);
- commitWrite(rowId, Timestamp.nextVersion());
+ commitWrite(rowId, clock.now());
BinaryRow foundRow = read(rowId, newTransactionId());
@@ -383,7 +386,7 @@ public abstract class AbstractMvPartitionStorageTest
extends BaseMvStoragesTest
@Test
void readsUncommittedVersionEvenWhenThereIsCommittedVersionBeforeIt() {
RowId rowId1 = insert(binaryRow, txId);
- commitWrite(rowId1, Timestamp.nextVersion());
+ commitWrite(rowId1, clock.now());
RowId rowId2 = insert(binaryRow2, txId);
@@ -395,10 +398,10 @@ public abstract class AbstractMvPartitionStorageTest
extends BaseMvStoragesTest
@Test
void readsCommittedVersionEvenWhenThereIsCommittedVersionBeforeIt() {
RowId rowId1 = insert(binaryRow, txId);
- commitWrite(rowId1, Timestamp.nextVersion());
+ commitWrite(rowId1, clock.now());
RowId rowId2 = insert(binaryRow2, txId);
- commitWrite(rowId2, Timestamp.nextVersion());
+ commitWrite(rowId2, clock.now());
BinaryRow foundRow = read(rowId2, txId);
@@ -408,7 +411,7 @@ public abstract class AbstractMvPartitionStorageTest
extends BaseMvStoragesTest
@Test
void readByExactlyCommitTimestampFindsRow() {
RowId rowId = insert(binaryRow, txId);
- Timestamp commitTimestamp = Timestamp.nextVersion();
+ HybridTimestamp commitTimestamp = clock.now();
commitWrite(rowId, commitTimestamp);
BinaryRow foundRow = read(rowId, commitTimestamp);
@@ -419,10 +422,10 @@ public abstract class AbstractMvPartitionStorageTest
extends BaseMvStoragesTest
@Test
void readByTimestampAfterCommitTimestampFindsRow() {
RowId rowId = insert(binaryRow, txId);
- Timestamp commitTimestamp = Timestamp.nextVersion();
+ HybridTimestamp commitTimestamp = clock.now();
commitWrite(rowId, commitTimestamp);
- Timestamp afterCommit = Timestamp.nextVersion();
+ HybridTimestamp afterCommit = clock.now();
BinaryRow foundRow = read(rowId, afterCommit);
assertRowMatches(foundRow, binaryRow);
@@ -430,10 +433,10 @@ public abstract class AbstractMvPartitionStorageTest
extends BaseMvStoragesTest
@Test
void readByTimestampBeforeFirstVersionCommitTimestampFindsNothing() {
- Timestamp beforeCommit = Timestamp.nextVersion();
+ HybridTimestamp beforeCommit = clock.now();
RowId rowId = insert(binaryRow, txId);
- Timestamp commitTimestamp = Timestamp.nextVersion();
+ HybridTimestamp commitTimestamp = clock.now();
commitWrite(rowId, commitTimestamp);
BinaryRow foundRow = read(rowId, beforeCommit);
@@ -444,11 +447,11 @@ public abstract class AbstractMvPartitionStorageTest
extends BaseMvStoragesTest
@Test
void readByTimestampOfLastVersionFindsLastVersion() {
RowId rowId = insert(binaryRow, txId);
- Timestamp firstVersionTs = Timestamp.nextVersion();
+ HybridTimestamp firstVersionTs = clock.now();
commitWrite(rowId, firstVersionTs);
addWrite(rowId, binaryRow2, newTransactionId());
- Timestamp secondVersionTs = Timestamp.nextVersion();
+ HybridTimestamp secondVersionTs = clock.now();
commitWrite(rowId, secondVersionTs);
BinaryRow foundRow = read(rowId, secondVersionTs);
@@ -459,11 +462,11 @@ public abstract class AbstractMvPartitionStorageTest
extends BaseMvStoragesTest
@Test
void readByTimestampOfPreviousVersionFindsPreviousVersion() {
RowId rowId = insert(binaryRow, txId);
- Timestamp firstVersionTs = Timestamp.nextVersion();
+ HybridTimestamp firstVersionTs = clock.now();
commitWrite(rowId, firstVersionTs);
addWrite(rowId, binaryRow2, newTransactionId());
- commitWrite(rowId, Timestamp.nextVersion());
+ commitWrite(rowId, clock.now());
BinaryRow foundRow = read(rowId, firstVersionTs);
@@ -473,13 +476,13 @@ public abstract class AbstractMvPartitionStorageTest
extends BaseMvStoragesTest
@Test
void readByTimestampBetweenVersionsFindsPreviousVersion() {
RowId rowId = insert(binaryRow, txId);
- Timestamp firstVersionTs = Timestamp.nextVersion();
+ HybridTimestamp firstVersionTs = clock.now();
commitWrite(rowId, firstVersionTs);
- Timestamp tsInBetween = Timestamp.nextVersion();
+ HybridTimestamp tsInBetween = clock.now();
addWrite(rowId, binaryRow2, newTransactionId());
- commitWrite(rowId, Timestamp.nextVersion());
+ commitWrite(rowId, clock.now());
BinaryRow foundRow = read(rowId, tsInBetween);
@@ -489,11 +492,11 @@ public abstract class AbstractMvPartitionStorageTest
extends BaseMvStoragesTest
@Test
void readByTimestampIgnoresUncommittedVersion() {
RowId rowId = insert(binaryRow, newTransactionId());
- commitWrite(rowId, Timestamp.nextVersion());
+ commitWrite(rowId, clock.now());
addWrite(rowId, binaryRow2, newTransactionId());
- Timestamp latestTs = Timestamp.nextVersion();
+ HybridTimestamp latestTs = clock.now();
BinaryRow foundRow = read(rowId, latestTs);
assertRowMatches(foundRow, binaryRow);
@@ -529,7 +532,7 @@ public abstract class AbstractMvPartitionStorageTest
extends BaseMvStoragesTest
@Test
void addWriteReturnsNullIfNoUncommittedVersionExists() {
RowId rowId = insert(binaryRow, newTransactionId());
- commitWrite(rowId, Timestamp.nextVersion());
+ commitWrite(rowId, clock.now());
BinaryRow returnedRow = addWrite(rowId, binaryRow2, txId);
@@ -539,7 +542,7 @@ public abstract class AbstractMvPartitionStorageTest
extends BaseMvStoragesTest
@Test
void afterRemovalReadWithTxIdFindsNothing() {
RowId rowId = insert(binaryRow, newTransactionId());
- commitWrite(rowId, Timestamp.nextVersion());
+ commitWrite(rowId, clock.now());
addWrite(rowId, null, txId);
@@ -551,12 +554,12 @@ public abstract class AbstractMvPartitionStorageTest
extends BaseMvStoragesTest
@Test
void afterRemovalReadByLatestTimestampFindsNothing() {
RowId rowId = insert(binaryRow, newTransactionId());
- commitWrite(rowId, Timestamp.nextVersion());
+ commitWrite(rowId, clock.now());
addWrite(rowId, null, newTransactionId());
- commitWrite(rowId, Timestamp.nextVersion());
+ commitWrite(rowId, clock.now());
- BinaryRow foundRow = read(rowId, Timestamp.nextVersion());
+ BinaryRow foundRow = read(rowId, clock.now());
assertThat(foundRow, is(nullValue()));
}
@@ -564,11 +567,11 @@ public abstract class AbstractMvPartitionStorageTest
extends BaseMvStoragesTest
@Test
void afterRemovalPreviousVersionRemainsAccessibleByTimestamp() {
RowId rowId = insert(binaryRow, newTransactionId());
- Timestamp firstTimestamp = Timestamp.nextVersion();
+ HybridTimestamp firstTimestamp = clock.now();
commitWrite(rowId, firstTimestamp);
addWrite(rowId, null, newTransactionId());
- commitWrite(rowId, Timestamp.nextVersion());
+ commitWrite(rowId, clock.now());
BinaryRow foundRow = read(rowId, firstTimestamp);
@@ -587,7 +590,7 @@ public abstract class AbstractMvPartitionStorageTest
extends BaseMvStoragesTest
@Test
void removalReturnsNullIfNoUncommittedVersionExists() {
RowId rowId = insert(binaryRow, newTransactionId());
- commitWrite(rowId, Timestamp.nextVersion());
+ commitWrite(rowId, clock.now());
BinaryRow rowFromRemoval = addWrite(rowId, null, newTransactionId());
@@ -598,9 +601,9 @@ public abstract class AbstractMvPartitionStorageTest
extends BaseMvStoragesTest
void commitWriteMakesVersionAvailableToReadByTimestamp() {
RowId rowId = insert(binaryRow, txId);
- commitWrite(rowId, Timestamp.nextVersion());
+ commitWrite(rowId, clock.now());
- BinaryRow foundRow = read(rowId, Timestamp.nextVersion());
+ BinaryRow foundRow = read(rowId, clock.now());
assertRowMatches(foundRow, binaryRow);
}
@@ -608,13 +611,13 @@ public abstract class AbstractMvPartitionStorageTest
extends BaseMvStoragesTest
@Test
void commitAndAbortWriteNoOpIfNoUncommittedVersionExists() {
RowId rowId = insert(binaryRow, newTransactionId());
- commitWrite(rowId, Timestamp.nextVersion());
+ commitWrite(rowId, clock.now());
abortWrite(rowId);
assertRowMatches(read(rowId, newTransactionId()), binaryRow);
- commitWrite(rowId, Timestamp.nextVersion());
+ commitWrite(rowId, clock.now());
assertRowMatches(read(rowId, newTransactionId()), binaryRow);
}
@@ -622,7 +625,7 @@ public abstract class AbstractMvPartitionStorageTest
extends BaseMvStoragesTest
@Test
void abortWriteRemovesUncommittedVersion() {
RowId rowId = insert(binaryRow, newTransactionId());
- commitWrite(rowId, Timestamp.nextVersion());
+ commitWrite(rowId, clock.now());
addWrite(rowId, binaryRow2, txId);
@@ -639,7 +642,7 @@ public abstract class AbstractMvPartitionStorageTest
extends BaseMvStoragesTest
abortWrite(rowId);
- BinaryRow foundRow = read(rowId, Timestamp.nextVersion());
+ BinaryRow foundRow = read(rowId, clock.now());
assertThat(foundRow, is(nullValue()));
}
@@ -675,7 +678,7 @@ public abstract class AbstractMvPartitionStorageTest
extends BaseMvStoragesTest
void
readByTimestampWorksCorrectlyAfterCommitAndAbortFollowedByUncommittedWrite() {
RowId rowId = commitAbortAndAddUncommitted();
- BinaryRow foundRow = storage.read(rowId, Timestamp.nextVersion());
+ BinaryRow foundRow = storage.read(rowId, clock.now());
assertRowMatches(foundRow, binaryRow);
}
@@ -684,7 +687,7 @@ public abstract class AbstractMvPartitionStorageTest
extends BaseMvStoragesTest
return storage.runConsistently(() -> {
RowId rowId = storage.insert(binaryRow, txId);
- storage.commitWrite(rowId, Timestamp.nextVersion());
+ storage.commitWrite(rowId, clock.now());
storage.addWrite(rowId, binaryRow2, newTransactionId());
storage.abortWrite(rowId);
@@ -701,7 +704,7 @@ public abstract class AbstractMvPartitionStorageTest
extends BaseMvStoragesTest
void
scanByTimestampWorksCorrectlyAfterCommitAndAbortFollowedByUncommittedWrite()
throws Exception {
commitAbortAndAddUncommitted();
- try (Cursor<BinaryRow> cursor = storage.scan(k -> true,
Timestamp.nextVersion())) {
+ try (Cursor<BinaryRow> cursor = storage.scan(k -> true, clock.now())) {
BinaryRow foundRow = cursor.next();
assertRowMatches(foundRow, binaryRow);
@@ -714,7 +717,7 @@ public abstract class AbstractMvPartitionStorageTest
extends BaseMvStoragesTest
void readByTimestampWorksCorrectlyIfNoUncommittedValueExists() {
RowId rowId = insert(binaryRow, txId);
- BinaryRow foundRow = read(rowId, Timestamp.nextVersion());
+ BinaryRow foundRow = read(rowId, clock.now());
assertThat(foundRow, is(nullValue()));
}
diff --git
a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/TestMvPartitionStorageTest.java
b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/TestMvPartitionStorageTest.java
index de48fe2b1a..175780712f 100644
---
a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/TestMvPartitionStorageTest.java
+++
b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/TestMvPartitionStorageTest.java
@@ -27,6 +27,6 @@ public class TestMvPartitionStorageTest extends
AbstractMvPartitionStorageTest {
* Creates new instance.
*/
public TestMvPartitionStorageTest() {
- storage = new TestConcurrentHashMapMvPartitionStorage(0);
+ storage = new TestConcurrentHashMapMvPartitionStorage(PARTITION_ID);
}
}
diff --git
a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvPartitionStorage.java
b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvPartitionStorage.java
index 15a22fbf67..7ebd2c91c0 100644
---
a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvPartitionStorage.java
+++
b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvPartitionStorage.java
@@ -26,12 +26,12 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.BiConsumer;
import java.util.function.Predicate;
+import org.apache.ignite.hlc.HybridTimestamp;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.TxIdMismatchException;
-import org.apache.ignite.internal.tx.Timestamp;
import org.apache.ignite.internal.util.Cursor;
import org.jetbrains.annotations.Nullable;
@@ -51,11 +51,11 @@ public class TestConcurrentHashMapMvPartitionStorage
implements MvPartitionStora
private static class VersionChain {
final @Nullable BinaryRow row;
- final @Nullable Timestamp begin;
+ final @Nullable HybridTimestamp begin;
final @Nullable UUID txId;
final @Nullable VersionChain next;
- VersionChain(@Nullable BinaryRow row, @Nullable Timestamp begin,
@Nullable UUID txId, @Nullable VersionChain next) {
+ VersionChain(@Nullable BinaryRow row, @Nullable HybridTimestamp begin,
@Nullable UUID txId, @Nullable VersionChain next) {
this.row = row;
this.begin = begin;
this.txId = txId;
@@ -66,7 +66,7 @@ public class TestConcurrentHashMapMvPartitionStorage
implements MvPartitionStora
return new VersionChain(row, null, txId, next);
}
- static VersionChain createCommitted(@Nullable Timestamp timestamp,
VersionChain uncommittedVersionChain) {
+ static VersionChain createCommitted(@Nullable HybridTimestamp
timestamp, VersionChain uncommittedVersionChain) {
return new VersionChain(uncommittedVersionChain.row, timestamp,
null, uncommittedVersionChain.next);
}
@@ -161,7 +161,7 @@ public class TestConcurrentHashMapMvPartitionStorage
implements MvPartitionStora
/** {@inheritDoc} */
@Override
- public void commitWrite(RowId rowId, Timestamp timestamp) {
+ public void commitWrite(RowId rowId, HybridTimestamp timestamp) {
map.compute(rowId, (ignored, versionChain) -> {
assert versionChain != null;
@@ -183,7 +183,7 @@ public class TestConcurrentHashMapMvPartitionStorage
implements MvPartitionStora
/** {@inheritDoc} */
@Override
- public @Nullable BinaryRow read(RowId rowId, @Nullable Timestamp
timestamp) {
+ public @Nullable BinaryRow read(RowId rowId, @Nullable HybridTimestamp
timestamp) {
VersionChain versionChain = map.get(rowId);
return read(versionChain, timestamp, null, null);
@@ -192,7 +192,7 @@ public class TestConcurrentHashMapMvPartitionStorage
implements MvPartitionStora
@Nullable
private static BinaryRow read(
VersionChain versionChain,
- @Nullable Timestamp timestamp,
+ @Nullable HybridTimestamp timestamp,
@Nullable UUID txId,
@Nullable Predicate<BinaryRow> filter
) {
@@ -252,7 +252,7 @@ public class TestConcurrentHashMapMvPartitionStorage
implements MvPartitionStora
/** {@inheritDoc} */
@Override
- public Cursor<BinaryRow> scan(Predicate<BinaryRow> filter, Timestamp
timestamp) {
+ public Cursor<BinaryRow> scan(Predicate<BinaryRow> filter, HybridTimestamp
timestamp) {
Iterator<BinaryRow> iterator = map.values().stream()
.map(versionChain -> read(versionChain, timestamp, null,
filter))
.filter(Objects::nonNull)
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
index 823ec149e9..0261dde770 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
@@ -23,6 +23,7 @@ import java.util.UUID;
import java.util.function.BiConsumer;
import java.util.function.Predicate;
import org.apache.ignite.configuration.schemas.table.TableView;
+import org.apache.ignite.hlc.HybridTimestamp;
import org.apache.ignite.internal.pagememory.PageMemory;
import org.apache.ignite.internal.pagememory.datapage.DataPageReader;
import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolderNoOp;
@@ -32,7 +33,6 @@ import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.TxIdMismatchException;
-import org.apache.ignite.internal.tx.Timestamp;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.IgniteCursor;
import org.apache.ignite.lang.IgniteInternalCheckedException;
@@ -48,7 +48,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage
implements MvPartitio
private static final Predicate<BinaryRow> MATCH_ALL = row -> true;
- private static final Predicate<Timestamp> ALWAYS_LOAD_VALUE = timestamp ->
true;
+ private static final Predicate<HybridTimestamp> ALWAYS_LOAD_VALUE =
timestamp -> true;
private final int partitionId;
private final int groupId;
@@ -97,7 +97,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage
implements MvPartitio
/** {@inheritDoc} */
@Override
- public @Nullable BinaryRow read(RowId rowId, Timestamp timestamp) throws
StorageException {
+ public @Nullable BinaryRow read(RowId rowId, HybridTimestamp timestamp)
throws StorageException {
VersionChain versionChain = findVersionChain(rowId);
if (versionChain == null) {
@@ -129,7 +129,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage
implements MvPartitio
return row;
}
- private RowVersion readRowVersion(long nextLink, Predicate<Timestamp>
loadValue) {
+ private RowVersion readRowVersion(long nextLink,
Predicate<HybridTimestamp> loadValue) {
ReadRowVersion read = new ReadRowVersion(partitionId);
try {
@@ -158,7 +158,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage
implements MvPartitio
private @Nullable ByteBufferRow findRowVersionInChain(
VersionChain versionChain,
@Nullable UUID transactionId,
- @Nullable Timestamp timestamp,
+ @Nullable HybridTimestamp timestamp,
Predicate<BinaryRow> keyFilter
) {
assert transactionId != null ^ timestamp != null;
@@ -172,7 +172,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage
implements MvPartitio
}
}
- private @Nullable ByteBufferRow findRowVersionByTimestamp(VersionChain
versionChain, Timestamp timestamp) {
+ private @Nullable ByteBufferRow findRowVersionByTimestamp(VersionChain
versionChain, HybridTimestamp timestamp) {
if (!versionChain.hasCommittedVersions()) {
return null;
}
@@ -298,7 +298,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage
implements MvPartitio
/** {@inheritDoc} */
@Override
- public void commitWrite(RowId rowId, Timestamp timestamp) throws
StorageException {
+ public void commitWrite(RowId rowId, HybridTimestamp timestamp) throws
StorageException {
VersionChain currentVersionChain = findVersionChain(rowId);
if (currentVersionChain == null || currentVersionChain.transactionId()
== null) {
@@ -352,12 +352,12 @@ public abstract class
AbstractPageMemoryMvPartitionStorage implements MvPartitio
/** {@inheritDoc} */
@Override
- public Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, Timestamp
timestamp) throws StorageException {
+ public Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter,
HybridTimestamp timestamp) throws StorageException {
return internalScan(keyFilter, null, timestamp);
}
- private Cursor<BinaryRow> internalScan(Predicate<BinaryRow> keyFilter,
@Nullable UUID transactionId, @Nullable Timestamp timestamp) {
- assert transactionId != null ^ timestamp != null;
+ private Cursor<BinaryRow> internalScan(Predicate<BinaryRow> keyFilter,
@Nullable UUID txId, @Nullable HybridTimestamp timestamp) {
+ assert txId != null ^ timestamp != null;
IgniteCursor<VersionChain> treeCursor;
@@ -367,7 +367,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage
implements MvPartitio
throw new StorageException("Find failed", e);
}
- return new ScanCursor(treeCursor, keyFilter, transactionId, timestamp);
+ return new ScanCursor(treeCursor, keyFilter, txId, timestamp);
}
/** {@inheritDoc} */
@@ -408,7 +408,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage
implements MvPartitio
private final @Nullable UUID transactionId;
- private final @Nullable Timestamp timestamp;
+ private final @Nullable HybridTimestamp timestamp;
private BinaryRow nextRow = null;
@@ -418,7 +418,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage
implements MvPartitio
IgniteCursor<VersionChain> treeCursor,
Predicate<BinaryRow> keyFilter,
@Nullable UUID transactionId,
- @Nullable Timestamp timestamp
+ @Nullable HybridTimestamp timestamp
) {
this.treeCursor = treeCursor;
this.keyFilter = keyFilter;
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/HybridTimestamps.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/HybridTimestamps.java
new file mode 100644
index 0000000000..c756364ce1
--- /dev/null
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/HybridTimestamps.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import static org.apache.ignite.hlc.HybridTimestamp.HYBRID_TIMESTAMP_SIZE;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.getInt;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.getLong;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.putInt;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.putLong;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.hlc.HybridTimestamp;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Code to work with {@link HybridTimestamp}s.
+ */
+public class HybridTimestamps {
+ /**
+ * Physical time component to store for {@code null} hybrid timestamp
values.
+ */
+ private static final long NULL_PHYSICAL_TIME = 0L;
+
+ /**
+ * Logical time component to store for {@code null} hybrid timestamp
values.
+ */
+ private static final int NULL_LOGICAL_TIME = 0;
+
+ /**
+ * Reads a {@link HybridTimestamp} value from memory.
+ *
+ * @param pageAddr Address where page data starts.
+ * @param offset Offset to the timestamp value relative to pageAddr.
+ */
+ static @Nullable HybridTimestamp readTimestamp(long pageAddr, int offset) {
+ long physical = getLong(pageAddr, offset);
+ int logical = getInt(pageAddr, offset + Long.BYTES);
+
+ if (physical == NULL_PHYSICAL_TIME && logical == NULL_LOGICAL_TIME) {
+ return null;
+ }
+
+ return new HybridTimestamp(physical, logical);
+ }
+
+ /**
+ * Writes a {@link HybridTimestamp} to memory starting at the given
address + offset.
+ *
+ * @param addr Memory address.
+ * @param offset Offset added to the address.
+ * @param timestamp The timestamp to write.
+ * @return Number of bytes written.
+ */
+ public static int writeTimestampToMemory(long addr, int offset, @Nullable
HybridTimestamp timestamp) {
+ if (timestamp == null) {
+ putLong(addr, offset, NULL_PHYSICAL_TIME);
+
+ putInt(addr, offset + Long.BYTES, NULL_LOGICAL_TIME);
+ } else {
+ putLong(addr, offset, timestamp.getPhysical());
+
+ putInt(addr, offset + Long.BYTES, timestamp.getLogical());
+ }
+
+ return HYBRID_TIMESTAMP_SIZE;
+ }
+
+ /**
+ * Writes a {@link HybridTimestamp} to a buffer.
+ *
+ * @param buffer Buffer to which to write.
+ * @param timestamp The timestamp to write.
+ */
+ public static void writeTimestampToBuffer(ByteBuffer buffer, @Nullable
HybridTimestamp timestamp) {
+ if (timestamp == null) {
+ buffer.putLong(NULL_PHYSICAL_TIME)
+ .putInt(NULL_LOGICAL_TIME);
+ } else {
+ buffer.putLong(timestamp.getPhysical())
+ .putInt(timestamp.getLogical());
+ }
+ }
+}
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadRowVersion.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadRowVersion.java
index bb6f7bd3cb..3441318ef6 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadRowVersion.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadRowVersion.java
@@ -21,17 +21,17 @@ import static
org.apache.ignite.internal.storage.pagememory.mv.PartitionlessLink
import java.nio.ByteBuffer;
import java.util.function.Predicate;
+import org.apache.ignite.hlc.HybridTimestamp;
import org.apache.ignite.internal.pagememory.datapage.PageMemoryTraversal;
import org.apache.ignite.internal.pagememory.io.DataPagePayload;
import org.apache.ignite.internal.pagememory.util.PageIdUtils;
import org.apache.ignite.internal.schema.ByteBufferRow;
-import org.apache.ignite.internal.tx.Timestamp;
import org.jetbrains.annotations.Nullable;
/**
* Traversal for reading a row version by its link. Loads the version value
conditionally.
*/
-class ReadRowVersion implements PageMemoryTraversal<Predicate<Timestamp>> {
+class ReadRowVersion implements
PageMemoryTraversal<Predicate<HybridTimestamp>> {
private final int partitionId;
private RowVersion result;
@@ -40,7 +40,7 @@ class ReadRowVersion implements
PageMemoryTraversal<Predicate<Timestamp>> {
private long firstFragmentLink;
- private @Nullable Timestamp timestamp;
+ private @Nullable HybridTimestamp timestamp;
private long nextLink;
@@ -52,7 +52,7 @@ class ReadRowVersion implements
PageMemoryTraversal<Predicate<Timestamp>> {
/** {@inheritDoc} */
@Override
- public long consumePagePayload(long link, long pageAddr, DataPagePayload
payload, Predicate<Timestamp> loadValue) {
+ public long consumePagePayload(long link, long pageAddr, DataPagePayload
payload, Predicate<HybridTimestamp> loadValue) {
if (readingFirstSlot) {
readingFirstSlot = false;
@@ -62,10 +62,10 @@ class ReadRowVersion implements
PageMemoryTraversal<Predicate<Timestamp>> {
}
}
- private long readFullOrInitiateReadFragmented(long link, long pageAddr,
DataPagePayload payload, Predicate<Timestamp> loadValue) {
+ private long readFullOrInitiateReadFragmented(long link, long pageAddr,
DataPagePayload payload, Predicate<HybridTimestamp> loadValue) {
firstFragmentLink = link;
- timestamp = Timestamps.readTimestamp(pageAddr, payload.offset() +
RowVersion.TIMESTAMP_OFFSET);
+ timestamp = HybridTimestamps.readTimestamp(pageAddr, payload.offset()
+ RowVersion.TIMESTAMP_OFFSET);
nextLink = readPartitionlessLink(partitionId, pageAddr,
payload.offset() + RowVersion.NEXT_LINK_OFFSET);
if (!loadValue.test(timestamp)) {
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersion.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersion.java
index fc4baf5bf0..2b13ab7ccd 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersion.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersion.java
@@ -17,33 +17,31 @@
package org.apache.ignite.internal.storage.pagememory.mv;
+import static org.apache.ignite.hlc.HybridTimestamp.HYBRID_TIMESTAMP_SIZE;
+
import java.nio.ByteBuffer;
import java.util.Objects;
+import org.apache.ignite.hlc.HybridTimestamp;
import org.apache.ignite.internal.pagememory.Storable;
import org.apache.ignite.internal.pagememory.io.AbstractDataPageIo;
import org.apache.ignite.internal.pagememory.io.IoVersions;
import org.apache.ignite.internal.storage.pagememory.mv.io.RowVersionDataIo;
import org.apache.ignite.internal.tostring.IgniteToStringExclude;
import org.apache.ignite.internal.tostring.S;
-import org.apache.ignite.internal.tx.Timestamp;
import org.jetbrains.annotations.Nullable;
/**
* Represents row version inside row version chain.
*/
-public class RowVersion implements Storable {
- /** A 'timestamp' representing absense of a timestamp. */
- public static final Timestamp NULL_TIMESTAMP = new
Timestamp(Long.MIN_VALUE, Long.MIN_VALUE);
-
+public final class RowVersion implements Storable {
/** Represents an absent partitionless link. */
public static final long NULL_LINK = 0;
- private static final int TIMESTAMP_STORE_SIZE_BYTES = 2 * Long.BYTES;
private static final int NEXT_LINK_STORE_SIZE_BYTES =
PartitionlessLinks.PARTITIONLESS_LINK_SIZE_BYTES;
private static final int VALUE_SIZE_STORE_SIZE_BYTES = Integer.BYTES;
public static final int TIMESTAMP_OFFSET = 0;
- public static final int NEXT_LINK_OFFSET = TIMESTAMP_STORE_SIZE_BYTES;
+ public static final int NEXT_LINK_OFFSET = TIMESTAMP_OFFSET +
HYBRID_TIMESTAMP_SIZE;
public static final int VALUE_SIZE_OFFSET = NEXT_LINK_OFFSET +
NEXT_LINK_STORE_SIZE_BYTES;
public static final int VALUE_OFFSET = VALUE_SIZE_OFFSET +
VALUE_SIZE_STORE_SIZE_BYTES;
@@ -51,7 +49,7 @@ public class RowVersion implements Storable {
private long link;
- private final @Nullable Timestamp timestamp;
+ private final @Nullable HybridTimestamp timestamp;
private final long nextLink;
@@ -70,30 +68,20 @@ public class RowVersion implements Storable {
/**
* Constructor.
*/
- public RowVersion(int partitionId, long link, @Nullable Timestamp
timestamp, long nextLink, @Nullable ByteBuffer value) {
+ public RowVersion(int partitionId, long link, @Nullable HybridTimestamp
timestamp, long nextLink, @Nullable ByteBuffer value) {
this.partitionId = partitionId;
link(link);
- assert !NULL_TIMESTAMP.equals(timestamp) : "Null timestamp provided";
-
this.timestamp = timestamp;
this.nextLink = nextLink;
this.valueSize = value == null ? -1 : value.limit();
this.value = value;
}
- public @Nullable Timestamp timestamp() {
+ public @Nullable HybridTimestamp timestamp() {
return timestamp;
}
- public Timestamp timestampForStorage() {
- return timestampForStorage(timestamp);
- }
-
- static Timestamp timestampForStorage(@Nullable Timestamp timestamp) {
- return timestamp == null ? NULL_TIMESTAMP : timestamp;
- }
-
/**
* Returns partitionless link of the next version or {@code 0} if this
version is the last in the chain (i.e. it's the oldest version).
*/
@@ -126,10 +114,6 @@ public class RowVersion implements Storable {
}
boolean isUncommitted() {
- return isUncommitted(timestamp);
- }
-
- static boolean isUncommitted(Timestamp timestamp) {
return timestamp == null;
}
@@ -160,13 +144,13 @@ public class RowVersion implements Storable {
public int size() {
assert value != null;
- return TIMESTAMP_STORE_SIZE_BYTES + NEXT_LINK_STORE_SIZE_BYTES +
VALUE_SIZE_STORE_SIZE_BYTES + value.limit();
+ return headerSize() + value.limit();
}
/** {@inheritDoc} */
@Override
public int headerSize() {
- return TIMESTAMP_STORE_SIZE_BYTES + NEXT_LINK_STORE_SIZE_BYTES +
VALUE_SIZE_STORE_SIZE_BYTES;
+ return HYBRID_TIMESTAMP_SIZE + NEXT_LINK_STORE_SIZE_BYTES +
VALUE_SIZE_STORE_SIZE_BYTES;
}
/** {@inheritDoc} */
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersionFreeList.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersionFreeList.java
index 835a94e94d..a3a20abb06 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersionFreeList.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersionFreeList.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.storage.pagememory.mv;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.hlc.HybridTimestamp;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.pagememory.PageMemory;
@@ -29,7 +30,6 @@ import org.apache.ignite.internal.pagememory.reuse.ReuseList;
import org.apache.ignite.internal.pagememory.util.PageHandler;
import org.apache.ignite.internal.pagememory.util.PageLockListener;
import org.apache.ignite.internal.storage.pagememory.mv.io.RowVersionDataIo;
-import org.apache.ignite.internal.tx.Timestamp;
import org.apache.ignite.lang.IgniteInternalCheckedException;
import org.jetbrains.annotations.Nullable;
@@ -108,7 +108,7 @@ public class RowVersionFreeList extends
AbstractFreeList<RowVersion> {
* @param newTimestamp timestamp to set
* @throws IgniteInternalCheckedException if something fails
*/
- public void updateTimestamp(long link, Timestamp newTimestamp) throws
IgniteInternalCheckedException {
+ public void updateTimestamp(long link, HybridTimestamp newTimestamp)
throws IgniteInternalCheckedException {
updateDataRow(link, updateTimestampHandler, newTimestamp, statHolder);
}
@@ -122,7 +122,7 @@ public class RowVersionFreeList extends
AbstractFreeList<RowVersion> {
super.removeDataRowByLink(link, statHolder);
}
- private class UpdateTimestampHandler implements PageHandler<Timestamp,
Object> {
+ private class UpdateTimestampHandler implements
PageHandler<HybridTimestamp, Object> {
/** {@inheritDoc} */
@Override
public Object run(
@@ -131,7 +131,7 @@ public class RowVersionFreeList extends
AbstractFreeList<RowVersion> {
long page,
long pageAddr,
PageIo io,
- Timestamp arg,
+ HybridTimestamp arg,
int itemId,
IoStatisticsHolder statHolder
) throws IgniteInternalCheckedException {
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ScanVersionChainByTimestamp.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ScanVersionChainByTimestamp.java
index 83ce3fee7b..569c2086aa 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ScanVersionChainByTimestamp.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ScanVersionChainByTimestamp.java
@@ -19,10 +19,10 @@ package org.apache.ignite.internal.storage.pagememory.mv;
import static
org.apache.ignite.internal.storage.pagememory.mv.PartitionlessLinks.readPartitionlessLink;
+import org.apache.ignite.hlc.HybridTimestamp;
import org.apache.ignite.internal.pagememory.datapage.PageMemoryTraversal;
import org.apache.ignite.internal.pagememory.io.DataPagePayload;
import org.apache.ignite.internal.schema.ByteBufferRow;
-import org.apache.ignite.internal.tx.Timestamp;
import org.jetbrains.annotations.Nullable;
/**
@@ -32,7 +32,7 @@ import org.jetbrains.annotations.Nullable;
* <p>NB: this traversal first traverses starting data slots of the Version
Chain one after another; when it finds the
* version it needs, it switches to traversing the slots comprising the
version (because it might be fragmented).
*/
-class ScanVersionChainByTimestamp implements PageMemoryTraversal<Timestamp> {
+class ScanVersionChainByTimestamp implements
PageMemoryTraversal<HybridTimestamp> {
private final int partitionId;
/** Contains the result when the traversal ends. */
@@ -52,9 +52,9 @@ class ScanVersionChainByTimestamp implements
PageMemoryTraversal<Timestamp> {
/** {@inheritDoc} */
@Override
- public long consumePagePayload(long link, long pageAddr, DataPagePayload
payload, Timestamp timestamp) {
+ public long consumePagePayload(long link, long pageAddr, DataPagePayload
payload, HybridTimestamp timestamp) {
if (lookingForVersion) {
- Timestamp rowVersionTs = Timestamps.readTimestamp(pageAddr,
payload.offset() + RowVersion.TIMESTAMP_OFFSET);
+ HybridTimestamp rowVersionTs =
HybridTimestamps.readTimestamp(pageAddr, payload.offset() +
RowVersion.TIMESTAMP_OFFSET);
if (rowTimestampMatches(rowVersionTs, timestamp)) {
return readFullyOrStartReadingFragmented(link, pageAddr,
payload);
@@ -67,8 +67,8 @@ class ScanVersionChainByTimestamp implements
PageMemoryTraversal<Timestamp> {
}
}
- private boolean rowTimestampMatches(Timestamp rowVersionTs, Timestamp
timestamp) {
- return rowVersionTs != null && rowVersionTs.beforeOrEquals(timestamp);
+ private boolean rowTimestampMatches(HybridTimestamp rowVersionTs,
HybridTimestamp timestamp) {
+ return rowVersionTs != null && rowVersionTs.compareTo(timestamp) <= 0;
}
private long readFullyOrStartReadingFragmented(long link, long pageAddr,
DataPagePayload payload) {
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/Timestamps.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/Timestamps.java
deleted file mode 100644
index bbe6ba61cc..0000000000
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/Timestamps.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.storage.pagememory.mv;
-
-import static org.apache.ignite.internal.pagememory.util.PageUtils.getLong;
-import static org.apache.ignite.internal.pagememory.util.PageUtils.putLong;
-
-import java.nio.ByteBuffer;
-import org.apache.ignite.internal.tx.Timestamp;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Code to work with {@link Timestamp}s.
- */
-public class Timestamps {
- /**
- * Reads a {@link Timestamp} value from memory.
- *
- * @param pageAddr Address where page data starts.
- * @param offset Offset to the timestamp value relative to pageAddr.
- */
- static @Nullable Timestamp readTimestamp(long pageAddr, int offset) {
- long nodeId = getLong(pageAddr, offset);
- long localTimestamp = getLong(pageAddr, offset + Long.BYTES);
-
- Timestamp timestamp = new Timestamp(localTimestamp, nodeId);
- if (timestamp.equals(RowVersion.NULL_TIMESTAMP)) {
- timestamp = null;
- }
-
- return timestamp;
- }
-
- /**
- * Writes a {@link Timestamp} to memory starting at the given address +
offset.
- *
- * @param addr Memory address.
- * @param offset Offset added to the address.
- * @param timestamp The timestamp to write.
- * @return Number of bytes written.
- */
- public static int writeTimestampToMemory(long addr, int offset, @Nullable
Timestamp timestamp) {
- Timestamp timestampForStorage =
RowVersion.timestampForStorage(timestamp);
-
- putLong(addr, offset, timestampForStorage.getNodeId());
- putLong(addr, offset + Long.BYTES, timestampForStorage.getTimestamp());
-
- return 2 * Long.BYTES;
- }
-
- /**
- * Writes a {@link Timestamp} to a buffer.
- *
- * @param buffer Buffer to which to write.
- * @param timestamp The timestamp to write.
- */
- public static void writeTimestampToBuffer(ByteBuffer buffer, @Nullable
Timestamp timestamp) {
- Timestamp timestampForStorage =
RowVersion.timestampForStorage(timestamp);
-
- buffer.putLong(timestampForStorage.getNodeId());
- buffer.putLong(timestampForStorage.getTimestamp());
- }
-}
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/RowVersionDataIo.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/RowVersionDataIo.java
index 3f32cf0464..c7d8846112 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/RowVersionDataIo.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/RowVersionDataIo.java
@@ -23,12 +23,12 @@ import static
org.apache.ignite.internal.pagememory.util.PageUtils.putShort;
import static
org.apache.ignite.internal.storage.pagememory.mv.PartitionlessLinks.writePartitionlessLink;
import java.nio.ByteBuffer;
+import org.apache.ignite.hlc.HybridTimestamp;
import org.apache.ignite.internal.pagememory.io.AbstractDataPageIo;
import org.apache.ignite.internal.pagememory.io.IoVersions;
+import org.apache.ignite.internal.storage.pagememory.mv.HybridTimestamps;
import org.apache.ignite.internal.storage.pagememory.mv.PartitionlessLinks;
import org.apache.ignite.internal.storage.pagememory.mv.RowVersion;
-import org.apache.ignite.internal.storage.pagememory.mv.Timestamps;
-import org.apache.ignite.internal.tx.Timestamp;
import org.apache.ignite.lang.IgniteStringBuilder;
import org.jetbrains.annotations.Nullable;
@@ -61,7 +61,7 @@ public class RowVersionDataIo extends
AbstractDataPageIo<RowVersion> {
putShort(addr, 0, (short) payloadSize);
addr += 2;
- addr += Timestamps.writeTimestampToMemory(addr, 0, row.timestamp());
+ addr += HybridTimestamps.writeTimestampToMemory(addr, 0,
row.timestamp());
addr += writePartitionlessLink(addr, row.nextLink());
@@ -81,7 +81,7 @@ public class RowVersionDataIo extends
AbstractDataPageIo<RowVersion> {
assert row.headerSize() <= payloadSize : "Header must entirely fit
in the first fragment, but header size is "
+ row.headerSize() + " and payload size is " + payloadSize;
- Timestamps.writeTimestampToBuffer(buf, row.timestamp());
+ HybridTimestamps.writeTimestampToBuffer(buf, row.timestamp());
PartitionlessLinks.writeToBuffer(buf, row.nextLink());
@@ -119,10 +119,10 @@ public class RowVersionDataIo extends
AbstractDataPageIo<RowVersion> {
* @param pageSize size of the page
* @param timestamp timestamp to store
*/
- public void updateTimestamp(long pageAddr, int itemId, int pageSize,
@Nullable Timestamp timestamp) {
+ public void updateTimestamp(long pageAddr, int itemId, int pageSize,
@Nullable HybridTimestamp timestamp) {
int payloadOffset = getPayloadOffset(pageAddr, itemId, pageSize, 0);
- Timestamps.writeTimestampToMemory(pageAddr, payloadOffset +
RowVersion.TIMESTAMP_OFFSET, timestamp);
+ HybridTimestamps.writeTimestampToMemory(pageAddr, payloadOffset +
RowVersion.TIMESTAMP_OFFSET, timestamp);
}
/** {@inheritDoc} */
diff --git
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorageTest.java
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorageTest.java
index 60234d4823..1fb6f8ad7c 100644
---
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorageTest.java
+++
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorageTest.java
@@ -28,7 +28,6 @@ import
org.apache.ignite.internal.storage.AbstractMvPartitionStorageTest;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
-import org.apache.ignite.internal.tx.Timestamp;
import org.apache.ignite.internal.util.Cursor;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -83,9 +82,9 @@ abstract class AbstractPageMemoryMvPartitionStorageTest
extends AbstractMvPartit
RowId rowId = insert(longRow, txId);
- commitWrite(rowId, Timestamp.nextVersion());
+ commitWrite(rowId, clock.now());
- BinaryRow foundRow = read(rowId, Timestamp.nextVersion());
+ BinaryRow foundRow = read(rowId, clock.now());
assertRowMatches(foundRow, longRow);
}
@@ -109,7 +108,7 @@ abstract class AbstractPageMemoryMvPartitionStorageTest
extends AbstractMvPartit
RowId rowId = insert(longRow, txId);
- commitWrite(rowId, Timestamp.nextVersion());
+ commitWrite(rowId, clock.now());
try (Cursor<BinaryRow> cursor = storage.scan(row -> true, txId)) {
BinaryRow foundRow = cursor.next();
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
index d7bf03d9e8..327dfa26c0 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
@@ -23,6 +23,7 @@ import static java.nio.ByteOrder.BIG_ENDIAN;
import static java.nio.ByteOrder.LITTLE_ENDIAN;
import static java.util.Arrays.copyOf;
import static java.util.Arrays.copyOfRange;
+import static org.apache.ignite.hlc.HybridTimestamp.HYBRID_TIMESTAMP_SIZE;
import static org.rocksdb.ReadTier.PERSISTED_TIER;
import java.nio.ByteBuffer;
@@ -33,13 +34,13 @@ import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.function.Predicate;
import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.hlc.HybridTimestamp;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.ByteBufferRow;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.TxIdMismatchException;
-import org.apache.ignite.internal.tx.Timestamp;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.GridUnsafe;
@@ -61,16 +62,16 @@ import org.rocksdb.WriteOptions;
* | partId (2 bytes, BE) | rowId (16 bytes, BE) |</code></pre>
* or
* <pre><code>
- * | partId (2 bytes, BE) | rowId (16 bytes, BE) | timestamp (16 bytes, DESC)
|</code></pre>
+ * | partId (2 bytes, BE) | rowId (16 bytes, BE) | timestamp (12 bytes, DESC)
|</code></pre>
* depending on transaction status. Pending transactions data doesn't have a
timestamp assigned.
*
* <p/>BE means Big Endian, meaning that lexicographical bytes order matches a
natural order of partitions.
*
- * <p/>DESC means that timestamps are sorted from newest to oldest (N2O).
Please refer to {@link #putTimestamp(ByteBuffer, Timestamp)} to
- * see how it's achieved. Missing timestamp could be interpreted as a moment
infinitely far away in the future.
+ * <p/>DESC means that timestamps are sorted from newest to oldest (N2O).
Please refer to {@link #putTimestamp(ByteBuffer, HybridTimestamp)}
+ * to see how it's achieved. Missing timestamp could be interpreted as a
moment infinitely far away in the future.
*/
public class RocksDbMvPartitionStorage implements MvPartitionStorage {
- /** Position of row id inside of the key. */
+ /** Position of row id inside the key. */
private static final int ROW_ID_OFFSET = Short.BYTES;
/** UUID size in bytes. */
@@ -79,14 +80,11 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
/** Size of the key without timestamp. */
private static final int ROW_PREFIX_SIZE = ROW_ID_OFFSET + ROW_ID_SIZE;
- /** Timestamp size in bytes. */
- private static final int TIMESTAMP_SIZE = 2 * Long.BYTES;
-
- /** Transaction id size. Matches timestamp size. */
- private static final int TX_ID_SIZE = TIMESTAMP_SIZE;
+ /** Transaction id size. */
+ private static final int TX_ID_SIZE = 2 * Long.BYTES;
/** Maximum size of the key. */
- private static final int MAX_KEY_SIZE = ROW_PREFIX_SIZE + TIMESTAMP_SIZE;
+ private static final int MAX_KEY_SIZE = ROW_PREFIX_SIZE +
HYBRID_TIMESTAMP_SIZE;
/** Thread-local direct buffer instance to read keys from RocksDB. */
private static final ThreadLocal<ByteBuffer> MV_KEY_BUFFER =
withInitial(() -> allocateDirect(MAX_KEY_SIZE).order(BIG_ENDIAN));
@@ -296,7 +294,9 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
// Check concurrent transaction data.
byte[] keyBufArray = keyBuf.array();
- byte[] previousValue = writeBatch.getFromBatchAndDB(db, cf,
readOpts, copyOf(keyBufArray, ROW_PREFIX_SIZE));
+ byte[] keyBytes = copyOf(keyBufArray, ROW_PREFIX_SIZE);
+
+ byte[] previousValue = writeBatch.getFromBatchAndDB(db, cf,
readOpts, keyBytes);
// Previous value must belong to the same transaction.
if (previousValue != null) {
@@ -309,12 +309,13 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
// Write empty value as a tombstone.
if (previousValue != null) {
// Reuse old array with transaction id already written to
it.
- writeBatch.put(cf, copyOf(keyBufArray, ROW_PREFIX_SIZE),
copyOf(previousValue, TX_ID_SIZE));
+ writeBatch.put(cf, keyBytes, copyOf(previousValue,
TX_ID_SIZE));
} else {
- // Use tail of the key buffer to save on array allocations.
- putTransactionId(keyBufArray, ROW_PREFIX_SIZE, txId);
+ byte[] txIdBytes = new byte[TX_ID_SIZE];
+
+ putTransactionId(txIdBytes, 0, txId);
- writeBatch.put(cf, copyOf(keyBufArray, ROW_PREFIX_SIZE),
copyOfRange(keyBufArray, ROW_PREFIX_SIZE, MAX_KEY_SIZE));
+ writeBatch.put(cf, keyBytes, txIdBytes);
}
} else {
writeUnversioned(keyBufArray, row, txId);
@@ -358,7 +359,9 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
try {
- byte[] previousValue = writeBatch.getFromBatchAndDB(db, cf,
readOpts, copyOf(keyBuf.array(), ROW_PREFIX_SIZE));
+ byte[] keyBytes = copyOf(keyBuf.array(), ROW_PREFIX_SIZE);
+
+ byte[] previousValue = writeBatch.getFromBatchAndDB(db, cf,
readOpts, keyBytes);
if (previousValue == null) {
//the chain doesn't contain an uncommitted write intent
@@ -366,7 +369,7 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
}
// Perform unconditional remove for the key without associated
timestamp.
- writeBatch.delete(cf, copyOf(keyBuf.array(), ROW_PREFIX_SIZE));
+ writeBatch.delete(cf, keyBytes);
return wrapValueIntoBinaryRow(previousValue, true);
} catch (RocksDBException e) {
@@ -376,14 +379,16 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
/** {@inheritDoc} */
@Override
- public void commitWrite(RowId rowId, Timestamp timestamp) throws
StorageException {
+ public void commitWrite(RowId rowId, HybridTimestamp timestamp) throws
StorageException {
WriteBatchWithIndex writeBatch = requireWriteBatch();
ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
try {
// Read a value associated with pending write.
- byte[] valueBytes = writeBatch.getFromBatchAndDB(db, cf, readOpts,
copyOf(keyBuf.array(), ROW_PREFIX_SIZE));
+ byte[] uncommittedKeyBytes = copyOf(keyBuf.array(),
ROW_PREFIX_SIZE);
+
+ byte[] valueBytes = writeBatch.getFromBatchAndDB(db, cf, readOpts,
uncommittedKeyBytes);
if (valueBytes == null) {
//the chain doesn't contain an uncommitted write intent
@@ -391,7 +396,7 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
}
// Delete pending write.
- writeBatch.delete(cf, copyOf(keyBuf.array(), ROW_PREFIX_SIZE));
+ writeBatch.delete(cf, uncommittedKeyBytes);
// Add timestamp to the key, and put the value back into the
storage.
putTimestamp(keyBuf, timestamp);
@@ -410,11 +415,11 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
/** {@inheritDoc} */
@Override
- public @Nullable BinaryRow read(RowId rowId, Timestamp timestamp) throws
StorageException {
+ public @Nullable BinaryRow read(RowId rowId, HybridTimestamp timestamp)
throws StorageException {
return read(rowId, timestamp, null);
}
- private @Nullable BinaryRow read(RowId rowId, @Nullable Timestamp
timestamp, @Nullable UUID txId)
+ private @Nullable BinaryRow read(RowId rowId, @Nullable HybridTimestamp
timestamp, @Nullable UUID txId)
throws TxIdMismatchException, StorageException {
assert timestamp == null ^ txId == null;
@@ -499,11 +504,11 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
/** {@inheritDoc} */
@Override
- public Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, Timestamp
timestamp) throws StorageException {
+ public Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter,
HybridTimestamp timestamp) throws StorageException {
return scan(keyFilter, timestamp, null);
}
- private Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, @Nullable
Timestamp timestamp, @Nullable UUID txId)
+ private Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, @Nullable
HybridTimestamp timestamp, @Nullable UUID txId)
throws TxIdMismatchException, StorageException {
assert timestamp == null ^ txId == null;
@@ -516,10 +521,10 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
it.seek(partitionStartPrefix());
// Size of the buffer to seek values. Should fit partition id, row id
and maybe timestamp, if it's not null.
- int seekKeyBufSize = ROW_PREFIX_SIZE + (timestamp == null ? 0 :
TIMESTAMP_SIZE);
+ int seekKeyBufSize = ROW_PREFIX_SIZE + (timestamp == null ? 0 :
HYBRID_TIMESTAMP_SIZE);
// Here's seek buffer itself. Originally it contains a valid partition
id, row id payload that's filled with zeroes, and maybe
- // a timestamp value. Zero row id guarantees that it's
lexicographically less then or equal to any other row id stored in the
+ // a timestamp value. Zero row id guarantees that it's
lexicographically less than or equal to any other row id stored in the
// partition.
// Byte buffer from a thread-local field can't be used here, because
of two reasons:
// - no one guarantees that there will only be a single cursor;
@@ -613,7 +618,7 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
}
// Or iterator may still be valid even if there's no
version for required timestamp. In this case row id
- // itself will be different and we must check it.
+ // itself will be different, and we must check it.
keyLength = it.key(directBuffer.limit(MAX_KEY_SIZE));
valueHasTxId = keyLength == ROW_PREFIX_SIZE;
@@ -676,7 +681,7 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
/** {@inheritDoc} */
@Override
public void close() throws Exception {
- IgniteUtils.closeAll(options, it);
+ IgniteUtils.closeAll(it, options);
}
private void incrementRowId(ByteBuffer buf) {
@@ -698,6 +703,8 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
short partitionId = (short) (1 + buf.getShort(0));
+ assert partitionId != 0;
+
buf.putShort(0, partitionId);
}
};
@@ -807,14 +814,12 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
/**
* Writes a timestamp into a byte buffer, in descending lexicographical
bytes order.
*/
- private static void putTimestamp(ByteBuffer buf, Timestamp ts) {
+ private static void putTimestamp(ByteBuffer buf, HybridTimestamp ts) {
assert buf.order() == BIG_ENDIAN;
- // Two things to note here:
- // - "xor" with a single sign bit makes long values comparable
according to RocksDB convention, where bytes are unsigned.
- // - "bitwise negation" turns ascending order into a descending one.
- buf.putLong(~ts.getTimestamp() ^ (1L << 63));
- buf.putLong(~ts.getNodeId() ^ (1L << 63));
+ // "bitwise negation" turns ascending order into a descending one.
+ buf.putLong(~ts.getPhysical());
+ buf.putInt(~ts.getLogical());
}
private static void putTransactionId(byte[] array, int off, UUID txId) {
@@ -839,7 +844,7 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
if (invalid) {
// Check the status first. This operation is guaranteed to throw
if an internal error has occurred during
- // the iteration. Otherwise we've exhausted the data range.
+ // the iteration. Otherwise, we've exhausted the data range.
try {
it.status();
} catch (RocksDBException e) {
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
index 2496f3c661..ea0dab45e7 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
@@ -130,7 +130,6 @@ public class RocksDbTableStorage implements MvTableStorage {
*/
private volatile Runnable latestFlushClosure;
- //TODO Use it instead of the "stopped" flag.
/** Busy lock to stop synchronously. */
final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
diff --git
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageTest.java
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageTest.java
index 7773862707..eff5fe6465 100644
---
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageTest.java
+++
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageTest.java
@@ -86,11 +86,11 @@ public class RocksDbMvPartitionStorageTest extends
AbstractMvPartitionStorageTes
engine.start();
- table = (RocksDbTableStorage) engine.createMvTable(tableCfg);
+ table = engine.createMvTable(tableCfg);
table.start();
- storage = table.getOrCreateMvPartition(0);
+ storage = table.getOrCreateMvPartition(PARTITION_ID);
}
@AfterEach