ignite-5454 Prevent partition tree destroy while clearing
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f0c62ac0 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f0c62ac0 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f0c62ac0 Branch: refs/heads/ignite-2.1.2-exchange Commit: f0c62ac0fc77b4dbc438dbd23ce756dbebc0659c Parents: 285644a Author: Igor Seliverstov <[email protected]> Authored: Mon Jun 26 15:43:17 2017 +0300 Committer: sboikov <[email protected]> Committed: Mon Jun 26 15:43:17 2017 +0300 ---------------------------------------------------------------------- .../cache/IgniteCacheOffheapManagerImpl.java | 129 ++++++++++++++++--- .../IgniteCacheClearDuringRebalanceTest.java | 126 ++++++++++++++++++ .../testsuites/IgniteCacheTestSuite2.java | 3 + 3 files changed, 242 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/f0c62ac0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index a4e4c24..b95828c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@ -427,26 +427,30 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager @Override public void clearCache(GridCacheContext cctx, boolean readers) { GridCacheVersion obsoleteVer = null; - GridIterator<CacheDataRow> it = iterator(cctx.cacheId(), cacheDataStores().iterator()); + try (GridCloseableIterator<CacheDataRow> it = grp.isLocal() ? iterator(cctx.cacheId(), cacheDataStores().iterator()) : + evictionSafeIterator(cctx.cacheId(), cacheDataStores().iterator())) { + while (it.hasNext()) { + KeyCacheObject key = it.next().key(); - while (it.hasNext()) { - KeyCacheObject key = it.next().key(); - - try { - if (obsoleteVer == null) - obsoleteVer = ctx.versions().next(); + try { + if (obsoleteVer == null) + obsoleteVer = ctx.versions().next(); - GridCacheEntryEx entry = cctx.cache().entryEx(key); + GridCacheEntryEx entry = cctx.cache().entryEx(key); - entry.clear(obsoleteVer, readers); - } - catch (GridDhtInvalidPartitionException ignore) { - // Ignore. - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to clear cache entry: " + key, e); + entry.clear(obsoleteVer, readers); + } + catch (GridDhtInvalidPartitionException ignore) { + // Ignore. + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to clear cache entry: " + key, e); + } } } + catch (IgniteCheckedException e) { + U.error(log, "Failed to close iterator", e); + } } /** {@inheritDoc} */ @@ -587,7 +591,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager * @param dataIt Data store iterator. * @return Rows iterator */ - private GridIterator<CacheDataRow> iterator(final int cacheId, final Iterator<CacheDataStore> dataIt) { + private GridCloseableIterator<CacheDataRow> iterator(final int cacheId, final Iterator<CacheDataStore> dataIt) { return new GridCloseableIteratorAdapter<CacheDataRow>() { /** */ private GridCursor<? extends CacheDataRow> cur; @@ -638,6 +642,99 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager } /** + * @param cacheId Cache ID. + * @param dataIt Data store iterator. + * @return Rows iterator + */ + private GridCloseableIterator<CacheDataRow> evictionSafeIterator(final int cacheId, final Iterator<CacheDataStore> dataIt) { + return new GridCloseableIteratorAdapter<CacheDataRow>() { + /** */ + private GridCursor<? extends CacheDataRow> cur; + + /** */ + private GridDhtLocalPartition curPart; + + /** */ + private CacheDataRow next; + + @Override protected CacheDataRow onNext() { + CacheDataRow res = next; + + next = null; + + return res; + } + + @Override protected boolean onHasNext() throws IgniteCheckedException { + if (next != null) + return true; + + while (true) { + if (cur == null) { + if (dataIt.hasNext()) { + CacheDataStore ds = dataIt.next(); + + if (!reservePartition(ds.partId())) + continue; + + cur = cacheId == UNDEFINED_CACHE_ID ? ds.cursor() : ds.cursor(cacheId); + } + else + break; + } + + if (cur.next()) { + next = cur.get(); + next.key().partition(curPart.id()); + + break; + } + else { + cur = null; + + releaseCurrentPartition(); + } + } + + return next != null; + } + + /** */ + private void releaseCurrentPartition() { + GridDhtLocalPartition p = curPart; + + assert p != null; + + curPart = null; + + p.release(); + } + + /** + * @param partId Partition number. + * @return {@code True} if partition was reserved. + */ + private boolean reservePartition(int partId) { + GridDhtLocalPartition p = grp.topology().localPartition(partId); + + if (p != null && p.reserve()) { + curPart = p; + + return true; + } + + return false; + } + + /** {@inheritDoc} */ + @Override protected void onClose() throws IgniteCheckedException { + if (curPart != null) + releaseCurrentPartition(); + } + }; + } + + /** * @param item Item. * @return Single item iterator. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/f0c62ac0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheClearDuringRebalanceTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheClearDuringRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheClearDuringRebalanceTest.java new file mode 100644 index 0000000..8561c5c --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheClearDuringRebalanceTest.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.cache.CacheMode.PARTITIONED; + +/** + * + */ +public class IgniteCacheClearDuringRebalanceTest extends GridCommonAbstractTest { + /** */ + private static final String CACHE_NAME = "cache"; + + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + TcpDiscoverySpi spi = new TcpDiscoverySpi(); + + spi.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(spi); + + cfg.setCacheConfiguration(new CacheConfiguration(CACHE_NAME) + .setCacheMode(PARTITIONED)); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testClearAll() throws Exception { + final IgniteEx node = startGrid(0); + + for (int i = 0; i < 5; i++) { + populate(node); + + try { + startGrid(1).cache(CACHE_NAME).clear(); + } + finally { + stopGrid(1); + } + } + + populate(node); + + startGrid(1).cache(CACHE_NAME).clear(); + + startGrid(2); + } + + /** + * @param node Ignite node; + * @throws Exception If failed. + */ + private void populate(final Ignite node) throws Exception { + final AtomicInteger id = new AtomicInteger(); + + final int tCnt = Runtime.getRuntime().availableProcessors(); + + final byte[] data = new byte[1024]; + + ThreadLocalRandom.current().nextBytes(data); + + GridTestUtils.runMultiThreaded(new Runnable() { + @Override public void run() { + try (IgniteDataStreamer<Object, Object> str = node.dataStreamer(CACHE_NAME)) { + int idx = id.getAndIncrement(); + + str.autoFlushFrequency(0); + + for (int i = idx; i < 500_000; i += tCnt) { + str.addData(i, data); + + if (i % (100 * tCnt) == idx) + str.flush(); + } + + str.flush(); + } + } + }, tCnt, "ldr"); + + assertEquals(500_000, node.cache(CACHE_NAME).size()); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/f0c62ac0/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java index 158b118..eec0273 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java @@ -80,6 +80,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePart import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePartitionedPreloadEventsSelfTest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePartitionedTopologyChangeSelfTest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePartitionedUnloadEventsSelfTest; +import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCacheClearDuringRebalanceTest; import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCachePartitionedBackupNodeFailureRecoveryTest; import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheAtomicNearEvictionEventSelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheAtomicNearMultiNodeSelfTest; @@ -282,6 +283,8 @@ public class IgniteCacheTestSuite2 extends TestSuite { suite.addTest(new TestSuite(CacheOptimisticTransactionsWithFilterSingleServerTest.class)); suite.addTest(new TestSuite(CacheOptimisticTransactionsWithFilterTest.class)); + suite.addTest(new TestSuite(IgniteCacheClearDuringRebalanceTest.class)); + return suite; } }
