Phillippko commented on code in PR #3087:
URL: https://github.com/apache/ignite-3/pull/3087#discussion_r1466109280
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java:
##########
@@ -157,51 +158,77 @@ public void handleUpdateAll(
) {
indexUpdateHandler.waitIndexes();
- storage.runConsistently(locker -> {
- int commitTblId = commitPartitionId.tableId();
- int commitPartId = commitPartitionId.partitionId();
-
- if (!nullOrEmpty(rowsToUpdate)) {
- List<RowId> rowIds = new ArrayList<>();
-
- // Sort IDs to prevent deadlock. Natural UUID order matches
RowId order within the same partition.
- SortedMap<UUID, TimedBinaryRow> sortedRowsToUpdateMap = new
TreeMap<>(rowsToUpdate);
-
- for (Map.Entry<UUID, TimedBinaryRow> entry :
sortedRowsToUpdateMap.entrySet()) {
- RowId rowId = new RowId(partitionId, entry.getKey());
- BinaryRow row = entry.getValue() == null ? null :
entry.getValue().binaryRow();
-
- locker.lock(rowId);
+ if (nullOrEmpty(rowsToUpdate)) {
+ return;
+ }
+ long batchSize = getBatchSize();
+ BiPredicate<List<Entry<UUID, TimedBinaryRow>>, Entry<UUID,
TimedBinaryRow>> isFullByRowCount = (batch, row) ->
+ batch.size() + 1 > batchSize;
+ List<List<Map.Entry<UUID, TimedBinaryRow>>> batches =
getTimedBinaryRowBatches(rowsToUpdate, isFullByRowCount);
+ batches.forEach((batch) ->
+ storage.runConsistently(locker -> {
+ int commitTblId = commitPartitionId.tableId();
+ int commitPartId = commitPartitionId.partitionId();
+
+ List<RowId> rowIds = new ArrayList<>();
+ for (Map.Entry<UUID, TimedBinaryRow> entry : batch) {
+ RowId rowId = new RowId(partitionId, entry.getKey());
+ BinaryRow row = entry.getValue() == null ? null :
entry.getValue().binaryRow();
+
+ locker.lock(rowId);
+
+ performStorageCleanupIfNeeded(txId, rowId,
entry.getValue() == null ? null : entry.getValue().commitTimestamp());
+
+ if (commitTs != null) {
+ storage.addWriteCommitted(rowId, row, commitTs);
+ } else {
+ BinaryRow oldRow = storage.addWrite(rowId, row,
txId, commitTblId, commitPartId);
+
+ if (oldRow != null) {
+ assert commitTs == null :
String.format("Expecting explicit txn: [txId=%s]", txId);
+ // Previous uncommitted row should be removed
from indexes.
+ tryRemovePreviousWritesIndex(rowId, oldRow);
+ }
+ }
- performStorageCleanupIfNeeded(txId, rowId,
entry.getValue() == null ? null : entry.getValue().commitTimestamp());
+ rowIds.add(rowId);
+ indexUpdateHandler.addToIndexes(row, rowId);
+ }
- if (commitTs != null) {
- storage.addWriteCommitted(rowId, row, commitTs);
- } else {
- BinaryRow oldRow = storage.addWrite(rowId, row, txId,
commitTblId, commitPartId);
+ if (trackWriteIntent) {
+ pendingRows.addPendingRowIds(txId, rowIds);
+ }
- if (oldRow != null) {
- assert commitTs == null : String.format("Expecting
explicit txn: [txId=%s]", txId);
- // Previous uncommitted row should be removed from
indexes.
- tryRemovePreviousWritesIndex(rowId, oldRow);
- }
+ if (onApplication != null) {
+ onApplication.run();
}
- rowIds.add(rowId);
- indexUpdateHandler.addToIndexes(row, rowId);
- }
+ return null;
+ }));
+ }
- if (trackWriteIntent) {
- pendingRows.addPendingRowIds(txId, rowIds);
- }
+ //draft, TODO get from config
+ private long getBatchSize() {
+ return 10;
+ }
- if (onApplication != null) {
- onApplication.run();
- }
+ private List<List<Map.Entry<UUID, TimedBinaryRow>>>
getTimedBinaryRowBatches(
+ Map<UUID, TimedBinaryRow> rows,
+ BiPredicate<List<Entry<UUID, TimedBinaryRow>>, Map.Entry<UUID,
TimedBinaryRow>> isFull
+ ) {
+ // Sort IDs to prevent deadlock. Natural UUID order matches RowId
order within the same partition.
+ List<Entry<UUID, TimedBinaryRow>> sortedRows =
rows.entrySet().stream().sorted().collect(Collectors.toList());
+ List<List<Entry<UUID, TimedBinaryRow>>> batches = new ArrayList<>();
+ List<Entry<UUID, TimedBinaryRow>> lastBatch = new ArrayList<>();
+ batches.add(lastBatch);
+ for (Entry<UUID, TimedBinaryRow> row : sortedRows) {
+ if (isFull.test(lastBatch, row)) {
+ lastBatch = new ArrayList<>();
+ batches.add(lastBatch);
}
-
- return null;
- });
+ lastBatch.add(row);
+ }
+ return batches;
Review Comment:
new batch is added only if the last will be overfilled by adding next row.
Changed naming to increase readabilitiy
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]