tkalkirill commented on code in PR #5654:
URL: https://github.com/apache/ignite-3/pull/5654#discussion_r2048433796
##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java:
##########
@@ -308,12 +302,16 @@ private void invokeOnRevisionCallback(long revision,
HybridTimestamp time) {
*
* <p>This method is not thread-safe and must be performed under an
exclusive lock in concurrent scenarios.
*/
- public void advanceSafeTime(HybridTimestamp time) {
+ public void advanceSafeTime(Runnable callback, HybridTimestamp time) {
Review Comment:
I don't think the variable name is appropriate, it's more of a function
before the safe time update. You should describe the arguments now, since the
first one is not at all obvious.
I would suggest creating another method without a function, so as not to
call a method with an empty lambda later, which can be confusing.
##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java:
##########
@@ -1001,6 +1001,8 @@ public void compact(long revision) {
compactKeys(revision);
compactAuxiliaryMappings(revision);
+
+ db.compactRange();
Review Comment:
Write a few words about why this is here, I didn’t notice anything like this
in the project, but now they’ve added it here.
##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java:
##########
@@ -170,18 +166,16 @@ public void
setWatchEventHandlingCallback(WatchEventHandlingCallback callback) {
*
* <p>This method is not thread-safe and must be performed under an
exclusive lock in concurrent scenarios.
*
+ * @param newRevision Revision associated with an update.
* @param updatedEntries Entries that were changed during a Meta Storage
update.
Review Comment:
```suggestion
* @param updatedEntries Entries that were changed during a Meta Storage
update, empty if only need to update the revision.
```
##########
modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java:
##########
@@ -1086,6 +1093,154 @@ void testUpdateCompactionRevision() {
assertEquals(1, storage.getCompactionRevision());
}
+ @Test
+ void testConcurrentReadAndCompaction() {
+ KeyValueUpdateContext context = kvContext(hybridTimestamp(10L));
+
+ for (int i = 0; i < 1000; i++) {
+ byte[] key = key(i);
+ byte[] value = keyValue(i, i);
+
+ storage.put(key, value, context);
+ long revision = storage.revision();
+ storage.remove(key, context);
+
+ runRace(
+ () -> {
+ storage.setCompactionRevision(revision);
+ storage.compact(revision);
+ },
+ () -> {
+ for (int j = 0; j < 1000; j++) {
+ try {
+ Entry entry = storage.get(key, revision);
+
+ assertEquals(revision, entry.revision());
+ assertArrayEquals(value, entry.value());
+ } catch (CompactedException ignore) {
+ // Expected outcome. Loop can be stopped.
+ return;
+ }
+ }
+ }
+ );
+ }
+ }
+
+ @Test
+ void testConcurrentReadAllAndCompaction() {
+ KeyValueUpdateContext context = kvContext(hybridTimestamp(10L));
+
+ int numberOfKeys = 100;
+ for (int i = 0; i < 100; i++) {
+ List<byte[]> keys = new ArrayList<>();
+ List<byte[]> values = new ArrayList<>();
+ for (int j = 0; j < numberOfKeys; j++) {
+ int k = i * numberOfKeys + j;
+
+ keys.add(key(k));
+ values.add(keyValue(k, k));
+ }
+
+ storage.putAll(keys, values, context);
+ long revision = storage.revision();
+ storage.removeAll(keys, context);
+
+ runRace(
+ () -> {
+ storage.setCompactionRevision(revision);
+ storage.compact(revision);
+ },
+ () -> {
+ for (int j = 0; j < 1000; j++) {
+ try {
+ List<Entry> entries = storage.getAll(keys,
revision);
+
+ assertEquals(numberOfKeys, entries.size());
+ for (int k = 0; k < numberOfKeys; k++) {
+ Entry entry = entries.get(k);
+
+ assertEquals(revision, entry.revision());
+ assertArrayEquals(values.get(k),
entry.value());
+ }
+ } catch (CompactedException ignore) {
+ // Expected outcome. Loop can be stopped.
+ return;
+ }
+ }
+ }
+ );
+ }
+ }
+
+ @Test
+ void testConcurrentRangeAndCompaction() {
+ KeyValueUpdateContext context = kvContext(hybridTimestamp(10L));
+
+ int numberOfKeys = 100;
+ for (int i = 0; i < 100; i++) {
+ List<byte[]> keys = new ArrayList<>();
Review Comment:
U can use `var`
##########
modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java:
##########
@@ -1086,6 +1093,154 @@ void testUpdateCompactionRevision() {
assertEquals(1, storage.getCompactionRevision());
}
+ @Test
+ void testConcurrentReadAndCompaction() {
+ KeyValueUpdateContext context = kvContext(hybridTimestamp(10L));
+
+ for (int i = 0; i < 1000; i++) {
+ byte[] key = key(i);
+ byte[] value = keyValue(i, i);
+
+ storage.put(key, value, context);
+ long revision = storage.revision();
+ storage.remove(key, context);
+
+ runRace(
+ () -> {
+ storage.setCompactionRevision(revision);
+ storage.compact(revision);
+ },
+ () -> {
+ for (int j = 0; j < 1000; j++) {
+ try {
+ Entry entry = storage.get(key, revision);
+
+ assertEquals(revision, entry.revision());
+ assertArrayEquals(value, entry.value());
+ } catch (CompactedException ignore) {
+ // Expected outcome. Loop can be stopped.
+ return;
+ }
+ }
+ }
+ );
+ }
+ }
+
+ @Test
+ void testConcurrentReadAllAndCompaction() {
+ KeyValueUpdateContext context = kvContext(hybridTimestamp(10L));
+
+ int numberOfKeys = 100;
+ for (int i = 0; i < 100; i++) {
+ List<byte[]> keys = new ArrayList<>();
Review Comment:
U can use `var`
##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java:
##########
@@ -170,18 +166,16 @@ public void
setWatchEventHandlingCallback(WatchEventHandlingCallback callback) {
*
* <p>This method is not thread-safe and must be performed under an
exclusive lock in concurrent scenarios.
*
+ * @param newRevision Revision associated with an update.
* @param updatedEntries Entries that were changed during a Meta Storage
update.
* @param time Timestamp of the Meta Storage update.
* @return Future that gets completed when all registered watches have
been notified of the given event.
*/
- public CompletableFuture<Void> notifyWatches(List<Entry> updatedEntries,
HybridTimestamp time) {
+ public CompletableFuture<Void> notifyWatches(long newRevision, List<Entry>
updatedEntries, HybridTimestamp time) {
assert time != null;
CompletableFuture<Void> newFuture = notificationFuture
.thenComposeAsync(v -> {
- // Revision must be the same for all entries.
- long newRevision = updatedEntries.get(0).revision();
-
List<Entry> filteredUpdatedEntries =
updatedEntries.stream()
Review Comment:
At your discretion, maybe here, as in the original, you can do a check for
an empty collection and if it is, then do not use the stream.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]