sanpwc commented on code in PR #2679:
URL: https://github.com/apache/ignite-3/pull/2679#discussion_r1361883729


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -3016,13 +3062,19 @@ private void scheduleTransactionRowAsyncCleanup(UUID 
txId, RowId rowId, Transact
 
         // Both normal cleanup and single row cleanup are using 
txsPendingRowIds map to store write intents.
         // So we don't need a separate method to handle single row case.
-        txManager.executeCleanupAsync(() ->
-                inBusyLock(busyLock, () -> 
storageUpdateHandler.handleTransactionCleanup(txId, txState == COMMITED, 
commitTimestamp))
-        ).exceptionally(e -> {
-            LOG.warn("Failed to complete transaction cleanup command [txId=" + 
txId + ']', e);
+        CompletableFuture<?> future = rowCleanupMap.compute(rowId, (k, v) -> {
+            CompletableFuture<Void> rowCleanup = 
txManager.executeCleanupAsync(() ->
+                    inBusyLock(busyLock, () -> 
storageUpdateHandler.handleTransactionCleanup(txId, txState == COMMITED, 
commitTimestamp))
+            ).exceptionally(e -> {
+                LOG.warn("Failed to complete transaction cleanup command 
[txId=" + txId + ']', e);

Review Comment:
   Why do we expect handleTransactionCleanup to fail? In other words why it's 
warn and not error?
   I believe that it shouldn't be possible to proceed with given tx if we 
failed to cleanup the writeIntent, thus we should rollback current transaction.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -2628,6 +2652,19 @@ private CompletableFuture<ReplicaResult> 
processSingleEntryAction(ReadWriteSingl
         }
     }
 
+    private <T> CompletableFuture<T> checkCleanup(@Nullable RowId rowId, T 
result) {

Review Comment:
   Could you please add javadoc?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -1763,12 +1768,15 @@ private CompletableFuture<ReplicaResult> 
processMultiEntryAction(ReadWriteMultiR
                 return allOf(deleteExactLockFuts).thenCompose(ignore -> {
                     Map<UUID, BinaryRowMessage> rowIdsToDelete = new 
HashMap<>();
                     Collection<BinaryRow> result = new ArrayList<>();
+                    List<RowId> rows = new ArrayList<>();
 
                     for (int i = 0; i < searchRows.size(); i++) {
                         RowId lockedRowId = deleteExactLockFuts[i].join();
 
                         if (lockedRowId != null) {
                             rowIdsToDelete.put(lockedRowId.uuid(), null);
+
+                            rows.add(lockedRowId);

Review Comment:
   Why there's no such logic in insert_all case?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -2628,6 +2652,19 @@ private CompletableFuture<ReplicaResult> 
processSingleEntryAction(ReadWriteSingl
         }
     }
 
+    private <T> CompletableFuture<T> checkCleanup(@Nullable RowId rowId, T 
result) {
+        return (rowId == null ? COMPLETED_EMPTY : 
rowCleanupMap.getOrDefault(rowId, COMPLETED_EMPTY))
+                .thenApply(ignored -> result);
+    }
+
+    private <T> CompletableFuture<T> checkCleanup(Collection<RowId> rowIds, T 
result) {
+        return allOf(rowIds.stream()
+                .map(rowCleanupMap::get)

Review Comment:
   As we agreed previously, let's avoid streap API on the hot path.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -2606,6 +2629,7 @@ private CompletableFuture<ReplicaResult> 
processSingleEntryAction(ReadWriteSingl
 
                     return takeLocksForDelete(row, rowId, txId)
                             .thenCompose(ignored -> 
validateOperationAgainstSchema(request.transactionId()))
+                            .thenCompose(catalogVersion -> checkCleanup(rowId, 
catalogVersion))

Review Comment:
   Seems that validateOperationAgainstSchema and checkCleanup can be awaited in 
parallel, right? If true I'd rather consider following approach
   ```
   f = validateOperationAgainstSchema(request.transactionId())
   allOf(f, checkCleanup).thenCompose(ignored -> {catalogVersion = f.join())
   ```
   It will also allow to remove T result param from the checkCleanup()



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to