rpuch commented on code in PR #3087:
URL: https://github.com/apache/ignite-3/pull/3087#discussion_r1464805612
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java:
##########
@@ -157,51 +158,77 @@ public void handleUpdateAll(
) {
indexUpdateHandler.waitIndexes();
- storage.runConsistently(locker -> {
- int commitTblId = commitPartitionId.tableId();
- int commitPartId = commitPartitionId.partitionId();
-
- if (!nullOrEmpty(rowsToUpdate)) {
- List<RowId> rowIds = new ArrayList<>();
-
- // Sort IDs to prevent deadlock. Natural UUID order matches
RowId order within the same partition.
- SortedMap<UUID, TimedBinaryRow> sortedRowsToUpdateMap = new
TreeMap<>(rowsToUpdate);
-
- for (Map.Entry<UUID, TimedBinaryRow> entry :
sortedRowsToUpdateMap.entrySet()) {
- RowId rowId = new RowId(partitionId, entry.getKey());
- BinaryRow row = entry.getValue() == null ? null :
entry.getValue().binaryRow();
-
- locker.lock(rowId);
+ if (nullOrEmpty(rowsToUpdate)) {
+ return;
+ }
+ long batchSize = getBatchSize();
+ BiPredicate<List<Entry<UUID, TimedBinaryRow>>, Entry<UUID,
TimedBinaryRow>> isFullByRowCount = (batch, row) ->
+ batch.size() + 1 > batchSize;
+ List<List<Map.Entry<UUID, TimedBinaryRow>>> batches =
getTimedBinaryRowBatches(rowsToUpdate, isFullByRowCount);
+ batches.forEach((batch) ->
+ storage.runConsistently(locker -> {
+ int commitTblId = commitPartitionId.tableId();
+ int commitPartId = commitPartitionId.partitionId();
+
+ List<RowId> rowIds = new ArrayList<>();
+ for (Map.Entry<UUID, TimedBinaryRow> entry : batch) {
+ RowId rowId = new RowId(partitionId, entry.getKey());
+ BinaryRow row = entry.getValue() == null ? null :
entry.getValue().binaryRow();
+
+ locker.lock(rowId);
+
+ performStorageCleanupIfNeeded(txId, rowId,
entry.getValue() == null ? null : entry.getValue().commitTimestamp());
+
+ if (commitTs != null) {
+ storage.addWriteCommitted(rowId, row, commitTs);
+ } else {
+ BinaryRow oldRow = storage.addWrite(rowId, row,
txId, commitTblId, commitPartId);
+
+ if (oldRow != null) {
+ assert commitTs == null :
String.format("Expecting explicit txn: [txId=%s]", txId);
+ // Previous uncommitted row should be removed
from indexes.
+ tryRemovePreviousWritesIndex(rowId, oldRow);
+ }
+ }
- performStorageCleanupIfNeeded(txId, rowId,
entry.getValue() == null ? null : entry.getValue().commitTimestamp());
+ rowIds.add(rowId);
+ indexUpdateHandler.addToIndexes(row, rowId);
+ }
- if (commitTs != null) {
- storage.addWriteCommitted(rowId, row, commitTs);
- } else {
- BinaryRow oldRow = storage.addWrite(rowId, row, txId,
commitTblId, commitPartId);
+ if (trackWriteIntent) {
+ pendingRows.addPendingRowIds(txId, rowIds);
+ }
- if (oldRow != null) {
- assert commitTs == null : String.format("Expecting
explicit txn: [txId=%s]", txId);
- // Previous uncommitted row should be removed from
indexes.
- tryRemovePreviousWritesIndex(rowId, oldRow);
- }
+ if (onApplication != null) {
+ onApplication.run();
}
- rowIds.add(rowId);
- indexUpdateHandler.addToIndexes(row, rowId);
- }
+ return null;
+ }));
+ }
- if (trackWriteIntent) {
- pendingRows.addPendingRowIds(txId, rowIds);
- }
+ //draft, TODO get from config
+ private long getBatchSize() {
+ return 10;
+ }
- if (onApplication != null) {
- onApplication.run();
- }
+ private List<List<Map.Entry<UUID, TimedBinaryRow>>>
getTimedBinaryRowBatches(
+ Map<UUID, TimedBinaryRow> rows,
+ BiPredicate<List<Entry<UUID, TimedBinaryRow>>, Map.Entry<UUID,
TimedBinaryRow>> isFull
+ ) {
+ // Sort IDs to prevent deadlock. Natural UUID order matches RowId
order within the same partition.
+ List<Entry<UUID, TimedBinaryRow>> sortedRows =
rows.entrySet().stream().sorted().collect(Collectors.toList());
+ List<List<Entry<UUID, TimedBinaryRow>>> batches = new ArrayList<>();
+ List<Entry<UUID, TimedBinaryRow>> lastBatch = new ArrayList<>();
+ batches.add(lastBatch);
+ for (Entry<UUID, TimedBinaryRow> row : sortedRows) {
+ if (isFull.test(lastBatch, row)) {
+ lastBatch = new ArrayList<>();
+ batches.add(lastBatch);
}
-
- return null;
- });
+ lastBatch.add(row);
+ }
+ return batches;
Review Comment:
It seems that the last batch might remain empty. This looks strange.
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java:
##########
@@ -157,51 +158,77 @@ public void handleUpdateAll(
) {
indexUpdateHandler.waitIndexes();
- storage.runConsistently(locker -> {
- int commitTblId = commitPartitionId.tableId();
- int commitPartId = commitPartitionId.partitionId();
-
- if (!nullOrEmpty(rowsToUpdate)) {
- List<RowId> rowIds = new ArrayList<>();
-
- // Sort IDs to prevent deadlock. Natural UUID order matches
RowId order within the same partition.
- SortedMap<UUID, TimedBinaryRow> sortedRowsToUpdateMap = new
TreeMap<>(rowsToUpdate);
-
- for (Map.Entry<UUID, TimedBinaryRow> entry :
sortedRowsToUpdateMap.entrySet()) {
- RowId rowId = new RowId(partitionId, entry.getKey());
- BinaryRow row = entry.getValue() == null ? null :
entry.getValue().binaryRow();
-
- locker.lock(rowId);
+ if (nullOrEmpty(rowsToUpdate)) {
+ return;
+ }
+ long batchSize = getBatchSize();
+ BiPredicate<List<Entry<UUID, TimedBinaryRow>>, Entry<UUID,
TimedBinaryRow>> isFullByRowCount = (batch, row) ->
+ batch.size() + 1 > batchSize;
+ List<List<Map.Entry<UUID, TimedBinaryRow>>> batches =
getTimedBinaryRowBatches(rowsToUpdate, isFullByRowCount);
+ batches.forEach((batch) ->
+ storage.runConsistently(locker -> {
+ int commitTblId = commitPartitionId.tableId();
+ int commitPartId = commitPartitionId.partitionId();
+
+ List<RowId> rowIds = new ArrayList<>();
+ for (Map.Entry<UUID, TimedBinaryRow> entry : batch) {
+ RowId rowId = new RowId(partitionId, entry.getKey());
+ BinaryRow row = entry.getValue() == null ? null :
entry.getValue().binaryRow();
+
+ locker.lock(rowId);
+
+ performStorageCleanupIfNeeded(txId, rowId,
entry.getValue() == null ? null : entry.getValue().commitTimestamp());
+
+ if (commitTs != null) {
+ storage.addWriteCommitted(rowId, row, commitTs);
+ } else {
+ BinaryRow oldRow = storage.addWrite(rowId, row,
txId, commitTblId, commitPartId);
+
+ if (oldRow != null) {
+ assert commitTs == null :
String.format("Expecting explicit txn: [txId=%s]", txId);
+ // Previous uncommitted row should be removed
from indexes.
+ tryRemovePreviousWritesIndex(rowId, oldRow);
+ }
+ }
- performStorageCleanupIfNeeded(txId, rowId,
entry.getValue() == null ? null : entry.getValue().commitTimestamp());
+ rowIds.add(rowId);
+ indexUpdateHandler.addToIndexes(row, rowId);
+ }
- if (commitTs != null) {
- storage.addWriteCommitted(rowId, row, commitTs);
- } else {
- BinaryRow oldRow = storage.addWrite(rowId, row, txId,
commitTblId, commitPartId);
+ if (trackWriteIntent) {
+ pendingRows.addPendingRowIds(txId, rowIds);
+ }
- if (oldRow != null) {
- assert commitTs == null : String.format("Expecting
explicit txn: [txId=%s]", txId);
- // Previous uncommitted row should be removed from
indexes.
- tryRemovePreviousWritesIndex(rowId, oldRow);
- }
+ if (onApplication != null) {
+ onApplication.run();
}
- rowIds.add(rowId);
- indexUpdateHandler.addToIndexes(row, rowId);
- }
+ return null;
+ }));
+ }
- if (trackWriteIntent) {
- pendingRows.addPendingRowIds(txId, rowIds);
- }
+ //draft, TODO get from config
+ private long getBatchSize() {
+ return 10;
+ }
- if (onApplication != null) {
- onApplication.run();
- }
+ private List<List<Map.Entry<UUID, TimedBinaryRow>>>
getTimedBinaryRowBatches(
Review Comment:
I would suggest to call it differently (as `get something` is not too
informative about the properties of what we get). Maybe `splitToBatches`?
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java:
##########
@@ -157,51 +158,77 @@ public void handleUpdateAll(
) {
indexUpdateHandler.waitIndexes();
- storage.runConsistently(locker -> {
- int commitTblId = commitPartitionId.tableId();
- int commitPartId = commitPartitionId.partitionId();
-
- if (!nullOrEmpty(rowsToUpdate)) {
- List<RowId> rowIds = new ArrayList<>();
-
- // Sort IDs to prevent deadlock. Natural UUID order matches
RowId order within the same partition.
- SortedMap<UUID, TimedBinaryRow> sortedRowsToUpdateMap = new
TreeMap<>(rowsToUpdate);
-
- for (Map.Entry<UUID, TimedBinaryRow> entry :
sortedRowsToUpdateMap.entrySet()) {
- RowId rowId = new RowId(partitionId, entry.getKey());
- BinaryRow row = entry.getValue() == null ? null :
entry.getValue().binaryRow();
-
- locker.lock(rowId);
+ if (nullOrEmpty(rowsToUpdate)) {
+ return;
+ }
+ long batchSize = getBatchSize();
+ BiPredicate<List<Entry<UUID, TimedBinaryRow>>, Entry<UUID,
TimedBinaryRow>> isFullByRowCount = (batch, row) ->
+ batch.size() + 1 > batchSize;
+ List<List<Map.Entry<UUID, TimedBinaryRow>>> batches =
getTimedBinaryRowBatches(rowsToUpdate, isFullByRowCount);
+ batches.forEach((batch) ->
+ storage.runConsistently(locker -> {
+ int commitTblId = commitPartitionId.tableId();
+ int commitPartId = commitPartitionId.partitionId();
Review Comment:
This can be either done once (before splitting to batches), or not done at
all (but our colleagues like to spare method accesses).
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java:
##########
@@ -157,51 +158,77 @@ public void handleUpdateAll(
) {
indexUpdateHandler.waitIndexes();
- storage.runConsistently(locker -> {
- int commitTblId = commitPartitionId.tableId();
- int commitPartId = commitPartitionId.partitionId();
-
- if (!nullOrEmpty(rowsToUpdate)) {
- List<RowId> rowIds = new ArrayList<>();
-
- // Sort IDs to prevent deadlock. Natural UUID order matches
RowId order within the same partition.
- SortedMap<UUID, TimedBinaryRow> sortedRowsToUpdateMap = new
TreeMap<>(rowsToUpdate);
-
- for (Map.Entry<UUID, TimedBinaryRow> entry :
sortedRowsToUpdateMap.entrySet()) {
- RowId rowId = new RowId(partitionId, entry.getKey());
- BinaryRow row = entry.getValue() == null ? null :
entry.getValue().binaryRow();
-
- locker.lock(rowId);
+ if (nullOrEmpty(rowsToUpdate)) {
+ return;
+ }
+ long batchSize = getBatchSize();
+ BiPredicate<List<Entry<UUID, TimedBinaryRow>>, Entry<UUID,
TimedBinaryRow>> isFullByRowCount = (batch, row) ->
+ batch.size() + 1 > batchSize;
+ List<List<Map.Entry<UUID, TimedBinaryRow>>> batches =
getTimedBinaryRowBatches(rowsToUpdate, isFullByRowCount);
+ batches.forEach((batch) ->
Review Comment:
I would replace this with a foreach loop: the readability is the same, but
we spare a capturing lambda.
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java:
##########
@@ -157,51 +158,77 @@ public void handleUpdateAll(
) {
indexUpdateHandler.waitIndexes();
- storage.runConsistently(locker -> {
- int commitTblId = commitPartitionId.tableId();
- int commitPartId = commitPartitionId.partitionId();
-
- if (!nullOrEmpty(rowsToUpdate)) {
- List<RowId> rowIds = new ArrayList<>();
-
- // Sort IDs to prevent deadlock. Natural UUID order matches
RowId order within the same partition.
- SortedMap<UUID, TimedBinaryRow> sortedRowsToUpdateMap = new
TreeMap<>(rowsToUpdate);
-
- for (Map.Entry<UUID, TimedBinaryRow> entry :
sortedRowsToUpdateMap.entrySet()) {
- RowId rowId = new RowId(partitionId, entry.getKey());
- BinaryRow row = entry.getValue() == null ? null :
entry.getValue().binaryRow();
-
- locker.lock(rowId);
+ if (nullOrEmpty(rowsToUpdate)) {
+ return;
+ }
+ long batchSize = getBatchSize();
+ BiPredicate<List<Entry<UUID, TimedBinaryRow>>, Entry<UUID,
TimedBinaryRow>> isFullByRowCount = (batch, row) ->
+ batch.size() + 1 > batchSize;
+ List<List<Map.Entry<UUID, TimedBinaryRow>>> batches =
getTimedBinaryRowBatches(rowsToUpdate, isFullByRowCount);
+ batches.forEach((batch) ->
+ storage.runConsistently(locker -> {
+ int commitTblId = commitPartitionId.tableId();
+ int commitPartId = commitPartitionId.partitionId();
+
+ List<RowId> rowIds = new ArrayList<>();
+ for (Map.Entry<UUID, TimedBinaryRow> entry : batch) {
+ RowId rowId = new RowId(partitionId, entry.getKey());
+ BinaryRow row = entry.getValue() == null ? null :
entry.getValue().binaryRow();
+
+ locker.lock(rowId);
+
+ performStorageCleanupIfNeeded(txId, rowId,
entry.getValue() == null ? null : entry.getValue().commitTimestamp());
+
+ if (commitTs != null) {
+ storage.addWriteCommitted(rowId, row, commitTs);
+ } else {
+ BinaryRow oldRow = storage.addWrite(rowId, row,
txId, commitTblId, commitPartId);
+
+ if (oldRow != null) {
+ assert commitTs == null :
String.format("Expecting explicit txn: [txId=%s]", txId);
+ // Previous uncommitted row should be removed
from indexes.
+ tryRemovePreviousWritesIndex(rowId, oldRow);
+ }
+ }
- performStorageCleanupIfNeeded(txId, rowId,
entry.getValue() == null ? null : entry.getValue().commitTimestamp());
+ rowIds.add(rowId);
+ indexUpdateHandler.addToIndexes(row, rowId);
+ }
- if (commitTs != null) {
- storage.addWriteCommitted(rowId, row, commitTs);
- } else {
- BinaryRow oldRow = storage.addWrite(rowId, row, txId,
commitTblId, commitPartId);
+ if (trackWriteIntent) {
+ pendingRows.addPendingRowIds(txId, rowIds);
+ }
- if (oldRow != null) {
- assert commitTs == null : String.format("Expecting
explicit txn: [txId=%s]", txId);
- // Previous uncommitted row should be removed from
indexes.
- tryRemovePreviousWritesIndex(rowId, oldRow);
- }
+ if (onApplication != null) {
+ onApplication.run();
}
- rowIds.add(rowId);
- indexUpdateHandler.addToIndexes(row, rowId);
- }
+ return null;
+ }));
+ }
- if (trackWriteIntent) {
- pendingRows.addPendingRowIds(txId, rowIds);
- }
+ //draft, TODO get from config
+ private long getBatchSize() {
+ return 10;
+ }
- if (onApplication != null) {
- onApplication.run();
- }
+ private List<List<Map.Entry<UUID, TimedBinaryRow>>>
getTimedBinaryRowBatches(
+ Map<UUID, TimedBinaryRow> rows,
+ BiPredicate<List<Entry<UUID, TimedBinaryRow>>, Map.Entry<UUID,
TimedBinaryRow>> isFull
+ ) {
+ // Sort IDs to prevent deadlock. Natural UUID order matches RowId
order within the same partition.
+ List<Entry<UUID, TimedBinaryRow>> sortedRows =
rows.entrySet().stream().sorted().collect(Collectors.toList());
Review Comment:
`Stream#sorted()` sorts by a natural order, but `Map.Entry` is not
`Comparable`, so this will cause a `ClassCastException` (this is probably why
the tests failed).
You could supply a `Comparator` (to compare by keys), or switch back to the
`TreeMap`-based approach.
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java:
##########
@@ -157,51 +158,77 @@ public void handleUpdateAll(
) {
indexUpdateHandler.waitIndexes();
- storage.runConsistently(locker -> {
- int commitTblId = commitPartitionId.tableId();
- int commitPartId = commitPartitionId.partitionId();
-
- if (!nullOrEmpty(rowsToUpdate)) {
- List<RowId> rowIds = new ArrayList<>();
-
- // Sort IDs to prevent deadlock. Natural UUID order matches
RowId order within the same partition.
- SortedMap<UUID, TimedBinaryRow> sortedRowsToUpdateMap = new
TreeMap<>(rowsToUpdate);
-
- for (Map.Entry<UUID, TimedBinaryRow> entry :
sortedRowsToUpdateMap.entrySet()) {
- RowId rowId = new RowId(partitionId, entry.getKey());
- BinaryRow row = entry.getValue() == null ? null :
entry.getValue().binaryRow();
-
- locker.lock(rowId);
+ if (nullOrEmpty(rowsToUpdate)) {
+ return;
+ }
+ long batchSize = getBatchSize();
+ BiPredicate<List<Entry<UUID, TimedBinaryRow>>, Entry<UUID,
TimedBinaryRow>> isFullByRowCount = (batch, row) ->
Review Comment:
I would create an inferface to represent this predicate. Its job is not too
trivial (for example, it should probably produce only non-empty batches), it
needs to be defined somewhere. Also, having it as a `BiPredicate<...>` is a
mouthful, it's difficult to parse when reading.
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java:
##########
@@ -157,51 +158,77 @@ public void handleUpdateAll(
) {
indexUpdateHandler.waitIndexes();
- storage.runConsistently(locker -> {
- int commitTblId = commitPartitionId.tableId();
- int commitPartId = commitPartitionId.partitionId();
-
- if (!nullOrEmpty(rowsToUpdate)) {
- List<RowId> rowIds = new ArrayList<>();
-
- // Sort IDs to prevent deadlock. Natural UUID order matches
RowId order within the same partition.
- SortedMap<UUID, TimedBinaryRow> sortedRowsToUpdateMap = new
TreeMap<>(rowsToUpdate);
-
- for (Map.Entry<UUID, TimedBinaryRow> entry :
sortedRowsToUpdateMap.entrySet()) {
- RowId rowId = new RowId(partitionId, entry.getKey());
- BinaryRow row = entry.getValue() == null ? null :
entry.getValue().binaryRow();
-
- locker.lock(rowId);
+ if (nullOrEmpty(rowsToUpdate)) {
+ return;
+ }
+ long batchSize = getBatchSize();
+ BiPredicate<List<Entry<UUID, TimedBinaryRow>>, Entry<UUID,
TimedBinaryRow>> isFullByRowCount = (batch, row) ->
+ batch.size() + 1 > batchSize;
+ List<List<Map.Entry<UUID, TimedBinaryRow>>> batches =
getTimedBinaryRowBatches(rowsToUpdate, isFullByRowCount);
+ batches.forEach((batch) ->
+ storage.runConsistently(locker -> {
+ int commitTblId = commitPartitionId.tableId();
+ int commitPartId = commitPartitionId.partitionId();
+
+ List<RowId> rowIds = new ArrayList<>();
+ for (Map.Entry<UUID, TimedBinaryRow> entry : batch) {
+ RowId rowId = new RowId(partitionId, entry.getKey());
+ BinaryRow row = entry.getValue() == null ? null :
entry.getValue().binaryRow();
+
+ locker.lock(rowId);
+
+ performStorageCleanupIfNeeded(txId, rowId,
entry.getValue() == null ? null : entry.getValue().commitTimestamp());
+
+ if (commitTs != null) {
+ storage.addWriteCommitted(rowId, row, commitTs);
+ } else {
+ BinaryRow oldRow = storage.addWrite(rowId, row,
txId, commitTblId, commitPartId);
+
+ if (oldRow != null) {
+ assert commitTs == null :
String.format("Expecting explicit txn: [txId=%s]", txId);
+ // Previous uncommitted row should be removed
from indexes.
+ tryRemovePreviousWritesIndex(rowId, oldRow);
+ }
+ }
- performStorageCleanupIfNeeded(txId, rowId,
entry.getValue() == null ? null : entry.getValue().commitTimestamp());
+ rowIds.add(rowId);
+ indexUpdateHandler.addToIndexes(row, rowId);
+ }
- if (commitTs != null) {
- storage.addWriteCommitted(rowId, row, commitTs);
- } else {
- BinaryRow oldRow = storage.addWrite(rowId, row, txId,
commitTblId, commitPartId);
+ if (trackWriteIntent) {
+ pendingRows.addPendingRowIds(txId, rowIds);
+ }
- if (oldRow != null) {
- assert commitTs == null : String.format("Expecting
explicit txn: [txId=%s]", txId);
- // Previous uncommitted row should be removed from
indexes.
- tryRemovePreviousWritesIndex(rowId, oldRow);
- }
+ if (onApplication != null) {
+ onApplication.run();
}
- rowIds.add(rowId);
- indexUpdateHandler.addToIndexes(row, rowId);
- }
+ return null;
+ }));
+ }
- if (trackWriteIntent) {
- pendingRows.addPendingRowIds(txId, rowIds);
- }
+ //draft, TODO get from config
+ private long getBatchSize() {
+ return 10;
+ }
- if (onApplication != null) {
- onApplication.run();
- }
+ private List<List<Map.Entry<UUID, TimedBinaryRow>>>
getTimedBinaryRowBatches(
+ Map<UUID, TimedBinaryRow> rows,
+ BiPredicate<List<Entry<UUID, TimedBinaryRow>>, Map.Entry<UUID,
TimedBinaryRow>> isFull
+ ) {
+ // Sort IDs to prevent deadlock. Natural UUID order matches RowId
order within the same partition.
+ List<Entry<UUID, TimedBinaryRow>> sortedRows =
rows.entrySet().stream().sorted().collect(Collectors.toList());
Review Comment:
Also, this collection contains entries, not rows, so the naming is probably
not perfect. How about `sortedEntries`?
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java:
##########
@@ -157,51 +158,77 @@ public void handleUpdateAll(
) {
indexUpdateHandler.waitIndexes();
- storage.runConsistently(locker -> {
- int commitTblId = commitPartitionId.tableId();
- int commitPartId = commitPartitionId.partitionId();
-
- if (!nullOrEmpty(rowsToUpdate)) {
- List<RowId> rowIds = new ArrayList<>();
-
- // Sort IDs to prevent deadlock. Natural UUID order matches
RowId order within the same partition.
- SortedMap<UUID, TimedBinaryRow> sortedRowsToUpdateMap = new
TreeMap<>(rowsToUpdate);
-
- for (Map.Entry<UUID, TimedBinaryRow> entry :
sortedRowsToUpdateMap.entrySet()) {
- RowId rowId = new RowId(partitionId, entry.getKey());
- BinaryRow row = entry.getValue() == null ? null :
entry.getValue().binaryRow();
-
- locker.lock(rowId);
+ if (nullOrEmpty(rowsToUpdate)) {
+ return;
+ }
+ long batchSize = getBatchSize();
+ BiPredicate<List<Entry<UUID, TimedBinaryRow>>, Entry<UUID,
TimedBinaryRow>> isFullByRowCount = (batch, row) ->
+ batch.size() + 1 > batchSize;
+ List<List<Map.Entry<UUID, TimedBinaryRow>>> batches =
getTimedBinaryRowBatches(rowsToUpdate, isFullByRowCount);
+ batches.forEach((batch) ->
+ storage.runConsistently(locker -> {
+ int commitTblId = commitPartitionId.tableId();
+ int commitPartId = commitPartitionId.partitionId();
+
+ List<RowId> rowIds = new ArrayList<>();
+ for (Map.Entry<UUID, TimedBinaryRow> entry : batch) {
+ RowId rowId = new RowId(partitionId, entry.getKey());
+ BinaryRow row = entry.getValue() == null ? null :
entry.getValue().binaryRow();
+
+ locker.lock(rowId);
+
+ performStorageCleanupIfNeeded(txId, rowId,
entry.getValue() == null ? null : entry.getValue().commitTimestamp());
+
+ if (commitTs != null) {
+ storage.addWriteCommitted(rowId, row, commitTs);
+ } else {
+ BinaryRow oldRow = storage.addWrite(rowId, row,
txId, commitTblId, commitPartId);
+
+ if (oldRow != null) {
+ assert commitTs == null :
String.format("Expecting explicit txn: [txId=%s]", txId);
+ // Previous uncommitted row should be removed
from indexes.
+ tryRemovePreviousWritesIndex(rowId, oldRow);
+ }
+ }
- performStorageCleanupIfNeeded(txId, rowId,
entry.getValue() == null ? null : entry.getValue().commitTimestamp());
+ rowIds.add(rowId);
+ indexUpdateHandler.addToIndexes(row, rowId);
+ }
- if (commitTs != null) {
- storage.addWriteCommitted(rowId, row, commitTs);
- } else {
- BinaryRow oldRow = storage.addWrite(rowId, row, txId,
commitTblId, commitPartId);
+ if (trackWriteIntent) {
+ pendingRows.addPendingRowIds(txId, rowIds);
Review Comment:
Before your change, `rowIds` contained all rowIds of this operation; now it
only contains IDs related to one batch. I would ask the guys is this is ok that
the pending IDs collection is updated in chunks.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]