tkalkirill commented on code in PR #13264:
URL: https://github.com/apache/ignite/pull/13264#discussion_r3479374562


##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java:
##########
@@ -1345,6 +1345,61 @@ public boolean lock(K key, long timeout)
      */
     public boolean isLocked(K key);
 
+    /**
+     * Acquires a transactional lock for the cached object represented by the 
given entry if the current cached version
+     * matches the entry version. This method works only in a {@link 
TransactionConcurrency#PESSIMISTIC} transaction.
+     *
+     * @param entry Entry whose key, value and version should be used.
+     * @param waitTimeout Timeout in milliseconds to wait for lock to be 
acquired
+     *      ({@code 0} to use the transaction timeout, {@code -1} for 
immediate failure if
+     *      lock cannot be acquired immediately).
+     * @return {@code True} if lock was acquired with the same entry version.
+     * @throws IgniteCheckedException If lock acquisition resulted in an error.
+     */
+    public boolean lockTxEntry(CacheEntry<K, V> entry, long waitTimeout) 
throws IgniteCheckedException;
+
+    /**
+     * Acquires transactional locks for the cached objects represented by the 
given entries if all current cached
+     * versions match the corresponding entry versions. This method works only 
in a
+     * {@link TransactionConcurrency#PESSIMISTIC} transaction.
+     *
+     * @param entries Entries whose keys, values and versions should be used.
+     * @param waitTimeout Timeout in milliseconds to wait for locks to be 
acquired
+     *      ({@code 0} to use the transaction timeout, {@code -1} for 
immediate failure if
+     *      locks cannot be acquired immediately).
+     * @return {@code True} if all locks were acquired with the same entry 
versions.
+     * @throws IgniteCheckedException If lock acquisition resulted in an error.
+     */
+    public boolean lockTxEntries(Collection<CacheEntry<K, V>> entries, long 
waitTimeout) throws IgniteCheckedException;
+
+    /**
+     * Asynchronously acquires a transactional lock for the cached object 
represented by the given entry if the current
+     * cached version matches the entry version. This method works only in a
+     * {@link TransactionConcurrency#PESSIMISTIC} transaction.
+     *
+     * @param entry Entry whose key, value and version should be used.
+     * @param waitTimeout Timeout in milliseconds to wait for lock to be 
acquired
+     *      ({@code 0} to use the transaction timeout, {@code -1} for 
immediate failure if
+     *      lock cannot be acquired immediately).
+     * @return Future that resolves to {@code true} if the lock was acquired 
and the versions matched, or to
+     *      {@code false} otherwise.
+     */
+    public IgniteInternalFuture<Boolean> lockTxEntryAsync(CacheEntry<K, V> 
entry, long waitTimeout);
+
+    /**
+     * Asynchronously acquires transactional locks for the cached objects 
represented by the given entries if all
+     * current cached versions match the corresponding entry versions. This 
method works only in a
+     * {@link TransactionConcurrency#PESSIMISTIC} transaction.
+     *
+     * @param entries Entries whose keys, values and versions should be used.
+     * @param waitTimeout Timeout in milliseconds to wait for locks to be 
acquired
+     *      ({@code 0} to use the transaction timeout, {@code -1} for 
immediate failure if
+     *      locks cannot be acquired immediately).
+     * @return Future that resolves to {@code true} if all locks were acquired 
and all versions matched, or to
+     *      {@code false} otherwise.
+     */
+    public IgniteInternalFuture<Boolean> 
lockTxEntriesAsync(Collection<CacheEntry<K, V>> entries, long waitTimeout);

Review Comment:
   What about errors? If it fails, will the method throw an error or will the 
future complete with an error? Please reflect this in the documentation.



