This is an automated email from the ASF dual-hosted git repository. sboikov pushed a commit to branch ignite-11704 in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-11704 by this push: new e6a793b ignite-11704 e6a793b is described below commit e6a793bf4caf897453f40ad4df977ea7268dea4c Author: sboikov <sboi...@apache.org> AuthorDate: Fri Jul 19 21:20:51 2019 +0300 ignite-11704 --- .../processors/cache/CacheGroupContext.java | 3 +- .../processors/cache/GridCacheMapEntry.java | 88 ++++++++++++++++++- .../cache/IgniteCacheOffheapManager.java | 5 +- .../cache/IgniteCacheOffheapManagerImpl.java | 26 +++++- .../dht/topology/GridDhtLocalPartition.java | 99 +++++++++++++++++++++- .../cache/persistence/GridCacheOffheapManager.java | 8 +- .../distributed/CacheRemoveWithTombstonesTest.java | 39 +++++++-- 7 files changed, 250 insertions(+), 18 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java index 4af5de5..7963893 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java @@ -47,6 +47,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffini import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopologyImpl; import org.apache.ignite.internal.processors.cache.persistence.DataRegion; @@ -1307,7 +1308,7 @@ public class CacheGroupContext { } public boolean createTombstone(@Nullable GridDhtLocalPartition part) { - return part != null && supportsTombstone(); + return part != null && supportsTombstone() && part.state() == GridDhtPartitionState.MOVING; } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index adc8699..08986a9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -1717,8 +1717,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } } - if (cctx.group().createTombstone(localPartition())) - cctx.offheap().removeWithTombstone(cctx, key, newVer, partition(), localPartition()); + if (cctx.group().createTombstone(localPartition())) { + cctx.offheap().removeWithTombstone(cctx, key, newVer, localPartition()); + + if (!cctx.group().createTombstone(localPartition())) + removeTombstone0(newVer); + } else removeValue(); @@ -2818,6 +2822,34 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } /** + * @param tombstoneVer Tombstone version. + * @throws GridCacheEntryRemovedException If entry was removed. + * @throws IgniteCheckedException If failed. + */ + public void removeTombstone(GridCacheVersion tombstoneVer) throws GridCacheEntryRemovedException, IgniteCheckedException { + lockEntry(); + + try { + checkObsolete(); + + removeTombstone0(tombstoneVer); + } + finally { + unlockEntry(); + } + } + + /** + * @param tombstoneVer Tombstone version. + * @throws IgniteCheckedException If failed. + */ + private void removeTombstone0(GridCacheVersion tombstoneVer) throws IgniteCheckedException { + RemoveClosure closure = new RemoveClosure(this, tombstoneVer); + + cctx.offheap().invoke(cctx, key, localPartition(), closure); + } + + /** * @return {@code True} if this entry should not be evicted from cache. */ protected boolean evictionDisabled() { @@ -5720,6 +5752,58 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme /** * */ + private static class RemoveClosure implements IgniteCacheOffheapManager.OffheapInvokeClosure { + /** */ + private final GridCacheMapEntry entry; + + /** */ + private final GridCacheVersion ver; + + /** */ + private IgniteTree.OperationType op; + + /** */ + private CacheDataRow oldRow; + + public RemoveClosure(GridCacheMapEntry entry, GridCacheVersion ver) { + this.entry = entry; + this.ver = ver; + } + + /** {@inheritDoc} */ + @Override public @Nullable CacheDataRow oldRow() { + return oldRow; + } + + /** {@inheritDoc} */ + @Override public void call(@Nullable CacheDataRow row) throws IgniteCheckedException { + if (row == null || !ver.equals(row.version())) { + op = IgniteTree.OperationType.NOOP; + + return; + } + + row.key(entry.key); + + oldRow = row; + + op = IgniteTree.OperationType.REMOVE; + } + + /** {@inheritDoc} */ + @Override public CacheDataRow newRow() { + return null; + } + + /** {@inheritDoc} */ + @Override public IgniteTree.OperationType operationType() { + return op; + } + } + + /** + * + */ private static class UpdateClosure implements IgniteCacheOffheapManager.OffheapInvokeClosure { /** */ private final GridCacheMapEntry entry; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java index c11e909..c883343 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java @@ -406,7 +406,6 @@ public interface IgniteCacheOffheapManager { GridCacheContext cctx, KeyCacheObject key, GridCacheVersion ver, - int partId, GridDhtLocalPartition part ) throws IgniteCheckedException; @@ -454,6 +453,8 @@ public interface IgniteCacheOffheapManager { */ public GridIterator<CacheDataRow> partitionIterator(final int part, boolean withTombstones) throws IgniteCheckedException; + public GridIterator<CacheDataRow> tombstonesIterator(final int part) throws IgniteCheckedException; + /** * @param part Partition number. * @param topVer Topology version. @@ -917,7 +918,7 @@ public interface IgniteCacheOffheapManager { * @param partId Partition number. * @throws IgniteCheckedException If failed. */ - public void removeWithTombstone(GridCacheContext cctx, KeyCacheObject key, GridCacheVersion ver, int partId) throws IgniteCheckedException; + public void removeWithTombstone(GridCacheContext cctx, KeyCacheObject key, GridCacheVersion ver, GridDhtLocalPartition part) throws IgniteCheckedException; /** * @param cctx Cache context. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index c45e3b1..3597de7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@ -632,9 +632,10 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager GridCacheContext cctx, KeyCacheObject key, GridCacheVersion ver, - int partId, GridDhtLocalPartition part) throws IgniteCheckedException { - dataStore(part).removeWithTombstone(cctx, key, ver, partId); + assert part != null; + + dataStore(part).removeWithTombstone(cctx, key, ver, part); } @Override public boolean isTombstone(CacheDataRow row) throws IgniteCheckedException { @@ -915,6 +916,19 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager return iterator(CU.UNDEFINED_CACHE_ID, singletonIterator(data), null, null, withTombstones); } + /** {@inheritDoc} */ + @Override public GridIterator<CacheDataRow> tombstonesIterator(int part) { + assert locCacheDataStore == null; + + CacheDataStore data = partitionData(part); + + if (data == null) + return new GridEmptyCloseableIterator<>(); + + // TODO IGNITE-11704. + return iterator(CU.UNDEFINED_CACHE_ID, singletonIterator(data), null, null, true); + } + /** * * @param cacheId Cache ID. @@ -2730,7 +2744,11 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager } /** {@inheritDoc} */ - @Override public void removeWithTombstone(GridCacheContext cctx, KeyCacheObject key, GridCacheVersion ver, int partId) throws IgniteCheckedException { + @Override public void removeWithTombstone( + GridCacheContext cctx, + KeyCacheObject key, + GridCacheVersion ver, + GridDhtLocalPartition part) throws IgniteCheckedException { if (!busyLock.enterBusy()) throw new NodeStoppingException("Operation has been cancelled (node is stopping)."); @@ -2745,6 +2763,8 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager assert c.operationType() == PUT || c.operationType() == IN_PLACE : c.operationType(); + part.tombstoneCreated(); + if (!isTombstone(c.oldRow)) cctx.tombstoneCreated(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java index e3e6435..13b1761 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java @@ -44,6 +44,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMapImpl; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; +import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; import org.apache.ignite.internal.processors.cache.GridCacheMapEntry; import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; @@ -173,6 +174,9 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements /** Set if topology update sequence should be updated on partition destroy. */ private boolean updateSeqOnDestroy; + /** */ + private volatile boolean tombstoneCreated; + /** * @param ctx Context. * @param grp Cache group. @@ -619,8 +623,12 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements assert partState == MOVING || partState == LOST; - if (casState(state, OWNING)) + if (casState(state, OWNING)) { + if (grp.supportsTombstone()) + clearTombstones(); + return true; + } } } @@ -1117,6 +1125,95 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements } /** + * + */ + public void tombstoneCreated() { + tombstoneCreated = true; + } + + /** + * + */ + private void submitClearTombstones() { + if (tombstoneCreated) + grp.shared().kernalContext().closure().runLocalSafe(this::clearTombstones, true); + } + + /** + * + */ + private void clearTombstones() { + final int stopCheckingFreq = 1000; + + CacheMapHolder hld = grp.sharedGroup() ? null : singleCacheEntryMap; + + try { + GridIterator<CacheDataRow> it0 = grp.offheap().tombstonesIterator(id); + + int cntr = 0; + + while (it0.hasNext()) { + CacheDataRow row = it0.next(); + + if (!grp.offheap().isTombstone(row)) + continue; + + assert row.key() != null; + assert row.version() != null; + + if (grp.sharedGroup() && (hld == null || hld.cctx.cacheId() != row.cacheId())) + hld = cacheMapHolder(ctx.cacheContext(row.cacheId())); + + assert hld != null; + + ctx.database().checkpointReadLock(); + + try { + while (true) { + GridCacheMapEntry cached = null; + + try { + cached = putEntryIfObsoleteOrAbsent( + hld, + hld.cctx, + grp.affinity().lastVersion(), + row.key(), + true, + false); + + cached.removeTombstone(row.version()); + + cached.touch(); + + break; + } + catch (GridCacheEntryRemovedException e) { + cached = null; + } + finally { + if (cached != null) + cached.touch(); + } + } + } + finally { + ctx.database().checkpointReadUnlock(); + } + + cntr++; + + if (cntr % stopCheckingFreq == 0) { + if (ctx.kernalContext().isStopping() || state() != OWNING) + break; + } + } + } + catch (IgniteCheckedException e) { + U.error(log, "Failed clear tombstone entries for partition: " + id, e); + } + } + + /** * Removes all entries and rows from this partition. * * @return Number of rows cleared from page memory. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java index d4bcbd8..427c0b9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java @@ -2423,12 +2423,16 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple } /** {@inheritDoc} */ - @Override public void removeWithTombstone(GridCacheContext cctx, KeyCacheObject key, GridCacheVersion ver, int partId) throws IgniteCheckedException { + @Override public void removeWithTombstone( + GridCacheContext cctx, + KeyCacheObject key, + GridCacheVersion ver, + GridDhtLocalPartition part) throws IgniteCheckedException { assert ctx.database().checkpointLockIsHeldByThread(); CacheDataStore delegate = init0(false); - delegate.removeWithTombstone(cctx, key, ver, partId); + delegate.removeWithTombstone(cctx, key, ver, part); } /** {@inheritDoc} */ diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheRemoveWithTombstonesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheRemoveWithTombstonesTest.java index 331fb64..05962c4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheRemoveWithTombstonesTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheRemoveWithTombstonesTest.java @@ -17,9 +17,11 @@ package org.apache.ignite.internal.processors.cache.distributed; +import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; @@ -28,8 +30,11 @@ import org.apache.ignite.configuration.WALMode; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.TestRecordingCommunicationSpi; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2; +import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage; import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.metric.LongMetric; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; @@ -45,7 +50,7 @@ import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT; import static org.apache.ignite.cache.CacheMode.PARTITIONED; -import static org.apache.ignite.cache.CacheRebalanceMode.SYNC; +import static org.apache.ignite.cache.CacheRebalanceMode.ASYNC; import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.cacheMetricsRegistryName; /** @@ -122,6 +127,8 @@ public class CacheRemoveWithTombstonesTest extends GridCommonAbstractTest { public void testRemoveAndRebalanceRaceTxWithPersistence() throws Exception { persistence = true; + cleanPersistenceDir(); + testRemoveAndRebalanceRace(TRANSACTIONAL, true); } @@ -169,8 +176,7 @@ public class CacheRemoveWithTombstonesTest extends GridCommonAbstractTest { cache0.putAll(map); - TestRecordingCommunicationSpi.spi(ignite0).blockMessages(GridDhtPartitionSupplyMessageV2.class, - getTestIgniteInstanceName(1)); + blockRebalance(ignite0); IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Object>() { @Override public Object call() throws Exception { @@ -180,6 +186,12 @@ public class CacheRemoveWithTombstonesTest extends GridCommonAbstractTest { IgniteEx ignite1 = (IgniteEx)fut.get(30_000); + if (persistence) { + ignite0.cluster().baselineAutoAdjustEnabled(false); + + ignite0.cluster().setBaselineTopology(2); + } + Set<Integer> removed = new HashSet<>(); // Do removes while rebalance is in progress. @@ -195,7 +207,7 @@ public class CacheRemoveWithTombstonesTest extends GridCommonAbstractTest { cacheMetricsRegistryName(DEFAULT_CACHE_NAME, false)).findMetric("Tombstones"); // On first node there should not be tombstones. - //assertEquals(0, tombstoneMetric0.get()); + assertEquals(0, tombstoneMetric0.get()); if (expTombstone) assertEquals(removed.size(), tombstoneMetric1.get()); @@ -213,7 +225,7 @@ public class CacheRemoveWithTombstonesTest extends GridCommonAbstractTest { assert !removed.isEmpty(); - //assertEquals(0, tombstoneMetric0.get()); + assertEquals(0, tombstoneMetric0.get()); if (expTombstone) assertEquals(removed.size(), tombstoneMetric1.get()); @@ -242,6 +254,19 @@ public class CacheRemoveWithTombstonesTest extends GridCommonAbstractTest { assertEquals(0, tombstoneMetric1.get()); } + /** + * + */ + private void blockRebalance(Ignite node) { + final int grpId = groupIdForCache(ignite(0), DEFAULT_CACHE_NAME); + + TestRecordingCommunicationSpi.spi(node).blockMessages(new IgniteBiPredicate<ClusterNode, Message>() { + @Override public boolean apply(ClusterNode node, Message msg) { + return (msg instanceof GridDhtPartitionSupplyMessage) + && ((GridCacheGroupIdMessage)msg).groupId() == grpId; + } + }); + } /** * @param atomicityMode Cache atomicity mode. @@ -253,7 +278,7 @@ public class CacheRemoveWithTombstonesTest extends GridCommonAbstractTest { ccfg.setAtomicityMode(atomicityMode); ccfg.setCacheMode(PARTITIONED); ccfg.setBackups(2); - ccfg.setRebalanceMode(SYNC); + ccfg.setRebalanceMode(ASYNC); ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); return ccfg;