Phillippko commented on code in PR #3087:
URL: https://github.com/apache/ignite-3/pull/3087#discussion_r1467567036


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java:
##########
@@ -155,52 +190,82 @@ public void handleUpdateAll(
             @Nullable Runnable onApplication,
             @Nullable HybridTimestamp commitTs
     ) {
+        if (nullOrEmpty(rowsToUpdate)) {
+            return;
+        }
         indexUpdateHandler.waitIndexes();
+        int commitTblId = commitPartitionId.tableId();
+        int commitPartId = commitPartitionId.partitionId();
+        Iterator<Entry<UUID, TimedBinaryRow>> it = 
rowsToUpdate.entrySet().iterator();
+        Entry<UUID, TimedBinaryRow> lastUnprocessedEntry = it.next();
+        boolean useTryLock = false;
+
+        while (lastUnprocessedEntry != null) {
+            lastUnprocessedEntry = processEntriesUntilBatchLimit(
+                    lastUnprocessedEntry,
+                    txId,
+                    trackWriteIntent,
+                    commitTs,
+                    commitTblId,
+                    commitPartId,
+                    it,
+                    storageUpdateConfiguration.batchLength().value(),
+                    useTryLock
+            );
+            useTryLock = true;
+        }
 
-        storage.runConsistently(locker -> {
-            int commitTblId = commitPartitionId.tableId();
-            int commitPartId = commitPartitionId.partitionId();
-
-            if (!nullOrEmpty(rowsToUpdate)) {
-                List<RowId> rowIds = new ArrayList<>();
-
-                // Sort IDs to prevent deadlock. Natural UUID order matches 
RowId order within the same partition.
-                SortedMap<UUID, TimedBinaryRow> sortedRowsToUpdateMap = new 
TreeMap<>(rowsToUpdate);
-
-                for (Map.Entry<UUID, TimedBinaryRow> entry : 
sortedRowsToUpdateMap.entrySet()) {
-                    RowId rowId = new RowId(partitionId, entry.getKey());
-                    BinaryRow row = entry.getValue() == null ? null : 
entry.getValue().binaryRow();
-
-                    locker.lock(rowId);
-
-                    performStorageCleanupIfNeeded(txId, rowId, 
entry.getValue() == null ? null : entry.getValue().commitTimestamp());
-
-                    if (commitTs != null) {
-                        storage.addWriteCommitted(rowId, row, commitTs);
-                    } else {
-                        BinaryRow oldRow = storage.addWrite(rowId, row, txId, 
commitTblId, commitPartId);
-
-                        if (oldRow != null) {
-                            assert commitTs == null : String.format("Expecting 
explicit txn: [txId=%s]", txId);
-                            // Previous uncommitted row should be removed from 
indexes.
-                            tryRemovePreviousWritesIndex(rowId, oldRow);
-                        }
-                    }
+        if (onApplication != null) {
+            onApplication.run();
+        }
+    }
 
-                    rowIds.add(rowId);
-                    indexUpdateHandler.addToIndexes(row, rowId);
+    private Entry<UUID, TimedBinaryRow> processEntriesUntilBatchLimit(
+            Entry<UUID, TimedBinaryRow> lastUnprocessedEntry,
+            UUID txId,
+            boolean trackWriteIntent,
+            @Nullable HybridTimestamp commitTs,
+            int commitTblId,
+            int commitPartId,
+            Iterator<Entry<UUID, TimedBinaryRow>> it,
+            int maxBatchLength,
+            boolean useTryLock
+    ) {
+        return storage.runConsistently(locker -> {
+            List<RowId> processedRowIds = new ArrayList<>();
+            int batchLength = 0;
+            Entry<UUID, TimedBinaryRow> entryToProcess = lastUnprocessedEntry;
+            while (entryToProcess != null) {
+                RowId rowId = new RowId(partitionId, entryToProcess.getKey());
+                BinaryRow row = entryToProcess.getValue() == null ? null : 
entryToProcess.getValue().binaryRow();
+                if (row != null) {
+                    batchLength += row.tupleSliceLength();

Review Comment:
   What syze you would suggest?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java:
##########
@@ -155,52 +190,82 @@ public void handleUpdateAll(
             @Nullable Runnable onApplication,
             @Nullable HybridTimestamp commitTs
     ) {
+        if (nullOrEmpty(rowsToUpdate)) {
+            return;
+        }
         indexUpdateHandler.waitIndexes();
+        int commitTblId = commitPartitionId.tableId();
+        int commitPartId = commitPartitionId.partitionId();
+        Iterator<Entry<UUID, TimedBinaryRow>> it = 
rowsToUpdate.entrySet().iterator();
+        Entry<UUID, TimedBinaryRow> lastUnprocessedEntry = it.next();
+        boolean useTryLock = false;
+
+        while (lastUnprocessedEntry != null) {
+            lastUnprocessedEntry = processEntriesUntilBatchLimit(
+                    lastUnprocessedEntry,
+                    txId,
+                    trackWriteIntent,
+                    commitTs,
+                    commitTblId,
+                    commitPartId,
+                    it,
+                    storageUpdateConfiguration.batchLength().value(),
+                    useTryLock
+            );
+            useTryLock = true;
+        }
 
-        storage.runConsistently(locker -> {
-            int commitTblId = commitPartitionId.tableId();
-            int commitPartId = commitPartitionId.partitionId();
-
-            if (!nullOrEmpty(rowsToUpdate)) {
-                List<RowId> rowIds = new ArrayList<>();
-
-                // Sort IDs to prevent deadlock. Natural UUID order matches 
RowId order within the same partition.
-                SortedMap<UUID, TimedBinaryRow> sortedRowsToUpdateMap = new 
TreeMap<>(rowsToUpdate);
-
-                for (Map.Entry<UUID, TimedBinaryRow> entry : 
sortedRowsToUpdateMap.entrySet()) {
-                    RowId rowId = new RowId(partitionId, entry.getKey());
-                    BinaryRow row = entry.getValue() == null ? null : 
entry.getValue().binaryRow();
-
-                    locker.lock(rowId);
-
-                    performStorageCleanupIfNeeded(txId, rowId, 
entry.getValue() == null ? null : entry.getValue().commitTimestamp());
-
-                    if (commitTs != null) {
-                        storage.addWriteCommitted(rowId, row, commitTs);
-                    } else {
-                        BinaryRow oldRow = storage.addWrite(rowId, row, txId, 
commitTblId, commitPartId);
-
-                        if (oldRow != null) {
-                            assert commitTs == null : String.format("Expecting 
explicit txn: [txId=%s]", txId);
-                            // Previous uncommitted row should be removed from 
indexes.
-                            tryRemovePreviousWritesIndex(rowId, oldRow);
-                        }
-                    }
+        if (onApplication != null) {
+            onApplication.run();
+        }
+    }
 
-                    rowIds.add(rowId);
-                    indexUpdateHandler.addToIndexes(row, rowId);
+    private Entry<UUID, TimedBinaryRow> processEntriesUntilBatchLimit(
+            Entry<UUID, TimedBinaryRow> lastUnprocessedEntry,
+            UUID txId,
+            boolean trackWriteIntent,
+            @Nullable HybridTimestamp commitTs,
+            int commitTblId,
+            int commitPartId,
+            Iterator<Entry<UUID, TimedBinaryRow>> it,
+            int maxBatchLength,
+            boolean useTryLock
+    ) {
+        return storage.runConsistently(locker -> {
+            List<RowId> processedRowIds = new ArrayList<>();
+            int batchLength = 0;
+            Entry<UUID, TimedBinaryRow> entryToProcess = lastUnprocessedEntry;
+            while (entryToProcess != null) {
+                RowId rowId = new RowId(partitionId, entryToProcess.getKey());
+                BinaryRow row = entryToProcess.getValue() == null ? null : 
entryToProcess.getValue().binaryRow();
+                if (row != null) {
+                    batchLength += row.tupleSliceLength();

Review Comment:
   What size would you suggest?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to