##########
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheOperationContextTransactionalLockTest.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntry;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.junit.Test;
+
+import static 
org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static 
org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+
+/** Tests operation context propagation when a versioned cache entry is locked 
in a transaction. */
+public class CacheOperationContextTransactionalLockTest extends 
GridCommonAbstractTest {
+    /** */
+    private static final int KEY = 1;
+
+    /** */
+    private static Ignite ignite;
+
+    /** */
+    private static IgniteCache<Integer, Integer> cache;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        ignite = startGrid(0);
+
+        cache = ignite.createCache(new CacheConfiguration<Integer, 
Integer>(DEFAULT_CACHE_NAME)
+            .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL));
+
+        cache.put(KEY, KEY);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        ignite = null;
+        cache = null;
+
+        super.afterTestsStopped();
+    }
+
+    /** Checks the defaults used when there is no operation context. */
+    @Test
+    public void testDefaultOperationContext() throws Exception {
+        assertTxEntryFlags(internalCache(), false, false, false, false);
+    }
+
+    /** Checks propagation of the skip-store flag. */
+    @Test
+    public void testSkipStoreOperationContext() throws Exception {
+        assertTxEntryFlags(internalCache().withSkipStore(), true, false, 
false, false);
+    }
+
+    /** Checks propagation of the skip-read-through flag. */
+    @Test
+    public void testSkipReadThroughOperationContext() throws Exception {
+        assertTxEntryFlags(internalCache().withSkipReadThrough(), false, true, 
false, false);
+    }
+
+    /** Checks propagation of the keep-binary-in-interceptor flag. */
+    @Test
+    public void testKeepBinaryInInterceptorOperationContext() throws Exception 
{
+        assertTxEntryFlags(internalCache().withKeepBinaryInInterceptor(), 
false, false, true, false);
+    }
+
+    /**
+     * Locks a versioned entry and checks flags stored in its transaction 
entry.
+     *
+     * @param internalCache Internal cache projection used to acquire the lock.
+     * @param skipStore Expected skip-store flag.
+     * @param skipReadThrough Expected skip-read-through flag.
+     * @param keepBinaryInInterceptor Expected keep-binary-in-interceptor flag.
+     * @param keepBinary Expected keep-binary flag.
+     * @throws Exception If failed.
+     */
+    private void assertTxEntryFlags(
+        IgniteInternalCache<Integer, Integer> internalCache,
+        boolean skipStore,
+        boolean skipReadThrough,
+        boolean keepBinaryInInterceptor,
+        boolean keepBinary
+    ) throws Exception {
+        CacheEntry<Integer, Integer> entry = cache.getEntry(KEY);
+
+        assertNotNull(entry);
+        assertNotNull(entry.version());
+
+        try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, 
READ_COMMITTED)) {
+            assertTrue(internalCache.lockTxEntry(entry, 0));
+
+            GridCacheContext<Integer, Integer> ctx = internalCache.context();
+            IgniteTxKey txKey = ctx.txKey(ctx.toCacheKeyObject(KEY));
+            IgniteTxEntry txEntry = internalCache.tx().entry(txKey);
+
+            assertNotNull(txEntry);
+            assertEquals(skipStore, txEntry.skipStore());

Review Comment:
   Could you explain what these flags mean and why they need to be checked? 
   I tried looking at the documentation for them, but I still didn't quite 
understand.



##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java:
##########
@@ -1345,6 +1345,61 @@ public boolean lock(K key, long timeout)
      */
     public boolean isLocked(K key);
 
+    /**
+     * Acquires a transactional lock for the cached object represented by the 
given entry if the current cached version
+     * matches the entry version. This method works only in a {@link 
TransactionConcurrency#PESSIMISTIC} transaction.
+     *
+     * @param entry Entry whose key, value and version should be used.
+     * @param waitTimeout Timeout in milliseconds to wait for lock to be 
acquired
+     *      ({@code 0} to use the transaction timeout, {@code -1} for 
immediate failure if
+     *      lock cannot be acquired immediately).
+     * @return {@code True} if lock was acquired with the same entry version.
+     * @throws IgniteCheckedException If lock acquisition resulted in an error.
+     */
+    public boolean lockTxEntry(CacheEntry<K, V> entry, long waitTimeout) 
throws IgniteCheckedException;
+
+    /**
+     * Acquires transactional locks for the cached objects represented by the 
given entries if all current cached
+     * versions match the corresponding entry versions. This method works only 
in a
+     * {@link TransactionConcurrency#PESSIMISTIC} transaction.
+     *
+     * @param entries Entries whose keys, values and versions should be used.
+     * @param waitTimeout Timeout in milliseconds to wait for locks to be 
acquired
+     *      ({@code 0} to use the transaction timeout, {@code -1} for 
immediate failure if
+     *      locks cannot be acquired immediately).
+     * @return {@code True} if all locks were acquired with the same entry 
versions.
+     * @throws IgniteCheckedException If lock acquisition resulted in an error.
+     */
+    public boolean lockTxEntries(Collection<CacheEntry<K, V>> entries, long 
waitTimeout) throws IgniteCheckedException;
+
+    /**
+     * Asynchronously acquires a transactional lock for the cached object 
represented by the given entry if the current
+     * cached version matches the entry version. This method works only in a
+     * {@link TransactionConcurrency#PESSIMISTIC} transaction.
+     *
+     * @param entry Entry whose key, value and version should be used.
+     * @param waitTimeout Timeout in milliseconds to wait for lock to be 
acquired
+     *      ({@code 0} to use the transaction timeout, {@code -1} for 
immediate failure if
+     *      lock cannot be acquired immediately).
+     * @return Future that resolves to {@code true} if the lock was acquired 
and the versions matched, or to
+     *      {@code false} otherwise.
+     */
+    public IgniteInternalFuture<Boolean> lockTxEntryAsync(CacheEntry<K, V> 
entry, long waitTimeout);

Review Comment:
   What about errors? If it fails, will the method throw an error or will the 
future complete with an error? Please reflect this in the documentation.



##########
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheOperationContextTransactionalLockTest.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntry;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.junit.Test;
+
+import static 
org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static 
org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+
+/** Tests operation context propagation when a versioned cache entry is locked 
in a transaction. */
+public class CacheOperationContextTransactionalLockTest extends 
GridCommonAbstractTest {
+    /** */
+    private static final int KEY = 1;
+
+    /** */
+    private static Ignite ignite;
+
+    /** */
+    private static IgniteCache<Integer, Integer> cache;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        ignite = startGrid(0);
+
+        cache = ignite.createCache(new CacheConfiguration<Integer, 
Integer>(DEFAULT_CACHE_NAME)
+            .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL));
+
+        cache.put(KEY, KEY);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        ignite = null;
+        cache = null;
+
+        super.afterTestsStopped();
+    }
+
+    /** Checks the defaults used when there is no operation context. */
+    @Test
+    public void testDefaultOperationContext() throws Exception {
+        assertTxEntryFlags(internalCache(), false, false, false, false);
+    }
+
+    /** Checks propagation of the skip-store flag. */
+    @Test
+    public void testSkipStoreOperationContext() throws Exception {
+        assertTxEntryFlags(internalCache().withSkipStore(), true, false, 
false, false);
+    }
+
+    /** Checks propagation of the skip-read-through flag. */
+    @Test
+    public void testSkipReadThroughOperationContext() throws Exception {
+        assertTxEntryFlags(internalCache().withSkipReadThrough(), false, true, 
false, false);
+    }
+
+    /** Checks propagation of the keep-binary-in-interceptor flag. */
+    @Test
+    public void testKeepBinaryInInterceptorOperationContext() throws Exception 
{
+        assertTxEntryFlags(internalCache().withKeepBinaryInInterceptor(), 
false, false, true, false);
+    }
+
+    /**
+     * Locks a versioned entry and checks flags stored in its transaction 
entry.
+     *
+     * @param internalCache Internal cache projection used to acquire the lock.
+     * @param skipStore Expected skip-store flag.
+     * @param skipReadThrough Expected skip-read-through flag.
+     * @param keepBinaryInInterceptor Expected keep-binary-in-interceptor flag.
+     * @param keepBinary Expected keep-binary flag.
+     * @throws Exception If failed.
+     */
+    private void assertTxEntryFlags(
+        IgniteInternalCache<Integer, Integer> internalCache,
+        boolean skipStore,
+        boolean skipReadThrough,
+        boolean keepBinaryInInterceptor,
+        boolean keepBinary
+    ) throws Exception {
+        CacheEntry<Integer, Integer> entry = cache.getEntry(KEY);
+
+        assertNotNull(entry);
+        assertNotNull(entry.version());
+
+        try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, 
READ_COMMITTED)) {
+            assertTrue(internalCache.lockTxEntry(entry, 0));
+
+            GridCacheContext<Integer, Integer> ctx = internalCache.context();
+            IgniteTxKey txKey = ctx.txKey(ctx.toCacheKeyObject(KEY));
+            IgniteTxEntry txEntry = internalCache.tx().entry(txKey);
+
+            assertNotNull(txEntry);
+            assertEquals(skipStore, txEntry.skipStore());
+            assertEquals(skipReadThrough, txEntry.skipReadThrough());
+            assertEquals(keepBinaryInInterceptor, 
txEntry.keepBinaryInInterceptor());
+            assertEquals(keepBinary, txEntry.keepBinary());
+
+            tx.commit();
+        }
+    }
+
+    /** Returns the cache's internal proxy. */
+    @SuppressWarnings("unchecked")
+    private IgniteInternalCache<Integer, Integer> internalCache() {

Review Comment:
   Why not static method?



##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java:
##########
@@ -1024,6 +1032,7 @@ private IgniteInternalFuture<Exception> lockAllAsync0(
                 txRead,
                 retval,
                 timeout,
+                waitTimeout == 0 ? timeout : waitTimeout,

Review Comment:
   Can a timeout be negative?



##########
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheVersionedEntryTransactionalLockTest.java:
##########
@@ -0,0 +1,515 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntry;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static 
org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static 
org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+import static 
org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ * Tests transactional locks acquired only for unchanged cache entry versions.
+ */
+@RunWith(Parameterized.class)
+public class CacheVersionedEntryTransactionalLockTest extends 
GridCommonAbstractTest {
+    /** Global test timeout. */
+    @ClassRule
+    public static Timeout globalTimeout = new Timeout(120, TimeUnit.SECONDS);

Review Comment:
   Tell me, what is the point of this if the `GridAbstractTest#getTestTimeout` 
exists?



##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java:
##########
@@ -1119,6 +1129,15 @@ else if (!b)
         }
     }
 
+    /**
+     * @param waitTimeout Lock wait timeout.
+     * @param timeout Transaction timeout.
+     * @return {@code True} if separate lock wait timeout expires before 
transaction timeout.
+     */
+    private static boolean waitTimeoutExpiresFirst(long waitTimeout, long 
timeout) {

Review Comment:
   Add the prefix "iS".



##########
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheVersionedEntryTransactionalLockTest.java:
##########
@@ -0,0 +1,515 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntry;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static 
org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static 
org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+import static 
org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ * Tests transactional locks acquired only for unchanged cache entry versions.
+ */
+@RunWith(Parameterized.class)
+public class CacheVersionedEntryTransactionalLockTest extends 
GridCommonAbstractTest {
+    /** Global test timeout. */
+    @ClassRule
+    public static Timeout globalTimeout = new Timeout(120, TimeUnit.SECONDS);
+
+    /** */
+    private static Ignite ignite0;
+
+    /** */
+    private static Ignite ignite1;
+
+    /** */
+    private static Ignite client;
+
+    /** */
+    @Parameterized.Parameter(0)
+    public boolean useNearCache;
+
+    /** */
+    @Parameterized.Parameter(1)
+    public int backups;
+
+    /** */
+    @Parameterized.Parameter(2)
+    public boolean replicated;
+
+    /** */
+    @Parameterized.Parameter(3)
+    public boolean batch;
+
+    /**
+     * Returns data for test.
+     *
+     * @return Test parameters.
+     */
+    @Parameterized.Parameters(name = "useNearCache={0}, backups={1}, 
replicated={2}, batch={3}")
+    public static Collection<Object[]> testData() {
+        return List.of(new Object[][] {
+            {false, 0, false, false},
+            {false, 0, false, true},
+            {false, 0, true, false},
+            {false, 0, true, true},
+            {false, 1, false, false},
+            {false, 1, false, true},
+            {false, 1, true, false},
+            {false, 1, true, true},
+            {false, 2, false, false},
+            {false, 2, false, true},
+            {false, 2, true, false},
+            {false, 2, true, true},
+            {true, 0, false, false},
+            {true, 0, false, true},
+            {true, 0, true, false},
+            {true, 0, true, true},
+            {true, 1, false, false},
+            {true, 1, false, true},
+            {true, 1, true, false},
+            {true, 1, true, true},
+            {true, 2, false, false},
+            {true, 2, false, true},
+            {true, 2, true, false},
+            {true, 2, true, true}
+        });
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        ignite0 = startGridsMultiThreaded(4);
+
+        awaitPartitionMapExchange();
+
+        ignite1 = grid(1);
+        client = startClientGrid();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        ignite0 = null;
+        ignite1 = null;
+        client = null;
+
+        super.afterTestsStopped();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        ignite0.destroyCache(DEFAULT_CACHE_NAME);
+
+        super.afterTest();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        return super.getConfiguration(igniteInstanceName)
+            .setConsistentId(igniteInstanceName);
+    }
+
+    /**
+     * Creates transactional cache.
+     *
+     * @param ignite Node.
+     * @return Transactional cache.
+     */
+    private IgniteCache<Integer, Integer> transactionalCache(Ignite ignite) {
+        CacheConfiguration<?, ?> ccfg =
+            new CacheConfiguration<>(DEFAULT_CACHE_NAME)
+                
.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
+                .setNearConfiguration(useNearCache ? new 
NearCacheConfiguration<>() : null)
+                .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
+                .setCacheMode(replicated ? CacheMode.REPLICATED : 
CacheMode.PARTITIONED)
+                .setBackups(backups);
+
+        return (IgniteCache<Integer, Integer>)ignite.createCache(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testLockBeforePutFromClientTest() throws Exception {
+        IgniteCache<Integer, Integer> cache = transactionalCache(client);
+
+        int key = 42;
+
+        checkLockBeforePut(cache, key, READ_COMMITTED);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testLockBeforePutLocalKeyTest() throws Exception {
+        IgniteCache<Integer, Integer> cache = transactionalCache(ignite0);
+
+        int key = primaryKey(cache);
+
+        checkLockBeforePut(cache, key, READ_COMMITTED);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testLockBeforePutRemoteKeyTest() throws Exception {
+        IgniteCache<Integer, Integer> cache = transactionalCache(ignite0);
+
+        int key = primaryKey(ignite1.cache(DEFAULT_CACHE_NAME));
+
+        checkLockBeforePut(cache, key, REPEATABLE_READ);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testLockBeforePutLocalKeyRepeatableReadTest() throws Exception 
{
+        IgniteCache<Integer, Integer> cache = transactionalCache(ignite0);
+
+        int key = primaryKey(cache);
+
+        checkLockBeforePut(cache, key, REPEATABLE_READ);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testLockBeforePutRemoteKeyRepeatableReadTest() throws 
Exception {
+        IgniteCache<Integer, Integer> cache = transactionalCache(ignite0);
+
+        int key = primaryKey(ignite1.cache(DEFAULT_CACHE_NAME));
+
+        checkLockBeforePut(cache, key, REPEATABLE_READ);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testEntryVersionDoesNotChangeWhenEntryIsNotUpdated() throws 
Exception {
+        IgniteCache<Integer, Integer> cache = transactionalCache(ignite0);
+
+        int locKey = primaryKey(cache);
+        int remoteKey = primaryKey(ignite1.cache(DEFAULT_CACHE_NAME));
+
+        cache.put(locKey, 0);
+        cache.put(remoteKey, 0);
+
+        CacheEntry<Integer, Integer> locEntry = cache.getEntry(locKey);
+        CacheEntry<Integer, Integer> remoteEntry = cache.getEntry(remoteKey);
+
+        try (Transaction tx = ignite0.transactions().txStart(PESSIMISTIC, 
READ_COMMITTED)) {
+            if (batch) {
+                assertTrue(acquireLockForEntries(cache, List.of(locEntry, 
remoteEntry), 0));
+            }
+            else {
+                assertTrue(acquireLockForEntry(cache, locEntry, 0));
+                assertTrue(acquireLockForEntry(cache, remoteEntry, 0));
+            }
+
+            tx.commit();
+        }
+
+        assertEquals(locEntry.version(), cache.getEntry(locKey).version());
+        assertEquals(remoteEntry.version(), 
cache.getEntry(remoteKey).version());
+
+        try (Transaction tx = ignite0.transactions().txStart(PESSIMISTIC, 
READ_COMMITTED)) {
+            if (batch) {
+                assertTrue(acquireLockForEntries(cache, List.of(locEntry, 
remoteEntry), 0));
+            }
+            else {
+                assertTrue(acquireLockForEntry(cache, locEntry, 0));
+                assertTrue(acquireLockForEntry(cache, remoteEntry, 0));
+            }
+
+            tx.rollback();
+        }
+
+        assertEquals(locEntry.version(), cache.getEntry(locKey).version());
+        assertEquals(remoteEntry.version(), 
cache.getEntry(remoteKey).version());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void 
testVersionedEntryLockReturnsFalseWhenEntryIsLockedByAnotherTransactionNoWait() 
throws Exception {
+        checkReturningWhenCanNotWaitForLock(-1, false, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void 
testVersionedEntryLockReturnsFalseWhenEntryIsLockedByAnotherTransaction() 
throws Exception {
+        checkReturningWhenCanNotWaitForLock(200, false, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void 
testVersionedEntryLockReturnsFalseWhenLocalEntryIsLockedByAnotherTransaction() 
throws Exception {
+        checkReturningWhenCanNotWaitForLock(200, false, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void 
testVersionedEntryLockReturnsFalseWhenEntryIsLockedByAnotherTransactionAndCommit()
 throws Exception {
+        checkReturningWhenCanNotWaitForLock(200, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void 
testVersionedEntryLockReturnsFalseWhenLocalEntryIsLockedByAnotherTransactionAndCommit()
 throws Exception {
+        checkReturningWhenCanNotWaitForLock(200, true, true);
+    }
+
+    /**
+     * Checks that lock acquisition can be retried in the same transaction 
after the competing transaction finishes.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testVersionedEntryLockCanBeRetriedAfterWaitTimeout() throws 
Exception {
+        Ignite holder = ignite0;
+        Ignite initiator = ignite1;
+
+        IgniteCache<Integer, Integer> holderCache = transactionalCache(holder);
+        IgniteCache<Integer, Integer> cache = 
initiator.cache(DEFAULT_CACHE_NAME);
+
+        int key = primaryKey(holderCache);
+
+        holderCache.put(key, 0);
+
+        CacheEntry<Integer, Integer> entry = cache.getEntry(key);
+
+        try (Transaction holderTx = holder.transactions().txStart(PESSIMISTIC, 
READ_COMMITTED)) {
+            holderCache.put(key, 42);
+
+            try (Transaction tx = 
initiator.transactions().txStart(PESSIMISTIC, READ_COMMITTED)) {
+                if (batch)
+                    assertFalse(acquireLockForEntries(cache, List.of(entry), 
200));
+                else
+                    assertFalse(acquireLockForEntry(cache, entry, 200));
+
+                holderTx.rollback();
+
+                if (batch)
+                    assertTrue(acquireLockForEntries(cache, List.of(entry), 
5_000));
+                else
+                    assertTrue(acquireLockForEntry(cache, entry, 5_000));
+
+                cache.put(key, 1);
+
+                tx.commit();
+            }
+        }
+
+        assertEquals(1, cache.get(key).intValue());
+    }
+
+    /**
+     * Checks that the lock entry method returns {@code false} when can not 
wait for lock.
+     *
+     * @param timeout Timeout.
+     * @param commit Whether to commit the transaction.
+     * @param locForLocal Whether the contended key should be local to the 
lock initiator.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void checkReturningWhenCanNotWaitForLock(long timeout, boolean 
commit, boolean locForLocal) throws IgniteCheckedException {
+        Ignite holder = ignite0;
+        Ignite initiator = ignite1;
+
+        IgniteCache<Integer, Integer> holderCache = transactionalCache(holder);
+        IgniteCache<Integer, Integer> cache = 
initiator.cache(DEFAULT_CACHE_NAME);
+
+        List<Integer> firstKeySet = locForLocal ? primaryKeys(cache, 2) : 
primaryKeys(holderCache, 2);
+        Integer concurentLockedKey = firstKeySet.get(0);
+        Integer txKey1 = firstKeySet.get(1);
+        Integer txKey2 = locForLocal ? primaryKey(holderCache) : 
primaryKey(cache);
+
+        holderCache.put(concurentLockedKey, 0);
+        holderCache.put(txKey1, 0);
+        holderCache.put(txKey2, 0);
+
+        CacheEntry<Integer, Integer> entry = 
cache.getEntry(concurentLockedKey);
+
+        try (Transaction holderTx = holder.transactions().txStart(PESSIMISTIC, 
READ_COMMITTED)) {
+            holderCache.put(concurentLockedKey, 42);
+
+            try (Transaction tx = 
initiator.transactions().txStart(PESSIMISTIC, READ_COMMITTED)) {
+                long startWaiting = System.currentTimeMillis();
+
+                if (batch) {
+                    assertFalse(acquireLockForEntries(cache, List.of(
+                        cache.getEntry(txKey1),
+                        entry,
+                        cache.getEntry(txKey2)
+                        ), timeout));
+
+                    assertTrue(acquireLockForEntries(cache, List.of(
+                        cache.getEntry(txKey1),
+                        cache.getEntry(txKey2)
+                    ), timeout));
+                }
+                else {
+                    assertTrue(acquireLockForEntry(cache, 
cache.getEntry(txKey1), timeout));
+                    assertTrue(acquireLockForEntry(cache, 
cache.getEntry(txKey2), timeout));
+
+                    assertFalse(acquireLockForEntry(cache, entry, timeout));
+                }
+
+                long waitTime = System.currentTimeMillis() - startWaiting;
+
+                assertTrue("Waited for lock for " + waitTime + " ms but 
timeout is " + timeout + " ms.",

Review Comment:
   Why specifically -50 and +300, rather than higher or lower values?



##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java:
##########
@@ -142,9 +142,12 @@ public final class GridDhtColocatedLockFuture extends 
GridCacheCompoundIdentityF
     @GridToStringExclude
     private volatile LockTimeoutObject timeoutObj;
 
-    /** Lock timeout. */
+    /** Transaction timeout. */
     private final long timeout;
 
+    /** Lock wait timeout. */

Review Comment:
   Please specify that it is in milliseconds.



##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java:
##########
@@ -201,7 +204,8 @@ public final class GridDhtLockFuture extends 
GridCacheCompoundIdentityFuture<Boo
      * @param cnt Number of keys to lock.
      * @param read Read flag.
      * @param needReturnVal Need return value flag.
-     * @param timeout Lock acquisition timeout.
+     * @param timeout Transaction timeout.
+     * @param waitTimeout Lock wait timeout.

Review Comment:
   Please specify that it is in milliseconds.



##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java:
##########
@@ -1165,6 +1193,20 @@ private void loadMissingFromStore() {
         }
     }
 
+    /**
+     * @return Timeout value for this lock future.

Review Comment:
   Please specify that it is in milliseconds.



##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java:
##########
@@ -750,7 +752,8 @@ else if (txLockMsgLog.isDebugEnabled()) {
      * Acquires locks in partitioned cache.
      *
      * @param keys Keys to lock.
-     * @param timeout Lock timeout.
+     * @param timeout Transaction timeout.
+     * @param waitTimeout Lock wait timeout.

Review Comment:
   Please specify that it is in milliseconds.



##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java:
##########
@@ -182,7 +188,8 @@ public final class GridNearLockFuture extends 
GridCacheCompoundIdentityFuture<Bo
      * @param tx Transaction.
      * @param read Read flag.
      * @param retval Flag to return value or not.
-     * @param timeout Lock acquisition timeout.
+     * @param timeout Transaction timeout.
+     * @param waitTimeout Lock wait timeout.

Review Comment:
   Please specify that it is in milliseconds.



##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java:
##########
@@ -98,7 +102,8 @@ public GridNearLockRequest() {
      * @param retVal Return value flag.
      * @param isolation Transaction isolation.
      * @param isInvalidate Invalidation flag.
-     * @param timeout Lock timeout.
+     * @param timeout Transaction timeout.
+     * @param waitTimeout Lock wait timeout.

Review Comment:
   Please specify that it is in milliseconds.



##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java:
##########
@@ -1397,6 +1430,20 @@ private ClusterTopologyCheckedException 
newTopologyException(@Nullable Throwable
         return topEx;
     }
 
+    /**
+     * @return Timeout value for this lock future.
+     */
+    private long lockTimeout() {
+        return waitTimeoutExpiresFirst() ? waitTimeout : timeout;
+    }
+
+    /**
+     * @return {@code True} if separate lock wait timeout expires before 
transaction timeout.
+     */
+    private boolean waitTimeoutExpiresFirst() {

Review Comment:
   I don't quite understand this check; please leave a comment and add the 
prefix "iS".



##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java:
##########
@@ -4275,17 +4308,36 @@ public <K> IgniteInternalFuture<GridCacheReturn> 
lockAllAsync(GridCacheContext c
 
         return new GridEmbeddedFuture<>(
             fut,
-            new PLC1<GridCacheReturn>(ret, false) {
-                @Override protected GridCacheReturn postLock(GridCacheReturn 
ret) {
-                    if (log.isDebugEnabled())
-                        log.debug("Acquired transaction lock on keys: " + 
keys);
+            new PLC1<GridCacheReturn>(ret, false, 
!waitTimeoutExpiresFirst(waitTimeout, timeout)) {
+                @Override protected GridCacheReturn postLock(GridCacheReturn 
ret) throws IgniteCheckedException {
+                    assert fut.error() == null;
+
+                    boolean success = Boolean.TRUE.equals(fut.get());

Review Comment:
   Same about fut.get() as above.



##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java:
##########
@@ -538,7 +540,8 @@ public void active(boolean active) {
 
     /**
      * @param keys Keys to lock.
-     * @param timeout Lock timeout.
+     * @param timeout Transaction timeout.
+     * @param waitTimeout Lock wait timeout.

Review Comment:
   Please specify that it is in milliseconds.



##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java:
##########
@@ -990,6 +996,7 @@ IgniteInternalFuture<Exception> lockAllAsync(
      * @param txRead Tx read.
      * @param retval Return value flag.
      * @param timeout Lock timeout.

Review Comment:
   Please specify that it is in milliseconds.



##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java:
##########
@@ -990,6 +996,7 @@ IgniteInternalFuture<Exception> lockAllAsync(
      * @param txRead Tx read.
      * @param retval Return value flag.
      * @param timeout Lock timeout.
+     * @param waitTimeout Lock wait timeout.

Review Comment:
   Please specify that it is in milliseconds.



##########
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheOperationContextTransactionalLockTest.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntry;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.junit.Test;
+
+import static 
org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static 
org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+
+/** Tests operation context propagation when a versioned cache entry is locked 
in a transaction. */
+public class CacheOperationContextTransactionalLockTest extends 
GridCommonAbstractTest {

Review Comment:
   If a combination of flags is possible, should it add a case for when all of 
them are set?



##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java:
##########
@@ -1077,7 +1086,7 @@ private IgniteInternalFuture<Exception> lockAllAsync0(
                     @Override public Exception apply(Boolean b, Exception e) {
                         if (e != null)
                             e = U.unwrap(e);
-                        else if (!b)
+                        else if (!b && !waitTimeoutExpiresFirst(waitTimeout, 
timeout))

Review Comment:
   I don't quite understand this check; please leave a comment.



##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java:
##########
@@ -1119,6 +1129,15 @@ else if (!b)
         }
     }
 
+    /**
+     * @param waitTimeout Lock wait timeout.
+     * @param timeout Transaction timeout.

Review Comment:
   Please specify that it is in milliseconds.



##########
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/LockTxEntryOneNodeTest.java:
##########
@@ -0,0 +1,523 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntry;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionTimeoutException;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
+import static 
org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static 
org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+import static 
org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ * Tests transactional entry locking through internal cache API.
+ */
+@RunWith(Parameterized.class)
+public class LockTxEntryOneNodeTest extends GridCommonAbstractTest {
+    /** Global test timeout. */
+    @ClassRule
+    public static Timeout globalTimeout = new Timeout(30, TimeUnit.SECONDS);

Review Comment:
   Tell me, what is the point of this if the `GridAbstractTest#getTestTimeout` 
exists?



##########
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/LockTxEntryOneNodeTest.java:
##########
@@ -0,0 +1,523 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntry;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionTimeoutException;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
+import static 
org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static 
org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+import static 
org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ * Tests transactional entry locking through internal cache API.
+ */
+@RunWith(Parameterized.class)
+public class LockTxEntryOneNodeTest extends GridCommonAbstractTest {
+    /** Global test timeout. */
+    @ClassRule
+    public static Timeout globalTimeout = new Timeout(30, TimeUnit.SECONDS);
+
+    /** */
+    private static final int KEY = 1;
+
+    /** */
+    private static final int INIT_VAL = 1;
+
+    /** */
+    private static final int UPDATED_VAL = 2;
+
+    /** */
+    private static Ignite ignite;
+
+    /** */
+    @Parameterized.Parameter(0)
+    public boolean commit;
+
+    /** */
+    @Parameterized.Parameter(1)
+    public boolean useNearCache;
+
+    /** */
+    @Parameterized.Parameter(2)
+    public CacheMode cacheMode;
+
+    /** Operation that enlists an entry in a transaction before a versioned 
lock is acquired. */
+    private enum TxOperation {
+        /** Read. */
+        GET,
+
+        /** Update. */
+        PUT,
+
+        /** Update with previous value returned. */
+        GET_AND_PUT,
+
+        /** Delete. */
+        REMOVE,
+
+        /** Delete with previous value returned. */
+        GET_AND_REMOVE,
+
+        /** Transform. */
+        INVOKE
+    }
+
+    /** Cache. */
+    private IgniteCache<Integer, Integer> cache;
+
+    /**
+     * Returns data for test.
+     *
+     * @return Test parameters.
+     */
+    @Parameterized.Parameters(name = "commit={0}, useNearCache={1}, 
cacheMode={2}")
+    public static Collection<Object[]> testData() {
+        return List.of(new Object[][] {
+            {false, false, PARTITIONED},
+            {false, false, REPLICATED},
+            {true, false, PARTITIONED},
+            {true, false, REPLICATED},
+            {false, true, PARTITIONED},
+            {false, true, REPLICATED},
+            {true, true, PARTITIONED},
+            {true, true, REPLICATED},
+        });
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        ignite = startGrid(0);
+
+        awaitPartitionMapExchange();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        ignite = null;
+
+        super.afterTestsStopped();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        cache = ignite.createCache(new CacheConfiguration<Integer, 
Integer>(DEFAULT_CACHE_NAME)
+            .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
+            .setNearConfiguration(useNearCache ? new 
NearCacheConfiguration<>() : null)
+            .setCacheMode(cacheMode));
+
+        cache.put(KEY, INIT_VAL);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        ignite.destroyCache(DEFAULT_CACHE_NAME);
+
+        super.afterTest();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testLockTxEntryInPessimisticTransaction() throws Exception {
+        CacheEntry<Integer, Integer> entry = cache.getEntry(KEY);
+
+        try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, 
READ_COMMITTED)) {
+            assertTrue(acquireLockForEntry(entry, 0));
+
+            assertEquals(entry.getValue(), cache.get(KEY));
+
+            checkInaccessInOtherTx(cache);
+
+            if (commit)
+                tx.commit();
+        }
+
+        assertEquals(INIT_VAL, cache.get(KEY).intValue());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testLockAlreadyLockedTxEntry() throws Exception {
+        CacheEntry<Integer, Integer> entry = cache.getEntry(KEY);
+
+        try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, 
READ_COMMITTED)) {
+            cache.put(KEY, UPDATED_VAL);
+
+            assertTrue(acquireLockForEntry(entry, 0));
+
+            assertEquals(UPDATED_VAL, cache.get(KEY).intValue());
+
+            checkInaccessInOtherTx(cache);
+
+            if (commit)
+                tx.commit();
+        }
+
+        assertEquals(commit ? UPDATED_VAL : INIT_VAL, 
cache.get(KEY).intValue());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testLockAlreadyEnlistedTxEntryAfterOperation() throws 
Exception {
+        for (TxOperation op : TxOperation.values()) {
+            int key = KEY + op.ordinal() + 1;
+
+            cache.put(key, INIT_VAL);
+
+            CacheEntry<Integer, Integer> entry = cache.getEntry(key);
+
+            try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, 
READ_COMMITTED)) {
+                applyOperation(op, key);
+
+                try {
+                    assertTrue("Failed to lock already enlisted tx entry [op=" 
+ op + ']',
+                        acquireLockForEntry(entry, 0));
+                }
+                catch (AssertionError e) {
+                    throw new AssertionError("Failed to lock already enlisted 
tx entry [op=" + op + ']', e);
+                }
+
+                checkInaccessInOtherTx(cache, key);
+
+                if (commit)
+                    tx.commit();
+            }
+
+            assertEquals("Unexpected value after transaction [op=" + op + ']',
+                commit ? expectedValue(op) : Integer.valueOf(INIT_VAL),
+                cache.get(key));
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testFailToLockTxEntryInPessimisticTransaction() throws 
Exception {
+        CacheEntry<Integer, Integer> entry = cache.getEntry(KEY);
+
+        cache.put(KEY, UPDATED_VAL);
+
+        try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, 
READ_COMMITTED)) {
+            assertFalse(acquireLockForEntry(entry, 0));
+
+            assertEquals(UPDATED_VAL, cache.get(KEY).intValue());
+
+            checkAccessInOtherTx(cache);
+
+            if (commit)
+                tx.commit();
+        }
+
+        assertEquals(UPDATED_VAL, cache.get(KEY).intValue());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void 
testLockTxEntryReturnsFalseOnTimeoutWhenLockedInOtherTransaction() throws 
Exception {
+        CacheEntry<Integer, Integer> entry = cache.getEntry(KEY);
+
+        try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, 
READ_COMMITTED)) {
+            assertTrue(acquireLockForEntry(entry, 0));
+
+            IgniteInternalFuture<Boolean> lockFut = GridTestUtils.runAsync(new 
Callable<Boolean>() {
+                @Override public Boolean call() throws Exception {
+                    boolean locked = true;
+
+                    try (Transaction tx = 
ignite.transactions().txStart(PESSIMISTIC, READ_COMMITTED)) {
+                        locked = acquireLockForEntry(entry, 100);
+
+                        assertFalse(locked);
+
+                        cache.put(2, 2);
+
+                        if (commit)
+                            tx.commit();
+                    }
+
+                    return locked;
+                }
+            });
+
+            assertFalse(lockFut.get(10_000));
+
+            if (commit)
+                tx.commit();
+        }
+
+        assertEquals(INIT_VAL, cache.get(KEY).intValue());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testUpdateAfterLock() throws Exception {
+        CacheEntry<Integer, Integer> entry = cache.getEntry(KEY);
+
+        try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, 
READ_COMMITTED)) {
+            assertTrue(acquireLockForEntry(entry, 0));
+
+            checkInaccessInOtherTx(cache);
+
+            assertEquals(entry.getValue(), cache.get(KEY));
+
+            cache.put(KEY, UPDATED_VAL);
+
+            assertEquals(UPDATED_VAL, cache.get(KEY).intValue());
+
+            if (commit)
+                tx.commit();
+        }
+
+        assertEquals(commit ? UPDATED_VAL : INIT_VAL, 
cache.get(KEY).intValue());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testLockTxEntryFailsWithoutTransaction() throws Exception {
+        CacheEntry<Integer, Integer> entry = cache.getEntry(KEY);
+
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                acquireLockForEntry(entry, 0);
+
+                return null;
+            }
+        }, IgniteCheckedException.class, "without transaction");
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testLockTxEntryAsyncFailsInOptimisticTransaction() throws 
Exception {
+        CacheEntry<Integer, Integer> entry = cache.getEntry(KEY);
+
+        try (Transaction tx = ignite.transactions().txStart(OPTIMISTIC, 
REPEATABLE_READ)) {
+            GridTestUtils.assertThrows(log, new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    return acquireLockForEntry(entry, 0);
+                }
+            }, IgniteCheckedException.class, "optimistic transaction");
+        }
+    }
+
+    /**
+     * Checks that another transaction can access the cache.
+     *
+     * @param cache Cache.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void checkAccessInOtherTx(IgniteCache<Integer, Integer> cache) 
throws IgniteCheckedException {
+        checkAccessInOtherTx(cache, KEY);
+    }
+
+    /**
+     * Checks that another transaction can access the cache.
+     *
+     * @param cache Cache.
+     * @param key Key.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void checkAccessInOtherTx(IgniteCache<Integer, Integer> cache, int 
key) throws IgniteCheckedException {
+        IgniteInternalFuture<Void> accessFut = GridTestUtils.runAsync(new 
Callable<Void>() {
+            @Override public Void call() {
+                try (Transaction tx = 
ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, 100, 1)) {
+                    cache.get(key);
+
+                    tx.commit();
+                }
+
+                return null;
+            }
+        });
+
+        accessFut.get(10_000);
+    }
+
+    /**
+     * Checks that another transaction cannot access the cache.
+     *
+     * @param cache Cache.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void checkInaccessInOtherTx(IgniteCache<Integer, Integer> cache) 
throws IgniteCheckedException {
+        checkInaccessInOtherTx(cache, KEY);
+    }
+
+    /**
+     * Checks that another transaction cannot access the cache.
+     *
+     * @param cache Cache.
+     * @param key Key.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void checkInaccessInOtherTx(IgniteCache<Integer, Integer> cache, 
int key) throws IgniteCheckedException {
+        IgniteInternalFuture<Void> accessFut = GridTestUtils.runAsync(new 
Callable<Void>() {
+            @Override public Void call() {
+                try (Transaction tx = 
ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, 100, 1)) {
+                    cache.get(key);
+
+                    tx.commit();
+                }
+
+                return null;
+            }
+        });
+
+        GridTestUtils.assertThrowsWithCause(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                return accessFut.get();

Review Comment:
   Missing timeoit for get()



##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java:
##########
@@ -142,9 +142,12 @@ public final class GridDhtColocatedLockFuture extends 
GridCacheCompoundIdentityF
     @GridToStringExclude
     private volatile LockTimeoutObject timeoutObj;
 
-    /** Lock timeout. */
+    /** Transaction timeout. */

Review Comment:
   Please specify that it is in milliseconds.



##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java:
##########
@@ -1477,6 +1500,20 @@ private boolean 
errorOrTimeoutOnTopologyVersion(IgniteCheckedException e, boolea
         return false;
     }
 
+    /**
+     * @return Timeout value for this lock future.

Review Comment:
   Please specify that it is in milliseconds.



##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java:
##########
@@ -197,7 +200,8 @@ public final class GridDhtColocatedLockFuture extends 
GridCacheCompoundIdentityF
      * @param tx Transaction.
      * @param read Read flag.
      * @param retval Flag to return value or not.
-     * @param timeout Lock acquisition timeout.
+     * @param timeout Transaction timeout.
+     * @param waitTimeout Lock wait timeout.

Review Comment:
   Please specify that it is in milliseconds.



##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java:
##########
@@ -154,9 +154,12 @@ public final class GridDhtLockFuture extends 
GridCacheCompoundIdentityFuture<Boo
     @GridToStringExclude
     private LockTimeoutObject timeoutObj;
 
-    /** Lock timeout. */
+    /** Transaction timeout. */

Review Comment:
   Please specify that it is in milliseconds.



##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java:
##########
@@ -1165,6 +1193,20 @@ private void loadMissingFromStore() {
         }
     }
 
+    /**
+     * @return Timeout value for this lock future.
+     */
+    private long lockTimeout() {
+        return waitTimeoutExpiresFirst() ? waitTimeout : timeout;
+    }
+
+    /**
+     * @return {@code True} if separate lock wait timeout expires before 
transaction timeout.
+     */
+    private boolean waitTimeoutExpiresFirst() {

Review Comment:
   I don't quite understand this check; please leave a comment and add the 
prefix "iS".



##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java:
##########
@@ -1190,13 +1202,23 @@ private GridNearLockResponse 
sendClientLockRemapResponse(ClusterNode nearNode,
         return res;
     }
 
+    /**
+     * @param waitTimeout Lock wait timeout.
+     * @param timeout Transaction timeout.
+     * @return {@code True} if separate lock wait timeout expires before 
transaction timeout.
+     */
+    private static boolean waitTimeoutExpiresFirst(long waitTimeout, long 
timeout) {

Review Comment:
   I don't quite understand this check; please leave a comment and add the 
prefix "iS".



##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java:
##########
@@ -154,9 +154,12 @@ public final class GridDhtLockFuture extends 
GridCacheCompoundIdentityFuture<Boo
     @GridToStringExclude
     private LockTimeoutObject timeoutObj;
 
-    /** Lock timeout. */
+    /** Transaction timeout. */
     private final long timeout;
 
+    /** Lock wait timeout. */

Review Comment:
   Please specify that it is in milliseconds.



##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java:
##########
@@ -1477,6 +1500,20 @@ private boolean 
errorOrTimeoutOnTopologyVersion(IgniteCheckedException e, boolea
         return false;
     }
 
+    /**
+     * @return Timeout value for this lock future.
+     */
+    private long lockTimeout() {
+        return waitTimeoutExpiresFirst() ? waitTimeout : timeout;
+    }
+
+    /**
+     * @return {@code True} if separate lock wait timeout expires before 
transaction timeout.
+     */
+    private boolean waitTimeoutExpiresFirst() {

Review Comment:
   I don't quite understand this check; please leave a comment and add the 
prefix "iS".



##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java:
##########
@@ -767,27 +771,49 @@ private IgniteInternalFuture<GridCacheReturn> 
obtainLockAsync(
 
         return new GridEmbeddedFuture<>(
             fut,
-            new PLC1<GridCacheReturn>(ret) {
+            new PLC1<GridCacheReturn>(ret, true, 
!waitTimeoutExpiresFirst(waitTimeout, timeout)) {
                 @Override protected GridCacheReturn postLock(GridCacheReturn 
ret) throws IgniteCheckedException {
-                    if (log.isDebugEnabled())
-                        log.debug("Acquired transaction lock on keys: " + 
passedKeys);
-
-                    postLockWrite(cacheCtx,
-                        passedKeys,
-                        ret,
-                        /*remove*/false,
-                        /*retval*/false,
-                        /*read*/read,
-                        accessTtl,
-                        CU.empty0(),
-                        /*computeInvoke*/false);
+                    assert fut.error() == null;
+
+                    boolean success = Boolean.TRUE.equals(fut.get());

Review Comment:
   Is it safe use fut.get() ? Don't we need a timeout here?



##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java:
##########
@@ -767,27 +771,49 @@ private IgniteInternalFuture<GridCacheReturn> 
obtainLockAsync(
 
         return new GridEmbeddedFuture<>(
             fut,
-            new PLC1<GridCacheReturn>(ret) {
+            new PLC1<GridCacheReturn>(ret, true, 
!waitTimeoutExpiresFirst(waitTimeout, timeout)) {
                 @Override protected GridCacheReturn postLock(GridCacheReturn 
ret) throws IgniteCheckedException {
-                    if (log.isDebugEnabled())
-                        log.debug("Acquired transaction lock on keys: " + 
passedKeys);
-
-                    postLockWrite(cacheCtx,
-                        passedKeys,
-                        ret,
-                        /*remove*/false,
-                        /*retval*/false,
-                        /*read*/read,
-                        accessTtl,
-                        CU.empty0(),
-                        /*computeInvoke*/false);
+                    assert fut.error() == null;

Review Comment:
   Please output error.



##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java:
##########
@@ -1397,6 +1430,20 @@ private ClusterTopologyCheckedException 
newTopologyException(@Nullable Throwable
         return topEx;
     }
 
+    /**
+     * @return Timeout value for this lock future.

Review Comment:
   Please specify that it is in milliseconds.



##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java:
##########
@@ -124,13 +124,19 @@ public final class GridNearLockFuture extends 
GridCacheCompoundIdentityFuture<Bo
     /** Timed out flag. */
     private volatile boolean timedOut;
 
+    /** Transaction lock timeout flag. */
+    private volatile boolean txLockTimedOut;
+
     /** Timeout object. */
     @GridToStringExclude
     private volatile LockTimeoutObject timeoutObj;
 
-    /** Lock timeout. */
+    /** Transaction timeout. */
     private final long timeout;
 
+    /** Lock wait timeout. */

Review Comment:
   Please specify that it is in milliseconds.



##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java:
##########
@@ -124,13 +124,19 @@ public final class GridNearLockFuture extends 
GridCacheCompoundIdentityFuture<Bo
     /** Timed out flag. */
     private volatile boolean timedOut;
 
+    /** Transaction lock timeout flag. */
+    private volatile boolean txLockTimedOut;
+
     /** Timeout object. */
     @GridToStringExclude
     private volatile LockTimeoutObject timeoutObj;
 
-    /** Lock timeout. */
+    /** Transaction timeout. */

Review Comment:
   Please specify that it is in milliseconds.



##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java:
##########
@@ -79,6 +79,10 @@ public class GridNearLockRequest extends 
GridDistributedLockRequest {
     @Order(7)
     String txLbl;
 
+    /** Lock wait timeout. */

Review Comment:
   Please specify that it is in milliseconds.



##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java:
##########
@@ -4223,6 +4253,7 @@ public IgniteInternalFuture<IgniteInternalTx> 
rollbackAsyncLocal() {
      * @param skipReadThrough Skip read-through cache store flag.
      * @param keepBinaryInInterceptor Handle binary in interceptor operation 
flag.
      * @param keepBinary Keep binary flag.
+     * @param waitTimeout Lock wait timeout.

Review Comment:
   Please specify that it is in milliseconds.



##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java:
##########
@@ -767,27 +771,49 @@ private IgniteInternalFuture<GridCacheReturn> 
obtainLockAsync(
 
         return new GridEmbeddedFuture<>(
             fut,
-            new PLC1<GridCacheReturn>(ret) {
+            new PLC1<GridCacheReturn>(ret, true, 
!waitTimeoutExpiresFirst(waitTimeout, timeout)) {
                 @Override protected GridCacheReturn postLock(GridCacheReturn 
ret) throws IgniteCheckedException {
-                    if (log.isDebugEnabled())
-                        log.debug("Acquired transaction lock on keys: " + 
passedKeys);
-
-                    postLockWrite(cacheCtx,
-                        passedKeys,
-                        ret,
-                        /*remove*/false,
-                        /*retval*/false,
-                        /*read*/read,
-                        accessTtl,
-                        CU.empty0(),
-                        /*computeInvoke*/false);
+                    assert fut.error() == null;
+
+                    boolean success = Boolean.TRUE.equals(fut.get());
+
+                    ret.success(success);
+
+                    if (log.isDebugEnabled()) {
+                        if (ret.success())
+                            log.debug("Successfully acquired transaction lock 
on keys: " + passedKeys);
+                        else
+                            log.debug("Failed to acquire transaction lock on 
keys: " + passedKeys);
+                    }
+
+                    if (ret.success()) {
+                        postLockWrite(cacheCtx,
+                            passedKeys,
+                            ret,
+                            /*remove*/false,
+                            /*retval*/false,
+                            /*read*/read,
+                            accessTtl,
+                            CU.empty0(),
+                            /*computeInvoke*/false,
+                            
/*skipIfLockLost*/waitTimeoutExpiresFirst(waitTimeout, timeout));
+                    }
 
                     return ret;
                 }
             }
         );
     }
 
+    /**
+     * @param waitTimeout Lock wait timeout.
+     * @param timeout Transaction timeout.
+     * @return {@code True} if separate lock wait timeout expires before 
transaction timeout.
+     */
+    private static boolean waitTimeoutExpiresFirst(long waitTimeout, long 
timeout) {

Review Comment:
   I don't quite understand this check; please leave a comment and add the 
prefix "iS".



##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java:
##########
@@ -207,6 +214,13 @@ public boolean firstClientRequest() {
         return isFlag(FIRST_CLIENT_REQ_FLAG_MASK);
     }
 
+    /**
+     * @return Lock wait timeout.

Review Comment:
   Please specify that it is in milliseconds.



##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java:
##########
@@ -4275,17 +4308,36 @@ public <K> IgniteInternalFuture<GridCacheReturn> 
lockAllAsync(GridCacheContext c
 
         return new GridEmbeddedFuture<>(
             fut,
-            new PLC1<GridCacheReturn>(ret, false) {
-                @Override protected GridCacheReturn postLock(GridCacheReturn 
ret) {
-                    if (log.isDebugEnabled())
-                        log.debug("Acquired transaction lock on keys: " + 
keys);
+            new PLC1<GridCacheReturn>(ret, false, 
!waitTimeoutExpiresFirst(waitTimeout, timeout)) {
+                @Override protected GridCacheReturn postLock(GridCacheReturn 
ret) throws IgniteCheckedException {
+                    assert fut.error() == null;
+
+                    boolean success = Boolean.TRUE.equals(fut.get());
+
+                    ret.success(success);
+
+                    if (log.isDebugEnabled()) {
+                        if (ret.success())
+                            log.debug("Successfully acquired transaction lock 
on keys: " + keys);
+                        else
+                            log.debug("Failed to acquire transaction lock on 
keys: " + keys);
+                    }
 
                     return ret;
                 }
             }
         );
     }
 
+    /**
+     * @param waitTimeout Lock wait timeout.
+     * @param timeout Transaction timeout.
+     * @return {@code True} if separate lock wait timeout expires before 
transaction timeout.
+     */
+    private static boolean waitTimeoutExpiresFirst(long waitTimeout, long 
timeout) {

Review Comment:
   I don't quite understand this check; please leave a comment and add the 
prefix "iS".



##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java:
##########
@@ -133,7 +135,8 @@ protected GridDistributedCacheAdapter(GridCacheContext<K, 
V> ctx, GridCacheConcu
 
     /**
      * @param keys Keys to lock.
-     * @param timeout Timeout.
+     * @param timeout Transaction timeout.
+     * @param waitTimeout Lock wait timeout.

Review Comment:
   Please specify that it is in milliseconds.



##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java:
##########
@@ -3028,6 +3032,196 @@ public CacheMetricsImpl metrics0() {
         }
     }
 
+    /** {@inheritDoc} */
+    @Override public boolean lockTxEntry(CacheEntry<K, V> entry, long 
waitTimeout) throws IgniteCheckedException {
+        A.notNull(entry, "entry");
+
+        return lockTxEntryAsync(entry, waitTimeout).get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean lockTxEntries(Collection<CacheEntry<K, V>> 
entries, long waitTimeout)
+        throws IgniteCheckedException {
+        A.notNull(entries, "entries");
+
+        return lockTxEntriesAsync(entries, waitTimeout).get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<Boolean> 
lockTxEntryAsync(CacheEntry<K, V> entry, long waitTimeout) {
+        A.notNull(entry, "entry");
+
+        return lockTxEntriesAsync(Collections.singleton(entry), waitTimeout);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<Boolean> lockTxEntriesAsync(
+        Collection<CacheEntry<K, V>> entries,
+        long waitTimeout
+    ) {
+        A.notNull(entries, "entries");
+
+        GridNearTxLocal tx = tx();
+
+        if (tx == null)
+            return new GridFinishedFuture<>(
+                new IgniteCheckedException("Failed to acquire transactional 
lock without transaction."));
+
+        if (!tx.pessimistic())
+            return new GridFinishedFuture<>(
+                new IgniteCheckedException("Failed to acquire transactional 
lock in optimistic transaction."));
+
+        // Wait for previous per-transaction async operations to finish.
+        tx.txState().awaitLastFuture();

Review Comment:
   I did't understand the reason for this wait.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to