Repository: ignite Updated Branches: refs/heads/ignite-3163 [created] 374779ca3
http://git-wip-us.apache.org/repos/asf/ignite/blob/9df46fc0/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java index 89ead35..3d1ec94 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java @@ -45,7 +45,6 @@ public class IgniteBinaryCacheQueryTestSuite extends TestSuite { suite.addTestSuite(BinarySerializationQueryWithReflectiveSerializerSelfTest.class); suite.addTestSuite(IgniteCacheBinaryObjectsScanSelfTest.class); - suite.addTestSuite(CacheContinuousQueryLostPartitionTest.class); //Should be adjusted. Not ready to be used with BinaryMarshaller. //suite.addTestSuite(GridCacheBinarySwapScanQuerySelfTest.class); http://git-wip-us.apache.org/repos/asf/ignite/blob/9df46fc0/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java index fbb3091..e0e81b7 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java @@ -20,9 +20,14 @@ package org.apache.ignite.testsuites; import junit.framework.TestSuite; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousBatchAckTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousBatchForceServerModeAckTest; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryAsyncFilterListenerTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryExecuteInPrimaryTest; -import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFactoryFilterTest; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFactoryAsyncFilterRandomOperationTest; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFactoryFilterRandomOperationTest; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryLostPartitionTest; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryOperationFromCallbackTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryOperationP2PTest; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryOrderingEventTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTwoNodesTest; import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicNearEnabledSelfTest; @@ -87,12 +92,17 @@ public class IgniteCacheQuerySelfTestSuite3 extends TestSuite { suite.addTestSuite(IgniteCacheContinuousQueryClientTxReconnectTest.class); suite.addTestSuite(CacheContinuousQueryRandomOperationsTest.class); suite.addTestSuite(CacheContinuousQueryRandomOperationsTwoNodesTest.class); - suite.addTestSuite(CacheContinuousQueryFactoryFilterTest.class); suite.addTestSuite(GridCacheContinuousQueryConcurrentTest.class); + suite.addTestSuite(CacheContinuousQueryAsyncFilterListenerTest.class); + suite.addTestSuite(CacheContinuousQueryFactoryFilterRandomOperationTest.class); + suite.addTestSuite(CacheContinuousQueryFactoryAsyncFilterRandomOperationTest.class); + suite.addTestSuite(CacheContinuousQueryOrderingEventTest.class); + suite.addTestSuite(CacheContinuousQueryOperationFromCallbackTest.class); suite.addTestSuite(CacheContinuousQueryOperationP2PTest.class); suite.addTestSuite(CacheContinuousBatchAckTest.class); suite.addTestSuite(CacheContinuousBatchForceServerModeAckTest.class); suite.addTestSuite(CacheContinuousQueryExecuteInPrimaryTest.class); + suite.addTestSuite(CacheContinuousQueryLostPartitionTest.class); return suite; } http://git-wip-us.apache.org/repos/asf/ignite/blob/9df46fc0/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite4.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite4.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite4.java index fa4e642..c4fcdac 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite4.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite4.java @@ -18,6 +18,9 @@ package org.apache.ignite.testsuites; import junit.framework.TestSuite; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryAsyncFailoverAtomicPrimaryWriteOrderSelfTest; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryAsyncFailoverTxReplicatedSelfTest; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryAsyncFailoverTxSelfTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicPrimaryWriteOrderOffheapTieredTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicPrimaryWriteOrderSelfTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicReplicatedSelfTest; @@ -44,6 +47,10 @@ public class IgniteCacheQuerySelfTestSuite4 extends TestSuite { suite.addTestSuite(CacheContinuousQueryFailoverAtomicPrimaryWriteOrderOffheapTieredTest.class); suite.addTestSuite(CacheContinuousQueryFailoverTxOffheapTieredTest.class); + suite.addTestSuite(CacheContinuousQueryAsyncFailoverAtomicPrimaryWriteOrderSelfTest.class); + suite.addTestSuite(CacheContinuousQueryAsyncFailoverTxReplicatedSelfTest.class); + suite.addTestSuite(CacheContinuousQueryAsyncFailoverTxSelfTest.class); + return suite; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/9df46fc0/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/CacheEntryEventAsyncProbe.java ---------------------------------------------------------------------- diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/CacheEntryEventAsyncProbe.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/CacheEntryEventAsyncProbe.java new file mode 100644 index 0000000..0ea66d4 --- /dev/null +++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/CacheEntryEventAsyncProbe.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.yardstick.cache; + +import java.util.concurrent.atomic.AtomicLong; +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryListenerException; +import javax.cache.event.CacheEntryUpdatedListener; +import org.apache.ignite.lang.IgniteAsyncCallback; + +/** + * Probe which calculate continuous query events. + */ +public class CacheEntryEventAsyncProbe extends CacheEntryEventProbe { + /** */ + @Override protected CacheEntryUpdatedListener<Integer, Integer> localListener(AtomicLong cntr) { + return new CacheEntryEventListener(cntr); + } + + /** + * + */ + @IgniteAsyncCallback + private static final class CacheEntryEventListener implements CacheEntryUpdatedListener<Integer, Integer> { + /** */ + private AtomicLong cnt; + + /** + * @param cnt Counter. + */ + public CacheEntryEventListener(AtomicLong cnt) { + this.cnt = cnt; + } + + /** {@inheritDoc} */ + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> events) + throws CacheEntryListenerException { + int size = 0; + + for (CacheEntryEvent<? extends Integer, ? extends Integer> e : events) + ++size; + + cnt.addAndGet(size); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9df46fc0/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/CacheEntryEventProbe.java ---------------------------------------------------------------------- diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/CacheEntryEventProbe.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/CacheEntryEventProbe.java index e42479a..a25f975 100644 --- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/CacheEntryEventProbe.java +++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/CacheEntryEventProbe.java @@ -45,9 +45,6 @@ public class CacheEntryEventProbe implements BenchmarkProbe { /** */ private BenchmarkConfiguration cfg; - /** Counter. */ - private AtomicLong cnt = new AtomicLong(0); - /** Collected points. */ private Collection<BenchmarkProbePoint> collected = new ArrayList<>(); @@ -67,17 +64,9 @@ public class CacheEntryEventProbe implements BenchmarkProbe { if (drv0.cache() != null) { ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>(); - qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() { - @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> - events) throws CacheEntryListenerException { - int size = 0; - - for (CacheEntryEvent<? extends Integer, ? extends Integer> e : events) - ++size; + final AtomicLong cnt = new AtomicLong(); - cnt.addAndGet(size); - } - }); + qry.setLocalListener(localListener(cnt)); qryCur = drv0.cache().query(qry); @@ -113,6 +102,24 @@ public class CacheEntryEventProbe implements BenchmarkProbe { + " probe. Probably, the driver doesn't provide \"cache()\" method."); } + /** + * @param cntr Received event counter. + * @return Local listener. + */ + protected CacheEntryUpdatedListener<Integer, Integer> localListener(final AtomicLong cntr) { + return new CacheEntryUpdatedListener<Integer, Integer>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> events) + throws CacheEntryListenerException { + int size = 0; + + for (CacheEntryEvent<? extends Integer, ? extends Integer> e : events) + ++size; + + cntr.addAndGet(size); + } + }; + } + /** {@inheritDoc} */ @Override public void stop() throws Exception { if (qryCur != null) {
