Repository: ignite Updated Branches: refs/heads/master 71836d95a -> dab050acc
http://git-wip-us.apache.org/repos/asf/ignite/blob/dab050ac/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java index af74996..4d1145c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java @@ -41,12 +41,15 @@ import java.util.stream.Collectors; import javax.cache.Cache; import javax.cache.expiry.Duration; import javax.cache.expiry.TouchedExpiryPolicy; +import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.MutableEntry; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteDataStreamer; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteTransactions; +import org.apache.ignite.cache.CacheEntryProcessor; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.affinity.Affinity; import org.apache.ignite.cache.query.ScanQuery; @@ -283,6 +286,56 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest { } /** + * @throws Exception If failed. + */ + public void testPessimisticTx3() throws Exception { + checkTxWithAllCaches(new CI1<IgniteCache<Integer, Integer>>() { + @Override public void apply(IgniteCache<Integer, Integer> cache) { + try { + IgniteTransactions txs = cache.unwrap(Ignite.class).transactions(); + + List<Integer> keys = testKeys(cache); + + for (Integer key : keys) { + log.info("Test key: " + key); + + try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { + Integer val = cache.get(key); + + assertNull(val); + + Integer res = cache.invoke(key, new CacheEntryProcessor<Integer, Integer, Integer>() { + @Override public Integer process(MutableEntry<Integer, Integer> entry, + Object... arguments) throws EntryProcessorException { + + entry.setValue(key); + + return -key; + } + }); + + assertEquals(Integer.valueOf(-key), res); + + val = (Integer)checkAndGet(true, cache, key, GET, SCAN); + + assertEquals(key, val); + + tx.commit(); + } + + Integer val = (Integer)checkAndGet(false, cache, key, SCAN, GET); + + assertEquals(key, val); + } + } + catch (Exception e) { + throw new IgniteException(e); + } + } + }); + } + + /** * @param c Closure to run. * @throws Exception If failed. */ @@ -3055,6 +3108,34 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest { assertEquals(size, cache.size()); } + // Check rollback create. + for (int i = 0; i < KEYS; i++) { + if (i % 2 == 0) { + final Integer key = i; + + try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.put(key, i); + + tx.rollback(); + } + + assertEquals(size, cache.size()); + } + } + + // Check rollback update. + for (int i = 0; i < KEYS; i++) { + final Integer key = i; + + try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.put(key, -1); + + tx.rollback(); + } + + assertEquals(size, cache.size()); + } + // Check rollback remove. for (int i = 0; i < KEYS; i++) { final Integer key = i; http://git-wip-us.apache.org/repos/asf/ignite/blob/dab050ac/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/SqlTransactionsCommandsWithMvccEnabledSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/SqlTransactionsCommandsWithMvccEnabledSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/SqlTransactionsCommandsWithMvccEnabledSelfTest.java index 76f8013..dcd46ff 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/SqlTransactionsCommandsWithMvccEnabledSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/SqlTransactionsCommandsWithMvccEnabledSelfTest.java @@ -263,33 +263,6 @@ public class SqlTransactionsCommandsWithMvccEnabledSelfTest extends AbstractSche return arg.getClass(); } - /** - * Test that attempting to perform a cache PUT operation from within an SQL transaction fails. - */ - @SuppressWarnings("ThrowableResultOfMethodCallIgnored") - public void testCacheOperationsFromSqlTransaction() { - checkCacheOperationThrows("invoke", 1, ENTRY_PROC, X.EMPTY_OBJECT_ARRAY); - - checkCacheOperationThrows("invoke", 1, CACHE_ENTRY_PROC, X.EMPTY_OBJECT_ARRAY); - - checkCacheOperationThrows("invokeAsync", 1, ENTRY_PROC, X.EMPTY_OBJECT_ARRAY); - - checkCacheOperationThrows("invokeAsync", 1, CACHE_ENTRY_PROC, X.EMPTY_OBJECT_ARRAY); - - checkCacheOperationThrows("invokeAll", Collections.singletonMap(1, CACHE_ENTRY_PROC), X.EMPTY_OBJECT_ARRAY); - - checkCacheOperationThrows("invokeAll", Collections.singleton(1), CACHE_ENTRY_PROC, X.EMPTY_OBJECT_ARRAY); - - checkCacheOperationThrows("invokeAll", Collections.singleton(1), ENTRY_PROC, X.EMPTY_OBJECT_ARRAY); - - checkCacheOperationThrows("invokeAllAsync", Collections.singletonMap(1, CACHE_ENTRY_PROC), - X.EMPTY_OBJECT_ARRAY); - - checkCacheOperationThrows("invokeAllAsync", Collections.singleton(1), CACHE_ENTRY_PROC, X.EMPTY_OBJECT_ARRAY); - - checkCacheOperationThrows("invokeAllAsync", Collections.singleton(1), ENTRY_PROC, X.EMPTY_OBJECT_ARRAY); - } - /** */ private final static EntryProcessor<Integer, Integer, Object> ENTRY_PROC = new EntryProcessor<Integer, Integer, Object>() { http://git-wip-us.apache.org/repos/asf/ignite/blob/dab050ac/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesAbstractTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesAbstractTest.java index 4ea53e0..bcbfbc2 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesAbstractTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesAbstractTest.java @@ -151,7 +151,7 @@ public abstract class CacheMvccSqlQueriesAbstractTest extends CacheMvccAbstractT * @throws Exception If failed. */ private void updateSingleValue(boolean singleNode, final boolean locQry) throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-9540"); + fail("https://issues.apache.org/jira/browse/IGNITE-9470"); final int VALS = 100; http://git-wip-us.apache.org/repos/asf/ignite/blob/dab050ac/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccRepeatableReadBulkOpsTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccRepeatableReadBulkOpsTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccRepeatableReadBulkOpsTest.java index 46aeaa1..5ec96e4 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccRepeatableReadBulkOpsTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccRepeatableReadBulkOpsTest.java @@ -18,6 +18,8 @@ package org.apache.ignite.internal.processors.cache.mvcc; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; @@ -26,12 +28,22 @@ import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.stream.Collectors; +import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.EntryProcessorResult; +import javax.cache.processor.MutableEntry; import java.util.function.Function; import java.util.stream.Collectors; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteTransactions; +import org.apache.ignite.cache.CacheEntryProcessor; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.testframework.GridTestUtils; @@ -41,6 +53,7 @@ import org.apache.ignite.transactions.TransactionIsolation; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.ReadMode.GET; +import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.ReadMode.INVOKE; import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.ReadMode.SQL; import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.WriteMode.DML; import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.WriteMode.PUT; @@ -103,6 +116,14 @@ public class MvccRepeatableReadBulkOpsTest extends CacheMvccAbstractTest { /** * @throws Exception If failed. */ + public void testRepeatableReadIsolationInvoke() throws Exception { + checkOperations(GET, GET, WriteMode.INVOKE, true); + checkOperations(GET, GET, WriteMode.INVOKE, false); + } + + /** + * @throws Exception If failed. + */ public void testRepeatableReadIsolationSqlPut() throws Exception { checkOperations(SQL, SQL, PUT, true); checkOperations(SQL, SQL, PUT, false); @@ -111,6 +132,14 @@ public class MvccRepeatableReadBulkOpsTest extends CacheMvccAbstractTest { /** * @throws Exception If failed. */ + public void testRepeatableReadIsolationSqlInvoke() throws Exception { + checkOperations(SQL, SQL, WriteMode.INVOKE, true); + checkOperations(SQL, SQL, WriteMode.INVOKE, false); + } + + /** + * @throws Exception If failed. + */ public void testRepeatableReadIsolationSqlDml() throws Exception { checkOperations(SQL, SQL, DML, true); checkOperations(SQL, SQL, DML, false); @@ -130,6 +159,8 @@ public class MvccRepeatableReadBulkOpsTest extends CacheMvccAbstractTest { public void testRepeatableReadIsolationMixedPut() throws Exception { checkOperations(SQL, GET, PUT, false); checkOperations(SQL, GET, PUT, true); + checkOperations(SQL, GET, WriteMode.INVOKE, false); + checkOperations(SQL, GET, WriteMode.INVOKE, true); } /** @@ -138,6 +169,8 @@ public class MvccRepeatableReadBulkOpsTest extends CacheMvccAbstractTest { public void testRepeatableReadIsolationMixedPut2() throws Exception { checkOperations(GET, SQL, PUT, false); checkOperations(GET, SQL, PUT, true); + checkOperations(GET, SQL, WriteMode.INVOKE, false); + checkOperations(GET, SQL, WriteMode.INVOKE, true); } /** @@ -162,15 +195,72 @@ public class MvccRepeatableReadBulkOpsTest extends CacheMvccAbstractTest { public void testOperationConsistency() throws Exception { checkOperationsConsistency(PUT, false); checkOperationsConsistency(DML, false); + checkOperationsConsistency(WriteMode.INVOKE, false); checkOperationsConsistency(PUT, true); checkOperationsConsistency(DML, true); + checkOperationsConsistency(WriteMode.INVOKE, true); + } + + /** + * @throws Exception If failed. + */ + public void testInvokeConsistency() throws Exception { + Ignite node = grid(/*requestFromClient ? nodesCount() - 1 :*/ 0); + + TestCache<Integer, MvccTestAccount> cache = new TestCache<>(node.cache(DEFAULT_CACHE_NAME)); + + final Set<Integer> keys1 = new HashSet<>(3); + final Set<Integer> keys2 = new HashSet<>(3); + + Set<Integer> allKeys = generateKeySet(cache.cache, keys1, keys2); + + final Map<Integer, MvccTestAccount> map1 = keys1.stream().collect( + Collectors.toMap(k -> k, k -> new MvccTestAccount(k, 1))); + + final Map<Integer, MvccTestAccount> map2 = keys2.stream().collect( + Collectors.toMap(k -> k, k -> new MvccTestAccount(k, 1))); + + assertEquals(0, cache.cache.size()); + + updateEntries(cache, map1, WriteMode.INVOKE); + assertEquals(3, cache.cache.size()); + + updateEntries(cache, map1, WriteMode.INVOKE); + assertEquals(3, cache.cache.size()); + + getEntries(cache, allKeys, INVOKE); + assertEquals(3, cache.cache.size()); + + updateEntries(cache, map2, WriteMode.INVOKE); + assertEquals(6, cache.cache.size()); + + getEntries(cache, keys2, INVOKE); + assertEquals(6, cache.cache.size()); + + removeEntries(cache, keys1, WriteMode.INVOKE); + assertEquals(3, cache.cache.size()); + + removeEntries(cache, keys1, WriteMode.INVOKE); + assertEquals(3, cache.cache.size()); + + getEntries(cache, allKeys, INVOKE); + assertEquals(3, cache.cache.size()); + + updateEntries(cache, map1, WriteMode.INVOKE); + assertEquals(6, cache.cache.size()); + + removeEntries(cache, allKeys, WriteMode.INVOKE); + assertEquals(0, cache.cache.size()); + + getEntries(cache, allKeys, INVOKE); + assertEquals(0, cache.cache.size()); } /** * Checks SQL and CacheAPI operation isolation consistency. * * @param readModeBefore read mode used before value updated. - * @param readModeBefore read mode used after value updated. + * @param readModeAfter read mode used after value updated. * @param writeMode write mode used for update. * @throws Exception If failed. */ @@ -206,20 +296,23 @@ public class MvccRepeatableReadBulkOpsTest extends CacheMvccAbstractTest { @Override public Void call() throws Exception { updateStart.await(); + assertEquals(initialMap.size(), cache2.cache.size()); + try (Transaction tx = txs2.txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)) { + tx.timeout(TX_TIMEOUT); updateEntries(cache2, updateMap, writeMode); removeEntries(cache2, keysForRemove, writeMode); - checkContains(cache2, true, updateMap.keySet()); - checkContains(cache2, false, keysForRemove); - assertEquals(updateMap, cache2.cache.getAll(allKeys)); tx.commit(); } + finally { + updateFinish.countDown(); + } - updateFinish.countDown(); + assertEquals(updateMap.size(), cache2.cache.size()); return null; } @@ -270,7 +363,7 @@ public class MvccRepeatableReadBulkOpsTest extends CacheMvccAbstractTest { * @return All keys. * @throws IgniteCheckedException If failed. */ - protected Set<Integer> generateKeySet(IgniteCache<Object, Object> cache, Set<Integer> keySet1, + protected Set<Integer> generateKeySet(IgniteCache<?, ?> cache, Set<Integer> keySet1, Set<Integer> keySet2) throws IgniteCheckedException { LinkedHashSet<Integer> allKeys = new LinkedHashSet<>(); @@ -302,50 +395,72 @@ public class MvccRepeatableReadBulkOpsTest extends CacheMvccAbstractTest { TestCache<Integer, MvccTestAccount> cache = new TestCache<>(node.cache(DEFAULT_CACHE_NAME)); - final Set<Integer> keysForUpdate = new HashSet<>(3); - final Set<Integer> keysForRemove = new HashSet<>(3); - final Set<Integer> allKeys = generateKeySet(grid(0).cache(DEFAULT_CACHE_NAME), keysForUpdate, keysForRemove); + final Set<Integer> keysForUpdate = new HashSet<>(3); + final Set<Integer> keysForRemove = new HashSet<>(3); - int updCnt = 1; + final Set<Integer> allKeys = generateKeySet(grid(0).cache(DEFAULT_CACHE_NAME), keysForUpdate, keysForRemove); - final Map<Integer, MvccTestAccount> initialVals = allKeys.stream().collect( - Collectors.toMap(k -> k, k -> new MvccTestAccount(k, 1))); + try { + int updCnt = 1; + + final Map<Integer, MvccTestAccount> initialVals = allKeys.stream().collect( + Collectors.toMap(k -> k, k -> new MvccTestAccount(k, 1))); + + updateEntries(cache, initialVals, writeMode); + + assertEquals(initialVals.size(), cache.cache.size()); + + IgniteTransactions txs = node.transactions(); + + Map<Integer, MvccTestAccount> updatedVals = null; - cache.cache.putAll(initialVals); + try (Transaction tx = txs.txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)) { + Map<Integer, MvccTestAccount> vals1 = getEntries(cache, allKeys, GET); + Map<Integer, MvccTestAccount> vals2 = getEntries(cache, allKeys, SQL); + Map<Integer, MvccTestAccount> vals3 = getEntries(cache, allKeys, ReadMode.INVOKE); - IgniteTransactions txs = node.transactions(); + assertEquals(initialVals, vals1); + assertEquals(initialVals, vals2); + assertEquals(initialVals, vals3); - Map<Integer, MvccTestAccount> updatedVals = null; + assertEquals(initialVals.size(), cache.cache.size()); - try (Transaction tx = txs.txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)) { - Map<Integer, MvccTestAccount> vals1 = getEntries(cache, allKeys, GET); - Map<Integer, MvccTestAccount> vals2 = getEntries(cache, allKeys, SQL); + for (ReadMode readMode : new ReadMode[] {GET, SQL, INVOKE}) { + int updCnt0 = ++updCnt; - assertEquals(initialVals, vals1); - assertEquals(initialVals, vals2); + updatedVals = allKeys.stream().collect(Collectors.toMap(Function.identity(), + k -> new MvccTestAccount(k, updCnt0))); - for (ReadMode readMode : new ReadMode[] {GET, SQL}) { - int updCnt0 = ++updCnt; + updateEntries(cache, updatedVals, writeMode); + assertEquals(allKeys.size(), cache.cache.size()); - updatedVals = keysForUpdate.stream().collect(Collectors.toMap(Function.identity(), - k -> new MvccTestAccount(k, updCnt0))); + removeEntries(cache, keysForRemove, writeMode); - updateEntries(cache, updatedVals, writeMode); - removeEntries(cache, keysForRemove, writeMode); + for (Integer key : keysForRemove) + updatedVals.remove(key); - assertEquals(String.valueOf(readMode), updatedVals, getEntries(cache, allKeys, readMode)); + assertEquals(String.valueOf(readMode), updatedVals, getEntries(cache, allKeys, readMode)); + } + + tx.commit(); } - tx.commit(); - } + try (Transaction tx = txs.txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)) { + assertEquals(updatedVals, getEntries(cache, allKeys, GET)); + assertEquals(updatedVals, getEntries(cache, allKeys, SQL)); + assertEquals(updatedVals, getEntries(cache, allKeys, INVOKE)); - try (Transaction tx = txs.txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)) { - assertEquals(updatedVals, getEntries(cache, allKeys, GET)); - assertEquals(updatedVals, getEntries(cache, allKeys, SQL)); + tx.commit(); + } - tx.commit(); + assertEquals(updatedVals.size(), cache.cache.size()); + } + finally { + cache.cache.removeAll(keysForUpdate); } + + assertEquals(0, cache.cache.size()); } /** @@ -365,6 +480,18 @@ public class MvccRepeatableReadBulkOpsTest extends CacheMvccAbstractTest { return cache.cache.getAll(keys); case SQL: return getAllSql(cache); + case INVOKE: { + Map<Integer, MvccTestAccount> res = new HashMap<>(); + + CacheEntryProcessor<Integer, MvccTestAccount, MvccTestAccount> ep = new GetEntryProcessor<>(); + + Map<Integer, EntryProcessorResult<MvccTestAccount>> invokeRes = cache.cache.invokeAll(keys, ep); + + for (Map.Entry<Integer, EntryProcessorResult<MvccTestAccount>> e : invokeRes.entrySet()) + res.put(e.getKey(), e.getValue().get()); + + return res; + } default: fail(); } @@ -395,6 +522,19 @@ public class MvccRepeatableReadBulkOpsTest extends CacheMvccAbstractTest { break; } + case INVOKE: { + CacheEntryProcessor<Integer, MvccTestAccount, MvccTestAccount> ep = + new GetAndPutEntryProcessor<Integer, MvccTestAccount>(){ + /** {@inheritDoc} */ + @Override MvccTestAccount newValForKey(Integer key) { + return entries.get(key); + } + }; + + cache.cache.invokeAll(entries.keySet(), ep); + + break; + } default: fail(); } @@ -423,6 +563,13 @@ public class MvccRepeatableReadBulkOpsTest extends CacheMvccAbstractTest { break; } + case INVOKE: { + CacheEntryProcessor<Integer, MvccTestAccount, MvccTestAccount> ep = new RemoveEntryProcessor<>(); + + cache.cache.invokeAll(keys, ep); + + break; + } default: fail(); } @@ -438,4 +585,52 @@ public class MvccRepeatableReadBulkOpsTest extends CacheMvccAbstractTest { protected void checkContains(TestCache<Integer, MvccTestAccount> cache, boolean expected, Set<Integer> keys) { assertEquals(expected, cache.cache.containsKeys(keys)); } + + /** + * Applies get operation. + */ + static class GetEntryProcessor<K, V> implements CacheEntryProcessor<K, V, V> { + /** {@inheritDoc} */ + @Override public V process(MutableEntry<K, V> entry, + Object... arguments) throws EntryProcessorException { + return entry.getValue(); + } + } + + /** + * Applies remove operation. + */ + static class RemoveEntryProcessor<K, V, R> implements CacheEntryProcessor<K, V, R> { + /** {@inheritDoc} */ + @Override public R process(MutableEntry<K, V> entry, + Object... arguments) throws EntryProcessorException { + entry.remove(); + + return null; + } + } + + /** + * Applies get and put operation. + */ + static class GetAndPutEntryProcessor<K, V> implements CacheEntryProcessor<K, V, V> { + /** {@inheritDoc} */ + @Override public V process(MutableEntry<K, V> entry, + Object... args) throws EntryProcessorException { + V newVal = !F.isEmpty(args) ? (V)args[0] : newValForKey(entry.getKey()); + + V oldVal = entry.getValue(); + entry.setValue(newVal); + + return oldVal; + } + + /** + * @param key Key. + * @return New value. + */ + V newValForKey(K key){ + throw new UnsupportedOperationException(); + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/dab050ac/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccRepeatableReadOperationsTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccRepeatableReadOperationsTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccRepeatableReadOperationsTest.java index c782f98..618d910 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccRepeatableReadOperationsTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccRepeatableReadOperationsTest.java @@ -22,9 +22,12 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.MutableEntry; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteTransactions; +import org.apache.ignite.cache.CacheEntryProcessor; import org.apache.ignite.transactions.Transaction; import org.apache.ignite.transactions.TransactionConcurrency; import org.apache.ignite.transactions.TransactionIsolation; @@ -55,8 +58,24 @@ public class MvccRepeatableReadOperationsTest extends MvccRepeatableReadBulkOpsT return res; } + case SQL: return getAllSql(cache); + + case INVOKE: { + Map<Integer, MvccTestAccount> res = new HashMap<>(); + + CacheEntryProcessor<Integer, MvccTestAccount, MvccTestAccount> ep = new GetEntryProcessor(); + + for (Integer key : keys) { + MvccTestAccount val = cache.cache.invoke(key, ep); + + if(val != null) + res.put(key, val); + } + + return res; + } default: fail(); } @@ -65,7 +84,7 @@ public class MvccRepeatableReadOperationsTest extends MvccRepeatableReadBulkOpsT } /** {@inheritDoc} */ - protected void updateEntries( + @Override protected void updateEntries( TestCache<Integer, MvccTestAccount> cache, Map<Integer, MvccTestAccount> entries, WriteMode writeMode) { @@ -79,6 +98,7 @@ public class MvccRepeatableReadOperationsTest extends MvccRepeatableReadBulkOpsT break; } + case DML: { for (Map.Entry<Integer, MvccTestAccount> e : entries.entrySet()) { if (e.getValue() == null) @@ -88,13 +108,23 @@ public class MvccRepeatableReadOperationsTest extends MvccRepeatableReadBulkOpsT } break; } + + case INVOKE: { + GetAndPutEntryProcessor<Integer, MvccTestAccount> ep = new GetAndPutEntryProcessor<>(); + + for (final Map.Entry<Integer, MvccTestAccount> e : entries.entrySet()) + cache.cache.invoke(e.getKey(), ep, e.getValue()); + + break; + } + default: fail(); } } /** {@inheritDoc} */ - protected void removeEntries( + @Override protected void removeEntries( TestCache<Integer, MvccTestAccount> cache, Set<Integer> keys, WriteMode writeMode) { @@ -111,6 +141,14 @@ public class MvccRepeatableReadOperationsTest extends MvccRepeatableReadBulkOpsT break; } + case INVOKE: { + CacheEntryProcessor<Integer, MvccTestAccount, MvccTestAccount> ep = new RemoveEntryProcessor<>(); + + for (Integer key : keys) + cache.cache.invoke(key, ep); + + break; + } default: fail(); }