This is an automated email from the ASF dual-hosted git repository.

sboikov pushed a commit to branch ignite-11704
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/ignite-11704 by this push:
     new e6a793b  ignite-11704
e6a793b is described below

commit e6a793bf4caf897453f40ad4df977ea7268dea4c
Author: sboikov <sboi...@apache.org>
AuthorDate: Fri Jul 19 21:20:51 2019 +0300

    ignite-11704
---
 .../processors/cache/CacheGroupContext.java        |  3 +-
 .../processors/cache/GridCacheMapEntry.java        | 88 ++++++++++++++++++-
 .../cache/IgniteCacheOffheapManager.java           |  5 +-
 .../cache/IgniteCacheOffheapManagerImpl.java       | 26 +++++-
 .../dht/topology/GridDhtLocalPartition.java        | 99 +++++++++++++++++++++-
 .../cache/persistence/GridCacheOffheapManager.java |  8 +-
 .../distributed/CacheRemoveWithTombstonesTest.java | 39 +++++++--
 7 files changed, 250 insertions(+), 18 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
index 4af5de5..7963893 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
@@ -47,6 +47,7 @@ import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffini
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopologyImpl;
 import org.apache.ignite.internal.processors.cache.persistence.DataRegion;
@@ -1307,7 +1308,7 @@ public class CacheGroupContext {
     }
 
     public boolean createTombstone(@Nullable GridDhtLocalPartition part) {
-        return part != null && supportsTombstone();
+        return part != null && supportsTombstone() && part.state() == 
GridDhtPartitionState.MOVING;
     }
 
     /**
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index adc8699..08986a9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -1717,8 +1717,12 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
                 }
             }
 
-            if (cctx.group().createTombstone(localPartition()))
-                cctx.offheap().removeWithTombstone(cctx, key, newVer, 
partition(), localPartition());
+            if (cctx.group().createTombstone(localPartition())) {
+                cctx.offheap().removeWithTombstone(cctx, key, newVer, 
localPartition());
+
+                if (!cctx.group().createTombstone(localPartition()))
+                    removeTombstone0(newVer);
+            }
             else
                 removeValue();
 
@@ -2818,6 +2822,34 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
     }
 
     /**
+     * @param tombstoneVer Tombstone version.
+     * @throws GridCacheEntryRemovedException If entry was removed.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void removeTombstone(GridCacheVersion tombstoneVer) throws 
GridCacheEntryRemovedException, IgniteCheckedException {
+        lockEntry();
+
+        try {
+            checkObsolete();
+
+            removeTombstone0(tombstoneVer);
+        }
+        finally {
+            unlockEntry();
+        }
+    }
+
+    /**
+     * @param tombstoneVer Tombstone version.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void removeTombstone0(GridCacheVersion tombstoneVer) throws 
IgniteCheckedException {
+        RemoveClosure closure = new RemoveClosure(this, tombstoneVer);
+
+        cctx.offheap().invoke(cctx, key, localPartition(), closure);
+    }
+
+    /**
      * @return {@code True} if this entry should not be evicted from cache.
      */
     protected boolean evictionDisabled() {
@@ -5720,6 +5752,58 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
     /**
      *
      */
+    private static class RemoveClosure implements 
IgniteCacheOffheapManager.OffheapInvokeClosure {
+        /** */
+        private final GridCacheMapEntry entry;
+
+        /** */
+        private final GridCacheVersion ver;
+
+        /** */
+        private IgniteTree.OperationType op;
+
+        /** */
+        private CacheDataRow oldRow;
+
+        public RemoveClosure(GridCacheMapEntry entry, GridCacheVersion ver) {
+            this.entry = entry;
+            this.ver = ver;
+        }
+
+        /** {@inheritDoc} */
+        @Override public @Nullable CacheDataRow oldRow() {
+            return oldRow;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void call(@Nullable CacheDataRow row) throws 
IgniteCheckedException {
+            if (row == null || !ver.equals(row.version())) {
+                op = IgniteTree.OperationType.NOOP;
+
+                return;
+            }
+
+            row.key(entry.key);
+
+            oldRow = row;
+
+           op = IgniteTree.OperationType.REMOVE;
+        }
+
+        /** {@inheritDoc} */
+        @Override public CacheDataRow newRow() {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteTree.OperationType operationType() {
+            return op;
+        }
+    }
+
+    /**
+     *
+     */
     private static class UpdateClosure implements 
IgniteCacheOffheapManager.OffheapInvokeClosure {
         /** */
         private final GridCacheMapEntry entry;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index c11e909..c883343 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -406,7 +406,6 @@ public interface IgniteCacheOffheapManager {
         GridCacheContext cctx,
         KeyCacheObject key,
         GridCacheVersion ver,
-        int partId,
         GridDhtLocalPartition part
     ) throws IgniteCheckedException;
 
@@ -454,6 +453,8 @@ public interface IgniteCacheOffheapManager {
      */
     public GridIterator<CacheDataRow> partitionIterator(final int part, 
boolean withTombstones) throws IgniteCheckedException;
 
+    public GridIterator<CacheDataRow> tombstonesIterator(final int part) 
throws IgniteCheckedException;
+
     /**
      * @param part Partition number.
      * @param topVer Topology version.
@@ -917,7 +918,7 @@ public interface IgniteCacheOffheapManager {
          * @param partId Partition number.
          * @throws IgniteCheckedException If failed.
          */
-        public void removeWithTombstone(GridCacheContext cctx, KeyCacheObject 
key, GridCacheVersion ver, int partId) throws IgniteCheckedException;
+        public void removeWithTombstone(GridCacheContext cctx, KeyCacheObject 
key, GridCacheVersion ver, GridDhtLocalPartition part) throws 
IgniteCheckedException;
 
         /**
          * @param cctx Cache context.
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index c45e3b1..3597de7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -632,9 +632,10 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
         GridCacheContext cctx,
         KeyCacheObject key,
         GridCacheVersion ver,
-        int partId,
         GridDhtLocalPartition part) throws IgniteCheckedException {
-        dataStore(part).removeWithTombstone(cctx, key, ver, partId);
+        assert part != null;
+
+        dataStore(part).removeWithTombstone(cctx, key, ver, part);
     }
 
     @Override public boolean isTombstone(CacheDataRow row) throws 
IgniteCheckedException {
@@ -915,6 +916,19 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
         return iterator(CU.UNDEFINED_CACHE_ID, singletonIterator(data), null, 
null, withTombstones);
     }
 
+    /** {@inheritDoc} */
+    @Override public GridIterator<CacheDataRow> tombstonesIterator(int part) {
+        assert locCacheDataStore == null;
+
+        CacheDataStore data = partitionData(part);
+
+        if (data == null)
+            return new GridEmptyCloseableIterator<>();
+
+        // TODO IGNITE-11704.
+        return iterator(CU.UNDEFINED_CACHE_ID, singletonIterator(data), null, 
null, true);
+    }
+
     /**
      *
      * @param cacheId Cache ID.
@@ -2730,7 +2744,11 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
         }
 
         /** {@inheritDoc} */
-        @Override public void removeWithTombstone(GridCacheContext cctx, 
KeyCacheObject key, GridCacheVersion ver, int partId) throws 
IgniteCheckedException {
+        @Override public void removeWithTombstone(
+                GridCacheContext cctx,
+                KeyCacheObject key,
+                GridCacheVersion ver,
+                GridDhtLocalPartition part) throws IgniteCheckedException {
             if (!busyLock.enterBusy())
                 throw new NodeStoppingException("Operation has been cancelled 
(node is stopping).");
 
@@ -2745,6 +2763,8 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
 
                 assert c.operationType() == PUT || c.operationType() == 
IN_PLACE : c.operationType();
 
+                part.tombstoneCreated();
+
                 if (!isTombstone(c.oldRow))
                     cctx.tombstoneCreated();
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
index e3e6435..13b1761 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
@@ -44,6 +44,7 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMapImpl;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
+import 
org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
 import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
 import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
@@ -173,6 +174,9 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
     /** Set if topology update sequence should be updated on partition 
destroy. */
     private boolean updateSeqOnDestroy;
 
+    /** */
+    private volatile boolean tombstoneCreated;
+
     /**
      * @param ctx Context.
      * @param grp Cache group.
@@ -619,8 +623,12 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
 
             assert partState == MOVING || partState == LOST;
 
-            if (casState(state, OWNING))
+            if (casState(state, OWNING)) {
+                if (grp.supportsTombstone())
+                    clearTombstones();
+
                 return true;
+            }
         }
     }
 
@@ -1117,6 +1125,95 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
     }
 
     /**
+     *
+     */
+    public void tombstoneCreated() {
+        tombstoneCreated = true;
+    }
+
+    /**
+     *
+     */
+    private void submitClearTombstones() {
+        if (tombstoneCreated)
+            
grp.shared().kernalContext().closure().runLocalSafe(this::clearTombstones, 
true);
+    }
+
+    /**
+     *
+     */
+    private void clearTombstones() {
+        final int stopCheckingFreq = 1000;
+
+        CacheMapHolder hld = grp.sharedGroup() ? null : singleCacheEntryMap;
+
+        try {
+            GridIterator<CacheDataRow> it0 = 
grp.offheap().tombstonesIterator(id);
+
+            int cntr = 0;
+
+            while (it0.hasNext()) {
+                CacheDataRow row = it0.next();
+
+                if (!grp.offheap().isTombstone(row))
+                    continue;
+
+                assert row.key() != null;
+                assert row.version() != null;
+
+                if (grp.sharedGroup() && (hld == null || hld.cctx.cacheId() != 
row.cacheId()))
+                    hld = cacheMapHolder(ctx.cacheContext(row.cacheId()));
+
+                assert hld != null;
+
+                ctx.database().checkpointReadLock();
+
+                try {
+                    while (true) {
+                        GridCacheMapEntry cached = null;
+
+                        try {
+                            cached = putEntryIfObsoleteOrAbsent(
+                                hld,
+                                hld.cctx,
+                                grp.affinity().lastVersion(),
+                                row.key(),
+                                true,
+                                false);
+
+                            cached.removeTombstone(row.version());
+
+                            cached.touch();
+
+                            break;
+                        }
+                        catch (GridCacheEntryRemovedException e) {
+                            cached = null;
+                        }
+                        finally {
+                            if (cached != null)
+                                cached.touch();
+                        }
+                    }
+                }
+                finally {
+                    ctx.database().checkpointReadUnlock();
+                }
+
+                cntr++;
+
+                if (cntr % stopCheckingFreq == 0) {
+                    if (ctx.kernalContext().isStopping() || state() != OWNING)
+                        break;
+                }
+            }
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed clear tombstone entries for partition: " + 
id, e);
+        }
+    }
+
+    /**
      * Removes all entries and rows from this partition.
      *
      * @return Number of rows cleared from page memory.
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index d4bcbd8..427c0b9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -2423,12 +2423,16 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
         }
 
         /** {@inheritDoc} */
-        @Override public void removeWithTombstone(GridCacheContext cctx, 
KeyCacheObject key, GridCacheVersion ver, int partId) throws 
IgniteCheckedException {
+        @Override public void removeWithTombstone(
+                GridCacheContext cctx,
+                KeyCacheObject key,
+                GridCacheVersion ver,
+                GridDhtLocalPartition part) throws IgniteCheckedException {
             assert ctx.database().checkpointLockIsHeldByThread();
 
             CacheDataStore delegate = init0(false);
 
-            delegate.removeWithTombstone(cctx, key, ver, partId);
+            delegate.removeWithTombstone(cctx, key, ver, part);
         }
 
         /** {@inheritDoc} */
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheRemoveWithTombstonesTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheRemoveWithTombstonesTest.java
index 331fb64..05962c4 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheRemoveWithTombstonesTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheRemoveWithTombstonesTest.java
@@ -17,9 +17,11 @@
 
 package org.apache.ignite.internal.processors.cache.distributed;
 
+import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
@@ -28,8 +30,11 @@ import org.apache.ignite.configuration.WALMode;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.TestRecordingCommunicationSpi;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
+import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.spi.metric.LongMetric;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -45,7 +50,7 @@ import static 
org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
 import static 
org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
-import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
+import static org.apache.ignite.cache.CacheRebalanceMode.ASYNC;
 import static 
org.apache.ignite.internal.processors.metric.impl.MetricUtils.cacheMetricsRegistryName;
 
 /**
@@ -122,6 +127,8 @@ public class CacheRemoveWithTombstonesTest extends 
GridCommonAbstractTest {
     public void testRemoveAndRebalanceRaceTxWithPersistence() throws Exception 
{
         persistence = true;
 
+        cleanPersistenceDir();
+
         testRemoveAndRebalanceRace(TRANSACTIONAL, true);
     }
 
@@ -169,8 +176,7 @@ public class CacheRemoveWithTombstonesTest extends 
GridCommonAbstractTest {
 
         cache0.putAll(map);
 
-        
TestRecordingCommunicationSpi.spi(ignite0).blockMessages(GridDhtPartitionSupplyMessageV2.class,
-                getTestIgniteInstanceName(1));
+        blockRebalance(ignite0);
 
         IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new 
Callable<Object>() {
             @Override public Object call() throws Exception {
@@ -180,6 +186,12 @@ public class CacheRemoveWithTombstonesTest extends 
GridCommonAbstractTest {
 
         IgniteEx ignite1 = (IgniteEx)fut.get(30_000);
 
+        if (persistence) {
+            ignite0.cluster().baselineAutoAdjustEnabled(false);
+
+            ignite0.cluster().setBaselineTopology(2);
+        }
+
         Set<Integer> removed = new HashSet<>();
 
         // Do removes while rebalance is in progress.
@@ -195,7 +207,7 @@ public class CacheRemoveWithTombstonesTest extends 
GridCommonAbstractTest {
                 cacheMetricsRegistryName(DEFAULT_CACHE_NAME, 
false)).findMetric("Tombstones");
 
         // On first node there should not be tombstones.
-        //assertEquals(0, tombstoneMetric0.get());
+        assertEquals(0, tombstoneMetric0.get());
 
         if (expTombstone)
             assertEquals(removed.size(), tombstoneMetric1.get());
@@ -213,7 +225,7 @@ public class CacheRemoveWithTombstonesTest extends 
GridCommonAbstractTest {
 
         assert !removed.isEmpty();
 
-        //assertEquals(0, tombstoneMetric0.get());
+        assertEquals(0, tombstoneMetric0.get());
 
         if (expTombstone)
             assertEquals(removed.size(), tombstoneMetric1.get());
@@ -242,6 +254,19 @@ public class CacheRemoveWithTombstonesTest extends 
GridCommonAbstractTest {
 
         assertEquals(0, tombstoneMetric1.get());
     }
+    /**
+     *
+     */
+    private void blockRebalance(Ignite node) {
+        final int grpId = groupIdForCache(ignite(0), DEFAULT_CACHE_NAME);
+
+        TestRecordingCommunicationSpi.spi(node).blockMessages(new 
IgniteBiPredicate<ClusterNode, Message>() {
+            @Override public boolean apply(ClusterNode node, Message msg) {
+                return (msg instanceof GridDhtPartitionSupplyMessage)
+                        && ((GridCacheGroupIdMessage)msg).groupId() == grpId;
+            }
+        });
+    }
 
     /**
      * @param atomicityMode Cache atomicity mode.
@@ -253,7 +278,7 @@ public class CacheRemoveWithTombstonesTest extends 
GridCommonAbstractTest {
         ccfg.setAtomicityMode(atomicityMode);
         ccfg.setCacheMode(PARTITIONED);
         ccfg.setBackups(2);
-        ccfg.setRebalanceMode(SYNC);
+        ccfg.setRebalanceMode(ASYNC);
         
ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
 
         return ccfg;

Reply via email to