IGNITE-2083 EntryProcessor is called twice on primary node in transactional cache
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9a14d643 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9a14d643 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9a14d643 Branch: refs/heads/ignite-843-rc2 Commit: 9a14d6432932fc1a1fdf2ddd77dea920382efe8c Parents: c10b112 Author: sboikov <sboi...@gridgain.com> Authored: Mon Dec 7 15:05:09 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Mon Dec 7 15:05:09 2015 +0300 ---------------------------------------------------------------------- .../distributed/dht/GridDhtTxPrepareFuture.java | 51 +- .../cache/transactions/IgniteTxAdapter.java | 5 + .../cache/transactions/IgniteTxEntry.java | 8 +- .../IgniteCacheEntryProcessorCallTest.java | 497 +++++++++++++++++++ ...idCachePartitionedHitsAndMissesSelfTest.java | 4 +- .../testframework/junits/GridAbstractTest.java | 7 + .../ignite/testsuites/IgniteCacheTestSuite.java | 2 + 7 files changed, 558 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/9a14d643/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 9f1f8a1..3829e28 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -337,6 +337,13 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter cacheCtx.config().isLoadPreviousValue() && !txEntry.skipStore(); + boolean evt = retVal || txEntry.op() == TRANSFORM; + + EntryProcessor entryProc = null; + + if (evt && txEntry.op() == TRANSFORM) + entryProc = F.first(txEntry.entryProcessors()).get1(); + CacheObject val = cached.innerGet( tx, /*swap*/true, @@ -344,11 +351,11 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter /*fail fast*/false, /*unmarshal*/true, /*metrics*/retVal, - /*event*/retVal, + /*event*/evt, /*tmp*/false, - null, - null, - null, + tx.subjectId(), + entryProc, + tx.resolveTaskName(), null, txEntry.keepBinary()); @@ -364,11 +371,13 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter Object procRes = null; Exception err = null; + boolean modified = false; + for (T2<EntryProcessor<Object, Object, Object>, Object[]> t : txEntry.entryProcessors()) { - try { - CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry<>( - txEntry.context(), key, val, txEntry.cached().version(), txEntry.keepBinary()); + CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry<>( + txEntry.context(), key, val, txEntry.cached().version(), txEntry.keepBinary()); + try { EntryProcessor<Object, Object, Object> processor = t.get1(); procRes = processor.process(invokeEntry, t.get2()); @@ -380,9 +389,27 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter break; } + + modified |= invokeEntry.modified(); } - txEntry.entryProcessorCalculatedValue(val); + if (modified) + val = cacheCtx.toCacheObject(cacheCtx.unwrapTemporary(val)); + + GridCacheOperation op = modified ? (val == null ? DELETE : UPDATE) : NOOP; + + if (op == NOOP) { + if (expiry != null) { + long ttl = CU.toTtl(expiry.getExpiryForAccess()); + + txEntry.ttl(ttl); + + if (ttl == CU.TTL_ZERO) + op = DELETE; + } + } + + txEntry.entryProcessorCalculatedValue(new T2<>(op, op == NOOP ? null : val)); if (retVal) { if (err != null || procRes != null) @@ -1301,10 +1328,12 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter entry.cached().partition()); if (state != GridDhtPartitionState.OWNING && state != GridDhtPartitionState.EVICTED) { - CacheObject procVal = entry.entryProcessorCalculatedValue(); + T2<GridCacheOperation, CacheObject> procVal = entry.entryProcessorCalculatedValue(); + + assert procVal != null : entry; - entry.op(procVal == null ? DELETE : UPDATE); - entry.value(procVal, true, false); + entry.op(procVal.get1()); + entry.value(procVal.get2(), true, false); entry.entryProcessors(null); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/9a14d643/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index 3065ac2..53f4f56 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -1233,6 +1233,11 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter if (F.isEmpty(txEntry.entryProcessors())) return F.t(txEntry.op(), txEntry.value()); else { + T2<GridCacheOperation, CacheObject> calcVal = txEntry.entryProcessorCalculatedValue(); + + if (calcVal != null) + return calcVal; + boolean recordEvt = cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ); CacheObject cacheVal = txEntry.hasValue() ? txEntry.value() : http://git-wip-us.apache.org/repos/asf/ignite/blob/9a14d643/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java index fba1513..2c6c3df 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java @@ -105,7 +105,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { /** Transient field for calculated entry processor value. */ @GridDirectTransient - private CacheObject entryProcessorCalcVal; + private T2<GridCacheOperation, CacheObject> entryProcessorCalcVal; /** Transform closure bytes. */ @GridToStringExclude @@ -888,14 +888,16 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { /** * @return Entry processor calculated value. */ - public CacheObject entryProcessorCalculatedValue() { + public T2<GridCacheOperation, CacheObject> entryProcessorCalculatedValue() { return entryProcessorCalcVal; } /** * @param entryProcessorCalcVal Entry processor calculated value. */ - public void entryProcessorCalculatedValue(CacheObject entryProcessorCalcVal) { + public void entryProcessorCalculatedValue(T2<GridCacheOperation, CacheObject> entryProcessorCalcVal) { + assert entryProcessorCalcVal != null; + this.entryProcessorCalcVal = entryProcessorCalcVal; } http://git-wip-us.apache.org/repos/asf/ignite/blob/9a14d643/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorCallTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorCallTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorCallTest.java new file mode 100644 index 0000000..5163d96 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorCallTest.java @@ -0,0 +1,497 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.concurrent.atomic.AtomicInteger; +import javax.cache.processor.EntryProcessor; +import javax.cache.processor.MutableEntry; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_GRID_NAME; +import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; +import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE; + +/** + * + */ +public class IgniteCacheEntryProcessorCallTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + static final AtomicInteger callCnt = new AtomicInteger(); + + /** */ + private static final int SRV_CNT = 4; + + /** */ + private static final int NODES = 5; + + /** */ + private boolean client; + + /** */ + private static final int OP_UPDATE = 1; + + /** */ + private static final int OP_REMOVE = 2; + + /** */ + private static final int OP_GET = 3; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + cfg.setClientMode(client); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGridsMultiThreaded(SRV_CNT); + + client = true; + + Ignite client = startGrid(SRV_CNT); + + assertTrue(client.configuration().isClientMode()); + } + + /** + * @throws Exception If failed. + */ + public void testEntryProcessorCall() throws Exception { + { + CacheConfiguration<Integer, TestValue> ccfg = new CacheConfiguration<>(); + ccfg.setBackups(1); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setAtomicityMode(ATOMIC); + ccfg.setAtomicWriteOrderMode(PRIMARY); + + checkEntryProcessorCallCount(ccfg, 1); + } + + { + CacheConfiguration<Integer, TestValue> ccfg = new CacheConfiguration<>(); + ccfg.setBackups(0); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setAtomicityMode(ATOMIC); + ccfg.setAtomicWriteOrderMode(PRIMARY); + + checkEntryProcessorCallCount(ccfg, 1); + } + + { + CacheConfiguration<Integer, TestValue> ccfg = new CacheConfiguration<>(); + ccfg.setBackups(1); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setAtomicityMode(TRANSACTIONAL); + + checkEntryProcessorCallCount(ccfg, 2); + } + + { + CacheConfiguration<Integer, TestValue> ccfg = new CacheConfiguration<>(); + ccfg.setBackups(0); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setAtomicityMode(TRANSACTIONAL); + + checkEntryProcessorCallCount(ccfg, 1); + } + } + + /** + * @param ccfg Cache configuration. + * @param expCallCnt Expected entry processor calls count. + * @throws Exception If failed. + */ + private void checkEntryProcessorCallCount(CacheConfiguration<Integer, TestValue> ccfg, + int expCallCnt) throws Exception { + Ignite client1 = ignite(SRV_CNT); + + IgniteCache<Integer, TestValue> clientCache1 = client1.createCache(ccfg); + + IgniteCache<Integer, TestValue> srvCache = ignite(0).cache(ccfg.getName()); + + awaitPartitionMapExchange(); + + int key = 0; + + checkEntryProcessCall(key++, clientCache1, null, null, expCallCnt); + + if (ccfg.getAtomicityMode() == TRANSACTIONAL) { + checkEntryProcessCall(key++, clientCache1, OPTIMISTIC, REPEATABLE_READ, expCallCnt + 1); + checkEntryProcessCall(key++, clientCache1, PESSIMISTIC, REPEATABLE_READ, expCallCnt + 1); + checkEntryProcessCall(key++, clientCache1, OPTIMISTIC, SERIALIZABLE, expCallCnt + 1); + } + + for (int i = 100; i < 110; i++) { + checkEntryProcessCall(key++, srvCache, null, null, expCallCnt); + + if (ccfg.getAtomicityMode() == TRANSACTIONAL) { + checkEntryProcessCall(key++, srvCache, OPTIMISTIC, REPEATABLE_READ, expCallCnt + 1); + checkEntryProcessCall(key++, srvCache, PESSIMISTIC, REPEATABLE_READ, expCallCnt + 1); + checkEntryProcessCall(key++, srvCache, OPTIMISTIC, SERIALIZABLE, expCallCnt + 1); + } + } + + for (int i = 0; i < NODES; i++) + ignite(i).destroyCache(ccfg.getName()); + } + + /** + * + * @param key Key. + * @param cache Cache. + * @param concurrency Transaction concurrency. + * @param isolation Transaction isolation. + * @param expCallCnt Expected entry processor calls count. + */ + private void checkEntryProcessCall(Integer key, + IgniteCache<Integer, TestValue> cache, + @Nullable TransactionConcurrency concurrency, + @Nullable TransactionIsolation isolation, + int expCallCnt) { + Ignite ignite = cache.unwrap(Ignite.class); + + ClusterNode primary = ignite.affinity(cache.getName()).mapKeyToNode(key); + + assertNotNull(primary); + + log.info("Check call [key=" + key + + ", primary=" + primary.attribute(ATTR_GRID_NAME) + + ", concurrency=" + concurrency + + ", isolation=" + isolation + "]"); + + Transaction tx; + TestReturnValue retVal; + + log.info("Invoke: " + key); + + // Update. + callCnt.set(0); + + tx = startTx(cache, concurrency, isolation); + + retVal = cache.invoke(key, new TestEntryProcessor(OP_UPDATE), new TestValue(Integer.MIN_VALUE)); + + if (tx != null) + tx.commit(); + + assertEquals(expCallCnt, callCnt.get()); + + checkReturnValue(retVal, "null"); + checkCacheValue(cache.getName(), key, new TestValue(0)); + + log.info("Invoke: " + key); + + // Get. + callCnt.set(0); + + tx = startTx(cache, concurrency, isolation); + + retVal = cache.invoke(key, new TestEntryProcessor(OP_GET), new TestValue(Integer.MIN_VALUE)); + + if (tx != null) + tx.commit(); + + assertEquals(expCallCnt, callCnt.get()); + + checkReturnValue(retVal, "0"); + checkCacheValue(cache.getName(), key, new TestValue(0)); + + log.info("Invoke: " + key); + + // Update. + callCnt.set(0); + + tx = startTx(cache, concurrency, isolation); + + retVal = cache.invoke(key, new TestEntryProcessor(OP_UPDATE), new TestValue(Integer.MIN_VALUE)); + + if (tx != null) + tx.commit(); + + assertEquals(expCallCnt, callCnt.get()); + + checkReturnValue(retVal, "0"); + checkCacheValue(cache.getName(), key, new TestValue(1)); + + log.info("Invoke: " + key); + + // Remove. + callCnt.set(0); + + tx = startTx(cache, concurrency, isolation); + + retVal = cache.invoke(key, new TestEntryProcessor(OP_REMOVE), new TestValue(Integer.MIN_VALUE)); + + if (tx != null) + tx.commit(); + + assertEquals(expCallCnt, callCnt.get()); + + checkReturnValue(retVal, "1"); + checkCacheValue(cache.getName(), key, null); + } + + /** + * @param retVal Return value. + * @param expVal Expected value. + */ + private void checkReturnValue(TestReturnValue retVal, String expVal) { + assertNotNull(retVal); + + TestValue arg = (TestValue)retVal.argument(); + assertNotNull(arg); + assertEquals(Integer.MIN_VALUE, (Object)arg.value()); + + assertEquals(expVal, retVal.value()); + } + + /** + * @param cacheName Cache name. + * @param key Key. + * @param expVal Expected value. + */ + private void checkCacheValue(String cacheName, Integer key, TestValue expVal) { + for (int i = 0; i < NODES; i++) { + Ignite ignite = ignite(i); + + IgniteCache<Integer, TestValue> cache = ignite.cache(cacheName); + + assertEquals(expVal, cache.get(key)); + } + } + + /** + * @param cache Cache. + * @param concurrency Transaction concurrency. + * @param isolation Transaction isolation. + * @return Started transaction. + */ + @Nullable private Transaction startTx(IgniteCache<Integer, TestValue> cache, + @Nullable TransactionConcurrency concurrency, + @Nullable TransactionIsolation isolation) { + if (concurrency != null) { + assert isolation != null; + + return cache.unwrap(Ignite.class).transactions().txStart(concurrency, isolation); + } + + return null; + } + + /** + * + */ + static class TestEntryProcessor implements EntryProcessor<Integer, TestValue, TestReturnValue> { + /** */ + private int op; + + /** + * @param op Operation. + */ + public TestEntryProcessor(int op) { + this.op = op; + } + + /** {@inheritDoc} */ + @Override public TestReturnValue process(MutableEntry<Integer, TestValue> entry, + Object... args) { + Ignite ignite = entry.unwrap(Ignite.class); + + ignite.log().info("TestEntryProcessor called [op=" + op + ", entry=" + entry + ']'); + + callCnt.incrementAndGet(); + + assertEquals(1, args.length); + + TestReturnValue retVal; + + TestValue val = entry.getValue(); + + if (val == null) + retVal = new TestReturnValue("null", args[0]); + else + retVal = new TestReturnValue(String.valueOf(val.value()), args[0]); + + switch (op) { + case OP_GET: + return retVal; + + case OP_UPDATE: { + if (val == null) + val = new TestValue(0); + else + val = new TestValue(val.val + 1); + + entry.setValue(val); + + break; + } + + case OP_REMOVE: + entry.remove(); + + break; + + default: + assert false; + } + + return retVal; + } + } + + /** + * + */ + static class TestValue { + /** */ + private Integer val; + + /** + * @param val Value. + */ + public TestValue(Integer val) { + this.val = val; + } + + /** + * @return Value. + */ + public Integer value() { + return val; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + TestValue testVal = (TestValue) o; + + return val.equals(testVal.val); + + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return val.hashCode(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TestValue.class, this); + } + } + + /** + * + */ + static class TestReturnValue { + /** */ + private String val; + + /** */ + private Object arg; + + /** + * @param val Value. + * @param arg Entry processor argument. + */ + public TestReturnValue(String val, Object arg) { + this.val = val; + this.arg = arg; + } + + /** + * @return Value. + */ + public String value() { + return val; + } + + /** + * @return Entry processor argument. + */ + public Object argument() { + return arg; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + TestReturnValue testVal = (TestReturnValue) o; + + return val.equals(testVal.val); + + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return val.hashCode(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TestReturnValue.class, this); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9a14d643/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedHitsAndMissesSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedHitsAndMissesSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedHitsAndMissesSelfTest.java index a2ae2e1..02eb9d8 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedHitsAndMissesSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedHitsAndMissesSelfTest.java @@ -121,8 +121,8 @@ public class GridCachePartitionedHitsAndMissesSelfTest extends GridCommonAbstrac } // Check that invoke and loader updated metrics - assertEquals(CNT, hits); - assertEquals(CNT, misses); + assertEquals(CNT / 2, hits); + assertEquals(CNT / 2, misses); } finally { stopAllGrids(); http://git-wip-us.apache.org/repos/asf/ignite/blob/9a14d643/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java index 95661cb..eaf63d7 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java @@ -60,6 +60,7 @@ import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.IgnitionEx; +import org.apache.ignite.internal.portable.BinaryMarshaller; import org.apache.ignite.internal.processors.resource.GridSpringResourceContext; import org.apache.ignite.internal.util.GridClassLoaderCache; import org.apache.ignite.internal.portable.BinaryEnumCache; @@ -119,6 +120,9 @@ public abstract class GridAbstractTest extends TestCase { /** Null name for execution map. */ private static final String NULL_NAME = UUID.randomUUID().toString(); + /** */ + private static final boolean BINARY_MARSHALLER = false; + /** Ip finder for TCP discovery. */ public static final TcpDiscoveryIpFinder LOCAL_IP_FINDER = new TcpDiscoveryVmIpFinder(false) {{ setAddresses(Collections.singleton("127.0.0.1:47500..47509")); @@ -155,6 +159,9 @@ public abstract class GridAbstractTest extends TestCase { System.setProperty(IgniteSystemProperties.IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE, "10000"); System.setProperty(IgniteSystemProperties.IGNITE_UPDATE_NOTIFIER, "false"); + if (BINARY_MARSHALLER) + GridTestProperties.setProperty(GridTestProperties.MARSH_CLASS_NAME, BinaryMarshaller.class.getName()); + Thread timer = new Thread(new GridTestClockTimer(), "ignite-clock-for-tests"); timer.setDaemon(true); http://git-wip-us.apache.org/repos/asf/ignite/blob/9a14d643/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java index ca31c28..7e45470 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java @@ -91,6 +91,7 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheEntryListenerEager import org.apache.ignite.internal.processors.cache.IgniteCacheEntryListenerTxLocalTest; import org.apache.ignite.internal.processors.cache.IgniteCacheEntryListenerTxReplicatedTest; import org.apache.ignite.internal.processors.cache.IgniteCacheEntryListenerTxTest; +import org.apache.ignite.internal.processors.cache.IgniteCacheEntryProcessorCallTest; import org.apache.ignite.internal.processors.cache.IgniteCacheManyAsyncOperationsTest; import org.apache.ignite.internal.processors.cache.IgniteCacheNearLockValueSelfTest; import org.apache.ignite.internal.processors.cache.IgniteCacheTransactionalStopBusySelfTest; @@ -167,6 +168,7 @@ public class IgniteCacheTestSuite extends TestSuite { suite.addTestSuite(IgniteCacheAtomicLocalInvokeTest.class); suite.addTestSuite(IgniteCacheAtomicLocalWithStoreInvokeTest.class); suite.addTestSuite(IgniteCacheTxInvokeTest.class); + suite.addTestSuite(IgniteCacheEntryProcessorCallTest.class); suite.addTestSuite(IgniteCacheTxNearEnabledInvokeTest.class); suite.addTestSuite(IgniteCacheTxLocalInvokeTest.class); suite.addTestSuite(IgniteCrossCacheTxStoreSelfTest.class);