sanpwc commented on code in PR #2679:
URL: https://github.com/apache/ignite-3/pull/2679#discussion_r1361883729
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -3016,13 +3062,19 @@ private void scheduleTransactionRowAsyncCleanup(UUID
txId, RowId rowId, Transact
// Both normal cleanup and single row cleanup are using
txsPendingRowIds map to store write intents.
// So we don't need a separate method to handle single row case.
- txManager.executeCleanupAsync(() ->
- inBusyLock(busyLock, () ->
storageUpdateHandler.handleTransactionCleanup(txId, txState == COMMITED,
commitTimestamp))
- ).exceptionally(e -> {
- LOG.warn("Failed to complete transaction cleanup command [txId=" +
txId + ']', e);
+ CompletableFuture<?> future = rowCleanupMap.compute(rowId, (k, v) -> {
+ CompletableFuture<Void> rowCleanup =
txManager.executeCleanupAsync(() ->
+ inBusyLock(busyLock, () ->
storageUpdateHandler.handleTransactionCleanup(txId, txState == COMMITED,
commitTimestamp))
+ ).exceptionally(e -> {
+ LOG.warn("Failed to complete transaction cleanup command
[txId=" + txId + ']', e);
Review Comment:
Why do we expect handleTransactionCleanup to fail? In other words why it's
warn and not error?
I believe that it shouldn't be possible to proceed with given tx if we
failed to cleanup the writeIntent, thus we should rollback current transaction.
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -2628,6 +2652,19 @@ private CompletableFuture<ReplicaResult>
processSingleEntryAction(ReadWriteSingl
}
}
+ private <T> CompletableFuture<T> checkCleanup(@Nullable RowId rowId, T
result) {
Review Comment:
Could you please add javadoc?
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -1763,12 +1768,15 @@ private CompletableFuture<ReplicaResult>
processMultiEntryAction(ReadWriteMultiR
return allOf(deleteExactLockFuts).thenCompose(ignore -> {
Map<UUID, BinaryRowMessage> rowIdsToDelete = new
HashMap<>();
Collection<BinaryRow> result = new ArrayList<>();
+ List<RowId> rows = new ArrayList<>();
for (int i = 0; i < searchRows.size(); i++) {
RowId lockedRowId = deleteExactLockFuts[i].join();
if (lockedRowId != null) {
rowIdsToDelete.put(lockedRowId.uuid(), null);
+
+ rows.add(lockedRowId);
Review Comment:
Why there's no such logic in insert_all case?
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -2628,6 +2652,19 @@ private CompletableFuture<ReplicaResult>
processSingleEntryAction(ReadWriteSingl
}
}
+ private <T> CompletableFuture<T> checkCleanup(@Nullable RowId rowId, T
result) {
+ return (rowId == null ? COMPLETED_EMPTY :
rowCleanupMap.getOrDefault(rowId, COMPLETED_EMPTY))
+ .thenApply(ignored -> result);
+ }
+
+ private <T> CompletableFuture<T> checkCleanup(Collection<RowId> rowIds, T
result) {
+ return allOf(rowIds.stream()
+ .map(rowCleanupMap::get)
Review Comment:
As we agreed previously, let's avoid streap API on the hot path.
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -2606,6 +2629,7 @@ private CompletableFuture<ReplicaResult>
processSingleEntryAction(ReadWriteSingl
return takeLocksForDelete(row, rowId, txId)
.thenCompose(ignored ->
validateOperationAgainstSchema(request.transactionId()))
+ .thenCompose(catalogVersion -> checkCleanup(rowId,
catalogVersion))
Review Comment:
Seems that validateOperationAgainstSchema and checkCleanup can be awaited in
parallel, right? If true I'd rather consider following approach
```
f = validateOperationAgainstSchema(request.transactionId())
allOf(f, checkCleanup).thenCompose(ignored -> {catalogVersion = f.join())
```
It will also allow to remove T result param from the checkCleanup()
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]