This is an automated email from the ASF dual-hosted git repository. ibessonov pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 94b064ab89 IGNITE-21933 Fix TxStateStorage#leaseStartTime possible inconsistency with partition storage (#3551) 94b064ab89 is described below commit 94b064ab89faf6b4d5e933ae37b8093f6673023e Author: Denis Chudov <moongll...@gmail.com> AuthorDate: Thu Apr 11 13:41:28 2024 +0300 IGNITE-21933 Fix TxStateStorage#leaseStartTime possible inconsistency with partition storage (#3551) --- check-rules/spotbugs-excludes.xml | 1 + .../pagememory/persistence/PartitionMeta.java | 47 ++++++++++++++- .../pagememory/persistence/io/PartitionMetaIo.java | 27 +++++++++ .../persistence/PartitionMetaManagerTest.java | 7 ++- .../pagememory/persistence/PartitionMetaTest.java | 2 +- .../persistence/checkpoint/CheckpointerTest.java | 2 +- .../internal/storage/MvPartitionStorage.java | 14 +++++ .../storage/ThreadAssertingMvPartitionStorage.java | 12 ++++ .../storage/AbstractMvPartitionStorageTest.java | 23 ++++++++ .../storage/impl/TestMvPartitionStorage.java | 22 +++++++ .../mv/PersistentPageMemoryMvPartitionStorage.java | 20 +++++++ .../mv/VolatilePageMemoryMvPartitionStorage.java | 28 +++++++++ .../storage/rocksdb/RocksDbMetaStorage.java | 5 ++ .../storage/rocksdb/RocksDbMvPartitionStorage.java | 52 +++++++++++++++- .../rocksdb/instance/SharedRocksDbInstance.java | 2 + .../distributed/raft/PartitionDataStorage.java | 14 +++++ .../table/distributed/raft/PartitionListener.java | 20 +++++-- .../SnapshotAwarePartitionDataStorage.java | 10 ++++ .../distributed/TestPartitionDataStorage.java | 10 ++++ .../state/ThreadAssertingTxStateStorage.java | 14 ----- .../internal/tx/storage/state/TxStateStorage.java | 16 ----- .../state/rocksdb/TxStateRocksDbStorage.java | 69 ---------------------- .../storage/state/AbstractTxStateStorageTest.java | 33 ----------- .../tx/storage/state/test/TestTxStateStorage.java | 22 ------- 24 files changed, 306 insertions(+), 166 deletions(-) diff --git a/check-rules/spotbugs-excludes.xml b/check-rules/spotbugs-excludes.xml index a25c91874a..acf0541bb5 100644 --- a/check-rules/spotbugs-excludes.xml +++ b/check-rules/spotbugs-excludes.xml @@ -214,6 +214,7 @@ <Field name="PARTITION_META_PREFIX"/> <Field name="PARTITION_CONF_PREFIX"/> <Field name="INDEX_ROW_ID_PREFIX"/> + <Field name="LEASE_PREFIX"/> </Or> </Match> <!-- end of false-positive exclusions --> diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PartitionMeta.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PartitionMeta.java index 85e9c8424e..e752116834 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PartitionMeta.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PartitionMeta.java @@ -50,6 +50,8 @@ public class PartitionMeta { private volatile long lastAppliedTerm; + private volatile long leaseStartTime; + private volatile long lastReplicationProtocolGroupConfigFirstPageId; private volatile long rowVersionFreeListRootPageId; @@ -93,7 +95,8 @@ public class PartitionMeta { long versionChainTreeRootPageId, long indexTreeMetaPageId, long gcQueueMetaPageId, - int pageCount + int pageCount, + long leaseStartTime ) { this.lastAppliedIndex = lastAppliedIndex; this.lastAppliedTerm = lastAppliedTerm; @@ -104,6 +107,7 @@ public class PartitionMeta { this.indexTreeMetaPageId = indexTreeMetaPageId; this.gcQueueMetaPageId = gcQueueMetaPageId; this.pageCount = pageCount; + this.leaseStartTime = leaseStartTime; metaSnapshot = new PartitionMetaSnapshot(checkpointId, this); } @@ -126,7 +130,8 @@ public class PartitionMeta { metaIo.getVersionChainTreeRootPageId(pageAddr), metaIo.getIndexTreeMetaPageId(pageAddr), metaIo.getGcQueueMetaPageId(pageAddr), - metaIo.getPageCount(pageAddr) + metaIo.getPageCount(pageAddr), + metaIo.getLeaseStartTime(pageAddr) ); } @@ -318,6 +323,31 @@ public class PartitionMeta { return S.toString(PartitionMeta.class, this); } + /** + * Updates the current lease start time in the storage. + * + * @param checkpointId Checkpoint ID. + * @param leaseStartTime Lease start time. + */ + public void updateLease(@Nullable UUID checkpointId, long leaseStartTime) { + updateSnapshot(checkpointId); + + if (leaseStartTime <= this.leaseStartTime) { + return; + } + + this.leaseStartTime = leaseStartTime; + } + + /** + * Return the start time of the known lease for this replication group. + * + * @return Lease start time. + */ + public long leaseStartTime() { + return leaseStartTime; + } + /** * An immutable snapshot of the partition's meta information. */ @@ -342,6 +372,8 @@ public class PartitionMeta { private final int pageCount; + private final long leaseStartTime; + /** * Private constructor. * @@ -359,6 +391,7 @@ public class PartitionMeta { indexTreeMetaPageId = partitionMeta.indexTreeMetaPageId; gcQueueMetaPageId = partitionMeta.gcQueueMetaPageId; pageCount = partitionMeta.pageCount; + leaseStartTime = partitionMeta.leaseStartTime; } /** @@ -424,6 +457,15 @@ public class PartitionMeta { return pageCount; } + /** + * Returns the lease start time. + * + * @return Lease start time. + */ + public long leaseStartTime() { + return leaseStartTime; + } + /** * Writes the contents of the snapshot to a page of type {@link PartitionMetaIo}. * @@ -440,6 +482,7 @@ public class PartitionMeta { metaIo.setIndexTreeMetaPageId(pageAddr, indexTreeMetaPageId); metaIo.setGcQueueMetaPageId(pageAddr, gcQueueMetaPageId); metaIo.setPageCount(pageAddr, pageCount); + metaIo.setLeaseStartTime(pageAddr, leaseStartTime); } @Override diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/io/PartitionMetaIo.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/io/PartitionMetaIo.java index b402cce0d0..6c64a5dcde 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/io/PartitionMetaIo.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/io/PartitionMetaIo.java @@ -23,6 +23,7 @@ import static org.apache.ignite.internal.pagememory.util.PageUtils.getLong; import static org.apache.ignite.internal.pagememory.util.PageUtils.putInt; import static org.apache.ignite.internal.pagememory.util.PageUtils.putLong; +import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.lang.IgniteStringBuilder; import org.apache.ignite.internal.pagememory.io.IoVersions; import org.apache.ignite.internal.pagememory.io.PageIo; @@ -49,6 +50,8 @@ public class PartitionMetaIo extends PageIo { private static final int PAGE_COUNT_OFF = GC_QUEUE_META_PAGE_ID_OFF + Long.BYTES; + private static final int LEASE_START_TIME_OFF = PAGE_COUNT_OFF + Integer.BYTES; + /** Page IO type. */ public static final short T_TABLE_PARTITION_META_IO = 7; @@ -78,6 +81,7 @@ public class PartitionMetaIo extends PageIo { setIndexTreeMetaPageId(pageAddr, 0); setGcQueueMetaPageId(pageAddr, 0); setPageCount(pageAddr, 0); + setLeaseStartTime(pageAddr, HybridTimestamp.MIN_VALUE.longValue()); } /** @@ -269,6 +273,28 @@ public class PartitionMetaIo extends PageIo { return getInt(pageAddr, PAGE_COUNT_OFF); } + /** + * Sets the lease start time. + * + * @param pageAddr Page address. + * @param leaseStartTime Lease start time. + */ + public void setLeaseStartTime(long pageAddr, long leaseStartTime) { + assertPageType(pageAddr); + + putLong(pageAddr, LEASE_START_TIME_OFF, leaseStartTime); + } + + /** + * Returns the lease start time. + * + * @param pageAddr Page address. + * @return Lease start time. + */ + public long getLeaseStartTime(long pageAddr) { + return getLong(pageAddr, LEASE_START_TIME_OFF); + } + /** {@inheritDoc} */ @Override protected void printPage(long addr, int pageSize, IgniteStringBuilder sb) { @@ -282,6 +308,7 @@ public class PartitionMetaIo extends PageIo { .app("indexTreeMetaPageId=").appendHex(getIndexTreeMetaPageId(addr)).nl() .app("gcQueueMetaPageId=").appendHex(getGcQueueMetaPageId(addr)).nl() .app("pageCount=").app(getPageCount(addr)).nl() + .app("leaseStartTime=").app(getLeaseStartTime(addr)).nl() .app(']'); } } diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PartitionMetaManagerTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PartitionMetaManagerTest.java index d511d47737..415cfc09b0 100644 --- a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PartitionMetaManagerTest.java +++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PartitionMetaManagerTest.java @@ -34,6 +34,7 @@ import java.nio.file.Path; import java.util.UUID; import org.apache.ignite.internal.fileio.FileIo; import org.apache.ignite.internal.fileio.RandomAccessFileIoFactory; +import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.pagememory.io.PageIoRegistry; import org.apache.ignite.internal.pagememory.persistence.store.DeltaFilePageStoreIo; import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore; @@ -106,6 +107,7 @@ public class PartitionMetaManagerTest extends BaseIgniteAbstractTest { assertEquals(0, meta.versionChainTreeRootPageId()); assertEquals(0, meta.rowVersionFreeListRootPageId()); assertEquals(1, meta.pageCount()); + assertEquals(HybridTimestamp.MIN_VALUE.longValue(), meta.leaseStartTime()); // Change the meta and write it to the file. meta.lastApplied(null, 50, 10); @@ -113,6 +115,7 @@ public class PartitionMetaManagerTest extends BaseIgniteAbstractTest { meta.versionChainTreeRootPageId(null, 300); meta.rowVersionFreeListRootPageId(null, 900); meta.incrementPageCount(null); + meta.updateLease(null, 500); manager.writeMetaToBuffer(partId, meta.metaSnapshot(UUID.randomUUID()), buffer); @@ -133,13 +136,14 @@ public class PartitionMetaManagerTest extends BaseIgniteAbstractTest { assertEquals(300, meta.versionChainTreeRootPageId()); assertEquals(900, meta.rowVersionFreeListRootPageId()); assertEquals(2, meta.pageCount()); + assertEquals(500, meta.leaseStartTime()); } // Check with delta file. try (FilePageStore filePageStore = createFilePageStore(testFilePath)) { manager.writeMetaToBuffer( partId, - new PartitionMeta(UUID.randomUUID(), 100, 10, 34, 900, 500, 300, 200, 400, 4).metaSnapshot(null), + new PartitionMeta(UUID.randomUUID(), 100, 10, 34, 900, 500, 300, 200, 400, 4, 1000).metaSnapshot(null), buffer.rewind() ); @@ -163,6 +167,7 @@ public class PartitionMetaManagerTest extends BaseIgniteAbstractTest { assertEquals(200, meta.indexTreeMetaPageId()); assertEquals(400, meta.gcQueueMetaPageId()); assertEquals(4, meta.pageCount()); + assertEquals(1000, meta.leaseStartTime()); } // Let's check the broken CRC. diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PartitionMetaTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PartitionMetaTest.java index b42cc3dd14..6d252dfd09 100644 --- a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PartitionMetaTest.java +++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PartitionMetaTest.java @@ -130,7 +130,7 @@ public class PartitionMetaTest { void testSnapshot() { UUID checkpointId = null; - PartitionMeta meta = new PartitionMeta(checkpointId, 0, 0, 0, 0, 0, 0, 0, 0, 0); + PartitionMeta meta = new PartitionMeta(checkpointId, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0); checkSnapshot(meta.metaSnapshot(checkpointId), 0, 0, 0, 0, 0, 0); checkSnapshot(meta.metaSnapshot(checkpointId = UUID.randomUUID()), 0, 0, 0, 0, 0, 0); diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java index f6b09cfed5..d8c32e1947 100644 --- a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java +++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java @@ -359,7 +359,7 @@ public class CheckpointerTest extends BaseIgniteAbstractTest { partitionMetaManager.addMeta( new GroupPartitionId(0, 0), - new PartitionMeta(null, 0, 0, 0, 0, 0, 0, 0, 0, 3) + new PartitionMeta(null, 0, 0, 0, 0, 0, 0, 0, 0, 3, 0) ); FilePageStore filePageStore = mock(FilePageStore.class); diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java index dee4d6b8e9..3c624a8a7b 100644 --- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java +++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java @@ -271,6 +271,20 @@ public interface MvPartitionStorage extends ManuallyCloseable { @Deprecated long rowsCount() throws StorageException; + /** + * Updates the current lease start time in the storage. + * + * @param leaseStartTime Lease start time. + */ + void updateLease(long leaseStartTime); + + /** + * Return the start time of the known lease for this replication group. + * + * @return Lease start time. + */ + long leaseStartTime(); + /** * Closes the storage. * diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingMvPartitionStorage.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingMvPartitionStorage.java index 701ff5cf04..8ef1b14aff 100644 --- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingMvPartitionStorage.java +++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingMvPartitionStorage.java @@ -161,6 +161,18 @@ public class ThreadAssertingMvPartitionStorage implements MvPartitionStorage, Wr return partitionStorage.rowsCount(); } + @Override + public void updateLease(long leaseStartTime) { + assertThreadAllowsToWrite(); + + partitionStorage.updateLease(leaseStartTime); + } + + @Override + public long leaseStartTime() { + return partitionStorage.leaseStartTime(); + } + @Override public void close() { partitionStorage.close(); diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java index ba3891f8ed..dc554fac03 100644 --- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java +++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java @@ -1353,6 +1353,29 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvPartitionStor assertThat(returnedConfig, is(equalTo(secondConfig))); } + @Test + public void testLease() { + storage.runConsistently(locker -> { + long lst0 = 1000; + + long lst1 = 2000; + + storage.updateLease(lst0); + + assertEquals(lst0, storage.leaseStartTime()); + + storage.updateLease(lst1); + + assertEquals(lst1, storage.leaseStartTime()); + + storage.updateLease(0); + + assertEquals(lst1, storage.leaseStartTime()); + + return null; + }); + } + /** * Returns row id that is lexicographically smaller (by the value of one) than the argument. * diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java index 6c536e3118..3036127766 100644 --- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java +++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java @@ -65,6 +65,8 @@ public class TestMvPartitionStorage implements MvPartitionStorage { private volatile long lastAppliedTerm; + private volatile long leaseStartTime = HybridTimestamp.MIN_VALUE.longValue(); + private volatile byte @Nullable [] groupConfig; final int partitionId; @@ -619,6 +621,24 @@ public class TestMvPartitionStorage implements MvPartitionStorage { return map.size(); } + @Override + public void updateLease(long leaseStartTime) { + checkStorageClosed(); + + if (leaseStartTime <= this.leaseStartTime) { + return; + } + + this.leaseStartTime = leaseStartTime; + } + + @Override + public long leaseStartTime() { + checkStorageClosed(); + + return leaseStartTime; + } + @Override public void close() { closed = true; @@ -650,6 +670,8 @@ public class TestMvPartitionStorage implements MvPartitionStorage { lastAppliedIndex = 0; lastAppliedTerm = 0; groupConfig = null; + + leaseStartTime = HybridTimestamp.MIN_VALUE.longValue(); } private void checkStorageClosed() { diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java index af3f7a2753..f7c802d671 100644 --- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java +++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java @@ -299,6 +299,26 @@ public class PersistentPageMemoryMvPartitionStorage extends AbstractPageMemoryMv }); } + @Override + public void updateLease(long leaseStartTime) { + busy(() -> { + throwExceptionIfStorageNotInRunnableState(); + + updateMeta((lastCheckpointId, meta) -> meta.updateLease(lastCheckpointId, leaseStartTime)); + + return null; + }); + } + + @Override + public long leaseStartTime() { + return busy(() -> { + throwExceptionIfStorageNotInRunnableState(); + + return meta.leaseStartTime(); + }); + } + private void committedGroupConfigurationBusy(byte[] groupConfigBytes) { updateMeta((lastCheckpointId, meta) -> { replicationProtocolGroupConfigReadWriteLock.writeLock().lock(); diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java index 7ac5f39e26..a44c904dd4 100644 --- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java +++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java @@ -61,6 +61,9 @@ public class VolatilePageMemoryMvPartitionStorage extends AbstractPageMemoryMvPa /** Last applied term value. */ private volatile long lastAppliedTerm; + /** Lease start time. */ + private volatile long leaseStartTime; + /** Last group configuration. */ private volatile byte @Nullable [] groupConfig; @@ -188,6 +191,30 @@ public class VolatilePageMemoryMvPartitionStorage extends AbstractPageMemoryMvPa }); } + @Override + public void updateLease(long leaseStartTime) { + busy(() -> { + throwExceptionIfStorageNotInRunnableState(); + + if (leaseStartTime <= this.leaseStartTime) { + return null; + } + + this.leaseStartTime = leaseStartTime; + + return null; + }); + } + + @Override + public long leaseStartTime() { + return busy(() -> { + throwExceptionIfStorageNotInRunnableState(); + + return leaseStartTime; + }); + } + @Override public void lastAppliedOnRebalance(long lastAppliedIndex, long lastAppliedTerm) { throwExceptionIfStorageNotInProgressOfRebalance(state.get(), this::createStorageInfo); @@ -212,6 +239,7 @@ public class VolatilePageMemoryMvPartitionStorage extends AbstractPageMemoryMvPa lastAppliedIndex = 0; lastAppliedTerm = 0; groupConfig = null; + leaseStartTime = HybridTimestamp.MIN_VALUE.longValue(); return destroyFuture; } diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMetaStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMetaStorage.java index 72e71e6c56..2b2b3d035e 100644 --- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMetaStorage.java +++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMetaStorage.java @@ -54,6 +54,11 @@ public class RocksDbMetaStorage { */ public static final byte[] INDEX_ROW_ID_PREFIX = {2}; + /** + * Prefix to store lease start time. Key format is {@code [prefix, tableId, partitionId]} in BE. + */ + public static final byte[] LEASE_PREFIX = {3}; + private final ColumnFamily metaColumnFamily; public RocksDbMetaStorage(ColumnFamily metaColumnFamily) { diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java index cdf7b881d0..0502432ff3 100644 --- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java +++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java @@ -22,6 +22,7 @@ import static java.nio.ByteBuffer.allocate; import static java.nio.ByteBuffer.allocateDirect; import static java.util.Arrays.copyOf; import static java.util.Arrays.copyOfRange; +import static org.apache.ignite.internal.lang.IgniteStringFormatter.format; import static org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.MAX_KEY_SIZE; import static org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.MV_KEY_BUFFER; import static org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.ROW_ID_OFFSET; @@ -32,6 +33,7 @@ import static org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.VAL import static org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.deserializeRow; import static org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.putTimestampDesc; import static org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.readTimestampDesc; +import static org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.LEASE_PREFIX; import static org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.PARTITION_CONF_PREFIX; import static org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.PARTITION_META_PREFIX; import static org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.KEY_BYTE_ORDER; @@ -43,6 +45,7 @@ import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptio import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageInProgressOfRebalance; import static org.apache.ignite.internal.storage.util.StorageUtils.transitionToTerminalState; import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY; +import static org.apache.ignite.internal.util.ByteUtils.putLongToBytes; import java.nio.ByteBuffer; import java.nio.ByteOrder; @@ -53,7 +56,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import org.apache.ignite.internal.hlc.HybridTimestamp; -import org.apache.ignite.internal.lang.IgniteStringFormatter; import org.apache.ignite.internal.rocksdb.RocksIteratorAdapter; import org.apache.ignite.internal.rocksdb.RocksUtils; import org.apache.ignite.internal.schema.BinaryRow; @@ -147,12 +149,18 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage { /** Key to store group config in meta. */ private final byte[] lastGroupConfigKey; + /** Key to store the lease start time. */ + private final byte[] leaseKey; + /** On-heap-cached last applied index value. */ private volatile long lastAppliedIndex; /** On-heap-cached last applied term value. */ private volatile long lastAppliedTerm; + /** On-heap-cached lease start time value. */ + private volatile long leaseStartTime; + /** On-heap-cached last committed group configuration. */ private volatile byte @Nullable [] lastGroupConfig; @@ -182,6 +190,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage { lastAppliedIndexAndTermKey = createKey(PARTITION_META_PREFIX, tableId, partitionId); lastGroupConfigKey = createKey(PARTITION_CONF_PREFIX, tableId, partitionId); + leaseKey = createKey(LEASE_PREFIX, tableId, partitionId); try { byte[] indexAndTerm = db.get(meta, readOpts, lastAppliedIndexAndTermKey); @@ -191,6 +200,13 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage { lastAppliedTerm = buf == null ? 0 : buf.getLong(); lastGroupConfig = db.get(meta, readOpts, lastGroupConfigKey); + + byte[] leaseStartTimeBytes = db.get(meta, readOpts, leaseKey); + ByteBuffer leaseStartTimeBuf = leaseStartTimeBytes == null + ? null + : ByteBuffer.wrap(leaseStartTimeBytes).order(ByteOrder.LITTLE_ENDIAN); + + leaseStartTime = leaseStartTimeBuf == null ? HybridTimestamp.MIN_VALUE.longValue() : leaseStartTimeBuf.getLong(); } catch (RocksDBException e) { throw new StorageException(e); } @@ -980,12 +996,43 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage { }); } + @Override + public void updateLease(long leaseStartTime) { + busy(() -> { + if (leaseStartTime <= this.leaseStartTime) { + return null; + } + + AbstractWriteBatch writeBatch = PartitionDataHelper.requireWriteBatch(); + + try { + byte[] leaseBytes = new byte[Long.BYTES]; + + putLongToBytes(leaseStartTime, leaseBytes, 0); + + writeBatch.put(meta, leaseKey, leaseBytes); + + this.leaseStartTime = leaseStartTime; + } catch (RocksDBException e) { + throw new StorageException(e); + } + + return null; + }); + } + + @Override + public long leaseStartTime() { + return busy(() -> leaseStartTime); + } + /** * Deletes partition data from the storage, using write batch to perform the operation. */ void destroyData(WriteBatch writeBatch) throws RocksDBException { writeBatch.delete(meta, lastAppliedIndexAndTermKey); writeBatch.delete(meta, lastGroupConfigKey); + writeBatch.delete(meta, leaseKey); writeBatch.deleteRange(helper.partCf, helper.partitionStartPrefix(), helper.partitionEndPrefix()); @@ -1430,7 +1477,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage { * Creates a summary info of the storage in the format "table=user, partitionId=1". */ String createStorageInfo() { - return IgniteStringFormatter.format("tableId={}, partitionId={}", tableStorage.getTableId(), partitionId); + return format("tableId={}, partitionId={}", tableStorage.getTableId(), partitionId); } /** @@ -1498,6 +1545,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage { writeBatch.delete(meta, lastGroupConfigKey); writeBatch.deleteRange(helper.partCf, helper.partitionStartPrefix(), helper.partitionEndPrefix()); + writeBatch.delete(meta, leaseKey); gc.deleteQueue(writeBatch); } diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstance.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstance.java index 26dad1528e..e66e1b84c8 100644 --- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstance.java +++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstance.java @@ -21,6 +21,7 @@ import static java.util.stream.Collectors.toList; import static org.apache.ignite.internal.rocksdb.RocksUtils.incrementPrefix; import static org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.toStringName; import static org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.INDEX_ROW_ID_PREFIX; +import static org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.LEASE_PREFIX; import static org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.PARTITION_CONF_PREFIX; import static org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.PARTITION_META_PREFIX; import static org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.KEY_BYTE_ORDER; @@ -349,6 +350,7 @@ public final class SharedRocksDbInstance { deleteByPrefix(writeBatch, meta.columnFamily(), metaPrefix(PARTITION_META_PREFIX, tableIdBytes)); deleteByPrefix(writeBatch, meta.columnFamily(), metaPrefix(PARTITION_CONF_PREFIX, tableIdBytes)); deleteByPrefix(writeBatch, meta.columnFamily(), metaPrefix(INDEX_ROW_ID_PREFIX, tableIdBytes)); + deleteByPrefix(writeBatch, meta.columnFamily(), metaPrefix(LEASE_PREFIX, tableIdBytes)); var cfsToRemove = new ArrayList<ColumnFamily>(); diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java index 6da31e9048..c7e48594bd 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java @@ -215,4 +215,18 @@ public interface PartitionDataStorage extends ManuallyCloseable { * @see MvPartitionStorage#vacuum(GcEntry) */ @Nullable BinaryRow vacuum(GcEntry entry); + + /** + * Updates the current lease start time in the storage. + * + * @param leaseStartTime Lease start time. + */ + void updateLease(long leaseStartTime); + + /** + * Return the start time of the known lease for this replication group. + * + * @return Lease start time. + */ + long leaseStartTime(); } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java index 38fde7b47c..32aaa5c708 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java @@ -273,8 +273,10 @@ public class PartitionListener implements RaftGroupListener, BeforeApplyHandler if (cmd.leaseStartTime() != null) { long leaseStartTime = requireNonNull(cmd.leaseStartTime(), "Inconsistent lease information in command [cmd=" + cmd + "]."); - if (leaseStartTime != txStateStorage.leaseStartTime()) { - return new UpdateCommandResult(false, txStateStorage.leaseStartTime()); + long storageLeaseStartTime = storage.leaseStartTime(); + + if (leaseStartTime != storageLeaseStartTime) { + return new UpdateCommandResult(false, storageLeaseStartTime); } } @@ -324,8 +326,10 @@ public class PartitionListener implements RaftGroupListener, BeforeApplyHandler if (cmd.leaseStartTime() != null) { long leaseStartTime = requireNonNull(cmd.leaseStartTime(), "Inconsistent lease information in command [cmd=" + cmd + "]."); - if (leaseStartTime != txStateStorage.leaseStartTime()) { - return new UpdateCommandResult(false, txStateStorage.leaseStartTime()); + long storageLeaseStartTime = storage.leaseStartTime(); + + if (leaseStartTime != storageLeaseStartTime) { + return new UpdateCommandResult(false, storageLeaseStartTime); } } @@ -626,7 +630,13 @@ public class PartitionListener implements RaftGroupListener, BeforeApplyHandler return; } - txStateStorage.updateLease(cmd.leaseStartTime(), commandIndex, commandTerm); + storage.runConsistently(locker -> { + storage.updateLease(cmd.leaseStartTime()); + + storage.lastApplied(commandIndex, commandTerm); + + return null; + }); } private static void onTxStateStorageCasFail(UUID txId, TxMeta txMetaBeforeCas, TxMeta txMetaToSet) { diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java index f46d0a01c0..d4949dc79e 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java @@ -225,4 +225,14 @@ public class SnapshotAwarePartitionDataStorage implements PartitionDataStorage { public @Nullable BinaryRow vacuum(GcEntry entry) { return partitionStorage.vacuum(entry); } + + @Override + public void updateLease(long leaseStartTime) { + partitionStorage.updateLease(leaseStartTime); + } + + @Override + public long leaseStartTime() { + return partitionStorage.leaseStartTime(); + } } diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java index 79fd8cea72..16483d6396 100644 --- a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java +++ b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java @@ -157,4 +157,14 @@ public class TestPartitionDataStorage implements PartitionDataStorage { public @Nullable BinaryRow vacuum(GcEntry entry) { return partitionStorage.vacuum(entry); } + + @Override + public void updateLease(long leaseStartTime) { + partitionStorage.updateLease(leaseStartTime); + } + + @Override + public long leaseStartTime() { + return partitionStorage.leaseStartTime(); + } } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/ThreadAssertingTxStateStorage.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/ThreadAssertingTxStateStorage.java index e3d1cd90fe..9e8beb5f08 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/ThreadAssertingTxStateStorage.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/ThreadAssertingTxStateStorage.java @@ -143,18 +143,4 @@ public class ThreadAssertingTxStateStorage implements TxStateStorage { return storage.clear(); } - - @Override - public void updateLease(long leaseStartTime, long commandIndex, long commandTerm) { - assertThreadAllowsToWrite(); - - storage.updateLease(leaseStartTime, commandIndex, commandTerm); - } - - @Override - public long leaseStartTime() { - assertThreadAllowsToRead(); - - return storage.leaseStartTime(); - } } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateStorage.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateStorage.java index 546fcc59d0..42ad6278f6 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateStorage.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateStorage.java @@ -206,20 +206,4 @@ public interface TxStateStorage extends ManuallyCloseable { * another reason. */ CompletableFuture<Void> clear(); - - /** - * Updates the current lease start time in the storage. - * - * @param leaseStartTime Lease start time. - * @param commandIndex New value for {@link #lastAppliedIndex()}. - * @param commandTerm New value for {@link #lastAppliedTerm()}. - */ - void updateLease(long leaseStartTime, long commandIndex, long commandTerm); - - /** - * Return the start time of the known lease for this replication group. - * - * @return Lease start time. - */ - long leaseStartTime(); } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java index 643685c8cd..563f28668a 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java @@ -30,7 +30,6 @@ import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_STATE_STORAGE_R import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_STATE_STORAGE_STOPPED_ERR; import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; import java.util.Objects; import java.util.Set; import java.util.UUID; @@ -79,8 +78,6 @@ public class TxStateRocksDbStorage implements TxStateStorage { /** Database key for the last applied index+term. */ private final byte[] lastAppliedIndexAndTermKey; - private final byte[] leaseKey; - /** Shared TX state storage. */ private final TxStateRocksDbSharedStorage sharedStorage; @@ -93,8 +90,6 @@ public class TxStateRocksDbStorage implements TxStateStorage { /** On-heap-cached last applied term value. */ private volatile long lastAppliedTerm; - private volatile long leaseStartTime; - /** Current state of the storage. */ private final AtomicReference<StorageState> state = new AtomicReference<>(StorageState.RUNNABLE); @@ -116,7 +111,6 @@ public class TxStateRocksDbStorage implements TxStateStorage { .putInt(tableId) .putShort((short) partitionId) .array(); - this.leaseKey = ("lease_" + tableId + '_' + partitionId).getBytes(StandardCharsets.UTF_8); } /** @@ -133,12 +127,6 @@ public class TxStateRocksDbStorage implements TxStateStorage { lastAppliedTerm = bytesToLong(indexAndTermBytes, Long.BYTES); } - byte[] leaseBytes = readLease(sharedStorage.readOptions); - - if (leaseBytes != null) { - leaseStartTime = bytesToLong(leaseBytes); - } - return null; }); } @@ -392,18 +380,6 @@ public class TxStateRocksDbStorage implements TxStateStorage { } } - private byte @Nullable [] readLease(ReadOptions readOptions) { - try { - return sharedStorage.db().get(readOptions, leaseKey); - } catch (RocksDBException e) { - throw new IgniteInternalException( - TX_STATE_STORAGE_ERR, - format("Failed to read a lease from a storage: [{}]", createStorageInfo()), - e - ); - } - } - @Override public void destroy() { if (!tryToCloseStorageAndResources()) { @@ -414,7 +390,6 @@ public class TxStateRocksDbStorage implements TxStateStorage { clearStorageData(writeBatch); writeBatch.delete(lastAppliedIndexAndTermKey); - writeBatch.delete(leaseKey); sharedStorage.db().write(sharedStorage.writeOptions, writeBatch); } catch (Exception e) { @@ -494,15 +469,12 @@ public class TxStateRocksDbStorage implements TxStateStorage { clearStorageData(writeBatch); writeBatch.delete(lastAppliedIndexAndTermKey); - writeBatch.delete(leaseKey); sharedStorage.db().write(sharedStorage.writeOptions, writeBatch); lastAppliedIndex = 0; lastAppliedTerm = 0; - leaseStartTime = Long.MIN_VALUE; - state.set(StorageState.RUNNABLE); } catch (Exception e) { throw new IgniteInternalException( @@ -571,47 +543,6 @@ public class TxStateRocksDbStorage implements TxStateStorage { } } - @Override - public void updateLease(long leaseStartTime, long commandIndex, long commandTerm) { - busy(() -> { - if (leaseStartTime <= this.leaseStartTime) { - return null; - } - - try (WriteBatch writeBatch = new WriteBatch()) { - byte[] leaseBytes = new byte[Long.BYTES]; - - putLongToBytes(leaseStartTime, leaseBytes, 0); - - writeBatch.put(leaseKey, leaseBytes); - - this.leaseStartTime = leaseStartTime; - - // If the store is in the process of rebalancing, then there is no need to update lastAppliedIndex and lastAppliedTerm. - // This is necessary to prevent a situation where, in the middle of the rebalance, the node will be restarted and we will - // have non-consistent storage. They will be updated by either #abortRebalance() or #finishRebalance(long, long). - if (state.get() != StorageState.REBALANCE) { - updateLastApplied(writeBatch, commandIndex, commandTerm); - } - - sharedStorage.db().write(sharedStorage.writeOptions, writeBatch); - } catch (RocksDBException e) { - throw new IgniteInternalException( - TX_STATE_STORAGE_ERR, - format("Failed update lease in a storage: [{}]", createStorageInfo()), - e - ); - } - - return null; - }); - } - - @Override - public long leaseStartTime() { - return leaseStartTime; - } - private void clearStorageData(WriteBatch writeBatch) throws RocksDBException { writeBatch.deleteRange(partitionStartPrefix(), partitionEndPrefix()); } diff --git a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/AbstractTxStateStorageTest.java b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/AbstractTxStateStorageTest.java index dc457fa985..6015ee95f1 100644 --- a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/AbstractTxStateStorageTest.java +++ b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/AbstractTxStateStorageTest.java @@ -410,39 +410,6 @@ public abstract class AbstractTxStateStorageTest extends BaseIgniteAbstractTest } } - @Test - void testLeases() { - TxStateStorage storage0 = tableStorage.getOrCreateTxStateStorage(0); - - long lst0 = 1000; - - long lst1 = 2000; - - storage0.updateLease(lst0, 1, 1); - - assertEquals(lst0, storage0.leaseStartTime()); - assertEquals(1, storage0.lastAppliedIndex()); - assertEquals(1, storage0.lastAppliedTerm()); - - storage0.updateLease(lst1, 2, 2); - - assertEquals(lst1, storage0.leaseStartTime()); - assertEquals(2, storage0.lastAppliedIndex()); - assertEquals(2, storage0.lastAppliedTerm()); - - // Storage update isn't expected because 100 < 2000 - storage0.updateLease(100, 3, 2); - assertEquals(lst1, storage0.leaseStartTime()); - assertEquals(2, storage0.lastAppliedIndex()); - assertEquals(2, storage0.lastAppliedTerm()); - - // Storage update isn't expected because 100 < 2000 - storage0.updateLease(100, 1, 1); - assertEquals(lst1, storage0.leaseStartTime()); - assertEquals(2, storage0.lastAppliedIndex()); - assertEquals(2, storage0.lastAppliedTerm()); - } - private static void checkStorageIsEmpty(TxStateStorage storage) { try (Cursor<IgniteBiTuple<UUID, TxMeta>> scan = storage.scan()) { assertThat(scan.stream().collect(toList()), is(empty())); diff --git a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/test/TestTxStateStorage.java b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/test/TestTxStateStorage.java index 8d34592d16..689bd0be80 100644 --- a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/test/TestTxStateStorage.java +++ b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/test/TestTxStateStorage.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.tx.storage.state.test; import static java.util.stream.Collectors.toList; -import static org.apache.ignite.internal.hlc.HybridTimestamp.MIN_VALUE; import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_STATE_STORAGE_REBALANCE_ERR; import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_STATE_STORAGE_STOPPED_ERR; @@ -52,13 +51,6 @@ public class TestTxStateStorage implements TxStateStorage { private volatile boolean closed; - private volatile long leaseStartTime; - - public TestTxStateStorage() { - // Write a timestamp of default start time of TestReplicaMetaImpl to the storage, in order to use it with TestPlacementDriver. - updateLease(MIN_VALUE.longValue(), 0, 0); - } - @Override @Nullable public TxMeta get(UUID txId) { @@ -250,20 +242,6 @@ public class TestTxStateStorage implements TxStateStorage { return nullCompletedFuture(); } - @Override - public void updateLease(long leaseStartTime, long commandIndex, long commandTerm) { - if (leaseStartTime > this.leaseStartTime) { - this.leaseStartTime = leaseStartTime; - this.lastAppliedIndex = commandIndex; - this.lastAppliedTerm = commandTerm; - } - } - - @Override - public long leaseStartTime() { - return leaseStartTime; - } - private void checkStorageInProgreesOfRebalance() { if (rebalanceFutureReference.get() != null) { throwRebalanceInProgressException();