ignite-5075
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a5c04ecf Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a5c04ecf Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a5c04ecf Branch: refs/heads/ignite-5075 Commit: a5c04ecf1220070379220225d6b43e9d68169694 Parents: 14dba6e Author: sboikov <sboi...@gridgain.com> Authored: Wed May 3 12:09:01 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Wed May 3 21:35:03 2017 +0300 ---------------------------------------------------------------------- .../processors/cache/ClusterCachesInfo.java | 49 ++++++++-------- .../cache/DynamicCacheDescriptor.java | 62 ++------------------ .../processors/cache/GridCacheProcessor.java | 9 +-- .../GridCacheAbstractLocalStoreSelfTest.java | 41 +++++++++---- .../IgniteCrossCacheTxStoreSelfTest.java | 13 +++- 5 files changed, 71 insertions(+), 103 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/a5c04ecf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java index 4d53361..0970b55 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java @@ -540,15 +540,32 @@ class ClusterCachesInfo { locJoinStartCaches = new ArrayList<>(); if (!disconnectedState()) { - processJoiningNode(joinDiscoData, node.id()); + processJoiningNode(joinDiscoData, node.id(), topVer); for (DynamicCacheDescriptor desc : registeredCaches.values()) { CacheConfiguration cfg = desc.cacheConfiguration(); CacheJoinNodeDiscoveryData.CacheInfo locCfg = joinDiscoData.caches().get(cfg.getName()); - NearCacheConfiguration nearCfg = locCfg != null ? locCfg.config().getNearConfiguration() : - null; + NearCacheConfiguration nearCfg = null; + + if (locCfg != null) { + DynamicCacheDescriptor desc0 = new DynamicCacheDescriptor(ctx, + locCfg.config(), + desc.cacheType(), + desc.template(), + desc.deploymentId(), + desc.schema()); + + desc0.startTopologyVersion(desc.startTopologyVersion()); + desc0.clientCacheStartVersion(desc.clientCacheStartVersion()); + desc0.receivedFrom(desc.receivedFrom()); + desc0.staticallyConfigured(desc.staticallyConfigured()); + + desc = desc0; + + nearCfg = locCfg.config().getNearConfiguration(); + } if (locCfg != null || CU.affinityNode(ctx.discovery().localNode(), cfg.getNodeFilter())) locJoinStartCaches.add(new T2<>(desc, nearCfg)); @@ -559,26 +576,8 @@ class ClusterCachesInfo { CacheJoinNodeDiscoveryData discoData = joiningNodesDiscoData.remove(node.id()); if (discoData != null) - processJoiningNode(discoData, node.id()); + processJoiningNode(discoData, node.id(), topVer); } - - initStartVersionOnJoin(registeredCaches.values(), node, topVer); - - initStartVersionOnJoin(registeredTemplates.values(), node, topVer); - } - } - - /** - * @param descs Cache descriptors. - * @param joinedNode Joined node. - * @param topVer Current topology version. - */ - private void initStartVersionOnJoin(Collection<DynamicCacheDescriptor> descs, - ClusterNode joinedNode, - AffinityTopologyVersion topVer) { - for (DynamicCacheDescriptor cacheDesc : descs) { - if (cacheDesc.staticallyConfigured() && joinedNode.id().equals(cacheDesc.receivedFrom())) - cacheDesc.startTopologyVersion(topVer); } } @@ -621,7 +620,7 @@ class ClusterCachesInfo { 0, desc.cacheType(), desc.startTopologyVersion(), - null, + desc.deploymentId(), desc.schema(), desc.receivedFrom(), desc.staticallyConfigured(), @@ -743,7 +742,7 @@ class ClusterCachesInfo { * @param joinData Joined node discovery data. * @param nodeId Joined node ID. */ - private void processJoiningNode(CacheJoinNodeDiscoveryData joinData, UUID nodeId) { + private void processJoiningNode(CacheJoinNodeDiscoveryData joinData, UUID nodeId, AffinityTopologyVersion topVer) { for (CacheJoinNodeDiscoveryData.CacheInfo cacheInfo : joinData.templates().values()) { CacheConfiguration cfg = cacheInfo.config(); @@ -757,6 +756,7 @@ class ClusterCachesInfo { desc.staticallyConfigured(true); desc.receivedFrom(nodeId); + desc.startTopologyVersion(topVer); DynamicCacheDescriptor old = registeredTemplates.put(cfg.getName(), desc); @@ -777,6 +777,7 @@ class ClusterCachesInfo { desc.staticallyConfigured(true); desc.receivedFrom(nodeId); + desc.startTopologyVersion(topVer); DynamicCacheDescriptor old = registeredCaches.put(cfg.getName(), desc); http://git-wip-us.apache.org/repos/asf/ignite/blob/a5c04ecf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java index 374bcb8..ccde2e9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java @@ -47,9 +47,6 @@ public class DynamicCacheDescriptor { @GridToStringExclude private CacheConfiguration cacheCfg; - /** Locally configured flag. */ - private boolean locCfg; - /** Statically configured flag. */ private boolean staticCfg; @@ -69,17 +66,11 @@ public class DynamicCacheDescriptor { private AffinityTopologyVersion startTopVer; /** */ - private boolean rcvdOnDiscovery; - - /** */ private Integer cacheId; /** */ private UUID rcvdFrom; - /** */ - private AffinityTopologyVersion rcvdFromVer; - /** Mutex. */ private final Object mux = new Object(); @@ -147,6 +138,8 @@ public class DynamicCacheDescriptor { * @param startTopVer Start topology version. */ public void startTopologyVersion(AffinityTopologyVersion startTopVer) { + assert startTopVer != null; + this.startTopVer = startTopVer; } @@ -172,27 +165,6 @@ public class DynamicCacheDescriptor { } /** - * @param deploymentId Deployment ID. - */ - public void deploymentId(IgniteUuid deploymentId) { - this.deploymentId = deploymentId; - } - - /** - * @return Locally configured flag. - */ - public boolean locallyConfigured() { - return locCfg; - } - - /** - * @param locCfg Locally configured flag. - */ - public void locallyConfigured(boolean locCfg) { - this.locCfg = locCfg; - } - - /** * @return {@code True} if statically configured. */ public boolean staticallyConfigured() { @@ -260,38 +232,12 @@ public class DynamicCacheDescriptor { } /** - * @return {@code True} if received in discovery data. - */ - public boolean receivedOnDiscovery() { - return rcvdOnDiscovery; - } - - /** - * @param rcvdOnDiscovery {@code True} if received in discovery data. - */ - public void receivedOnDiscovery(boolean rcvdOnDiscovery) { - this.rcvdOnDiscovery = rcvdOnDiscovery; - } - - /** * @param nodeId ID of node provided cache configuration in discovery data. */ public void receivedFrom(UUID nodeId) { - rcvdFrom = nodeId; - } + assert nodeId != null; - /** - * @return Topology version when node provided cache configuration was started. - */ - @Nullable public AffinityTopologyVersion receivedFromStartVersion() { - return rcvdFromVer; - } - - /** - * @param rcvdFromVer Topology version when node provided cache configuration was started. - */ - public void receivedFromStartVersion(AffinityTopologyVersion rcvdFromVer) { - this.rcvdFromVer = rcvdFromVer; + rcvdFrom = nodeId; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/a5c04ecf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 2fabcc5..a255e9d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -793,10 +793,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { CacheConfiguration c = desc.cacheConfiguration(); IgnitePredicate filter = c.getNodeFilter(); - if (c.getName().equals(conf.getName()) && - ((desc.receivedOnDiscovery() && CU.affinityNode(locNode, filter)) || - CU.isSystemCache(c.getName()))) { - + // TODO IGNITE-5075. + if (c.getName().equals(conf.getName()) && ((CU.affinityNode(locNode, filter)) || CU.isSystemCache(c.getName()))) { tmpCacheCfg.add(c); break; @@ -837,9 +835,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (!activeOnStart) return; - for (GridCacheAdapter<?, ?> cache : caches.values()) - onKernalStart(cache); - if (!ctx.config().isDaemon()) ctx.cacheObjects().onUtilityCacheStarted(); http://git-wip-us.apache.org/repos/asf/ignite/blob/a5c04ecf/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java index 7dce36b..ae9986d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java @@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import javax.cache.Cache; +import javax.cache.configuration.Factory; import javax.cache.expiry.CreatedExpiryPolicy; import javax.cache.expiry.Duration; import javax.cache.integration.CacheLoaderException; @@ -53,6 +54,7 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.resources.IgniteInstanceResource; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; @@ -169,18 +171,7 @@ public abstract class GridCacheAbstractLocalStoreSelfTest extends GridCommonAbst cacheCfg.setRebalanceMode(SYNC); - if (igniteInstanceName.endsWith("1")) - cacheCfg.setCacheStoreFactory(singletonFactory(LOCAL_STORE_1)); - else if (igniteInstanceName.endsWith("2")) - cacheCfg.setCacheStoreFactory(singletonFactory(LOCAL_STORE_2)); - else if (igniteInstanceName.endsWith("3")) - cacheCfg.setCacheStoreFactory(singletonFactory(LOCAL_STORE_3)); - else if (igniteInstanceName.endsWith("4")) - cacheCfg.setCacheStoreFactory(singletonFactory(LOCAL_STORE_4)); - else if (igniteInstanceName.endsWith("5")) - cacheCfg.setCacheStoreFactory(singletonFactory(LOCAL_STORE_5)); - else - cacheCfg.setCacheStoreFactory(singletonFactory(LOCAL_STORE_6)); + cacheCfg.setCacheStoreFactory(new StoreFactory()); cacheCfg.setWriteThrough(true); cacheCfg.setReadThrough(true); @@ -840,4 +831,30 @@ public abstract class GridCacheAbstractLocalStoreSelfTest extends GridCommonAbst map.clear(); } } + + /** + * + */ + static class StoreFactory implements Factory<CacheStore> { + /** */ + @IgniteInstanceResource + private Ignite node; + + @Override public CacheStore create() { + String igniteInstanceName = node.configuration().getIgniteInstanceName(); + + if (igniteInstanceName.endsWith("1")) + return LOCAL_STORE_1; + else if (igniteInstanceName.endsWith("2")) + return LOCAL_STORE_2; + else if (igniteInstanceName.endsWith("3")) + return LOCAL_STORE_3; + else if (igniteInstanceName.endsWith("4")) + return LOCAL_STORE_4; + else if (igniteInstanceName.endsWith("5")) + return LOCAL_STORE_5; + else + return LOCAL_STORE_6; + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/a5c04ecf/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java index 4864a67..a7682a1 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java @@ -26,6 +26,8 @@ import javax.cache.Cache; import javax.cache.configuration.Factory; import javax.cache.integration.CacheLoaderException; import javax.cache.integration.CacheWriterException; + +import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.store.CacheStore; import org.apache.ignite.cache.store.CacheStoreSession; @@ -35,6 +37,7 @@ import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.resources.CacheStoreSessionResource; +import org.apache.ignite.resources.IgniteInstanceResource; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.ignite.transactions.Transaction; import org.jetbrains.annotations.Nullable; @@ -357,9 +360,12 @@ public class IgniteCrossCacheTxStoreSelfTest extends GridCommonAbstractTest { * */ private static class FirstStoreFactory implements Factory<CacheStore> { + @IgniteInstanceResource + private Ignite ignite; + /** {@inheritDoc} */ @Override public CacheStore create() { - String igniteInstanceName = startingIgniteInstanceName.get(); + String igniteInstanceName = ignite.name(); CacheStore store = firstStores.get(igniteInstanceName); @@ -374,9 +380,12 @@ public class IgniteCrossCacheTxStoreSelfTest extends GridCommonAbstractTest { * */ private static class SecondStoreFactory implements Factory<CacheStore> { + @IgniteInstanceResource + private Ignite ignite; + /** {@inheritDoc} */ @Override public CacheStore create() { - String igniteInstanceName = startingIgniteInstanceName.get(); + String igniteInstanceName = ignite.name(); CacheStore store = secondStores.get(igniteInstanceName);