Repository: ignite Updated Branches: refs/heads/ignite-2.7 35a2c800b -> 8a85823b5
IGNITE-9927: Reverted changes to CacheContinuousQueryOperationFromCallbackTest until flaky failures are fixed. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8a85823b Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8a85823b Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8a85823b Branch: refs/heads/ignite-2.7 Commit: 8a85823b567a61deb85bf12a9b9d72b5eed7dbbf Parents: 35a2c80 Author: devozerov <[email protected]> Authored: Thu Oct 18 12:19:38 2018 +0300 Committer: devozerov <[email protected]> Committed: Thu Oct 18 12:22:03 2018 +0300 ---------------------------------------------------------------------- ...ontinuousQueryOperationFromCallbackTest.java | 205 ++++--------------- 1 file changed, 39 insertions(+), 166 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8a85823b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationFromCallbackTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationFromCallbackTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationFromCallbackTest.java index 3cb13bf..0540b43 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationFromCallbackTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationFromCallbackTest.java @@ -63,13 +63,10 @@ import org.jetbrains.annotations.Nullable; import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; -import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT; import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.cache.CacheMode.REPLICATED; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC; -import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; -import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; /** * @@ -200,60 +197,6 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs } /** - * @throws Exception If failed. - */ - public void testMvccTxTwoBackupsFilter() throws Exception { - CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL_SNAPSHOT, FULL_SYNC); - - doTest(ccfg, false); - } - - /** - * @throws Exception If failed. - */ - public void testMvccTxTwoBackupsFilterPrimary() throws Exception { - CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL_SNAPSHOT, PRIMARY_SYNC); - - doTest(ccfg, false); - } - - /** - * @throws Exception If failed. - */ - public void testMvccTxReplicatedFilter() throws Exception { - CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 0, TRANSACTIONAL_SNAPSHOT, FULL_SYNC); - - doTest(ccfg, false); - } - - /** - * @throws Exception If failed. - */ - public void testMvccTxTwoBackup() throws Exception { - CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL_SNAPSHOT, FULL_SYNC); - - doTest(ccfg, true); - } - - /** - * @throws Exception If failed. - */ - public void testMvccTxReplicated() throws Exception { - CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 2, TRANSACTIONAL_SNAPSHOT, FULL_SYNC); - - doTest(ccfg, true); - } - - /** - * @throws Exception If failed. - */ - public void testMvccTxReplicatedPrimary() throws Exception { - CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 2, TRANSACTIONAL_SNAPSHOT, PRIMARY_SYNC); - - doTest(ccfg, true); - } - - /** * @param ccfg Cache configuration. * @throws Exception If failed. */ @@ -309,51 +252,34 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs QueryTestKey key = new QueryTestKey(rnd.nextInt(KEYS)); - boolean startTx = cache.getConfiguration(CacheConfiguration.class).getAtomicityMode() != - ATOMIC && rnd.nextBoolean(); + boolean startTx = cache.getConfiguration(CacheConfiguration.class).getAtomicityMode() == + TRANSACTIONAL && rnd.nextBoolean(); Transaction tx = null; - boolean committed = false; - - while (!committed && !Thread.currentThread().isInterrupted()) { - try { - if (startTx) - tx = cache.unwrap(Ignite.class).transactions().txStart(PESSIMISTIC, REPEATABLE_READ); + if (startTx) + tx = cache.unwrap(Ignite.class).transactions().txStart(); - if ((cache.get(key) == null) || rnd.nextBoolean()) - cache.invoke(key, new IncrementTestEntryProcessor()); - else { - QueryTestValue val; - QueryTestValue newVal; + try { + if ((cache.get(key) == null) || rnd.nextBoolean()) + cache.invoke(key, new IncrementTestEntryProcessor()); + else { + QueryTestValue val; + QueryTestValue newVal; - do { - val = cache.get(key); + do { + val = cache.get(key); - newVal = val == null ? - new QueryTestValue(0) : new QueryTestValue(val.val1 + 1); - } - while (!cache.replace(key, val, newVal)); + newVal = val == null ? + new QueryTestValue(0) : new QueryTestValue(val.val1 + 1); } - - if (tx != null) - tx.commit(); - - committed = true; - } - catch (Exception e) { - assertTrue(e.getMessage(), e.getMessage() != null && - (e.getMessage().contains("Transaction has been rolled back") || - e.getMessage().contains("Cannot serialize transaction due to write conflict"))); - - // Wait MVCC updates become visible. - doSleep(50); - } - finally { - if (tx != null) - tx.close(); + while (!cache.replace(key, val, newVal)); } } + finally { + if (tx != null) + tx.commit(); + } } } }, threadCnt, "put-thread"); @@ -376,7 +302,7 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs @Override public boolean apply() { return cbCntr.get() >= expCnt; } - }, TimeUnit.SECONDS.toMillis(120)); + }, TimeUnit.SECONDS.toMillis(60)); assertTrue("Failed to wait events [exp=" + expCnt + ", act=" + cbCntr.get() + "]", res); @@ -393,7 +319,7 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs @Override public boolean apply() { return filterCbCntr.get() >= expInvkCnt; } - }, TimeUnit.SECONDS.toMillis(120)); + }, TimeUnit.SECONDS.toMillis(60)); assertEquals(expInvkCnt, filterCbCntr.get()); @@ -481,45 +407,17 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs if (e.getKey().compareTo(new QueryTestKey(KEYS)) < 0) { IgniteCache<QueryTestKey, QueryTestValue> cache = ignite.cache(cacheName); - boolean committed = false; - Transaction tx = null; - boolean startTx = cache.getConfiguration(CacheConfiguration.class).getAtomicityMode() == TRANSACTIONAL_SNAPSHOT; - - while (!committed && !Thread.currentThread().isInterrupted()) { - try { - if (startTx) - tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ); - - if (ThreadLocalRandom.current().nextBoolean()) { - Set<QueryTestKey> keys = new LinkedHashSet<>(); + if (ThreadLocalRandom.current().nextBoolean()) { + Set<QueryTestKey> keys = new LinkedHashSet<>(); - for (int key = KEYS; key < KEYS + KEYS_FROM_CALLBACK; key++) - keys.add(new QueryTestKey(key)); + for (int key = KEYS; key < KEYS + KEYS_FROM_CALLBACK; key++) + keys.add(new QueryTestKey(key)); - cache.invokeAll(keys, new IncrementTestEntryProcessor()); - } - else { - for (int key = KEYS; key < KEYS + KEYS_FROM_CALLBACK; key++) - cache.invoke(new QueryTestKey(key), new IncrementTestEntryProcessor()); - } - - if (tx != null) - tx.commit(); - - committed = true; - } - catch (Exception ex) { - assertTrue(ex.getMessage(), ex.getMessage() != null && - (ex.getMessage().contains("Transaction has been rolled back") || - ex.getMessage().contains("Cannot serialize transaction due to write conflict"))); - - // Wait MVCC updates become visible. - doSleep(50); - } - finally { - if (tx != null) - tx.close(); - } + cache.invokeAll(keys, new IncrementTestEntryProcessor()); + } + else { + for (int key = KEYS; key < KEYS + KEYS_FROM_CALLBACK; key++) + cache.invoke(new QueryTestKey(key), new IncrementTestEntryProcessor()); } filterCbCntr.incrementAndGet(); @@ -579,42 +477,17 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs cntr.incrementAndGet(); if (cache != null) { - boolean committed = false; - Transaction tx = null; - boolean startTx = cache.getConfiguration(CacheConfiguration.class).getAtomicityMode() == TRANSACTIONAL_SNAPSHOT; - - while (!committed && !Thread.currentThread().isInterrupted()) { - try { - if (startTx) - tx = cache.unwrap(Ignite.class).transactions().txStart(PESSIMISTIC, REPEATABLE_READ); - - if (ThreadLocalRandom.current().nextBoolean()) { - Set<QueryTestKey> keys = new LinkedHashSet<>(); - - for (int key = KEYS; key < KEYS + KEYS_FROM_CALLBACK; key++) - keys.add(new QueryTestKey(key)); - - cache.invokeAll(keys, new IncrementTestEntryProcessor()); - } - else { - for (int key = KEYS; key < KEYS + KEYS_FROM_CALLBACK; key++) - cache.invoke(new QueryTestKey(key), new IncrementTestEntryProcessor()); - } + if (ThreadLocalRandom.current().nextBoolean()) { + Set<QueryTestKey> keys = new LinkedHashSet<>(); - if (tx != null) - tx.commit(); + for (int key = KEYS; key < KEYS + KEYS_FROM_CALLBACK; key++) + keys.add(new QueryTestKey(key)); - committed = true; - } - catch (Exception ex) { - assertTrue(ex.getMessage(), ex.getMessage() != null && - (ex.getMessage().contains("Transaction has been rolled back") || - ex.getMessage().contains("Cannot serialize transaction due to write conflict"))); - } - finally { - if (tx != null) - tx.close(); - } + cache.invokeAll(keys, new IncrementTestEntryProcessor()); + } + else { + for (int key = KEYS; key < KEYS + KEYS_FROM_CALLBACK; key++) + cache.invoke(new QueryTestKey(key), new IncrementTestEntryProcessor()); } } }
