tkalkirill commented on code in PR #937:
URL: https://github.com/apache/ignite-3/pull/937#discussion_r920785270
##########
modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/CommandClosure.java:
##########
@@ -28,6 +28,14 @@
* @see RaftGroupListener
*/
public interface CommandClosure<R extends Command> {
+ /**
+ * Corresponding log index of the command. Present for write commands only.
+ * Returns {@code 0} for read commands.
+ */
+ default long index() {
Review Comment:
What about the name **appliedIndex**?
##########
modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java:
##########
@@ -63,7 +63,7 @@ public FakeInternalTable(String tableName, UUID tableId) {
/** {@inheritDoc} */
@Override
- public @NotNull TableStorage storage() {
+ public @NotNull MvTableStorage storage() {
Review Comment:
```suggestion
public MvTableStorage storage() {
```
##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java:
##########
@@ -30,6 +31,21 @@
* <p>Each MvPartitionStorage instance represents exactly one partition.
*/
public interface MvPartitionStorage extends AutoCloseable {
+ /**
+ * Last known replicator index. {@code 0} if index is unknown.
+ */
+ long appliedIndex();
+
+ /**
+ * Sets the last known replicator index.
+ */
+ void appliedIndex(long appliedIndex) throws StorageException;
+
+ /**
+ * {@link #appliedIndex()} value consistent with the data, already
persisted on the storage.
+ */
+ long persistedIndex();
Review Comment:
What about the name **persistedAppliedIndex**?
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java:
##########
@@ -489,9 +491,17 @@ public boolean hasNext() {
public CommandClosure<WriteCommand> next() {
@Nullable CommandClosure<WriteCommand> done =
(CommandClosure<WriteCommand>) iter.done();
ByteBuffer data = iter.getData();
- WriteCommand command =
JDKMarshaller.DEFAULT.unmarshall(data.array());
+
+ WriteCommand command = done == null ?
JDKMarshaller.DEFAULT.unmarshall(data.array()) : done.command();
+
+ long index = iter.getIndex();
return new CommandClosure<>() {
+ @Override
+ public long index() {
+ return index;
+ }
Review Comment:
```suggestion
return new CommandClosure<>() {
/** {@inheritDoc} */
@Override
public long index() {
return index;
}
```
##########
modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/TestMvPartitionStorage.java:
##########
@@ -74,6 +78,23 @@ public boolean notContainsWriteIntent() {
}
}
+ @Override
Review Comment:
```suggestion
/** {@inheritDoc} */
@Override
```
##########
modules/storage-api/src/test/java/org/apache/ignite/internal/storage/chm/TestMvTableStorage.java:
##########
@@ -65,6 +64,11 @@ public CompletableFuture<?> destroyPartition(int
partitionId) throws StorageExce
return CompletableFuture.completedFuture(null);
}
+ @Override
Review Comment:
```suggestion
/** {@inheritDoc} */
@Override
```
##########
modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/TestMvPartitionStorage.java:
##########
@@ -74,6 +78,23 @@ public boolean notContainsWriteIntent() {
}
}
+ @Override
+ public long appliedIndex() {
+ return appliedIndex;
+ }
+
+ @Override
+ public void appliedIndex(long appliedIndex) throws StorageException {
+ assert appliedIndex > this.appliedIndex;
+
+ this.appliedIndex = appliedIndex;
+ }
+
+ @Override
Review Comment:
```suggestion
/** {@inheritDoc} */
@Override
```
##########
modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/TestMvPartitionStorage.java:
##########
@@ -74,6 +78,23 @@ public boolean notContainsWriteIntent() {
}
}
+ @Override
+ public long appliedIndex() {
+ return appliedIndex;
+ }
+
+ @Override
Review Comment:
```suggestion
/** {@inheritDoc} */
@Override
```
##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PageMemoryMvPartitionStorage.java:
##########
@@ -123,6 +132,21 @@ private VersionChainTree createVersionChainTree(
);
}
+ @Override
+ public long appliedIndex() {
+ return appliedIndex;
+ }
+
+ @Override
Review Comment:
```suggestion
/** {@inheritDoc} */
@Override
```
##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PageMemoryMvPartitionStorage.java:
##########
@@ -123,6 +132,21 @@ private VersionChainTree createVersionChainTree(
);
}
+ @Override
+ public long appliedIndex() {
+ return appliedIndex;
+ }
+
+ @Override
+ public void appliedIndex(long appliedIndex) throws StorageException {
+ this.appliedIndex = appliedIndex;
+ }
+
+ @Override
Review Comment:
```suggestion
/** {@inheritDoc} */
@Override
```
##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PageMemoryMvPartitionStorage.java:
##########
@@ -123,6 +132,21 @@ private VersionChainTree createVersionChainTree(
);
}
+ @Override
Review Comment:
```suggestion
/** {@inheritDoc} */
@Override
```
##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PageMemoryMvPartitionStorage.java:
##########
@@ -467,6 +491,21 @@ private Cursor<BinaryRow>
internalScan(Predicate<BinaryRow> keyFilter, @Nullable
return new ScanCursor(treeCursor, keyFilter, transactionId, timestamp);
}
+ @Override
Review Comment:
```suggestion
/** {@inheritDoc} */
@Override
```
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -95,30 +98,73 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
/** Partitions column family. */
private final ColumnFamilyHandle cf;
+ /** Meta column family. */
+ private final ColumnFamilyHandle meta;
+
/** Write options. */
private final WriteOptions writeOpts = new WriteOptions();
/** Upper bound for scans and reads. */
private final Slice upperBound;
+ /** Key to store applied index value in meta. */
+ private byte[] appliedIndexKey;
+
/**
* Constructor.
*
* @param partitionId Partition id.
* @param db RocksDB instance.
* @param cf Column family handle to store partition data.
+ * @param meta Column family handle to store partition metadata.
*/
- public RocksDbMvPartitionStorage(int partitionId, RocksDB db,
ColumnFamilyHandle cf) {
+ public RocksDbMvPartitionStorage(int partitionId, RocksDB db,
ColumnFamilyHandle cf, ColumnFamilyHandle meta) {
this.partitionId = partitionId;
this.db = db;
this.cf = cf;
+ this.meta = meta;
heapKeyBuffer = withInitial(() ->
ByteBuffer.allocate(MAX_KEY_SIZE)
.order(BIG_ENDIAN)
);
upperBound = new Slice(partitionEndPrefix());
+
+ appliedIndexKey = ("index" +
partitionId).getBytes(StandardCharsets.UTF_8);
+ }
+
+ @Override
+ public long appliedIndex() {
+ byte[] appliedIndexBytes;
+
+ try {
+
+ appliedIndexBytes = db.get(meta, appliedIndexKey);
+
+ } catch (RocksDBException e) {
+ throw new StorageException(e);
+ }
+
+ if (appliedIndexBytes == null) {
+ return 0;
+ }
+
+ return ByteUtils.bytesToLong(appliedIndexBytes);
+ }
+
+ @Override
+ public void appliedIndex(long appliedIndex) throws StorageException {
+ try {
+ db.put(meta, appliedIndexKey, ByteUtils.longToBytes(appliedIndex));
+ } catch (RocksDBException e) {
+ throw new StorageException(e);
+ }
+ }
+
+ @Override
Review Comment:
```suggestion
/** {@inheritDoc} */
@Override
```
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -95,30 +98,73 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
/** Partitions column family. */
private final ColumnFamilyHandle cf;
+ /** Meta column family. */
+ private final ColumnFamilyHandle meta;
+
/** Write options. */
private final WriteOptions writeOpts = new WriteOptions();
/** Upper bound for scans and reads. */
private final Slice upperBound;
+ /** Key to store applied index value in meta. */
+ private byte[] appliedIndexKey;
+
/**
* Constructor.
*
* @param partitionId Partition id.
* @param db RocksDB instance.
* @param cf Column family handle to store partition data.
+ * @param meta Column family handle to store partition metadata.
*/
- public RocksDbMvPartitionStorage(int partitionId, RocksDB db,
ColumnFamilyHandle cf) {
+ public RocksDbMvPartitionStorage(int partitionId, RocksDB db,
ColumnFamilyHandle cf, ColumnFamilyHandle meta) {
this.partitionId = partitionId;
this.db = db;
this.cf = cf;
+ this.meta = meta;
heapKeyBuffer = withInitial(() ->
ByteBuffer.allocate(MAX_KEY_SIZE)
.order(BIG_ENDIAN)
);
upperBound = new Slice(partitionEndPrefix());
+
+ appliedIndexKey = ("index" +
partitionId).getBytes(StandardCharsets.UTF_8);
+ }
+
+ @Override
Review Comment:
```suggestion
/** {@inheritDoc} */
@Override
```
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -95,30 +98,73 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
/** Partitions column family. */
private final ColumnFamilyHandle cf;
+ /** Meta column family. */
+ private final ColumnFamilyHandle meta;
+
/** Write options. */
private final WriteOptions writeOpts = new WriteOptions();
/** Upper bound for scans and reads. */
private final Slice upperBound;
+ /** Key to store applied index value in meta. */
+ private byte[] appliedIndexKey;
+
/**
* Constructor.
*
* @param partitionId Partition id.
* @param db RocksDB instance.
* @param cf Column family handle to store partition data.
+ * @param meta Column family handle to store partition metadata.
*/
- public RocksDbMvPartitionStorage(int partitionId, RocksDB db,
ColumnFamilyHandle cf) {
+ public RocksDbMvPartitionStorage(int partitionId, RocksDB db,
ColumnFamilyHandle cf, ColumnFamilyHandle meta) {
this.partitionId = partitionId;
this.db = db;
this.cf = cf;
+ this.meta = meta;
heapKeyBuffer = withInitial(() ->
ByteBuffer.allocate(MAX_KEY_SIZE)
.order(BIG_ENDIAN)
);
upperBound = new Slice(partitionEndPrefix());
+
+ appliedIndexKey = ("index" +
partitionId).getBytes(StandardCharsets.UTF_8);
+ }
+
+ @Override
+ public long appliedIndex() {
+ byte[] appliedIndexBytes;
+
+ try {
+
+ appliedIndexBytes = db.get(meta, appliedIndexKey);
+
Review Comment:
```suggestion
appliedIndexBytes = db.get(meta, appliedIndexKey);
```
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -95,30 +98,73 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
/** Partitions column family. */
private final ColumnFamilyHandle cf;
+ /** Meta column family. */
+ private final ColumnFamilyHandle meta;
+
/** Write options. */
private final WriteOptions writeOpts = new WriteOptions();
/** Upper bound for scans and reads. */
private final Slice upperBound;
+ /** Key to store applied index value in meta. */
+ private byte[] appliedIndexKey;
+
/**
* Constructor.
*
* @param partitionId Partition id.
* @param db RocksDB instance.
* @param cf Column family handle to store partition data.
+ * @param meta Column family handle to store partition metadata.
*/
- public RocksDbMvPartitionStorage(int partitionId, RocksDB db,
ColumnFamilyHandle cf) {
+ public RocksDbMvPartitionStorage(int partitionId, RocksDB db,
ColumnFamilyHandle cf, ColumnFamilyHandle meta) {
this.partitionId = partitionId;
this.db = db;
this.cf = cf;
+ this.meta = meta;
heapKeyBuffer = withInitial(() ->
ByteBuffer.allocate(MAX_KEY_SIZE)
.order(BIG_ENDIAN)
);
upperBound = new Slice(partitionEndPrefix());
+
+ appliedIndexKey = ("index" +
partitionId).getBytes(StandardCharsets.UTF_8);
+ }
+
+ @Override
+ public long appliedIndex() {
+ byte[] appliedIndexBytes;
+
+ try {
+
+ appliedIndexBytes = db.get(meta, appliedIndexKey);
+
+ } catch (RocksDBException e) {
+ throw new StorageException(e);
+ }
+
+ if (appliedIndexBytes == null) {
+ return 0;
+ }
+
+ return ByteUtils.bytesToLong(appliedIndexBytes);
+ }
+
+ @Override
Review Comment:
```suggestion
/** {@inheritDoc} */
@Override
```
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -528,6 +574,67 @@ private void incrementRowId(ByteBuffer buf) {
};
}
+ // TODO IGNITE-16769 Implement correct PartitionStorage rows count
calculation.
+ @Override
+ public long rowsCount() {
+ try (
+ var upperBound = new Slice(partitionEndPrefix());
+ var options = new
ReadOptions().setIterateUpperBound(upperBound);
+ RocksIterator it = db.newIterator(cf, options)
+ ) {
+ it.seek(partitionStartPrefix());
+
+ long size = 0;
+
+ while (it.isValid()) {
+ ++size;
+ it.next();
+ }
+
+ return size;
+ }
+ }
+
+ @Override
Review Comment:
```suggestion
/** {@inheritDoc} */
@Override
```
##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PageMemoryMvPartitionStorage.java:
##########
@@ -71,6 +72,14 @@ public class PageMemoryMvPartitionStorage implements
MvPartitionStorage {
ScanVersionChainByTimestamp::new
);
+ /**
+ * Applied index value.
+ *
+ * @deprecated Not persistent, should be fixed later.
Review Comment:
Later is when and where? add a ticket
##########
modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/TestMvPartitionStorage.java:
##########
@@ -74,6 +78,23 @@ public boolean notContainsWriteIntent() {
}
}
+ @Override
+ public long appliedIndex() {
+ return appliedIndex;
+ }
+
+ @Override
+ public void appliedIndex(long appliedIndex) throws StorageException {
+ assert appliedIndex > this.appliedIndex;
Review Comment:
```suggestion
assert appliedIndex > this.appliedIndex : "current=" +
this.appliedIndex + ", new=" + appliedIndex;
```
##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java:
##########
@@ -118,4 +134,25 @@ public interface MvPartitionStorage extends AutoCloseable {
* @throws StorageException If failed to read data from the storage.
*/
Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, Timestamp
timestamp) throws StorageException;
+
+ /**
+ * Returns rows count belongs to current storage.
+ *
+ * @return Rows count.
+ * @throws StorageException If failed to obtain size.
+ * @deprecated It's not yet defined what a "count" is. This value is not
easily defined for multiversioned storages.
+ *
+ */
Review Comment:
```suggestion
/**
* Returns rows count belongs to current storage.
*
* @return Rows count.
* @throws StorageException If failed to obtain size.
* @deprecated It's not yet defined what a "count" is. This value is not
easily defined for multiversioned storages.
*/
```
##########
modules/table/src/test/java/org/apache/ignite/internal/table/MessagingServiceTestUtils.java:
##########
@@ -88,8 +93,15 @@ public static MessagingService mockMessagingService(
return messagingService;
}
- private static <T extends Command> Iterator<CommandClosure<T>> iterator(T
obj) {
+ private static <T extends Command> Iterator<CommandClosure<T>> iterator(T
obj, AtomicLong raftIndex) {
+ long index = raftIndex.incrementAndGet();
+
CommandClosure<T> closure = new CommandClosure<>() {
+ @Override
Review Comment:
```suggestion
CommandClosure<T> closure = new CommandClosure<>() {
/** {@inheritDoc} */
@Override
```
##########
modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java:
##########
@@ -109,6 +112,11 @@ public void result(@Nullable Serializable r) {
}
} else {
CommandClosure<WriteCommand> clo = new
CommandClosure<>() {
+ @Override
+ public long index() {
Review Comment:
```suggestion
/** {@inheritDoc} */
@Override
public long index() {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]