http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index c7bf091..b2e7490 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -23,6 +23,7 @@ import java.io.ObjectInput; import java.io.ObjectOutput; import java.io.Serializable; import java.util.ArrayList; +import java.util.Map; import java.util.Collection; import java.util.Iterator; import java.util.NoSuchElementException; @@ -46,12 +47,14 @@ import org.apache.ignite.cache.CacheEntryEventSerializableFilter; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.query.ContinuousQuery; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.continuous.GridContinuousHandler; +import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgnitePredicate; @@ -82,6 +85,9 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { /** */ private static final byte EXPIRED_FLAG = 0b1000; + /** */ + private static final long BACKUP_ACK_FREQ = 5000; + /** Listeners. */ private final ConcurrentMap<UUID, CacheContinuousQueryListener> lsnrs = new ConcurrentHashMap8<>(); @@ -108,6 +114,18 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { @Override protected void start0() throws IgniteCheckedException { // Append cache name to the topic. topicPrefix = "CONTINUOUS_QUERY" + (cctx.name() == null ? "" : "_" + cctx.name()); + + cctx.io().addHandler(cctx.cacheId(), CacheContinuousQueryBatchAck.class, + new CI2<UUID, CacheContinuousQueryBatchAck>() { + @Override public void apply(UUID uuid, CacheContinuousQueryBatchAck msg) { + CacheContinuousQueryListener lsnr = lsnrs.get(msg.routineId()); + + if (lsnr != null) + lsnr.cleanupBackupQueue(msg.updateCntrs()); + } + }); + + cctx.time().schedule(new BackupCleaner(lsnrs, cctx.kernalContext()), BACKUP_ACK_FREQ, BACKUP_ACK_FREQ); } /** {@inheritDoc} */ @@ -137,25 +155,55 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { } /** - * @param e Cache entry. + * @param partId Partition id. + * @param updCntr Updated counter. + * @param topVer Topology version. + */ + public void skipUpdateEvent(KeyCacheObject key, int partId, long updCntr, AffinityTopologyVersion topVer) { + if (lsnrCnt.get() > 0) { + CacheContinuousQueryEntry e0 = new CacheContinuousQueryEntry( + cctx.cacheId(), + UPDATED, + key, + null, + null, + partId, + updCntr, + topVer); + + CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>( + cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0); + + for (CacheContinuousQueryListener lsnr : lsnrs.values()) + lsnr.skipUpdateEvent(evt, topVer); + } + } + + /** * @param key Key. * @param newVal New value. * @param oldVal Old value. + * @param internal Internal entry (internal key or not user cache), + * @param primary {@code True} if called on primary node. * @param preload Whether update happened during preloading. + * @param updateCntr Update counter. + * @param topVer Topology version. * @throws IgniteCheckedException In case of error. */ - public void onEntryUpdated(GridCacheEntryEx e, + public void onEntryUpdated( KeyCacheObject key, CacheObject newVal, CacheObject oldVal, - boolean preload) + boolean internal, + int partId, + boolean primary, + boolean preload, + long updateCntr, + AffinityTopologyVersion topVer) throws IgniteCheckedException { - assert e != null; assert key != null; - boolean internal = e.isInternal() || !e.context().userCache(); - if (preload && !internal) return; @@ -179,8 +227,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { boolean initialized = false; - boolean primary = cctx.affinity().primary(cctx.localNode(), key, AffinityTopologyVersion.NONE); - boolean recordIgniteEvt = !internal && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ); + boolean recordIgniteEvt = primary && !internal && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ); for (CacheContinuousQueryListener lsnr : lsnrCol.values()) { if (preload && !lsnr.notifyExisting()) @@ -205,7 +252,10 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { evtType, key, newVal, - lsnr.oldValueRequired() ? oldVal : null); + lsnr.oldValueRequired() ? oldVal : null, + partId, + updateCntr, + topVer); CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>( cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0); @@ -250,12 +300,15 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { initialized = true; } - CacheContinuousQueryEntry e0 = new CacheContinuousQueryEntry( - cctx.cacheId(), - EXPIRED, - key, - null, - lsnr.oldValueRequired() ? oldVal : null); + CacheContinuousQueryEntry e0 = new CacheContinuousQueryEntry( + cctx.cacheId(), + EXPIRED, + key, + null, + lsnr.oldValueRequired() ? oldVal : null, + e.partition(), + -1, + null); CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent( cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0); @@ -373,6 +426,27 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { } /** + * @param topVer Topology version. + */ + public void beforeExchange(AffinityTopologyVersion topVer) { + for (CacheContinuousQueryListener lsnr : lsnrs.values()) + lsnr.flushBackupQueue(cctx.kernalContext(), topVer); + } + + /** + * Partition evicted callback. + * + * @param part Partition number. + */ + public void onPartitionEvicted(int part) { + for (CacheContinuousQueryListener lsnr : lsnrs.values()) + lsnr.onPartitionEvicted(part); + + for (CacheContinuousQueryListener lsnr : intLsnrs.values()) + lsnr.onPartitionEvicted(part); + } + + /** * @param locLsnr Local listener. * @param rmtFilter Remote filter. * @param bufSize Buffer size. @@ -417,7 +491,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { sync, ignoreExpired, taskNameHash, - skipPrimaryCheck); + skipPrimaryCheck, + cctx.isLocal()); IgnitePredicate<ClusterNode> pred = (loc || cctx.config().getCacheMode() == CacheMode.LOCAL) ? F.nodeForNodeId(cctx.localNodeId()) : F.<ClusterNode>alwaysTrue(); @@ -469,10 +544,19 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { GridCacheEntryEx e = it.next(); + CacheContinuousQueryEntry entry = new CacheContinuousQueryEntry( + cctx.cacheId(), + CREATED, + e.key(), + e.rawGet(), + null, + 0, + -1, + null); + next = new CacheContinuousQueryEvent<>( cctx.kernalContext().cache().jcache(cctx.name()), - cctx, - new CacheContinuousQueryEntry(cctx.cacheId(), CREATED, e.key(), e.rawGet(), null)); + cctx, entry); if (rmtFilter != null && !rmtFilter.evaluate(next)) next = null; @@ -590,10 +674,11 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { CacheEntryEventFilter fltr = null; if (cfg.getCacheEntryEventFilterFactory() != null) { - fltr = (CacheEntryEventFilter) cfg.getCacheEntryEventFilterFactory().create(); + fltr = (CacheEntryEventFilter)cfg.getCacheEntryEventFilterFactory().create(); if (!(fltr instanceof Serializable)) - throw new IgniteCheckedException("Cache entry event filter must implement java.io.Serializable: " + fltr); + throw new IgniteCheckedException("Cache entry event filter must implement java.io.Serializable: " + + fltr); } CacheEntryEventSerializableFilter rmtFilter = new JCacheQueryRemoteFilter(fltr, types); @@ -637,6 +722,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { /** * @param impl Listener. + * @param log Logger. */ JCacheQueryLocalListener(CacheEntryListener<K, V> impl, IgniteLogger log) { assert impl != null; @@ -789,4 +875,29 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { } } } + + /** + * Task flash backup queue. + */ + private static final class BackupCleaner implements Runnable { + /** Listeners. */ + private final Map<UUID, CacheContinuousQueryListener> lsnrs; + + /** Context. */ + private final GridKernalContext ctx; + + /** + * @param lsnrs Listeners. + */ + public BackupCleaner(Map<UUID, CacheContinuousQueryListener> lsnrs, GridKernalContext ctx) { + this.lsnrs = lsnrs; + this.ctx = ctx; + } + + /** {@inheritDoc} */ + @Override public void run() { + for (CacheContinuousQueryListener lsnr : lsnrs.values()) + lsnr.acknowledgeBackupOnTimeout(ctx); + } + } }
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java index 23f83be..ff413d3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java @@ -181,6 +181,10 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { */ private byte flags; + /** Partition update counter. */ + @GridDirectTransient + private long partUpdateCntr; + /** */ private GridCacheVersion serReadVer; @@ -373,6 +377,22 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { } /** + * Sets partition counter. + * + * @param partCntr Partition counter. + */ + public void updateCounter(long partCntr) { + this.partUpdateCntr = partCntr; + } + + /** + * @return Partition index. + */ + public long updateCounter() { + return partUpdateCntr; + } + + /** * @param val Value to set. */ void setAndMarkValid(CacheObject val) { http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index a9846ef..63a4cbe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -964,6 +964,9 @@ public class IgniteTxHandler { // Complete remote candidates. tx.doneRemote(req.baseVersion(), null, null, null); + tx.setPartitionUpdateCounters( + req.partUpdateCounters() != null ? req.partUpdateCounters().array() : null); + tx.commit(); } else { http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index ecb0595..cff62d9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -1012,7 +1012,11 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter cached.isNear() ? null : explicitVer, CU.subjectId(this, cctx), resolveTaskName(), - dhtVer); + dhtVer, + null); + + if (updRes.success()) + txEntry.updateCounter(updRes.updatePartitionCounter()); if (nearCached != null && updRes.success()) { nearCached.innerSet( @@ -1032,7 +1036,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter null, CU.subjectId(this, cctx), resolveTaskName(), - dhtVer); + dhtVer, + null); } } else if (op == DELETE) { @@ -1049,7 +1054,11 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter cached.isNear() ? null : explicitVer, CU.subjectId(this, cctx), resolveTaskName(), - dhtVer); + dhtVer, + null); + + if (updRes.success()) + txEntry.updateCounter(updRes.updatePartitionCounter()); if (nearCached != null && updRes.success()) { nearCached.innerRemove( @@ -1065,7 +1074,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter null, CU.subjectId(this, cctx), resolveTaskName(), - dhtVer); + dhtVer, + null); } } else if (op == RELOAD) { http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java index b80909f..8ceca3f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java @@ -34,4 +34,9 @@ public interface IgniteTxRemoteEx extends IgniteInternalTx { Collection<GridCacheVersion> committedVers, Collection<GridCacheVersion> rolledbackVers, Collection<GridCacheVersion> pendingVers); -} \ No newline at end of file + + /** + * @param cntrs Partition update indexes. + */ + public void setPartitionUpdateCounters(long[] cntrs); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatch.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatch.java new file mode 100644 index 0000000..67b8c82 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatch.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.continuous; + +import java.util.Collection; + +/** + * Continuous routine batch. + */ +public interface GridContinuousBatch { + /** + * Adds element to this batch. + * + * @param obj Element to add. + */ + public void add(Object obj); + + /** + * Collects elements that are currently in this batch. + * + * @return Elements in this batch. + */ + public Collection<Object> collect(); + + /** + * @return Current batch size. + */ + public int size(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java new file mode 100644 index 0000000..4540de1 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.continuous; + +import java.util.Collection; +import org.jsr166.ConcurrentLinkedDeque8; + +/** + * Continuous routine batch adapter. + */ +public class GridContinuousBatchAdapter implements GridContinuousBatch { + /** Buffer. */ + private final ConcurrentLinkedDeque8<Object> buf = new ConcurrentLinkedDeque8<>(); + + /** {@inheritDoc} */ + @Override public void add(Object obj) { + assert obj != null; + + buf.add(obj); + } + + /** {@inheritDoc} */ + @Override public Collection<Object> collect() { + return buf; + } + + /** {@inheritDoc} */ + @Override public int size() { + return buf.sizex(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java index 30e596a..d8698b3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.continuous; import java.io.Externalizable; import java.util.Collection; +import java.util.Map; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.GridKernalContext; @@ -98,6 +99,22 @@ public interface GridContinuousHandler extends Externalizable, Cloneable { public void p2pUnmarshal(UUID nodeId, GridKernalContext ctx) throws IgniteCheckedException; /** + * Creates new batch. + * + * @return New batch. + */ + public GridContinuousBatch createBatch(); + + /** + * Called when ack for a batch is received from client. + * + * @param routineId Routine ID. + * @param batch Acknowledged batch. + * @param ctx Kernal context. + */ + public void onBatchAcknowledged(UUID routineId, GridContinuousBatch batch, GridKernalContext ctx); + + /** * @return Topic for ordered notifications. If {@code null}, notifications * will be sent in non-ordered messages. */ @@ -129,4 +146,9 @@ public interface GridContinuousHandler extends Externalizable, Cloneable { * @return Cache name if this is a continuous query handler. */ public String cacheName(); + + /** + * @param cntrs Init state for partition counters. + */ + public void updateCounters(Map<Integer, Long> cntrs); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index d1cb3a9..c07cc13 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -35,6 +35,7 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; @@ -52,17 +53,21 @@ import org.apache.ignite.internal.managers.discovery.CustomEventListener; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.GridProcessorAdapter; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.worker.GridWorker; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.marshaller.Marshaller; @@ -70,7 +75,6 @@ import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.thread.IgniteThread; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; -import org.jsr166.ConcurrentLinkedDeque8; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; @@ -203,8 +207,38 @@ public class GridContinuousProcessor extends GridProcessorAdapter { StartFuture fut = startFuts.remove(msg.routineId()); if (fut != null) { - if (msg.errs().isEmpty()) + if (msg.errs().isEmpty()) { + LocalRoutineInfo routine = locInfos.get(msg.routineId()); + + if (routine != null) { + try { + Map<Integer, Long> cntrs = msg.updateCounters(); + + GridCacheAdapter<Object, Object> interCache = + ctx.cache().internalCache(routine.handler().cacheName()); + + if (interCache != null && cntrs != null && interCache.context() != null + && !interCache.isLocal() && !CU.clientNode(ctx.grid().localNode())) { + Map<Integer, Long> map = interCache.context().topology().updateCounters(); + + for (Map.Entry<Integer, Long> e : map.entrySet()) { + Long cntr0 = cntrs.get(e.getKey()); + Long cntr1 = e.getValue(); + + if (cntr0 == null || cntr1 > cntr0) + cntrs.put(e.getKey(), cntr1); + } + } + } + catch (Exception e) { + U.warn(log, "Failed to load update counters.", e); + } + + routine.handler().updateCounters(msg.updateCounters()); + } + fut.onRemoteRegistered(); + } else { IgniteCheckedException firstEx = F.first(msg.errs().values()); @@ -651,6 +685,30 @@ public class GridContinuousProcessor extends GridProcessorAdapter { /** * @param nodeId ID of the node that started routine. * @param routineId Routine ID. + * @param objs Notification objects. + * @param orderedTopic Topic for ordered notifications. If {@code null}, non-ordered message will be sent. + * @throws IgniteCheckedException In case of error. + */ + public void addBackupNotification(UUID nodeId, + final UUID routineId, + Collection<?> objs, + @Nullable Object orderedTopic) + throws IgniteCheckedException { + if (processorStopped) + return; + + final RemoteRoutineInfo info = rmtInfos.get(routineId); + + if (info != null) { + final GridContinuousBatch batch = info.addAll(objs); + + sendNotification(nodeId, routineId, null, batch.collect(), orderedTopic, true, null); + } + } + + /** + * @param nodeId ID of the node that started routine. + * @param routineId Routine ID. * @param obj Notification object. * @param orderedTopic Topic for ordered notifications. If {@code null}, non-ordered message will be sent. * @param sync If {@code true} then waits for event acknowledgment. @@ -658,7 +716,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { * @throws IgniteCheckedException In case of error. */ public void addNotification(UUID nodeId, - UUID routineId, + final UUID routineId, @Nullable Object obj, @Nullable Object orderedTopic, boolean sync, @@ -673,7 +731,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { if (processorStopped) return; - RemoteRoutineInfo info = rmtInfos.get(routineId); + final RemoteRoutineInfo info = rmtInfos.get(routineId); if (info != null) { assert info.interval == 0 || !sync; @@ -686,7 +744,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { syncMsgFuts.put(futId, fut); try { - sendNotification(nodeId, routineId, futId, F.asList(obj), orderedTopic, msg); + sendNotification(nodeId, routineId, futId, F.asList(obj), orderedTopic, msg, null); } catch (IgniteCheckedException e) { syncMsgFuts.remove(futId); @@ -697,10 +755,18 @@ public class GridContinuousProcessor extends GridProcessorAdapter { fut.get(); } else { - Collection<Object> toSnd = info.add(obj); + final GridContinuousBatch batch = info.add(obj); + + if (batch != null) { + CI1<IgniteException> ackC = new CI1<IgniteException>() { + @Override public void apply(IgniteException e) { + if (e == null) + info.hnd.onBatchAcknowledged(routineId, batch, ctx); + } + }; - if (toSnd != null) - sendNotification(nodeId, routineId, null, toSnd, orderedTopic, msg); + sendNotification(nodeId, routineId, null, batch.collect(), orderedTopic, msg, ackC); + } } } } @@ -725,6 +791,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { * @param orderedTopic Topic for ordered notifications. * If {@code null}, non-ordered message will be sent. * @param msg If {@code true} then sent data is collection of messages. + * @param ackC Ack closure. * @throws IgniteCheckedException In case of error. */ private void sendNotification(UUID nodeId, @@ -732,7 +799,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter { @Nullable IgniteUuid futId, Collection<Object> toSnd, @Nullable Object orderedTopic, - boolean msg) throws IgniteCheckedException { + boolean msg, + IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException { assert nodeId != null; assert routineId != null; assert toSnd != null; @@ -740,7 +808,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter { sendWithRetries(nodeId, new GridContinuousMessage(MSG_EVT_NOTIFICATION, routineId, futId, toSnd, msg), - orderedTopic); + orderedTopic, + ackC); } /** @@ -819,6 +888,18 @@ public class GridContinuousProcessor extends GridProcessorAdapter { } } + try { + if (ctx.cache() != null && ctx.cache().internalCache(hnd.cacheName()) != null) { + Map<Integer, Long> cntrs = ctx.cache().internalCache(hnd.cacheName()) + .context().topology().updateCounters(); + + req.addUpdateCounters(cntrs); + } + } + catch (Exception e) { + U.warn(log, "Failed to load partition counters."); + } + if (err != null) req.addError(ctx.localNodeId(), err); @@ -859,6 +940,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { try { sendWithRetries(nodeId, new GridContinuousMessage(MSG_EVT_ACK, null, msg.futureId(), null, false), + null, null); } catch (IgniteCheckedException e) { @@ -922,15 +1004,30 @@ public class GridContinuousProcessor extends GridProcessorAdapter { break; } - IgniteBiTuple<Collection<Object>, Long> t = info.checkInterval(); + IgniteBiTuple<GridContinuousBatch, Long> t = info.checkInterval(); - Collection<Object> toSnd = t.get1(); + final GridContinuousBatch batch = t.get1(); - if (toSnd != null && !toSnd.isEmpty()) { + if (batch != null && batch.size() > 0) { try { + Collection<Object> toSnd = batch.collect(); + boolean msg = toSnd.iterator().next() instanceof Message; - sendNotification(nodeId, routineId, null, toSnd, hnd.orderedTopic(), msg); + CI1<IgniteException> ackC = new CI1<IgniteException>() { + @Override public void apply(IgniteException e) { + if (e == null) + info.hnd.onBatchAcknowledged(routineId, batch, ctx); + } + }; + + sendNotification(nodeId, + routineId, + null, + toSnd, + hnd.orderedTopic(), + msg, + ackC); } catch (ClusterTopologyCheckedException ignored) { if (log.isDebugEnabled()) @@ -1013,9 +1110,11 @@ public class GridContinuousProcessor extends GridProcessorAdapter { * @param msg Message. * @param orderedTopic Topic for ordered notifications. * If {@code null}, non-ordered message will be sent. + * @param ackC Ack closure. * @throws IgniteCheckedException In case of error. */ - private void sendWithRetries(UUID nodeId, GridContinuousMessage msg, @Nullable Object orderedTopic) + private void sendWithRetries(UUID nodeId, GridContinuousMessage msg, @Nullable Object orderedTopic, + IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException { assert nodeId != null; assert msg != null; @@ -1023,7 +1122,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { ClusterNode node = ctx.discovery().node(nodeId); if (node != null) - sendWithRetries(node, msg, orderedTopic); + sendWithRetries(node, msg, orderedTopic, ackC); else throw new ClusterTopologyCheckedException("Node for provided ID doesn't exist (did it leave the grid?): " + nodeId); } @@ -1033,14 +1132,15 @@ public class GridContinuousProcessor extends GridProcessorAdapter { * @param msg Message. * @param orderedTopic Topic for ordered notifications. * If {@code null}, non-ordered message will be sent. + * @param ackC Ack closure. * @throws IgniteCheckedException In case of error. */ - private void sendWithRetries(ClusterNode node, GridContinuousMessage msg, @Nullable Object orderedTopic) - throws IgniteCheckedException { + private void sendWithRetries(ClusterNode node, GridContinuousMessage msg, @Nullable Object orderedTopic, + IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException { assert node != null; assert msg != null; - sendWithRetries(F.asList(node), msg, orderedTopic); + sendWithRetries(F.asList(node), msg, orderedTopic, ackC); } /** @@ -1048,10 +1148,11 @@ public class GridContinuousProcessor extends GridProcessorAdapter { * @param msg Message. * @param orderedTopic Topic for ordered notifications. * If {@code null}, non-ordered message will be sent. + * @param ackC Ack closure. * @throws IgniteCheckedException In case of error. */ private void sendWithRetries(Collection<? extends ClusterNode> nodes, GridContinuousMessage msg, - @Nullable Object orderedTopic) throws IgniteCheckedException { + @Nullable Object orderedTopic, IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException { assert !F.isEmpty(nodes); assert msg != null; @@ -1074,10 +1175,11 @@ public class GridContinuousProcessor extends GridProcessorAdapter { msg, SYSTEM_POOL, 0, - true); + true, + ackC); } else - ctx.io().send(node, TOPIC_CONTINUOUS, msg, SYSTEM_POOL); + ctx.io().send(node, TOPIC_CONTINUOUS, msg, SYSTEM_POOL, ackC); break; } @@ -1178,8 +1280,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter { /** Lock. */ private final ReadWriteLock lock = new ReentrantReadWriteLock(); - /** Buffer. */ - private ConcurrentLinkedDeque8<Object> buf; + /** Batch. */ + private GridContinuousBatch batch; /** Last send time. */ private long lastSndTime = U.currentTimeMillis(); @@ -1210,7 +1312,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { this.interval = interval; this.autoUnsubscribe = autoUnsubscribe; - buf = new ConcurrentLinkedDeque8<>(); + batch = hnd.createBatch(); } /** @@ -1238,21 +1340,53 @@ public class GridContinuousProcessor extends GridProcessorAdapter { } /** + * @param objs Objects to add. + * @return Batch to send. + */ + GridContinuousBatch addAll(Collection<?> objs) { + assert objs != null; + assert objs.size() > 0; + + GridContinuousBatch toSnd = null; + + lock.writeLock().lock(); + + try { + for (Object obj : objs) + batch.add(obj); + + toSnd = batch; + + batch = hnd.createBatch(); + + if (interval > 0) + lastSndTime = U.currentTimeMillis(); + } + finally { + lock.writeLock().unlock(); + } + + return toSnd; + } + + /** * @param obj Object to add. - * @return Object to send or {@code null} if there is nothing to send for now. + * @return Batch to send or {@code null} if there is nothing to send for now. */ - @Nullable Collection<Object> add(@Nullable Object obj) { - ConcurrentLinkedDeque8 buf0 = null; + @Nullable GridContinuousBatch add(Object obj) { + assert obj != null; + + GridContinuousBatch toSnd = null; - if (buf.sizex() >= bufSize - 1) { + if (batch.size() >= bufSize - 1) { lock.writeLock().lock(); try { - buf.add(obj); + batch.add(obj); - buf0 = buf; + toSnd = batch; - buf = new ConcurrentLinkedDeque8<>(); + batch = hnd.createBatch(); if (interval > 0) lastSndTime = U.currentTimeMillis(); @@ -1265,34 +1399,25 @@ public class GridContinuousProcessor extends GridProcessorAdapter { lock.readLock().lock(); try { - buf.add(obj); + batch.add(obj); } finally { lock.readLock().unlock(); } } - Collection<Object> toSnd = null; - - if (buf0 != null) { - toSnd = new ArrayList<>(buf0.sizex()); - - for (Object o : buf0) - toSnd.add(o); - } - return toSnd; } /** - * @return Tuple with objects to sleep (or {@code null} if there is nothing to + * @return Tuple with batch to send (or {@code null} if there is nothing to * send for now) and time interval after next check is needed. */ @SuppressWarnings("TooBroadScope") - IgniteBiTuple<Collection<Object>, Long> checkInterval() { + IgniteBiTuple<GridContinuousBatch, Long> checkInterval() { assert interval > 0; - Collection<Object> toSnd = null; + GridContinuousBatch toSnd = null; long diff; long now = U.currentTimeMillis(); @@ -1302,10 +1427,10 @@ public class GridContinuousProcessor extends GridProcessorAdapter { try { diff = now - lastSndTime; - if (diff >= interval && !buf.isEmpty()) { - toSnd = buf; + if (diff >= interval && batch.size() > 0) { + toSnd = batch; - buf = new ConcurrentLinkedDeque8<>(); + batch = hnd.createBatch(); lastSndTime = now; } http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java index bd4aae3..9644372 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java @@ -35,14 +35,19 @@ public class StartRoutineAckDiscoveryMessage extends AbstractContinuousMessage { /** */ private final Map<UUID, IgniteCheckedException> errs; + /** */ + private final Map<Integer, Long> updateCntrs; + /** * @param routineId Routine id. * @param errs Errs. */ - public StartRoutineAckDiscoveryMessage(UUID routineId, Map<UUID, IgniteCheckedException> errs) { + public StartRoutineAckDiscoveryMessage(UUID routineId, Map<UUID, IgniteCheckedException> errs, + Map<Integer, Long> cntrs) { super(routineId); this.errs = new HashMap<>(errs); + this.updateCntrs = cntrs; } /** {@inheritDoc} */ @@ -51,6 +56,13 @@ public class StartRoutineAckDiscoveryMessage extends AbstractContinuousMessage { } /** + * @return Update counters for partitions. + */ + public Map<Integer, Long> updateCounters() { + return updateCntrs; + } + + /** * @return Errs. */ public Map<UUID, IgniteCheckedException> errs() { http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java index 892adac..82c0377 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java @@ -37,6 +37,9 @@ public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage { /** */ private final Map<UUID, IgniteCheckedException> errs = new HashMap<>(); + /** */ + private Map<Integer, Long> updateCntrs; + /** * @param routineId Routine id. * @param startReqData Start request data. @@ -63,6 +66,22 @@ public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage { } /** + * @param cntrs Update counters. + */ + public void addUpdateCounters(Map<Integer, Long> cntrs) { + if (updateCntrs == null) + updateCntrs = new HashMap<>(); + + for (Map.Entry<Integer, Long> e : cntrs.entrySet()) { + Long cntr0 = updateCntrs.get(e.getKey()); + Long cntr1 = e.getValue(); + + if (cntr0 == null || cntr1 > cntr0) + updateCntrs.put(e.getKey(), cntr1); + } + } + + /** * @return Errs. */ public Map<UUID, IgniteCheckedException> errs() { @@ -76,7 +95,7 @@ public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage { /** {@inheritDoc} */ @Override public DiscoveryCustomMessage ackMessage() { - return new StartRoutineAckDiscoveryMessage(routineId, errs); + return new StartRoutineAckDiscoveryMessage(routineId, errs, updateCntrs); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java index b93acf5..97696bb 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java @@ -479,7 +479,8 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr @Nullable GridCacheVersion drVer, UUID subjId, String taskName, - @Nullable GridCacheVersion dhtVer) + @Nullable GridCacheVersion dhtVer, + @Nullable Long updateCntr) throws IgniteCheckedException, GridCacheEntryRemovedException { return new GridCacheUpdateTxResult(true, rawPut(val, ttl)); } @@ -529,7 +530,9 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr boolean conflictResolve, boolean intercept, UUID subjId, - String taskName) throws IgniteCheckedException, + String taskName, + @Nullable CacheObject prevVal, + @Nullable Long updateCntr) throws IgniteCheckedException, GridCacheEntryRemovedException { assert false; @@ -550,7 +553,8 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr @Nullable GridCacheVersion drVer, UUID subjId, String taskName, - @Nullable GridCacheVersion dhtVer + @Nullable GridCacheVersion dhtVer, + @Nullable Long updateCntr ) throws IgniteCheckedException, GridCacheEntryRemovedException { obsoleteVer = ver;