Repository: ignite Updated Branches: refs/heads/ignite-1537 0b5306fb9 -> dc8ab8afe
ignite-1.5 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/dc8ab8af Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/dc8ab8af Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/dc8ab8af Branch: refs/heads/ignite-1537 Commit: dc8ab8afec41945c201e5a677920ec74670a443c Parents: 0b5306f Author: sboikov <sboi...@gridgain.com> Authored: Tue Dec 22 22:42:32 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Tue Dec 22 22:42:32 2015 +0300 ---------------------------------------------------------------------- ...ContinuousQueryFailoverAbstractSelfTest.java | 128 ++++++++++--------- 1 file changed, 66 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/dc8ab8af/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java index 5a4ba14..283da80 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java @@ -1009,107 +1009,111 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC */ private void checkEvents(final List<T3<Object, Object, Object>> expEvts, final CacheEventListener2 lsnr, boolean lostAllow, boolean wait) throws Exception { - if (wait) + if (wait) { GridTestUtils.waitForCondition(new PA() { - @Override public boolean apply() { + @Override + public boolean apply() { return expEvts.size() == lsnr.size(); } }, 2000L); + } - Map<Integer, List<CacheEntryEvent<?, ?>>> prevMap = new HashMap<>(lsnr.evts.size()); + synchronized (lsnr) { + Map<Integer, List<CacheEntryEvent<?, ?>>> prevMap = new HashMap<>(lsnr.evts.size()); - for (Map.Entry<Integer, List<CacheEntryEvent<?, ?>>> e : lsnr.evts.entrySet()) - prevMap.put(e.getKey(), new ArrayList<>(e.getValue())); + for (Map.Entry<Integer, List<CacheEntryEvent<?, ?>>> e : lsnr.evts.entrySet()) + prevMap.put(e.getKey(), new ArrayList<>(e.getValue())); - List<T3<Object, Object, Object>> lostEvts = new ArrayList<>(); + List<T3<Object, Object, Object>> lostEvts = new ArrayList<>(); - for (T3<Object, Object, Object> exp : expEvts) { - List<CacheEntryEvent<?, ?>> rcvdEvts = lsnr.evts.get(exp.get1()); + for (T3<Object, Object, Object> exp : expEvts) { + List<CacheEntryEvent<?, ?>> rcvdEvts = lsnr.evts.get(exp.get1()); - if (F.eq(exp.get2(), exp.get3())) - continue; + if (F.eq(exp.get2(), exp.get3())) + continue; - if (rcvdEvts == null || rcvdEvts.isEmpty()) { - lostEvts.add(exp); + if (rcvdEvts == null || rcvdEvts.isEmpty()) { + lostEvts.add(exp); - continue; - } + continue; + } - Iterator<CacheEntryEvent<?, ?>> iter = rcvdEvts.iterator(); + Iterator<CacheEntryEvent<?, ?>> iter = rcvdEvts.iterator(); - boolean found = false; + boolean found = false; - while (iter.hasNext()) { - CacheEntryEvent<?, ?> e = iter.next(); + while (iter.hasNext()) { + CacheEntryEvent<?, ?> e = iter.next(); - if ((exp.get2() != null && e.getValue() != null && exp.get2().equals(e.getValue())) - && equalOldValue(e, exp)) { - found = true; + if ((exp.get2() != null && e.getValue() != null && exp.get2().equals(e.getValue())) + && equalOldValue(e, exp)) { + found = true; - iter.remove(); + iter.remove(); - break; + break; + } } - } - // Lost event is acceptable. - if (!found) - lostEvts.add(exp); - } + // Lost event is acceptable. + if (!found) + lostEvts.add(exp); + } - boolean dup = false; + boolean dup = false; - // Check duplicate. - if (!lsnr.evts.isEmpty()) { - for (List<CacheEntryEvent<?, ?>> evts : lsnr.evts.values()) { - if (!evts.isEmpty()) { - for (CacheEntryEvent<?, ?> e : evts) { - boolean found = false; + // Check duplicate. + if (!lsnr.evts.isEmpty()) { + for (List<CacheEntryEvent<?, ?>> evts : lsnr.evts.values()) { + if (!evts.isEmpty()) { + for (CacheEntryEvent<?, ?> e : evts) { + boolean found = false; - for (T3<Object, Object, Object> lostEvt : lostEvts) { - if (e.getKey().equals(lostEvt.get1()) && e.getValue().equals(lostEvt.get2())) { - found = true; + for (T3<Object, Object, Object> lostEvt : lostEvts) { + if (e.getKey().equals(lostEvt.get1()) && e.getValue().equals(lostEvt.get2())) { + found = true; - lostEvts.remove(lostEvt); + lostEvts.remove(lostEvt); - break; + break; + } } - } - if (!found) { - dup = true; + if (!found) { + dup = true; - break; + break; + } } } } - } - if (dup) { - for (List<CacheEntryEvent<?, ?>> e : lsnr.evts.values()) { - if (!e.isEmpty()) { - for (CacheEntryEvent<?, ?> event : e) - log.error("Got duplicate event: " + event); + if (dup) { + for (List<CacheEntryEvent<?, ?>> e : lsnr.evts.values()) { + if (!e.isEmpty()) { + for (CacheEntryEvent<?, ?> event : e) + log.error("Got duplicate event: " + event); + } } } } - } - if (!lostAllow && lostEvts.size() > 100) { - log.error("Lost event cnt: " + lostEvts.size()); + if (!lostAllow && lostEvts.size() > 100) { + log.error("Lost event cnt: " + lostEvts.size()); - for (T3<Object, Object, Object> e : lostEvts) - log.error("Lost event: " + e); + for (T3<Object, Object, Object> e : lostEvts) + log.error("Lost event: " + e); - fail("Lose events, see log for details."); - } + fail("Lose events, see log for details."); + } - log.error("Lost event cnt: " + lostEvts.size()); + log.error("Lost event cnt: " + lostEvts.size()); - expEvts.clear(); + expEvts.clear(); - lsnr.evts.clear(); - lsnr.vals.clear(); + lsnr.evts.clear(); + lsnr.vals.clear(); + } } /** @@ -2111,7 +2115,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC /** * @return Count events. */ - public int size() { + public synchronized int size() { int size = 0; for (List<CacheEntryEvent<?, ?>> e : evts.values())