http://git-wip-us.apache.org/repos/asf/ignite/blob/9df46fc0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index 767697a..9ae2972 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -48,6 +48,7 @@ import org.apache.ignite.events.CacheQueryExecutedEvent; import org.apache.ignite.events.CacheQueryReadEvent; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteDeploymentCheckedException; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.managers.deployment.GridDeployment; @@ -58,7 +59,10 @@ import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheDeploymentManager; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateFuture; import org.apache.ignite.internal.processors.cache.query.CacheQueryType; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.JCacheQueryLocalListener; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.JCacheQueryRemoteFilter; import org.apache.ignite.internal.processors.continuous.GridContinuousBatch; import org.apache.ignite.internal.processors.continuous.GridContinuousBatchAdapter; import org.apache.ignite.internal.processors.continuous.GridContinuousHandler; @@ -66,13 +70,14 @@ import org.apache.ignite.internal.processors.platform.cache.query.PlatformContin import org.apache.ignite.internal.util.GridConcurrentSkipListSet; import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.tostring.GridToStringInclude; -import org.apache.ignite.internal.util.typedef.C1; +import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteAsyncCallback; import org.apache.ignite.lang.IgniteBiTuple; -import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentLinkedDeque8; @@ -159,6 +164,21 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler /** */ private transient boolean ignoreClsNotFound; + /** */ + private transient boolean asyncCallback; + + /** */ + private transient UUID nodeId; + + /** */ + private transient UUID routineId; + + /** */ + private transient GridKernalContext ctx; + + /** */ + private transient IgniteLogger log; + /** * Required by {@link Externalizable}. */ @@ -283,13 +303,36 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler assert routineId != null; assert ctx != null; - if (locLsnr != null) - ctx.resource().injectGeneric(locLsnr); + if (locLsnr != null) { + if (locLsnr instanceof JCacheQueryLocalListener) { + ctx.resource().injectGeneric(((JCacheQueryLocalListener)locLsnr).impl); + + asyncCallback = ((JCacheQueryLocalListener)locLsnr).async(); + } + else { + ctx.resource().injectGeneric(locLsnr); + + asyncCallback = U.hasAnnotation(locLsnr, IgniteAsyncCallback.class); + } + } final CacheEntryEventFilter filter = getEventFilter(); - if (filter != null) - ctx.resource().injectGeneric(filter); + if (filter != null) { + if (filter instanceof JCacheQueryRemoteFilter) { + if (((JCacheQueryRemoteFilter)filter).impl != null) + ctx.resource().injectGeneric(((JCacheQueryRemoteFilter)filter).impl); + + if (!asyncCallback) + asyncCallback = ((JCacheQueryRemoteFilter)filter).async(); + } + else { + ctx.resource().injectGeneric(filter); + + if (!asyncCallback) + asyncCallback = U.hasAnnotation(filter, IgniteAsyncCallback.class); + } + } entryBufs = new ConcurrentHashMap<>(); @@ -299,10 +342,18 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler rcvs = new ConcurrentHashMap<>(); + this.nodeId = nodeId; + + this.routineId = routineId; + + this.ctx = ctx; + final boolean loc = nodeId.equals(ctx.localNodeId()); assert !skipPrimaryCheck || loc; + log = ctx.log(CacheContinuousQueryHandler.class); + CacheContinuousQueryListener<K, V> lsnr = new CacheContinuousQueryListener<K, V>() { @Override public void onExecution() { if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) { @@ -324,15 +375,16 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler } } - /** {@inheritDoc} */ @Override public boolean keepBinary() { return keepBinary; } - @Override public void onEntryUpdated(CacheContinuousQueryEvent<K, V> evt, boolean primary, - boolean recordIgniteEvt) { + @Override public void onEntryUpdated(final CacheContinuousQueryEvent<K, V> evt, + boolean primary, + final boolean recordIgniteEvt, + GridDhtAtomicUpdateFuture fut) { if (ignoreExpired && evt.getEventType() == EventType.EXPIRED) - return; + return ; final GridCacheContext<K, V> cctx = cacheContext(ctx); @@ -343,110 +395,33 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler // skipPrimaryCheck is set only when listen locally for replicated cache events. assert !skipPrimaryCheck || (cctx.isReplicated() && ctx.localNodeId().equals(nodeId)); - boolean notify = !evt.entry().isFiltered(); + if (asyncCallback) { + ContinuousQueryAsyncClosure clsr = new ContinuousQueryAsyncClosure( + primary, + evt, + recordIgniteEvt, + fut); - if (notify && filter != null) { - try { - notify = filter.evaluate(evt); - } - catch (Exception e) { - U.error(cctx.logger(CacheContinuousQueryHandler.class), "CacheEntryEventFilter failed: " + e); - } + ctx.asyncCallbackPool().execute(clsr, evt.partitionId()); } - - try { - final CacheContinuousQueryEntry entry = evt.entry(); - - if (!notify) - entry.markFiltered(); + else { + final boolean notify = filter(evt, primary); if (primary || skipPrimaryCheck) { - if (loc) { - if (!locCache) { - Collection<CacheContinuousQueryEntry> entries = handleEvent(ctx, entry); - - if (!entries.isEmpty()) { - final IgniteCache cache = cctx.kernalContext().cache().jcache(cctx.name()); - - Iterable<CacheEntryEvent<? extends K, ? extends V>> evts = F.viewReadOnly(entries, - new C1<CacheContinuousQueryEntry, CacheEntryEvent<? extends K, ? extends V>>() { - @Override public CacheEntryEvent<? extends K, ? extends V> apply( - CacheContinuousQueryEntry e) { - return new CacheContinuousQueryEvent<>(cache, cctx, e); - } - }, - new IgnitePredicate<CacheContinuousQueryEntry>() { - @Override public boolean apply(CacheContinuousQueryEntry entry) { - return !entry.isFiltered(); - } - } - ); - - if (!F.isEmpty(evts)) - locLsnr.onUpdated(evts); - - if (!internal && !skipPrimaryCheck) - sendBackupAcknowledge(ackBuf.onAcknowledged(entry), routineId, ctx); - } - } - else { - if (!entry.isFiltered()) - locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt)); - } - } + if (fut == null) + onEntryUpdate(evt, notify, loc, recordIgniteEvt); else { - if (!entry.isFiltered()) - prepareEntry(cctx, nodeId, entry); + fut.addContinuousQueryClosure(new CI1<Boolean>() { + @Override public void apply(Boolean suc) { + if (!suc) + evt.entry().markFiltered(); - CacheContinuousQueryEntry e = handleEntry(entry); - - if (e != null) - ctx.continuous().addNotification(nodeId, routineId, entry, topic, sync, true); - } - } - else { - if (!internal) { - // Skip init query and expire entries. - if (entry.updateCounter() != -1L) { - entry.markBackup(); - - backupQueue.add(entry); - } + onEntryUpdate(evt, notify, loc, recordIgniteEvt); + } + }); } } } - catch (ClusterTopologyCheckedException ex) { - IgniteLogger log = ctx.log(getClass()); - - if (log.isDebugEnabled()) - log.debug("Failed to send event notification to node, node left cluster " + - "[node=" + nodeId + ", err=" + ex + ']'); - } - catch (IgniteCheckedException ex) { - U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, ex); - } - - if (recordIgniteEvt && notify) { - ctx.event().record(new CacheQueryReadEvent<>( - ctx.discovery().localNode(), - "Continuous query executed.", - EVT_CACHE_QUERY_OBJECT_READ, - CacheQueryType.CONTINUOUS.name(), - cacheName, - null, - null, - null, - filter instanceof CacheEntryEventSerializableFilter ? - (CacheEntryEventSerializableFilter)filter : null, - null, - nodeId, - taskName(), - evt.getKey(), - evt.getValue(), - evt.getOldValue(), - null - )); - } } @Override public void onUnregister() { @@ -492,15 +467,15 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler sendBackupAcknowledge(ackBuf.acknowledgeOnTimeout(), routineId, ctx); } - @Override public void skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt, AffinityTopologyVersion topVer, - boolean primary) { + @Override public void skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt, + AffinityTopologyVersion topVer, boolean primary) { assert evt != null; CacheContinuousQueryEntry e = evt.entry(); e.markFiltered(); - onEntryUpdated(evt, primary, false); + onEntryUpdated(evt, primary, false, null); } @Override public void onPartitionEvicted(int part) { @@ -597,17 +572,73 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public void notifyCallback(UUID nodeId, UUID routineId, Collection<?> objs, GridKernalContext ctx) { + @Override public void notifyCallback(final UUID nodeId, + final UUID routineId, + Collection<?> objs, + final GridKernalContext ctx) { assert nodeId != null; assert routineId != null; assert objs != null; assert ctx != null; - Collection<CacheContinuousQueryEntry> entries = (Collection<CacheContinuousQueryEntry>)objs; + final List<CacheContinuousQueryEntry> entries = (List<CacheContinuousQueryEntry>)objs; + + if (entries.isEmpty()) + return; + + if (asyncCallback) { + IgniteStripedThreadPoolExecutor asyncPool = ctx.asyncCallbackPool(); + + int threadId = asyncPool.threadId(entries.get(0).partition()); + + int startIdx = 0; + + if (entries.size() != 1) { + for (int i = 1; i < entries.size(); i++) { + int curThreadId = asyncPool.threadId(entries.get(i).partition()); + // If all entries from one partition avoid creation new collections. + if (curThreadId == threadId) + continue; + + final int i0 = i; + final int startIdx0 = startIdx; + + asyncPool.execute(new Runnable() { + @Override public void run() { + notifyCallback0(nodeId, ctx, entries.subList(startIdx0, i0)); + } + }, threadId); + + startIdx = i0; + threadId = curThreadId; + } + } + + final int startIdx0 = startIdx; + + asyncPool.execute(new Runnable() { + @Override public void run() { + notifyCallback0(nodeId, ctx, + startIdx0 == 0 ? entries : entries.subList(startIdx0, entries.size())); + } + }, threadId); + } + else + notifyCallback0(nodeId, ctx, entries); + } + + /** + * @param nodeId Node id. + * @param ctx Kernal context. + * @param entries Entries. + */ + private void notifyCallback0(UUID nodeId, + final GridKernalContext ctx, + Collection<CacheContinuousQueryEntry> entries) { final GridCacheContext cctx = cacheContext(ctx); - Collection<CacheContinuousQueryEntry> entries0 = new ArrayList<>(); + final Collection<CacheEntryEvent<? extends K, ? extends V>> entries0 = new ArrayList<>(entries.size()); for (CacheContinuousQueryEntry e : entries) { GridCacheDeploymentManager depMgr = cctx.deploy(); @@ -626,7 +657,10 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler try { e.unmarshal(cctx, ldr); - entries0.addAll(handleEvent(ctx, e)); + Collection<CacheEntryEvent<? extends K, ? extends V>> evts = handleEvent(ctx, e); + + if (evts != null && !evts.isEmpty()) + entries0.addAll(evts); } catch (IgniteCheckedException ex) { if (ignoreClsNotFound) @@ -636,24 +670,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler } } - final IgniteCache cache = cctx.kernalContext().cache().jcache(cctx.name()); - - if (!entries0.isEmpty()) { - Iterable<CacheEntryEvent<? extends K, ? extends V>> evts = F.viewReadOnly(entries0, - new C1<CacheContinuousQueryEntry, CacheEntryEvent<? extends K, ? extends V>>() { - @Override public CacheEntryEvent<? extends K, ? extends V> apply(CacheContinuousQueryEntry e) { - return new CacheContinuousQueryEvent<>(cache, cctx, e); - } - }, - new IgnitePredicate<CacheContinuousQueryEntry>() { - @Override public boolean apply(CacheContinuousQueryEntry entry) { - return !entry.isFiltered(); - } - } - ); - - locLsnr.onUpdated(evts); - } + if (!entries0.isEmpty()) + locLsnr.onUpdated(entries0); } /** @@ -661,24 +679,142 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler * @param e entry. * @return Entry collection. */ - private Collection<CacheContinuousQueryEntry> handleEvent(GridKernalContext ctx, + private Collection<CacheEntryEvent<? extends K, ? extends V>> handleEvent(GridKernalContext ctx, CacheContinuousQueryEntry e) { assert e != null; + GridCacheContext<K, V> cctx = cacheContext(ctx); + + final IgniteCache cache = cctx.kernalContext().cache().jcache(cctx.name()); + if (internal) { if (e.isFiltered()) return Collections.emptyList(); else - return F.asList(e); + return F.<CacheEntryEvent<? extends K, ? extends V>> + asList(new CacheContinuousQueryEvent<K, V>(cache, cctx, e)); } // Initial query entry or evicted entry. These events should be fired immediately. - if (e.updateCounter() == -1L) - return F.asList(e); + if (e.updateCounter() == -1L) { + return !e.isFiltered() ? F.<CacheEntryEvent<? extends K, ? extends V>>asList( + new CacheContinuousQueryEvent<K, V>(cache, cctx, e)) : + Collections.<CacheEntryEvent<? extends K, ? extends V>>emptyList(); + } PartitionRecovery rec = getOrCreatePartitionRecovery(ctx, e.partition()); - return rec.collectEntries(e); + return rec.collectEntries(e, cctx, cache); + } + + /** + * @param primary Primary. + * @param evt Query event. + * @return {@code True} if event passed filter otherwise {@code true}. + */ + public boolean filter(CacheContinuousQueryEvent evt, boolean primary) { + CacheContinuousQueryEntry entry = evt.entry(); + + boolean notify = !entry.isFiltered(); + + try { + if (notify && getEventFilter() != null) + notify = getEventFilter().evaluate(evt); + } + catch (Exception e) { + U.error(log, "CacheEntryEventFilter failed: " + e); + } + + if (!notify) + entry.markFiltered(); + + if (!primary && !internal && entry.updateCounter() != -1L /* Skip init query and expire entries */) { + entry.markBackup(); + + backupQueue.add(entry); + } + + return notify; + } + + /** + * @param evt Continuous query event. + * @param notify Notify flag. + * @param loc Listener deployed on this node. + * @param recordIgniteEvt Record ignite event. + */ + private void onEntryUpdate(CacheContinuousQueryEvent evt, boolean notify, boolean loc, boolean recordIgniteEvt) { + try { + GridCacheContext<K, V> cctx = cacheContext(ctx); + + if (cctx == null) + return; + + final CacheContinuousQueryEntry entry = evt.entry(); + + if (loc) { + if (!locCache) { + Collection<CacheEntryEvent<? extends K, ? extends V>> evts = handleEvent(ctx, entry); + + if (!evts.isEmpty()) { + locLsnr.onUpdated(evts); + + if (!internal && !skipPrimaryCheck) + sendBackupAcknowledge(ackBuf.onAcknowledged(entry), routineId, ctx); + } + } + else { + if (!entry.isFiltered()) + locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt)); + } + } + else { + if (!entry.isFiltered()) + prepareEntry(cctx, nodeId, entry); + + CacheContinuousQueryEntry e = handleEntry(entry); + + if (e != null) + ctx.continuous().addNotification(nodeId, routineId, entry, topic, sync, true); + } + } + catch (ClusterTopologyCheckedException ex) { + if (log.isDebugEnabled()) + log.debug("Failed to send event notification to node, node left cluster " + + "[node=" + nodeId + ", err=" + ex + ']'); + } + catch (IgniteCheckedException ex) { + U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, ex); + } + + if (recordIgniteEvt && notify) { + ctx.event().record(new CacheQueryReadEvent<>( + ctx.discovery().localNode(), + "Continuous query executed.", + EVT_CACHE_QUERY_OBJECT_READ, + CacheQueryType.CONTINUOUS.name(), + cacheName, + null, + null, + null, + getEventFilter() instanceof CacheEntryEventSerializableFilter ? + (CacheEntryEventSerializableFilter)getEventFilter() : null, + null, + nodeId, + taskName(), + evt.getKey(), + evt.getValue(), + evt.getOldValue(), + null + )); + } + } + + /** + * @return Task name. + */ + private String taskName() { + return ctx.security().enabled() ? ctx.task().resolveTaskName(taskHash) : null; } /** @@ -710,9 +846,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler } } } - else if (initUpdCntrs != null) { + else if (initUpdCntrs != null) partCntr = initUpdCntrs.get(partId); - } } rec = new PartitionRecovery(ctx.log(getClass()), initTopVer0, partCntr); @@ -802,19 +937,26 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler /** * Add continuous entry. * + * @param cctx Cache context. + * @param cache Cache. * @param entry Cache continuous query entry. - * @return Collection entries which will be fired. + * @return Collection entries which will be fired. This collection should contains only non-filtered events. */ - public Collection<CacheContinuousQueryEntry> collectEntries(CacheContinuousQueryEntry entry) { + <K, V> Collection<CacheEntryEvent<? extends K, ? extends V>> collectEntries( + CacheContinuousQueryEntry entry, + GridCacheContext cctx, + IgniteCache cache + ) { assert entry != null; if (entry.topologyVersion() == null) { // Possible if entry is sent from old node. assert entry.updateCounter() == 0L : entry; - return F.asList(entry); + return F.<CacheEntryEvent<? extends K, ? extends V>> + asList(new CacheContinuousQueryEvent<K, V>(cache, cctx, entry)); } - List<CacheContinuousQueryEntry> entries; + List<CacheEntryEvent<? extends K, ? extends V>> entries; synchronized (pendingEvts) { // Received first event. @@ -823,7 +965,10 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler curTop = entry.topologyVersion(); - return F.asList(entry); + return !entry.isFiltered() ? + F.<CacheEntryEvent<? extends K, ? extends V>> + asList(new CacheContinuousQueryEvent<K, V>(cache, cctx, entry)) : + Collections.<CacheEntryEvent<? extends K, ? extends V>>emptyList(); } if (curTop.compareTo(entry.topologyVersion()) < 0) { @@ -832,7 +977,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler for (CacheContinuousQueryEntry evt : pendingEvts.values()) { if (evt != HOLE && !evt.isFiltered()) - entries.add(evt); + entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, evt)); } pendingEvts.clear(); @@ -841,7 +986,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler lastFiredEvt = entry.updateCounter(); - entries.add(entry); + if (!entry.isFiltered()) + entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, entry)); return entries; } @@ -880,7 +1026,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler Map.Entry<Long, CacheContinuousQueryEntry> e = iter.next(); if (e.getValue() != HOLE && !e.getValue().isFiltered()) - entries.add(e.getValue()); + entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, e.getValue())); lastFiredEvt = e.getKey(); @@ -896,7 +1042,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler ++lastFiredEvt; if (e.getValue() != HOLE && !e.getValue().isFiltered()) - entries.add(e.getValue()); + entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, e.getValue())); iter.remove(); } @@ -1258,6 +1404,87 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler } /** + * + */ + private class ContinuousQueryAsyncClosure implements Runnable { + /** */ + private final CacheContinuousQueryEvent<K, V> evt; + + /** */ + private final boolean primary; + + /** */ + private final boolean recordIgniteEvt; + + /** */ + private final IgniteInternalFuture<?> fut; + + /** + * @param primary Primary flag. + * @param evt Event. + * @param recordIgniteEvt Fired event. + * @param fut Dht future. + */ + ContinuousQueryAsyncClosure( + boolean primary, + CacheContinuousQueryEvent<K, V> evt, + boolean recordIgniteEvt, + IgniteInternalFuture<?> fut) { + this.primary = primary; + this.evt = evt; + this.recordIgniteEvt = recordIgniteEvt; + this.fut = fut; + } + + /** {@inheritDoc} */ + @Override public void run() { + final boolean notify = filter(evt, primary); + + if (!primary()) + return; + + if (fut == null) { + onEntryUpdate(evt, notify, nodeId.equals(ctx.localNodeId()), recordIgniteEvt); + + return; + } + + if (fut.isDone()) { + if (fut.error() != null) + evt.entry().markFiltered(); + + onEntryUpdate(evt, notify, nodeId.equals(ctx.localNodeId()), recordIgniteEvt); + } + else { + fut.listen(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> f) { + if (f.error() != null) + evt.entry().markFiltered(); + + ctx.asyncCallbackPool().execute(new Runnable() { + @Override public void run() { + onEntryUpdate(evt, notify, nodeId.equals(ctx.localNodeId()), recordIgniteEvt); + } + }, evt.entry().partition()); + } + }); + } + } + + /** + * @return {@code True} if event fired on this node. + */ + private boolean primary() { + return primary || skipPrimaryCheck; + } + + /** {@inheritDoc} */ + public String toString() { + return S.toString(ContinuousQueryAsyncClosure.class, this); + } + } + + /** * Deployable object. */ protected static class DeployableObject implements Externalizable {
http://git-wip-us.apache.org/repos/asf/ignite/blob/9df46fc0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java index 83ff32c..8eca81c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java @@ -20,6 +20,8 @@ package org.apache.ignite.internal.processors.cache.query.continuous; import java.util.Map; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateFuture; +import org.jetbrains.annotations.Nullable; /** * Continuous query listener. @@ -36,8 +38,10 @@ public interface CacheContinuousQueryListener<K, V> { * @param evt Event * @param primary Primary flag. * @param recordIgniteEvt Whether to record event. + * @param fut Dht atomic future. */ - public void onEntryUpdated(CacheContinuousQueryEvent<K, V> evt, boolean primary, boolean recordIgniteEvt); + public void onEntryUpdated(CacheContinuousQueryEvent<K, V> evt, boolean primary, + boolean recordIgniteEvt, @Nullable GridDhtAtomicUpdateFuture fut); /** * Listener unregistered callback. http://git-wip-us.apache.org/repos/asf/ignite/blob/9df46fc0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index f9c33c1..fafb830 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -54,11 +54,13 @@ import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter; import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateFuture; import org.apache.ignite.internal.processors.continuous.GridContinuousHandler; import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor; import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteAsyncCallback; import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.plugin.security.SecurityPermission; @@ -166,23 +168,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { * @param key Entry key. * @param partId Partition id. * @param updCntr Updated counter. - * @param topVer Topology version. - */ - public void skipUpdateEvent(Map<UUID, CacheContinuousQueryListener> lsnrs, - KeyCacheObject key, - int partId, - long updCntr, - AffinityTopologyVersion topVer) { - skipUpdateEvent(lsnrs, key, partId, updCntr, true, topVer); - } - - /** - * @param lsnrs Listeners to notify. - * @param key Entry key. - * @param partId Partition id. - * @param updCntr Updated counter. - * @param topVer Topology version. * @param primary Primary. + * @param topVer Topology version. */ public void skipUpdateEvent(Map<UUID, CacheContinuousQueryListener> lsnrs, KeyCacheObject key, @@ -241,6 +228,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { * @param primary {@code True} if called on primary node. * @param preload Whether update happened during preloading. * @param updateCntr Update counter. + * @param fut Dht atomic future. * @param topVer Topology version. * @throws IgniteCheckedException In case of error. */ @@ -253,7 +241,9 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { boolean primary, boolean preload, long updateCntr, - AffinityTopologyVersion topVer) throws IgniteCheckedException { + @Nullable GridDhtAtomicUpdateFuture fut, + AffinityTopologyVersion topVer + ) throws IgniteCheckedException { Map<UUID, CacheContinuousQueryListener> lsnrCol = updateListeners(internal, preload); if (lsnrCol != null) { @@ -267,6 +257,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { primary, preload, updateCntr, + fut, topVer); } } @@ -282,6 +273,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { * @param preload Whether update happened during preloading. * @param updateCntr Update counter. * @param topVer Topology version. + * @param fut Dht atomic future. * @throws IgniteCheckedException In case of error. */ public void onEntryUpdated( @@ -294,6 +286,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { boolean primary, boolean preload, long updateCntr, + @Nullable GridDhtAtomicUpdateFuture fut, AffinityTopologyVersion topVer) throws IgniteCheckedException { @@ -347,7 +340,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>( cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0); - lsnr.onEntryUpdated(evt, primary, recordIgniteEvt); + lsnr.onEntryUpdated(evt, primary, recordIgniteEvt, fut); } } @@ -401,7 +394,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent( cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0); - lsnr.onEntryUpdated(evt, primary, recordIgniteEvt); + lsnr.onEntryUpdated(evt, primary, recordIgniteEvt, null); } } } @@ -511,7 +504,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { return executeQuery0( locLsnr, new IgniteClosure<Boolean, CacheContinuousQueryHandler>() { - @Override public CacheContinuousQueryHandler apply(Boolean aBoolean) { + @Override public CacheContinuousQueryHandler apply(Boolean v2) { return new CacheContinuousQueryHandler( cctx.name(), TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()), @@ -801,6 +794,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { } /** + * */ private class JCacheQuery { /** */ @@ -932,9 +926,9 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { /** * */ - private static class JCacheQueryLocalListener<K, V> implements CacheEntryUpdatedListener<K, V> { + static class JCacheQueryLocalListener<K, V> implements CacheEntryUpdatedListener<K, V> { /** */ - private final CacheEntryListener<K, V> impl; + final CacheEntryListener<K, V> impl; /** */ private final IgniteLogger log; @@ -958,28 +952,28 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { try { switch (evt.getEventType()) { case CREATED: - assert impl instanceof CacheEntryCreatedListener; + assert impl instanceof CacheEntryCreatedListener : evt; ((CacheEntryCreatedListener<K, V>)impl).onCreated(singleton(evt)); break; case UPDATED: - assert impl instanceof CacheEntryUpdatedListener; + assert impl instanceof CacheEntryUpdatedListener : evt; ((CacheEntryUpdatedListener<K, V>)impl).onUpdated(singleton(evt)); break; case REMOVED: - assert impl instanceof CacheEntryRemovedListener; + assert impl instanceof CacheEntryRemovedListener : evt; ((CacheEntryRemovedListener<K, V>)impl).onRemoved(singleton(evt)); break; case EXPIRED: - assert impl instanceof CacheEntryExpiredListener; + assert impl instanceof CacheEntryExpiredListener : evt; ((CacheEntryExpiredListener<K, V>)impl).onExpired(singleton(evt)); @@ -1010,6 +1004,13 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { return evts; } + + /** + * @return {@code True} if listener should be executed in non-system thread. + */ + protected boolean async() { + return U.hasAnnotation(impl, IgniteAsyncCallback.class); + } } /** @@ -1020,7 +1021,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { private static final long serialVersionUID = 0L; /** */ - private CacheEntryEventFilter impl; + protected CacheEntryEventFilter impl; /** */ private byte types; @@ -1073,6 +1074,13 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { } /** + * @return {@code True} if filter should be executed in non-system thread. + */ + protected boolean async() { + return U.hasAnnotation(impl, IgniteAsyncCallback.class); + } + + /** * @param evtType Type. * @return Flag value. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/9df46fc0/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncCallback.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncCallback.java b/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncCallback.java new file mode 100644 index 0000000..1e04ce6 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncCallback.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.lang; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; +import javax.cache.event.CacheEntryEventFilter; +import javax.cache.event.CacheEntryListener; +import javax.cache.event.CacheEntryUpdatedListener; +import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.configuration.IgniteConfiguration; + +/** + * If callback has this annotation then it will be executing in another thread. + * <p> + * Currently this annotation is supported for: + * <ol> + * <li>{@link ContinuousQuery} - {@link CacheEntryUpdatedListener} and {@link CacheEntryEventFilter}.</li> + * </ol> + * <p> + * For example, if {@link CacheEntryEventFilter filter} or {@link CacheEntryListener} + * has the annotation then callbacks will be executing to asyncCallback thread pool. It allows to use cache API + * in a callbacks. This thread pool can be configured by {@link IgniteConfiguration#setAsyncCallbackPoolSize(int)}. + * <h1 class="header">Example</h1> + * As an example, suppose we have cache with {@code 'Person'} objects and we need + * to query all persons with salary above then 1000. Also remote filter will update some entries. + * <p> + * Here is the {@code Person} class: + * <pre name="code" class="java"> + * public class Person { + * // Name. + * private String name; + * + * // Salary. + * private double salary; + * + * ... + * } + * </pre> + * <p> + * Here is the {@code ExampleCacheEntryFilter} class: + * <pre name="code" class="java"> + * @IgniteAsyncCallback + * public class ExampleCacheEntryFilter implements CacheEntryEventFilter<Integer, Person> { + * @IgniteInstanceResource + * private Ignite ignite; + * + * // Continuous listener will be notified for persons with salary above 1000. + * // Filter increases salary for some person on 100. Without @IgniteAsyncCallback annotation + * // this operation is not safe. + * public boolean evaluate(CacheEntryEvent<? extends K, ? extends V> evt) throws CacheEntryListenerException { + * Person p = evt.getValue(); + * + * if (p.getSalary() > 1000) + * return true; + * + * ignite.cache("Person").put(evt.getKey(), new Person(p.getName(), p.getSalary() + 100)); + * + * return false; + * } + * } + * </pre> + * <p> + * Query with asynchronous callback execute as usually: + * <pre name="code" class="java"> + * // Create new continuous query. + * ContinuousQuery<Long, Person> qry = new ContinuousQuery<>(); + * + * // Callback that is called locally when update notifications are received. + * // It simply prints out information about all created persons. + * qry.setLocalListener((evts) -> { + * for (CacheEntryEvent<? extends Long, ? extends Person> e : evts) { + * Person p = e.getValue(); + * + * System.out.println(p.getFirstName() + " " + p.getLastName() + "'s salary is " + p.getSalary()); + * } + * }); + * + * // Sets remote filter. + * qry.setRemoteFilterFactory(() -> new ExampleCacheEntryFilter()); + * + * // Execute query. + * QueryCursor<Cache.Entry<Long, Person>> cur = cache.query(qry); + * </pre> + * + * @see IgniteConfiguration#getAsyncCallbackPoolSize + * @see ContinuousQuery#getRemoteFilterFactory() + * @see ContinuousQuery#getLocalListener() + */ +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.TYPE) +public @interface IgniteAsyncCallback { + // No-op. +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9df46fc0/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java b/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java index 35882b9..9f7c381 100644 --- a/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java +++ b/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java @@ -17,62 +17,63 @@ package org.apache.ignite.thread; +import java.util.ArrayList; import java.util.Collection; -import java.util.LinkedList; +import java.util.Collections; import java.util.List; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.NotNull; /** * An {@link ExecutorService} that executes submitted tasks using pooled grid threads. */ public class IgniteStripedThreadPoolExecutor implements ExecutorService { /** */ - public static final int DFLT_SEG_POOL_SIZE = 8; - - /** */ - public static final int DFLT_CONCUR_LVL = 16; - - /** */ private final ExecutorService[] execs; - /** */ - private final int segShift; - - /** */ - private final int segMask; - /** + * Create striped thread pool. * + * @param concurrentLvl Concurrency level. + * @param gridName Node name. + * @param threadNamePrefix Thread name prefix. */ - public IgniteStripedThreadPoolExecutor() { - execs = new ExecutorService[DFLT_CONCUR_LVL]; - - ThreadFactory factory = new IgniteThreadFactory(null); - - for (int i = 0; i < DFLT_CONCUR_LVL; i++) - execs[i] = Executors.newFixedThreadPool(DFLT_SEG_POOL_SIZE, factory); + public IgniteStripedThreadPoolExecutor(int concurrentLvl, String gridName, String threadNamePrefix) { + execs = new ExecutorService[concurrentLvl]; - // Find power-of-two sizes best matching arguments - int sshift = 0; - int ssize = 1; + ThreadFactory factory = new IgniteThreadFactory(gridName, threadNamePrefix); - while (ssize < DFLT_CONCUR_LVL) { - ++sshift; - - ssize <<= 1; - } + for (int i = 0; i < concurrentLvl; i++) + execs[i] = Executors.newSingleThreadExecutor(factory); + } - segShift = 32 - sshift; - segMask = ssize - 1; + /** + * Executes the given command at some time in the future. The command with the same {@code index} + * will be executed in the same thread. + * + * @param task the runnable task + * @param idx Striped index. + * @throws RejectedExecutionException if this task cannot be + * accepted for execution. + * @throws NullPointerException If command is null + */ + public void execute(Runnable task, int idx) { + execs[threadId(idx)].execute(task); + } + /** + * @param idx Index. + * @return Stripped thread ID. + */ + public int threadId(int idx) { + return idx < execs.length ? idx : idx % execs.length; } /** {@inheritDoc} */ @@ -83,7 +84,10 @@ public class IgniteStripedThreadPoolExecutor implements ExecutorService { /** {@inheritDoc} */ @Override public List<Runnable> shutdownNow() { - List<Runnable> res = new LinkedList<>(); + if (execs.length == 0) + return Collections.emptyList(); + + List<Runnable> res = new ArrayList<>(execs.length); for (ExecutorService exec : execs) { for (Runnable r : exec.shutdownNow()) @@ -124,105 +128,45 @@ public class IgniteStripedThreadPoolExecutor implements ExecutorService { } /** {@inheritDoc} */ - @Override public <T> Future<T> submit(Callable<T> task) { - return execForTask(task).submit(task); + @NotNull @Override public <T> Future<T> submit(Callable<T> task) { + throw new UnsupportedOperationException(); } /** {@inheritDoc} */ - @Override public <T> Future<T> submit(Runnable task, T result) { - return execForTask(task).submit(task, result); + @NotNull @Override public <T> Future<T> submit(Runnable task, T res) { + throw new UnsupportedOperationException(); } /** {@inheritDoc} */ - @Override public Future<?> submit(Runnable task) { - return execForTask(task).submit(task); + @NotNull @Override public Future<?> submit(Runnable task) { + throw new UnsupportedOperationException(); } /** {@inheritDoc} */ - @Override public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) - throws InterruptedException { - List<Future<T>> futs = new LinkedList<>(); - - for (Callable<T> task : tasks) - futs.add(execForTask(task).submit(task)); - - boolean done = false; - - try { - for (Future<T> fut : futs) { - try { - fut.get(); - } - catch (ExecutionException | InterruptedException ignored) { - // No-op. - } - } - - done = true; - - return futs; - } - finally { - if (!done) { - for (Future<T> fut : futs) - fut.cancel(true); - } - } + @NotNull @Override public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) { + throw new UnsupportedOperationException(); } /** {@inheritDoc} */ - @Override public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, - TimeUnit unit) throws InterruptedException { - throw new RuntimeException("Not implemented."); + @NotNull @Override public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, + long timeout, + TimeUnit unit) { + throw new UnsupportedOperationException(); } /** {@inheritDoc} */ - @Override public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, - ExecutionException { - throw new RuntimeException("Not implemented."); + @NotNull @Override public <T> T invokeAny(Collection<? extends Callable<T>> tasks) { + throw new UnsupportedOperationException(); } /** {@inheritDoc} */ - @Override public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { - throw new RuntimeException("Not implemented."); + @Override public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) { + throw new UnsupportedOperationException(); } /** {@inheritDoc} */ @Override public void execute(Runnable cmd) { - execForTask(cmd).execute(cmd); - } - - /** - * Applies a supplemental hash function to a given hashCode, which - * defends against poor quality hash functions. This is critical - * because ConcurrentHashMap uses power-of-two length hash tables, - * that otherwise encounter collisions for hashCodes that do not - * differ in lower or upper bits. - * - * @param h Hash code. - * @return Enhanced hash code. - */ - private int hash(int h) { - // Spread bits to regularize both segment and index locations, - // using variant of single-word Wang/Jenkins hash. - h += (h << 15) ^ 0xffffcd7d; - h ^= (h >>> 10); - h += (h << 3); - h ^= (h >>> 6); - h += (h << 2) + (h << 14); - return h ^ (h >>> 16); - } - - /** - * @param cmd Command. - * @return Service. - */ - private <T> ExecutorService execForTask(T cmd) { - assert cmd != null; - - //return execs[ThreadLocalRandom8.current().nextInt(DFLT_CONCUR_LVL)]; - return execs[(hash(cmd.hashCode()) >>> segShift) & segMask]; + throw new UnsupportedOperationException(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/9df46fc0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java index 4455b46..09cf7c8 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java @@ -26,6 +26,7 @@ import javax.cache.processor.EntryProcessorResult; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.eviction.EvictableEntry; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateFuture; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -540,7 +541,8 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr UUID subjId, String taskName, @Nullable CacheObject prevVal, - @Nullable Long updateCntr) throws IgniteCheckedException, + @Nullable Long updateCntr, + @Nullable GridDhtAtomicUpdateFuture fut) throws IgniteCheckedException, GridCacheEntryRemovedException { assert false; http://git-wip-us.apache.org/repos/asf/ignite/blob/9df46fc0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverAtomicPrimaryWriteOrderSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverAtomicPrimaryWriteOrderSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverAtomicPrimaryWriteOrderSelfTest.java new file mode 100644 index 0000000..62fd984 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverAtomicPrimaryWriteOrderSelfTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import org.apache.ignite.cache.CacheAtomicWriteOrderMode; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; + +import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; + +/** + * + */ +public class CacheContinuousQueryAsyncFailoverAtomicPrimaryWriteOrderSelfTest + extends CacheContinuousQueryFailoverAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected CacheAtomicWriteOrderMode writeOrderMode() { + return PRIMARY; + } + + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return CacheMode.PARTITIONED; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return CacheAtomicityMode.ATOMIC; + } + + /** {@inheritDoc} */ + @Override protected boolean asyncCallback() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9df46fc0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverTxReplicatedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverTxReplicatedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverTxReplicatedSelfTest.java new file mode 100644 index 0000000..4460498 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverTxReplicatedSelfTest.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import org.apache.ignite.cache.CacheMode; + +import static org.apache.ignite.cache.CacheMode.REPLICATED; + +/** + * + */ +public class CacheContinuousQueryAsyncFailoverTxReplicatedSelfTest extends CacheContinuousQueryFailoverTxSelfTest { + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return REPLICATED; + } + + /** {@inheritDoc} */ + @Override protected boolean asyncCallback() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9df46fc0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverTxSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverTxSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverTxSelfTest.java new file mode 100644 index 0000000..8f0bd0e --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverTxSelfTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; + +/** + * + */ +public class CacheContinuousQueryAsyncFailoverTxSelfTest extends CacheContinuousQueryFailoverAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return PARTITIONED; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return TRANSACTIONAL; + } + + /** {@inheritDoc} */ + @Override protected boolean asyncCallback() { + return true; + } +}
