Repository: incubator-ignite Updated Branches: refs/heads/ignite-500 [created] ed1678f13
ignite-500 Cache not loading correctly Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ed1678f1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ed1678f1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ed1678f1 Branch: refs/heads/ignite-500 Commit: ed1678f139c50de76d72d512783df308a0f9a83f Parents: b4b28fd Author: agura <[email protected]> Authored: Fri Apr 24 20:58:01 2015 +0300 Committer: agura <[email protected]> Committed: Fri Apr 24 20:58:01 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 61 ++++++++++++-- .../distributed/dht/GridDhtCacheAdapter.java | 9 ++- .../datastreamer/DataStreamerImpl.java | 4 +- ...GridCacheLoadingConcurrentGridStartTest.java | 83 ++++++++++++++------ .../ignite/testsuites/IgniteCacheTestSuite.java | 2 +- 5 files changed, 127 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ed1678f1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index c246661..f5a88bb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -3312,6 +3312,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V if (ttl == CU.TTL_ZERO) return; +/* + if (!topVer.equals(ctx.topology().topologyVersion())) + throw new ClusterTopologyException("Topology changed"); +*/ + loadEntry(key, val, ver0, (IgniteBiPredicate<Object, Object>) p, topVer, replicate, ttl); } }, args); @@ -3531,17 +3536,15 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V */ IgniteInternalFuture<?> globalLoadCacheAsync(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) throws IgniteCheckedException { - ClusterGroup nodes = ctx.kernalContext().grid().cluster().forCacheNodes(ctx.name()); - ctx.kernalContext().task().setThreadContext(TC_NO_FAILOVER, true); CacheOperationContext opCtx = ctx.operationContextPerCall(); ExpiryPolicy plc = opCtx != null ? opCtx.expiry() : null; - return ctx.kernalContext().closure().callAsync(BROADCAST, - Arrays.asList(new LoadCacheClosure<>(ctx.name(), p, args, plc)), - nodes.nodes()); + final LoadCacheClosure<K, V> loadClos = new LoadCacheClosure<>(ctx.name(), p, args, plc); + + return new GlobalLoadCacheFuture(loadClos, ctx); } /** @@ -5697,4 +5700,52 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V metrics.addPutAndGetTimeNanos(System.nanoTime() - start); } } + + /** + * Global load cache future. + */ + private static class GlobalLoadCacheFuture<K1, V1> extends GridFutureAdapter<Void> { + /** Load clos. */ + private final LoadCacheClosure<K1, V1> loadClos; + + /** Context. */ + private final GridCacheContext<K1, V1> ctx; + + /** + * @param loadClos Load cache closure. + * @param ctx Context. + */ + public GlobalLoadCacheFuture(LoadCacheClosure<K1, V1> loadClos, GridCacheContext<K1, V1> ctx) { + this.loadClos = loadClos; + this.ctx = ctx; + + init(); + } + + /** + * Inits future. + */ + private void init() { + ClusterGroup nodes = ctx.kernalContext().grid().cluster().forCacheNodes(ctx.name()); + + ComputeTaskInternalFuture<Collection<Void>> loadFut = ctx.kernalContext().closure().callAsync(BROADCAST, + Arrays.asList(loadClos), nodes.nodes()); + + loadFut.listen(new IgniteInClosure<IgniteInternalFuture<Collection<Void>>>() { + @Override public void apply(IgniteInternalFuture<Collection<Void>> fut) { + try { + fut.get(); + + onDone(); + } + catch (Exception e) { + if (e.getCause().getCause() instanceof GridDhtInvalidPartitionException) + init(); + else + onDone(null, fut.error()); + } + } + }); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ed1678f1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index 1c46fd0..10df868 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -410,6 +410,11 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap @Override public void apply(KeyCacheObject key, Object val, @Nullable GridCacheVersion ver) { assert ver == null; +/* + if (!topVer.equals(topology().topologyVersion())) + throw new ClusterTopologyException("Topology changed"); +*/ + loadEntry(key, val, ver0, p, topVer, replicate, plc); } }, args); @@ -476,12 +481,14 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap } } else if (log.isDebugEnabled()) - log.debug("Will node load entry into cache (partition is invalid): " + part); + log.debug("Will node load entry into cache (partition is invalid): " + part); } catch (GridDhtInvalidPartitionException e) { if (log.isDebugEnabled()) log.debug("Ignoring entry for partition that does not belong [key=" + key + ", val=" + val + ", err=" + e + ']'); + + throw e; } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ed1678f1/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java index a69e033..e113c0d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java @@ -1421,8 +1421,8 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed cctx.evicts().touch(entry, topVer); } - catch (GridDhtInvalidPartitionException | GridCacheEntryRemovedException ignored) { - // No-op. + catch (GridDhtInvalidPartitionException | GridCacheEntryRemovedException ignored){ + ignored.printStackTrace(); } catch (IgniteCheckedException ex) { IgniteLogger log = cache.unwrap(Ignite.class).log(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ed1678f1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheLoadingConcurrentGridStartTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheLoadingConcurrentGridStartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheLoadingConcurrentGridStartTest.java index 2f9bb96..064047c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheLoadingConcurrentGridStartTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheLoadingConcurrentGridStartTest.java @@ -18,21 +18,25 @@ package org.apache.ignite.internal.processors.cache.distributed; import org.apache.ignite.*; -import org.apache.ignite.cache.*; import org.apache.ignite.cache.store.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.apache.ignite.testframework.*; import org.apache.ignite.testframework.junits.common.*; + import org.jetbrains.annotations.*; import javax.cache.*; import javax.cache.configuration.*; import javax.cache.integration.*; +import java.io.*; import java.util.concurrent.*; +import static org.apache.ignite.cache.CacheAtomicityMode.*; import static org.apache.ignite.cache.CacheMode.*; +import static org.apache.ignite.cache.CacheRebalanceMode.*; /** * Tests for cache data loading during simultaneous grids start. @@ -42,7 +46,7 @@ public class GridCacheLoadingConcurrentGridStartTest extends GridCommonAbstractT private static int GRIDS_CNT = 5; /** Keys count */ - private static int KEYS_CNT = 1_000_000; + private static int KEYS_CNT = 100_000; /** {@inheritDoc} */ @SuppressWarnings("unchecked") @@ -53,26 +57,13 @@ public class GridCacheLoadingConcurrentGridStartTest extends GridCommonAbstractT ccfg.setCacheMode(PARTITIONED); - ccfg.setBackups(1); + ccfg.setAtomicityMode(ATOMIC); - CacheStore<Integer, String> store = new CacheStoreAdapter<Integer, String>() { - @Override public void loadCache(IgniteBiInClosure<Integer, String> f, Object... args) { - for (int i = 0; i < KEYS_CNT; i++) - f.apply(i, Integer.toString(i)); - } + ccfg.setRebalanceMode(SYNC); - @Nullable @Override public String load(Integer i) throws CacheLoaderException { - return null; - } + ccfg.setBackups(1); - @Override public void write(Cache.Entry<? extends Integer, ? extends String> entry) throws CacheWriterException { - // No-op. - } - - @Override public void delete(Object o) throws CacheWriterException { - // No-op. - } - }; + CacheStore<Integer, String> store = new TestCacheStoreAdapter(); ccfg.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(store)); @@ -92,9 +83,22 @@ public class GridCacheLoadingConcurrentGridStartTest extends GridCommonAbstractT public void testLoadCacheWithDataStreamer() throws Exception { IgniteInClosure<Ignite> f = new IgniteInClosure<Ignite>() { @Override public void apply(Ignite grid) { + try (IgniteDataStreamer<Integer, String> dataStreamer = grid.dataStreamer(null)) { +// dataStreamer.perNodeParallelOperations(2); + dataStreamer.perNodeBufferSize(64); + for (int i = 0; i < KEYS_CNT; i++) dataStreamer.addData(i, Integer.toString(i)); + +/* + try { + Thread.sleep(5000); + } + catch (InterruptedException e) { + e.printStackTrace(); + } +*/ } } }; @@ -142,13 +146,46 @@ public class GridCacheLoadingConcurrentGridStartTest extends GridCommonAbstractT private void assertCacheSize() { IgniteCache<Integer, String> cache = grid(0).cache(null); - assertEquals(KEYS_CNT, cache.size(CachePeekMode.PRIMARY)); - int total = 0; - for (int i = 0; i < GRIDS_CNT; i++) - total += grid(i).cache(null).localSize(CachePeekMode.PRIMARY); + for (int i = 0; i < GRIDS_CNT; i++) { + int locSize = grid(i).cache(null).localSize(); + + System.out.println("!!! Local size(" + i + "): " + locSize); + + total += locSize; + + } assertEquals(KEYS_CNT, total); + + assertEquals(KEYS_CNT, cache.size()); + } + + /** + * Cache store adapter. + */ + private static class TestCacheStoreAdapter extends CacheStoreAdapter<Integer, String> implements Serializable { + /** {@inheritDoc} */ + @Override public void loadCache(IgniteBiInClosure<Integer, String> f, Object... args) { + for (int i = 0; i < KEYS_CNT; i++) + f.apply(i, Integer.toString(i)); + } + + /** {@inheritDoc} */ + @Nullable @Override public String load(Integer i) throws CacheLoaderException { + return null; + } + + /** {@inheritDoc} */ + @Override public void write(Cache.Entry<? extends Integer, ? extends String> entry) + throws CacheWriterException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void delete(Object o) throws CacheWriterException { + // No-op. + } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ed1678f1/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java index 6e70052..6f954cd 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java @@ -224,7 +224,7 @@ public class IgniteCacheTestSuite extends TestSuite { suite.addTest(new TestSuite(GridCacheDhtPreloadUnloadSelfTest.class)); suite.addTest(new TestSuite(GridCachePartitionedAffinityFilterSelfTest.class)); suite.addTest(new TestSuite(GridCachePartitionedPreloadLifecycleSelfTest.class)); -// suite.addTest(new TestSuite(GridCacheLoadingConcurrentGridStartTest.class)); TODO-ignite-500 + suite.addTest(new TestSuite(GridCacheLoadingConcurrentGridStartTest.class)); suite.addTest(new TestSuite(GridCacheDhtPreloadDelayedSelfTest.class)); suite.addTest(new TestSuite(GridPartitionedBackupLoadSelfTest.class)); suite.addTest(new TestSuite(GridCachePartitionedLoadCacheSelfTest.class));
