korlov42 commented on code in PR #1191:
URL: https://github.com/apache/ignite-3/pull/1191#discussion_r1004206463


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -856,335 +886,218 @@ private CompletableFuture<Object> 
processSingleEntryAction(ReadWriteSingleRowRep
 
         ByteBuffer searchKey = searchRow.keySlice();
 
-        UUID indexId = indexIdOrDefault(indexPkId/*request.indexToUse()*/);
-
         UUID txId = request.transactionId();
 
         switch (request.requestType()) {
             case RW_GET: {
-                CompletableFuture<RowId> lockFut = takeLocksForGet(searchKey, 
indexId, txId);
-
-                return lockFut.thenApply(lockedRowId -> {
-                    BinaryRow result = lockedRowId != null
-                            ? 
resolveReadResult(mvDataStorage.read(lockedRowId, HybridTimestamp.MAX_VALUE), 
txId) : null;
+                return resolveRowByPk(searchKey, txId, (rowId, row) -> {
+                    if (rowId == null) {
+                        return CompletableFuture.completedFuture(null);
+                    }
 
-                    return result;
+                    return takeLocksForGet(rowId, txId)
+                            .thenApply(ignored -> row);
                 });
             }
             case RW_DELETE: {
-                CompletableFuture<RowId> lockFut = 
takeLocksForDelete(searchKey, indexId, txId);
-
-                return lockFut.thenCompose(lockedRowId -> {
-                    boolean removed = lockedRowId != null;
-
-                    CompletableFuture raftFut = removed ? 
applyCmdWithExceptionHandling(new UpdateCommand(lockedRowId, txId)) :
-                            CompletableFuture.completedFuture(null);
+                return resolveRowByPk(searchKey, txId, (rowId, row) -> {
+                    if (rowId == null) {
+                        return CompletableFuture.completedFuture(false);
+                    }
 
-                    return raftFut.thenApply(ignored -> removed);
+                    return takeLocksForDelete(searchRow, rowId, txId)
+                            .thenCompose(ignored -> 
applyCmdWithExceptionHandling(new UpdateCommand(rowId, txId)))
+                            .thenApply(ignored -> true);
                 });
             }
             case RW_GET_AND_DELETE: {
-                CompletableFuture<RowId> lockFut = 
takeLocksForDelete(searchKey, indexId, txId);
-
-                return lockFut.thenCompose(lockedRowId -> {
-                    BinaryRow lockedRow = lockedRowId != null
-                            ? 
resolveReadResult(mvDataStorage.read(lockedRowId, HybridTimestamp.MAX_VALUE), 
txId) : null;
-
-                    CompletableFuture raftFut = lockedRowId != null ? 
applyCmdWithExceptionHandling(new UpdateCommand(lockedRowId, txId)) :
-                            CompletableFuture.completedFuture(null);
+                return resolveRowByPk(searchKey, txId, (rowId, row) -> {
+                    if (rowId == null) {
+                        return CompletableFuture.completedFuture(null);
+                    }
 
-                    return raftFut.thenApply(ignored -> lockedRow);
+                    return takeLocksForDelete(searchRow, rowId, txId)
+                            .thenCompose(ignored -> 
applyCmdWithExceptionHandling(new UpdateCommand(rowId, txId)))
+                            .thenApply(ignored -> row);
                 });
             }
             case RW_DELETE_EXACT: {
-                CompletableFuture<RowId> lockFut = 
takeLocksForDeleteExact(searchKey, searchRow, indexId, txId);
-
-                return lockFut.thenCompose(lockedRow -> {
-                    boolean removed = lockedRow != null;
+                return resolveRowByPk(searchKey, txId, (rowId, row) -> {
+                    if (rowId == null) {
+                        return CompletableFuture.completedFuture(false);
+                    }
 
-                    CompletableFuture raftFut = removed ? 
applyCmdWithExceptionHandling(new UpdateCommand(lockedRow, txId)) :
-                            CompletableFuture.completedFuture(null);
+                    return takeLocksForDeleteExact(searchRow, rowId, row, txId)
+                            .thenCompose(validatedRowId -> {
+                                if (validatedRowId == null) {
+                                    return 
CompletableFuture.completedFuture(false);
+                                }
 
-                    return raftFut.thenApply(ignored -> removed);
+                                return applyCmdWithExceptionHandling(new 
UpdateCommand(validatedRowId, txId))
+                                        .thenApply(ignored -> true);
+                            });
                 });
             }
             case RW_INSERT: {
-                CompletableFuture<RowId> lockFut = 
takeLocksForInsert(searchKey, indexId, txId);
-
-                return lockFut.thenCompose(lockedRowId -> {
-                    boolean inserted = lockedRowId == null;
+                return resolveRowByPk(searchKey, txId, (rowId, row) -> {
+                    if (rowId != null) {
+                        return CompletableFuture.completedFuture(false);
+                    }
 
-                    CompletableFuture raftFut =
-                            lockedRowId == null ? 
applyCmdWithExceptionHandling(new UpdateCommand(new RowId(partId), searchRow, 
txId)) :
-                                    CompletableFuture.completedFuture(null);
+                    RowId rowId0 = new RowId(partId);
 
-                    return raftFut.thenApply(ignored -> inserted);
+                    return takeLocksForInsert(searchRow, rowId0, txId)
+                            .thenCompose(ignored -> 
applyCmdWithExceptionHandling(new UpdateCommand(rowId0, searchRow, txId)))
+                            .thenApply(ignored -> true);
                 });
             }
             case RW_UPSERT: {
-                CompletableFuture<RowId> lockFut = 
takeLocksForUpsert(searchKey, indexId, txId);
+                return resolveRowByPk(searchKey, txId, (rowId, row) -> {
+                    boolean insert = rowId == null;
+
+                    RowId rowId0 = insert ? new RowId(partId) : rowId;
 
-                return lockFut.thenCompose(lockedRowId -> {
-                    CompletableFuture raftFut =
-                            lockedRowId != null ? 
applyCmdWithExceptionHandling(new UpdateCommand(lockedRowId, searchRow, txId)) :
-                                    applyCmdWithExceptionHandling(new 
UpdateCommand(new RowId(partId), searchRow, txId));
+                    CompletableFuture<?> lockFut = insert
+                            ? takeLocksForInsert(searchRow, rowId0, txId)
+                            : takeLocksForUpdate(searchRow, rowId0, txId);
 
-                    return raftFut.thenApply(ignored -> null);
+                    return lockFut
+                            .thenCompose(ignored -> 
applyCmdWithExceptionHandling(new UpdateCommand(rowId0, searchRow, txId)))
+                            .thenApply(ignored -> null);
                 });
             }
             case RW_GET_AND_UPSERT: {
-                return lockManager.acquire(txId, new LockKey(indexId, 
searchKey), LockMode.X)
-                        .thenCompose(idxLock -> { // Index X lock
-                            RowId rowId = rowIdByKey(indexId, searchKey);
-
-                            return lockManager.acquire(txId, new 
LockKey(tableId), LockMode.IX)
-                                    .thenCompose(tblLock -> { // IX lock on 
table
-                                        CompletableFuture<Lock> rowLockFut = 
(rowId != null)
-                                                ? lockManager.acquire(txId, 
new LockKey(tableId, rowId), LockMode.X)
-                                                // X lock on RowId
-                                                : 
CompletableFuture.completedFuture(null);
-
-                                        return rowLockFut.thenCompose(rowLock 
-> {
-                                            BinaryRow result = rowId != null
-                                                    ? 
resolveReadResult(mvDataStorage.read(rowId, HybridTimestamp.MAX_VALUE), txId) : 
null;
-
-                                            CompletableFuture raftFut =
-                                                    rowId != null ? 
applyCmdWithExceptionHandling(new UpdateCommand(rowId, searchRow, txId))
-                                                            : 
applyCmdWithExceptionHandling(
-                                                                    new 
UpdateCommand(new RowId(partId), searchRow, txId));
-
-                                            return raftFut.thenApply(ignored 
-> result);
-                                        });
-                                    });
-                        });
-            }
-            case RW_GET_AND_REPLACE: {
-                CompletableFuture<RowId> idxLockFut = 
lockManager.acquire(txId, new LockKey(indexId, searchKey), LockMode.S)
-                        .thenCompose(sharedIdxLock -> { // Index S lock
-                            RowId rowId = rowIdByKey(indexId, searchKey);
+                return resolveRowByPk(searchKey, txId, (rowId, row) -> {
+                    boolean insert = rowId == null;
 
-                            if (rowId != null) {
-                                return lockManager.acquire(txId, new 
LockKey(indexId, searchKey), LockMode.X)
-                                        .thenApply(exclusiveIdxLock -> rowId); 
// Index X lock
-                            }
-
-                            return CompletableFuture.completedFuture(null);
-                        });
-
-                return idxLockFut.thenCompose(lockedRowId -> {
-                    return lockManager.acquire(txId, new LockKey(tableId), 
LockMode.IX)
-                            .thenCompose(tblLock -> { // IX lock on table
-                                CompletableFuture<BinaryRow> rowLockFut;
+                    RowId rowId0 = insert ? new RowId(partId) : rowId;
 
-                                if (lockedRowId != null) {
-                                    rowLockFut = lockManager.acquire(txId, new 
LockKey(tableId, lockedRowId), LockMode.X)
-                                            .thenApply(rowLock -> // X lock on 
RowId
-                                                    
resolveReadResult(mvDataStorage.read(lockedRowId, HybridTimestamp.MAX_VALUE), 
txId)
-                                            );
-                                } else {
-                                    rowLockFut = 
CompletableFuture.completedFuture(null);
-                                }
+                    CompletableFuture<?> lockFut = insert
+                            ? takeLocksForInsert(searchRow, rowId0, txId)
+                            : takeLocksForUpdate(searchRow, rowId0, txId);
 
-                                return rowLockFut.thenCompose(lockedRow -> {
-                                    CompletableFuture raftFut = lockedRow == 
null ? CompletableFuture.completedFuture(null) :
-                                            applyCmdWithExceptionHandling(new 
UpdateCommand(lockedRowId, searchRow, txId));
+                    return lockFut
+                            .thenCompose(ignored -> 
applyCmdWithExceptionHandling(new UpdateCommand(rowId0, searchRow, txId)))
+                            .thenApply(ignored -> row);
+                });
+            }
+            case RW_GET_AND_REPLACE: {
+                return resolveRowByPk(searchKey, txId, (rowId, row) -> {
+                    if (rowId == null) {
+                        return CompletableFuture.completedFuture(null);
+                    }
 
-                                    return raftFut.thenApply(ignored -> 
lockedRow);
-                                });
-                            });
+                    return takeLocksForUpdate(searchRow, rowId, txId)
+                            .thenCompose(ignored -> 
applyCmdWithExceptionHandling(new UpdateCommand(rowId, searchRow, txId)))
+                            .thenApply(ignored0 -> row);
                 });
             }
             case RW_REPLACE_IF_EXIST: {
-                CompletableFuture<RowId> lockFut = 
takeLocksForReplaceIfExist(searchKey, indexId, txId);
-
-                return lockFut.thenCompose(lockedRowId -> {
-                    boolean replaced = lockedRowId != null;
-
-                    CompletableFuture raftFut = replaced ? 
applyCmdWithExceptionHandling(new UpdateCommand(lockedRowId, searchRow, txId)) :
-                            CompletableFuture.completedFuture(null);
+                return resolveRowByPk(searchKey, txId, (rowId, row) -> {
+                    if (rowId == null) {
+                        return CompletableFuture.completedFuture(false);
+                    }
 
-                    return raftFut.thenApply(ignored -> replaced);
+                    return takeLocksForUpdate(searchRow, rowId, txId)
+                            .thenCompose(ignored -> 
applyCmdWithExceptionHandling(new UpdateCommand(rowId, searchRow, txId)))
+                            .thenApply(ignored -> true);
                 });
             }
             default: {
                 throw new 
IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
-                        IgniteStringFormatter.format("Unknown single request 
[actionType={}]", request.requestType()));
+                        format("Unknown single request [actionType={}]", 
request.requestType()));
             }
         }
     }
 
-    /**
-     * Takes all required locks on a key, before replacing.
-     *
-     * @param searchKey Key to search.
-     * @param indexId   Index id.
-     * @param txId      Transaction id.
-     * @return Future completes with {@link RowId} or {@code null} if there is 
no entry.
-     */
-    private CompletableFuture<RowId> takeLocksForReplaceIfExist(ByteBuffer 
searchKey, UUID indexId, UUID txId) {
-        return lockManager.acquire(txId, new LockKey(indexId, searchKey), 
LockMode.S).thenCompose(shareIdxLock -> { // Index R lock
-            RowId rowId = rowIdByKey(indexId, searchKey);
-
-            CompletableFuture<Lock> idxLockFut = rowId != null
-                    ? lockManager.acquire(txId, new LockKey(indexId, 
searchKey), LockMode.X) // Index X lock
-                    : CompletableFuture.completedFuture(null);
-
-            return idxLockFut.thenCompose(exclusiveIdxLock -> 
lockManager.acquire(txId, new LockKey(tableId), LockMode.IX)
-                    .thenCompose(tblLock -> { // IX lock on table
-                        if (rowId != null) {
-                            RowId rowIdToLock = rowId;
-
-                            return lockManager.acquire(txId, new 
LockKey(tableId, rowId), LockMode.X)
-                                    .thenApply(rowLock -> rowIdToLock); // X 
lock on RowId
-                        }
-
-                        return CompletableFuture.completedFuture(null);
-                    }));
-        });
-    }
-
     /**
      * Takes all required locks on a key, before upserting.
      *
-     * @param searchKey Key to search.
-     * @param indexId   Index id.
      * @param txId      Transaction id.
      * @return Future completes with {@link RowId} or {@code null} if there is 
no value.
      */
-    private CompletableFuture<RowId> takeLocksForUpsert(ByteBuffer searchKey, 
UUID indexId, UUID txId) {
-        return lockManager.acquire(txId, new LockKey(indexId, searchKey), 
LockMode.X).thenCompose(idxLock -> { // Index X lock
-            RowId rowId = rowIdByKey(indexId, searchKey);
-
-            return lockManager.acquire(txId, new LockKey(tableId), LockMode.IX)
-                    .thenCompose(tblLock -> { // IX lock on table
-                        if (rowId != null) {
-                            return lockManager.acquire(txId, new 
LockKey(tableId, rowId), LockMode.X)
-                                    .thenApply(rowLock -> rowId); // X lock on 
RowId
-                        }
-
-                        return CompletableFuture.completedFuture(null);
-                    });
-        });
+    private CompletableFuture<RowId> takeLocksForUpdate(BinaryRow tableRow, 
RowId rowId, UUID txId) {
+        return lockManager.acquire(txId, new LockKey(tableId, 
tableRow.keySlice()), LockMode.X) // Index X lock
+                .thenCompose(ignored -> lockManager.acquire(txId, new 
LockKey(tableId), LockMode.IX))
+                .thenCompose(ignored -> lockManager.acquire(txId, new 
LockKey(tableId, rowId), LockMode.X))
+                .thenCompose(ignored -> takePutLockOnIndexes(tableRow, rowId, 
txId))
+                .thenApply(ignored -> rowId);
     }
 
     /**
      * Takes all required locks on a key, before inserting the value.
      *
-     * @param searchKey Key to search.
-     * @param indexId   Index id.
-     * @param txId      Transaction id.
+     * @param tableRow Table row.
+     * @param txId Transaction id.
      * @return Future completes with {@link RowId} or {@code null} if there is 
no value.
      */
-    private CompletableFuture<RowId> takeLocksForInsert(ByteBuffer searchKey, 
UUID indexId, UUID txId) {
-        return lockManager.acquire(txId, new LockKey(indexId, searchKey), 
LockMode.S) // Index S lock
-                .thenCompose(sharedIdxLock -> {
-                    RowId rowId = rowIdByKey(indexId, searchKey);
+    private CompletableFuture<RowId> takeLocksForInsert(BinaryRow tableRow, 
RowId rowId, UUID txId) {
+        return lockManager.acquire(txId, new LockKey(tableId, 
tableRow.keySlice()), LockMode.X)
+                .thenCompose(exclusiveIdxLock -> lockManager.acquire(txId, new 
LockKey(tableId), LockMode.IX)) // IX lock on table
+                .thenCompose(ignored -> takePutLockOnIndexes(tableRow, rowId, 
txId))
+                .thenApply(tblLock -> rowId);
+    }
 
-                    if (rowId == null) {
-                        return lockManager.acquire(txId, new LockKey(indexId, 
searchKey), LockMode.X) // Index X lock
-                                .thenCompose(exclusiveIdxLock ->
-                                        lockManager.acquire(txId, new 
LockKey(tableId), LockMode.IX) // IX lock on table
-                                                .thenApply(tblLock -> null));
-                    }
+    private CompletableFuture<?> takePutLockOnIndexes(BinaryRow tableRow, 
RowId rowId, UUID txId) {
+        List<IndexLocker> indexes = indexesLockers.get();
 
-                    return CompletableFuture.completedFuture(rowId);
-                });
+        if (nullOrEmpty(indexes)) {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        CompletableFuture<?>[] locks = new CompletableFuture[indexes.size()];
+        int idx = 0;
+
+        for (IndexLocker locker : indexes) {
+            locks[idx++] = locker.locksForInsert(txId, tableRow, rowId);
+        }
+
+        return CompletableFuture.allOf(locks);
     }
 
     /**
      * Takes all required locks on a key, before deleting the value.
      *
-     * @param searchKey Key to search.
-     * @param searchRow Row to remove.
-     * @param indexId   Index id.
      * @param txId      Transaction id.
      * @return Future completes with {@link RowId} or {@code null} if there is 
no value for remove.
      */
-    private CompletableFuture<RowId> takeLocksForDeleteExact(ByteBuffer 
searchKey, BinaryRow searchRow, UUID indexId, UUID txId) {
-        return lockManager.acquire(txId, new LockKey(indexId, searchKey), 
LockMode.X).thenCompose(idxLock -> { // Index X lock
-            RowId rowId = rowIdByKey(indexId, searchKey);
-
-            return lockManager.acquire(txId, new LockKey(tableId), 
LockMode.IX) // IX lock on table
-                    .thenCompose(tblLock -> {
-                        CompletableFuture<RowId> rowLockFut;
-
-                        if (rowId != null) {
-                            rowLockFut = lockManager.acquire(txId, new 
LockKey(tableId, rowId), LockMode.S) // S lock on RowId
-                                    .thenCompose(sharedRowLock -> {
-                                        BinaryRow curVal = 
resolveReadResult(mvDataStorage.read(rowId, HybridTimestamp.MAX_VALUE), txId);
-
-                                        if (equalValues(curVal, searchRow)) {
-                                            return lockManager.acquire(txId, 
new LockKey(tableId, rowId),
-                                                            LockMode.X) // X 
lock on RowId
-                                                    
.thenApply(exclusiveRowLock -> rowId);
-                                        }
-
-                                        return 
CompletableFuture.completedFuture(null);
-                                    });
-                        } else {
-                            rowLockFut = 
CompletableFuture.completedFuture(null);
-                        }
+    private CompletableFuture<RowId> takeLocksForDeleteExact(BinaryRow 
expectedRow, RowId rowId, BinaryRow actualRow, UUID txId) {
+        return lockManager.acquire(txId, new LockKey(tableId, 
expectedRow.keySlice()), LockMode.X)  // Index X lock

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to