Merge branch 'master' into ignite-2004
Conflicts:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fcf0c95f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fcf0c95f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fcf0c95f
Branch: refs/heads/ignite-2004
Commit: fcf0c95f17ae227c1a95aa45ec65e1070fe3aac6
Parents: 3e449b0 f970c11
Author: nikolay_tikhonov <[email protected]>
Authored: Thu Apr 7 17:38:16 2016 +0300
Committer: nikolay_tikhonov <[email protected]>
Committed: Thu Apr 7 17:38:39 2016 +0300
----------------------------------------------------------------------
.../store/jdbc/CacheAbstractJdbcStore.java | 4 +
.../discovery/GridDiscoveryManager.java | 2 +-
.../affinity/GridAffinityAssignment.java | 15 +
.../affinity/GridAffinityAssignmentCache.java | 72 +++-
.../cache/CacheAffinitySharedManager.java | 10 -
.../cache/GridCacheAffinityManager.java | 11 -
.../processors/cache/GridCacheMapEntry.java | 7 +-
.../GridCachePartitionExchangeManager.java | 16 -
.../GridDhtPartitionsExchangeFuture.java | 2 +-
.../continuous/CacheContinuousQueryHandler.java | 111 +++--
.../transactions/IgniteTxLocalAdapter.java | 2 +-
.../continuous/GridContinuousProcessor.java | 13 +-
.../ignite/spi/discovery/tcp/ClientImpl.java | 2 +-
.../ignite/spi/discovery/tcp/ServerImpl.java | 46 +--
.../spi/discovery/tcp/TcpDiscoverySpi.java | 23 +-
.../affinity/AffinityHistoryCleanupTest.java | 414 +++++++++++++++++++
.../CacheJdbcPojoStoreAbstractSelfTest.java | 28 +-
...eJdbcStoreAbstractMultithreadedSelfTest.java | 25 +-
.../ignite/cache/store/jdbc/model/Person.java | 25 ++
.../IgniteClientReconnectAbstractTest.java | 7 +-
.../BinaryObjectOffHeapUnswapTemporaryTest.java | 362 ++++++++++++++++
.../GridCacheBinaryObjectsAbstractSelfTest.java | 181 +++++---
...ffinityAssignmentNodeJoinValidationTest.java | 134 ++++++
.../tcp/TcpClientDiscoverySpiSelfTest.java | 8 +-
.../spi/discovery/tcp/TcpDiscoverySelfTest.java | 39 +-
.../TcpDiscoverySpiFailureTimeoutSelfTest.java | 23 +-
.../spi/discovery/tcp/TestTcpDiscoverySpi.java | 5 +-
.../IgniteBinaryObjectsTestSuite.java | 6 +-
.../ignite/testsuites/IgniteCacheTestSuite.java | 2 +
.../testsuites/IgniteCacheTestSuite5.java | 2 +
30 files changed, 1320 insertions(+), 277 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/fcf0c95f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/fcf0c95f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --cc
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 4bf22e7,16513b0..56c02d6
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@@ -547,112 -590,70 +547,99 @@@ public class CacheContinuousQueryHandle
final GridCacheContext cctx = cacheContext(ctx);
- final Collection<CacheContinuousQueryEntry> entries0 = new
ArrayList<>();
- Collection<CacheEntryEvent<? extends K, ? extends V>> entries0 = new
ArrayList<>();
++ final Collection<CacheEntryEvent<? extends K, ? extends V>> entries0
= new ArrayList<>();
- for (CacheContinuousQueryEntry e : entries) {
- GridCacheDeploymentManager depMgr = cctx.deploy();
+ final List<PartitionRecovery> rcvs = new ArrayList<>();
- ClassLoader ldr = depMgr.globalLoader();
+ try {
+ for (CacheContinuousQueryEntry e : entries) {
+ GridCacheDeploymentManager depMgr = cctx.deploy();
+
+ ClassLoader ldr = depMgr.globalLoader();
- if (ctx.config().isPeerClassLoadingEnabled()) {
- GridDeploymentInfo depInfo = e.deployInfo();
+ if (ctx.config().isPeerClassLoadingEnabled()) {
+ GridDeploymentInfo depInfo = e.deployInfo();
- if (depInfo != null) {
- depMgr.p2pContext(nodeId, depInfo.classLoaderId(),
depInfo.userVersion(), depInfo.deployMode(),
- depInfo.participants(),
depInfo.localDeploymentOwner());
+ if (depInfo != null) {
+ depMgr.p2pContext(nodeId, depInfo.classLoaderId(),
depInfo.userVersion(), depInfo.deployMode(),
+ depInfo.participants(),
depInfo.localDeploymentOwner());
+ }
}
- }
- try {
- e.unmarshal(cctx, ldr);
+ try {
+ e.unmarshal(cctx, ldr);
+
+ if (!asyncCallback) {
- T2<Collection<CacheContinuousQueryEntry>,
PartitionRecovery> evts = handleEvent(ctx, e, false);
++ T2<Collection<CacheEntryEvent<? extends K, ? extends
V>>, PartitionRecovery> evts =
++ handleEvent(ctx, e, false);
- entries0.addAll(handleEvent(ctx, e));
+ if (evts.get2() != null)
+ rcvs.add(evts.get2());
+
+ entries0.addAll(evts.get1());
+ }
+ }
+ catch (IgniteCheckedException ex) {
+ if (ignoreClsNotFound)
+ assert internal;
+ else
+ U.error(ctx.log(getClass()), "Failed to unmarshal
entry.", ex);
+ }
}
- catch (IgniteCheckedException ex) {
- if (ignoreClsNotFound)
- assert internal;
- else
- U.error(ctx.log(getClass()), "Failed to unmarshal
entry.", ex);
+
- final IgniteCache cache =
cctx.kernalContext().cache().jcache(cctx.name());
-
+ if (asyncCallback) {
+ for (final CacheContinuousQueryEntry e : entries) {
+ ctx.continuousQueryPool().execute(new Runnable() {
+ @Override public void run() {
- T2<Collection<CacheContinuousQueryEntry>,
PartitionRecovery> evts =
++ T2<Collection<CacheEntryEvent<? extends K, ?
extends V>>, PartitionRecovery> evts =
+ handleEvent(ctx, e, false);
+
- for (CacheContinuousQueryEntry entry :
evts.get1()) {
- CacheContinuousQueryEvent evt =
- new CacheContinuousQueryEvent<>(cache,
cctx, entry);
-
-
locLsnr.onUpdated(Collections.<CacheEntryEvent<? extends K, ? extends V>>
- singleton(evt));
- }
++ locLsnr.onUpdated(evts.get1());
+ }
+ }, e.partition());
+ }
}
- else if (!entries0.isEmpty()) {
- Iterable<CacheEntryEvent<? extends K, ? extends V>> evts =
F.viewReadOnly(entries0,
- new C1<CacheContinuousQueryEntry, CacheEntryEvent<?
extends K, ? extends V>>() {
- @Override public CacheEntryEvent<? extends K, ?
extends V> apply(CacheContinuousQueryEntry e) {
- return new CacheContinuousQueryEvent<>(cache,
cctx, e);
- }
- },
- new IgnitePredicate<CacheContinuousQueryEntry>() {
- @Override public boolean
apply(CacheContinuousQueryEntry entry) {
- return !entry.isFiltered();
- }
- }
- );
-
- locLsnr.onUpdated(evts);
- }
++ else if (!entries0.isEmpty())
++ locLsnr.onUpdated(entries0);
+ }
+ finally {
+ for (PartitionRecovery rec : rcvs)
+ rec.unlock();
}
-
- if (!entries0.isEmpty())
- locLsnr.onUpdated(entries0);
}
/**
* @param ctx Context.
* @param e entry.
+ * @param async Async.
* @return Entry collection.
*/
- private T2<Collection<CacheContinuousQueryEntry>, PartitionRecovery>
handleEvent(GridKernalContext ctx,
- CacheContinuousQueryEntry e, boolean async) {
- private Collection<CacheEntryEvent<? extends K, ? extends V>>
handleEvent(GridKernalContext ctx,
- CacheContinuousQueryEntry e) {
++ private T2<Collection<CacheEntryEvent<? extends K, ? extends V>>,
PartitionRecovery>
++ handleEvent(GridKernalContext ctx, CacheContinuousQueryEntry e,
boolean async) {
assert e != null;
+ GridCacheContext<K, V> cctx = cacheContext(ctx);
+
+ final IgniteCache cache =
cctx.kernalContext().cache().jcache(cctx.name());
+
if (internal) {
if (e.isFiltered())
- return Collections.emptyList();
+ return new T2(Collections.emptyList(), null);
else
- return new T2(F.asList(e), null);
- return F.<CacheEntryEvent<? extends K, ? extends V>>asList(
- new CacheContinuousQueryEvent<K, V>(cache, cctx, e));
++ return new T2(F.<CacheEntryEvent<? extends K, ? extends
V>>asList(
++ new CacheContinuousQueryEvent<K, V>(cache, cctx, e)),
null);
}
// Initial query entry or evicted entry. These events should be fired
immediately.
- if (e.updateCounter() == -1L)
- return new T2(F.asList(e), null);
+ if (e.updateCounter() == -1L) {
- return !e.isFiltered() ? F.<CacheEntryEvent<? extends K, ?
extends V>>asList(
- new CacheContinuousQueryEvent<K, V>(cache, cctx, e)) :
- Collections.<CacheEntryEvent<? extends K, ? extends
V>>emptyList();
++ return !e.isFiltered() ? new T2(F.<CacheEntryEvent<? extends K, ?
extends V>>asList(
++ new CacheContinuousQueryEvent<K, V>(cache, cctx, e)),
null) :
++ new T2(Collections.<CacheEntryEvent<? extends K, ? extends
V>>emptyList(), null);
+ }
PartitionRecovery rec = getOrCreatePartitionRecovery(ctx,
e.partition());
- return new T2<>(rec.collectEntries(e, async), async ? null : rec);
- return rec.collectEntries(cctx, cache, e);
++ return new T2<>(rec.<K, V>collectEntries(e, cctx, async, cache), rec);
}
/**
@@@ -779,24 -776,26 +765,30 @@@
/**
* Add continuous entry.
*
+ * @param cctx Cache context.
+ * @param cache Cache.
* @param entry Cache continuous query entry.
- * @return Collection entries which will be fired.
+ * @return Collection entries which will be fired. This collection
should contains only non-filtered events.
*/
- public Collection<CacheContinuousQueryEntry>
collectEntries(CacheContinuousQueryEntry entry, boolean async) {
- public <K, V> Collection<CacheEntryEvent<? extends K, ? extends V>>
collectEntries(GridCacheContext cctx,
- IgniteCache cache,
- CacheContinuousQueryEntry entry) {
- assert entry != null;
++ <K, V> Collection<CacheEntryEvent<? extends K, ? extends V>>
collectEntries(CacheContinuousQueryEntry entry,
++ GridCacheContext cctx,
++ boolean async,
++ IgniteCache cache) {
+ if (!async)
+ lock.lock();
- if (entry.topologyVersion() == null) { // Possible if entry is
sent from old node.
- assert entry.updateCounter() == 0L : entry;
+ try {
+ assert entry != null;
- return F.<CacheEntryEvent<? extends K, ? extends V>>
- asList(new CacheContinuousQueryEvent<K, V>(cache, cctx,
entry));
- }
+ if (entry.topologyVersion() == null) { // Possible if entry
is sent from old node.
+ assert entry.updateCounter() == 0L : entry;
- return F.asList(entry);
- List<CacheEntryEvent<? extends K, ? extends V>> entries;
++ return F.<CacheEntryEvent<? extends K, ? extends V>>
++ asList(new CacheContinuousQueryEvent<K, V>(cache,
cctx, entry));
+ }
+
- List<CacheContinuousQueryEntry> entries;
++ List<CacheEntryEvent<? extends K, ? extends V>> entries;
- synchronized (pendingEvts) {
// Received first event.
if (curTop == AffinityTopologyVersion.NONE) {
lastFiredEvt = entry.updateCounter();
@@@ -1251,276 -1241,6 +1247,261 @@@
}
/**
+ *
+ */
+ private class ContinuousQueryClosureImpl implements
CacheContinuousQueryClosure {
+ /** */
+ private final IgniteCache cache;
+
+ /** */
+ private final IgniteLogger log;
+
+ /** */
+ private final boolean fireEvent;
+
+ /** */
+ private CacheContinuousQueryEvent<K, V> evt;
+
+ /** */
+ private CacheEntryEventFilter filter;
+
+ /** */
+ private final GridCacheContext<K, V> cctx;
+
+ /** */
+ private boolean primary;
+
+ /** */
+ private boolean loc;
+
+ /** */
+ private GridKernalContext ctx;
+
+ /** */
+ private UUID nodeId;
+
+ /** */
+ private UUID routineId;
+
+ /** */
+ private boolean recordIgniteEvt;
+
+ /** */
+ private final String taskName;
+
+ /** */
+ private boolean notify;
+
+ /** */
+ private boolean backup;
+
+ /** */
+ private final CountDownLatch latch = new CountDownLatch(1);
+
+ /**
+ * @param taskName Task name.
+ * @param recordIgniteEvt Fired event.
+ * @param routineId Routine id.
+ * @param nodeId Node id.
+ * @param ctx Kernal context.
+ * @param loc Local.
+ * @param primary Primary flag.
+ * @param cctx Cache context.
+ * @param filter Filter.
+ * @param evt Event.
+ * @param fireEvent Immediately fire event.
+ * @param cache Cache.
+ */
+ ContinuousQueryClosureImpl(String taskName,
+ boolean recordIgniteEvt,
+ UUID routineId,
+ UUID nodeId,
+ GridKernalContext ctx,
+ boolean loc,
+ boolean primary,
+ GridCacheContext<K, V> cctx,
+ CacheEntryEventFilter filter,
+ CacheContinuousQueryEvent<K, V> evt,
+ boolean fireEvent, IgniteCache cache) {
+ this.taskName = taskName;
+ this.recordIgniteEvt = recordIgniteEvt;
+ this.routineId = routineId;
+ this.nodeId = nodeId;
+ this.ctx = ctx;
+ this.loc = loc;
+ this.primary = primary;
+ this.cctx = cctx;
+ this.filter = filter;
+ this.evt = evt;
+ this.cache = cache;
+ this.fireEvent = fireEvent;
+
+ log = ctx.log(CacheContinuousQueryHandler.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ filter();
+
+ if (fireEvent || waitIfAsync())
+ onEntryUpdate0();
+ }
+
+ /**
+ * @return {@code True} if event fired on this node.
+ */
+ private boolean primary() {
+ return primary || skipPrimaryCheck;
+ }
+
+ /**
+ * @return {@code False} if filter sync.
+ */
+ private boolean waitIfAsync() {
+ if (backup)
+ return false;
+
+ try {
+ U.await(latch);
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ log.error("Failed to wait latch.");
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void skipEvent() {
+ if (evt != null && evt.entry() != null)
+ evt.entry().markFiltered();
+
+ onEntryUpdate();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onEntryUpdate() {
+ if (backup)
+ return;
+
+ if (!fireEvent && asyncCallback) {
+ latch.countDown();
+
+ return;
+ }
+
+ onEntryUpdate0();
+ }
+
+ /**
+ *
+ */
+ private void onEntryUpdate0() {
+ try {
+ final CacheContinuousQueryEntry entry = evt.entry();
+
+ if (loc) {
+ if (!locCache) {
- T2<Collection<CacheContinuousQueryEntry>,
PartitionRecovery> events =
++ T2<Collection<CacheEntryEvent<? extends K, ? extends
V>>, PartitionRecovery> events =
+ handleEvent(ctx, entry, asyncCallback);
+
+ try {
- Collection<CacheContinuousQueryEntry> entries =
events.get1();
-
- if (!entries.isEmpty()) {
- Iterable<CacheEntryEvent<? extends K, ?
extends V>> evts = F.viewReadOnly(entries,
- new C1<CacheContinuousQueryEntry,
CacheEntryEvent<? extends K, ? extends V>>() {
- @Override public CacheEntryEvent<?
extends K, ? extends V> apply(
- CacheContinuousQueryEntry e) {
- return new
CacheContinuousQueryEvent<>(cache, cctx, e);
- }
- },
- new
IgnitePredicate<CacheContinuousQueryEntry>() {
- @Override public boolean
apply(CacheContinuousQueryEntry entry) {
- return !entry.isFiltered();
- }
- }
- );
-
- if (!F.isEmpty(evts))
- locLsnr.onUpdated(evts);
++ Collection<CacheEntryEvent<? extends K, ? extends
V>> evts = events.get1();
++
++ if (!evts.isEmpty()) {
++ locLsnr.onUpdated(evts);
+
+ if (!internal && !skipPrimaryCheck)
+
sendBackupAcknowledge(ackBuf.onAcknowledged(entry), routineId, ctx);
+ }
+ }
+ finally {
+ if (events.get2() != null)
+ events.get2().unlock();
+ }
+ }
+ else {
+ if (!entry.isFiltered())
+ locLsnr.onUpdated(F.<CacheEntryEvent<? extends K,
? extends V>>asList(evt));
+ }
+ }
+ else {
+ if (!entry.isFiltered())
+ prepareEntry(cctx, nodeId, entry);
+
+ CacheContinuousQueryEntry e = handleEntry(entry);
+
+ if (e != null)
+ ctx.continuous().addNotification(nodeId, routineId,
entry, topic, sync, true);
+ }
+ }
+ catch (ClusterTopologyCheckedException ex) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to send event notification to node,
node left cluster " +
+ "[node=" + nodeId + ", err=" + ex + ']');
+ }
+ catch (IgniteCheckedException ex) {
+ U.error(ctx.log(getClass()), "Failed to send event
notification to node: " + nodeId, ex);
+ }
+
+ if (recordIgniteEvt && notify) {
+ ctx.event().record(new CacheQueryReadEvent<>(
+ ctx.discovery().localNode(),
+ "Continuous query executed.",
+ EVT_CACHE_QUERY_OBJECT_READ,
+ CacheQueryType.CONTINUOUS.name(),
+ cacheName,
+ null,
+ null,
+ null,
+ filter instanceof CacheEntryEventSerializableFilter ?
+ (CacheEntryEventSerializableFilter)filter : null,
+ null,
+ nodeId,
+ taskName,
+ evt.getKey(),
+ evt.getValue(),
+ evt.getOldValue(),
+ null
+ ));
+ }
+ }
+
+ /**
+ *
+ */
+ public void filter() {
+ CacheContinuousQueryEntry entry = evt.entry();
+
+ notify = !entry.isFiltered();
+
+ try {
+ if (notify && filter != null)
+ notify = filter.evaluate(evt);
+ }
+ catch (Exception e) {
+ U.error(log, "CacheEntryEventFilter failed: " + e);
+ }
+
+ if (!notify)
+ entry.markFiltered();
+
+ if (!primary()) {
+ if (!internal) {
+ // Skip init query and expire entries.
+ if (entry.updateCounter() != -1L) {
+ entry.markBackup();
+
+ backupQueue.add(entry);
+ }
+ }
+
+ backup = true;
+ }
+ }
+ }
+
+ /**
* Deployable object.
*/
protected static class DeployableObject implements Externalizable {