ibessonov commented on code in PR #3087:
URL: https://github.com/apache/ignite-3/pull/3087#discussion_r1467487239


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java:
##########
@@ -103,27 +111,20 @@ public void handleUpdate(
         indexUpdateHandler.waitIndexes();
 
         storage.runConsistently(locker -> {
-            RowId rowId = new RowId(partitionId, rowUuid);
             int commitTblId = commitPartitionId.tableId();
             int commitPartId = commitPartitionId.partitionId();
-
-            locker.lock(rowId);
-
-            performStorageCleanupIfNeeded(txId, rowId, lastCommitTs);
-
-            if (commitTs != null) {
-                storage.addWriteCommitted(rowId, row, commitTs);
-            } else {
-                BinaryRow oldRow = storage.addWrite(rowId, row, txId, 
commitTblId, commitPartId);
-
-                if (oldRow != null) {
-                    assert commitTs == null : String.format("Expecting 
explicit txn: [txId=%s]", txId);
-                    // Previous uncommitted row should be removed from indexes.
-                    tryRemovePreviousWritesIndex(rowId, oldRow);
-                }
-            }
-
-            indexUpdateHandler.addToIndexes(row, rowId);
+            RowId rowId = new RowId(partitionId, rowUuid);
+            tryProcessRow(

Review Comment:
   ```suggestion
               RowId rowId = new RowId(partitionId, rowUuid);
   
               tryProcessRow(
   ```



##########
modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxStateLocalMapTest.java:
##########
@@ -63,10 +64,13 @@ public class ItTxStateLocalMapTest extends 
IgniteAbstractTest {
 
     //TODO fsync can be turned on again after 
https://issues.apache.org/jira/browse/IGNITE-20195
     @InjectConfiguration("mock: { fsync: false }")
-    private static RaftConfiguration raftConfig;
+    private RaftConfiguration raftConfig;

Review Comment:
   Any explanation of why it's not static anymore?



##########
modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/StorageUpdateConfigurationSchema.java:
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schema.configuration;
+
+import org.apache.ignite.configuration.annotation.ConfigurationRoot;
+import org.apache.ignite.configuration.annotation.ConfigurationType;
+import org.apache.ignite.configuration.annotation.Value;
+import org.apache.ignite.configuration.validation.Range;
+
+/**
+ * Configuration schema for StorageUpdateHandler.
+ */
+@ConfigurationRoot(rootName = "storageUpdate", type = 
ConfigurationType.DISTRIBUTED)
+public class StorageUpdateConfigurationSchema {
+
+    /**
+     * Maximum batch size of rows to update. Unit depends on filling predicate 
used.

Review Comment:
   > Unit depends on filling predicate used.
   
   How do I interpret this? Since configuration is a public thing, its unit 
should be clear.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java:
##########
@@ -64,11 +70,13 @@ public class StorageUpdateHandler {
     public StorageUpdateHandler(
             int partitionId,
             PartitionDataStorage storage,
-            IndexUpdateHandler indexUpdateHandler
+            IndexUpdateHandler indexUpdateHandler,
+            StorageUpdateConfiguration storageUpdateConfiguration

Review Comment:
   Parameters is not added to the javadoc



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java:
##########
@@ -42,6 +44,8 @@
  * Handler for storage updates that can be performed on processing of primary 
replica requests and partition replication requests.
  */
 public class StorageUpdateHandler {
+    /** Maximum bytes to write in one run. */
+    private static final int MAX_BATCH_LENGTH = 64;

Review Comment:
   Please don't skip empty lines and follow the code style.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java:
##########
@@ -53,6 +57,8 @@ public class StorageUpdateHandler {
 
     /** A container for rows that were inserted, updated or removed. */
     private final PendingRows pendingRows = new PendingRows();
+    // Discussing if batch limit should be configurable

Review Comment:
   You forgot to remove this comment. Please also add empty line



##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java:
##########
@@ -197,6 +198,11 @@ public class TableManagerTest extends IgniteAbstractTest {
     @InjectConfiguration
     private GcConfiguration gcConfig;
 
+    /** Storage update configuration. */
+    @InjectConfiguration
+    private StorageUpdateConfiguration storageUpdateConfiguration;
+
+

Review Comment:
   ```suggestion
   
   ```



##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java:
##########
@@ -210,7 +213,7 @@
 import org.mockito.quality.Strictness;
 
 /** Tests for partition replica listener. */
-@ExtendWith(MockitoExtension.class)
+@ExtendWith({MockitoExtension.class, ConfigurationExtension.class})

Review Comment:
   In this PR I see both repeated annotations and explicit arrays. It would be 
nice to only see one of two. Repeated annotations, for example.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java:
##########
@@ -155,52 +190,82 @@ public void handleUpdateAll(
             @Nullable Runnable onApplication,
             @Nullable HybridTimestamp commitTs
     ) {
+        if (nullOrEmpty(rowsToUpdate)) {
+            return;
+        }
         indexUpdateHandler.waitIndexes();
+        int commitTblId = commitPartitionId.tableId();
+        int commitPartId = commitPartitionId.partitionId();
+        Iterator<Entry<UUID, TimedBinaryRow>> it = 
rowsToUpdate.entrySet().iterator();
+        Entry<UUID, TimedBinaryRow> lastUnprocessedEntry = it.next();
+        boolean useTryLock = false;
+
+        while (lastUnprocessedEntry != null) {
+            lastUnprocessedEntry = processEntriesUntilBatchLimit(
+                    lastUnprocessedEntry,
+                    txId,
+                    trackWriteIntent,
+                    commitTs,
+                    commitTblId,
+                    commitPartId,
+                    it,
+                    storageUpdateConfiguration.batchLength().value(),
+                    useTryLock
+            );
+            useTryLock = true;
+        }
 
-        storage.runConsistently(locker -> {
-            int commitTblId = commitPartitionId.tableId();
-            int commitPartId = commitPartitionId.partitionId();
-
-            if (!nullOrEmpty(rowsToUpdate)) {
-                List<RowId> rowIds = new ArrayList<>();
-
-                // Sort IDs to prevent deadlock. Natural UUID order matches 
RowId order within the same partition.
-                SortedMap<UUID, TimedBinaryRow> sortedRowsToUpdateMap = new 
TreeMap<>(rowsToUpdate);
-
-                for (Map.Entry<UUID, TimedBinaryRow> entry : 
sortedRowsToUpdateMap.entrySet()) {
-                    RowId rowId = new RowId(partitionId, entry.getKey());
-                    BinaryRow row = entry.getValue() == null ? null : 
entry.getValue().binaryRow();
-
-                    locker.lock(rowId);
-
-                    performStorageCleanupIfNeeded(txId, rowId, 
entry.getValue() == null ? null : entry.getValue().commitTimestamp());
-
-                    if (commitTs != null) {
-                        storage.addWriteCommitted(rowId, row, commitTs);
-                    } else {
-                        BinaryRow oldRow = storage.addWrite(rowId, row, txId, 
commitTblId, commitPartId);
-
-                        if (oldRow != null) {
-                            assert commitTs == null : String.format("Expecting 
explicit txn: [txId=%s]", txId);
-                            // Previous uncommitted row should be removed from 
indexes.
-                            tryRemovePreviousWritesIndex(rowId, oldRow);
-                        }
-                    }
+        if (onApplication != null) {
+            onApplication.run();
+        }
+    }
 
-                    rowIds.add(rowId);
-                    indexUpdateHandler.addToIndexes(row, rowId);
+    private Entry<UUID, TimedBinaryRow> processEntriesUntilBatchLimit(
+            Entry<UUID, TimedBinaryRow> lastUnprocessedEntry,
+            UUID txId,
+            boolean trackWriteIntent,
+            @Nullable HybridTimestamp commitTs,
+            int commitTblId,
+            int commitPartId,
+            Iterator<Entry<UUID, TimedBinaryRow>> it,
+            int maxBatchLength,
+            boolean useTryLock
+    ) {
+        return storage.runConsistently(locker -> {
+            List<RowId> processedRowIds = new ArrayList<>();
+            int batchLength = 0;
+            Entry<UUID, TimedBinaryRow> entryToProcess = lastUnprocessedEntry;
+            while (entryToProcess != null) {
+                RowId rowId = new RowId(partitionId, entryToProcess.getKey());
+                BinaryRow row = entryToProcess.getValue() == null ? null : 
entryToProcess.getValue().binaryRow();
+                if (row != null) {
+                    batchLength += row.tupleSliceLength();

Review Comment:
   Now, if we measure batches in bytes, 64 is not enough



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java:
##########
@@ -137,6 +138,40 @@ public void handleUpdate(
         });
     }
 
+    private boolean tryProcessRow(
+            Locker locker,
+            int commitTblId,
+            int commitPartId,
+            RowId rowId,
+            UUID txId,
+            @Nullable BinaryRow row,
+            @Nullable HybridTimestamp lastCommitTs,
+            @Nullable HybridTimestamp commitTs,
+            boolean useTryLock
+    ) {
+        if (useTryLock) {
+            if (!locker.tryLock(rowId)) {
+                return false;
+            }
+        } else {
+            locker.lock(rowId);
+        }
+        performStorageCleanupIfNeeded(txId, rowId, lastCommitTs);

Review Comment:
   Ok, I see that you skip empty lines all the time. Please reformat your code



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java:
##########
@@ -284,8 +349,8 @@ public void handleWriteIntentRead(UUID txId, RowId rowId) {
     }
 
     /**
-     * Switches write intents created by the transaction to regular values if 
the transaction is committed
-     * or removes them if the transaction is aborted.
+     * Switches write intents created by the transaction to regular values if 
the transaction is committed or removes them if the
+     * transaction is aborted.

Review Comment:
   Artifacts of auto-formatting the file I guess?
   I don't like when people add them. You didn't change the content of the 
javadoc, why committing it then?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java:
##########
@@ -155,52 +190,82 @@ public void handleUpdateAll(
             @Nullable Runnable onApplication,
             @Nullable HybridTimestamp commitTs
     ) {
+        if (nullOrEmpty(rowsToUpdate)) {
+            return;
+        }
         indexUpdateHandler.waitIndexes();
+        int commitTblId = commitPartitionId.tableId();
+        int commitPartId = commitPartitionId.partitionId();
+        Iterator<Entry<UUID, TimedBinaryRow>> it = 
rowsToUpdate.entrySet().iterator();
+        Entry<UUID, TimedBinaryRow> lastUnprocessedEntry = it.next();
+        boolean useTryLock = false;
+
+        while (lastUnprocessedEntry != null) {
+            lastUnprocessedEntry = processEntriesUntilBatchLimit(
+                    lastUnprocessedEntry,
+                    txId,
+                    trackWriteIntent,
+                    commitTs,
+                    commitTblId,
+                    commitPartId,
+                    it,
+                    storageUpdateConfiguration.batchLength().value(),
+                    useTryLock
+            );
+            useTryLock = true;
+        }
 
-        storage.runConsistently(locker -> {
-            int commitTblId = commitPartitionId.tableId();
-            int commitPartId = commitPartitionId.partitionId();
-
-            if (!nullOrEmpty(rowsToUpdate)) {
-                List<RowId> rowIds = new ArrayList<>();
-
-                // Sort IDs to prevent deadlock. Natural UUID order matches 
RowId order within the same partition.
-                SortedMap<UUID, TimedBinaryRow> sortedRowsToUpdateMap = new 
TreeMap<>(rowsToUpdate);
-
-                for (Map.Entry<UUID, TimedBinaryRow> entry : 
sortedRowsToUpdateMap.entrySet()) {
-                    RowId rowId = new RowId(partitionId, entry.getKey());
-                    BinaryRow row = entry.getValue() == null ? null : 
entry.getValue().binaryRow();
-
-                    locker.lock(rowId);
-
-                    performStorageCleanupIfNeeded(txId, rowId, 
entry.getValue() == null ? null : entry.getValue().commitTimestamp());
-
-                    if (commitTs != null) {
-                        storage.addWriteCommitted(rowId, row, commitTs);
-                    } else {
-                        BinaryRow oldRow = storage.addWrite(rowId, row, txId, 
commitTblId, commitPartId);
-
-                        if (oldRow != null) {
-                            assert commitTs == null : String.format("Expecting 
explicit txn: [txId=%s]", txId);
-                            // Previous uncommitted row should be removed from 
indexes.
-                            tryRemovePreviousWritesIndex(rowId, oldRow);
-                        }
-                    }
+        if (onApplication != null) {
+            onApplication.run();
+        }
+    }
 
-                    rowIds.add(rowId);
-                    indexUpdateHandler.addToIndexes(row, rowId);
+    private Entry<UUID, TimedBinaryRow> processEntriesUntilBatchLimit(
+            Entry<UUID, TimedBinaryRow> lastUnprocessedEntry,
+            UUID txId,
+            boolean trackWriteIntent,
+            @Nullable HybridTimestamp commitTs,
+            int commitTblId,
+            int commitPartId,
+            Iterator<Entry<UUID, TimedBinaryRow>> it,
+            int maxBatchLength,
+            boolean useTryLock
+    ) {
+        return storage.runConsistently(locker -> {
+            List<RowId> processedRowIds = new ArrayList<>();
+            int batchLength = 0;
+            Entry<UUID, TimedBinaryRow> entryToProcess = lastUnprocessedEntry;
+            while (entryToProcess != null) {
+                RowId rowId = new RowId(partitionId, entryToProcess.getKey());
+                BinaryRow row = entryToProcess.getValue() == null ? null : 
entryToProcess.getValue().binaryRow();
+                if (row != null) {
+                    batchLength += row.tupleSliceLength();
                 }
-
-                if (trackWriteIntent) {
-                    pendingRows.addPendingRowIds(txId, rowIds);
+                if (!processedRowIds.isEmpty() && batchLength > 
maxBatchLength) {
+                    break;
                 }
-
-                if (onApplication != null) {
-                    onApplication.run();
+                boolean rowProcessed = tryProcessRow(
+                        locker,
+                        commitTblId,
+                        commitPartId,
+                        rowId,
+                        txId,
+                        row,
+                        entryToProcess.getValue() == null ? null : 
entryToProcess.getValue().commitTimestamp(),
+                        commitTs,
+                        useTryLock
+                );
+                if (rowProcessed) {

Review Comment:
   I don't see an `else` branch where you would return from this method. Are 
you sure that this code works if `tryLock` fails?



##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java:
##########
@@ -51,11 +54,13 @@
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 /**
  * Base test for indexes. Sets up a table with (int, string) key and (int, 
string) value and three indexes: primary key, hash index over
  * value columns and sorted index over value columns.
  */
+@ExtendWith(ConfigurationExtension.class)

Review Comment:
   Maybe we should add it to the `BaseMvStoragesTest` class, what do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to