This is an automated email from the ASF dual-hosted git repository.

sboikov pushed a commit to branch ignite-11704
in repository https://gitbox.apache.org/repos/asf/ignite.git

commit 5a05291a9414ef8f8b37ec3d1e8a2e0a9ce4c70f
Author: sboikov <sboi...@apache.org>
AuthorDate: Thu Jul 11 23:45:40 2019 +0300

    ignite-11704
---
 .../processors/cache/CacheMetricsImpl.java         |  14 ++
 .../processors/cache/GridCacheContext.java         |  14 ++
 .../cache/IgniteCacheOffheapManager.java           |   6 +-
 .../cache/IgniteCacheOffheapManagerImpl.java       |  69 ++++--
 .../dht/topology/GridDhtLocalPartition.java        |   2 +-
 .../cache/persistence/GridCacheOffheapManager.java |   8 +-
 .../verify/CollectConflictPartitionKeysTask.java   |   2 +-
 .../cache/verify/VerifyBackupPartitionsTask.java   |   2 +-
 .../cache/verify/VerifyBackupPartitionsTaskV2.java |   2 +-
 ...orFindAndDeleteGarbageInPersistenceClosure.java |   2 +-
 .../processors/cache/IgniteCacheGroupsTest.java    |  10 +-
 .../distributed/CacheRemoveWithTombstonesTest.java | 261 +++++++++++++++++++++
 .../cache/persistence/db/IgnitePdsWithTtlTest.java |   2 +-
 .../ignite/testsuites/IgniteCacheTestSuite9.java   |   3 +
 14 files changed, 355 insertions(+), 42 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
index ab1c4a1..5808db6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
@@ -29,6 +29,7 @@ import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.Grid
 import 
org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore;
 import org.apache.ignite.internal.processors.metric.MetricRegistry;
 import org.apache.ignite.internal.processors.metric.impl.HitRateMetric;
+import org.apache.ignite.internal.processors.metric.impl.LongAdderMetricImpl;
 import org.apache.ignite.internal.processors.metric.impl.LongMetricImpl;
 import org.apache.ignite.internal.processors.metric.impl.MetricUtils;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -161,6 +162,9 @@ public class CacheMetricsImpl implements CacheMetrics {
     /** Number of currently clearing partitions for rebalancing. */
     private final LongMetricImpl rebalanceClearingPartitions;
 
+    /** */
+    private LongAdderMetricImpl tombstones;
+
     /** Cache metrics. */
     @GridToStringExclude
     private transient CacheMetricsImpl delegate;
@@ -312,6 +316,8 @@ public class CacheMetricsImpl implements CacheMetrics {
 
         rebalanceClearingPartitions = 
mreg.metric("RebalanceClearingPartitionsLeft",
             "Number of partitions need to be cleared before actual rebalance 
start.");
+
+        tombstones = mreg.longAdderMetric("Tombstones", "Number of tombstone 
entries");
     }
 
     /**
@@ -1009,6 +1015,14 @@ public class CacheMetricsImpl implements CacheMetrics {
             delegate.addPutAndGetTimeNanos(duration);
     }
 
+    public void tombstoneCreated() {
+        tombstones.increment();
+    }
+
+    public void tombstoneRemoved() {
+        tombstones.decrement();
+    }
+
     /** {@inheritDoc} */
     @Override public String getKeyType() {
         CacheConfiguration ccfg = cctx.config();
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 87c6253..5ac56db 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -2372,6 +2372,20 @@ public class GridCacheContext<K, V> implements 
Externalizable {
         }
     }
 
+    public void tombstoneCreated() {
+        GridCacheAdapter cache = this.cache;
+
+        if (cache != null)
+            cache.metrics0().tombstoneCreated();
+    }
+
+    public void tombstoneRemoved() {
+        GridCacheAdapter cache = this.cache;
+
+        if (cache != null)
+            cache.metrics0().tombstoneRemoved();
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return "GridCacheContext: " + name();
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index 4221a2d..c11e909 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -452,7 +452,7 @@ public interface IgniteCacheOffheapManager {
      * @return Iterator for given partition.
      * @throws IgniteCheckedException If failed.
      */
-    public GridIterator<CacheDataRow> partitionIterator(final int part) throws 
IgniteCheckedException;
+    public GridIterator<CacheDataRow> partitionIterator(final int part, 
boolean withTombstones) throws IgniteCheckedException;
 
     /**
      * @param part Partition number.
@@ -963,7 +963,7 @@ public interface IgniteCacheOffheapManager {
          * @return Data cursor.
          * @throws IgniteCheckedException If failed.
          */
-        public GridCursor<? extends CacheDataRow> cursor() throws 
IgniteCheckedException;
+        public GridCursor<? extends CacheDataRow> cursor(boolean 
withTombstones) throws IgniteCheckedException;
 
         /**
          * @param x Implementation specific argument, {@code null} always 
means that we need to return full detached data row.
@@ -984,7 +984,7 @@ public interface IgniteCacheOffheapManager {
          * @return Data cursor.
          * @throws IgniteCheckedException If failed.
          */
-        public GridCursor<? extends CacheDataRow> cursor(int cacheId) throws 
IgniteCheckedException;
+        public GridCursor<? extends CacheDataRow> cursor(int cacheId, boolean 
withTombstones) throws IgniteCheckedException;
 
         /**
          * @param cacheId Cache ID.
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index e628345..c45e3b1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -747,8 +747,8 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
         GridCacheVersion obsoleteVer = null;
 
         try (GridCloseableIterator<CacheDataRow> it = grp.isLocal() ?
-            iterator(cctx.cacheId(), cacheDataStores().iterator(), null, null) 
:
-            evictionSafeIterator(cctx.cacheId(), 
cacheDataStores().iterator())) {
+            iterator(cctx.cacheId(), cacheDataStores().iterator(), null, null, 
true) :
+            evictionSafeIterator(cctx.cacheId(), cacheDataStores().iterator(), 
true)) {
             while (it.hasNext()) {
                 cctx.shared().database().checkpointReadLock();
 
@@ -891,7 +891,7 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
         @Nullable MvccSnapshot mvccSnapshot,
         Boolean dataPageScanEnabled
     ) {
-        return iterator(cacheId, cacheData(primary, backups, topVer), 
mvccSnapshot, dataPageScanEnabled);
+        return iterator(cacheId, cacheData(primary, backups, topVer), 
mvccSnapshot, dataPageScanEnabled, false);
     }
 
     /** {@inheritDoc} */
@@ -902,17 +902,17 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
         if (data == null)
             return new GridEmptyCloseableIterator<>();
 
-        return iterator(cacheId, singletonIterator(data), mvccSnapshot, 
dataPageScanEnabled);
+        return iterator(cacheId, singletonIterator(data), mvccSnapshot, 
dataPageScanEnabled, false);
     }
 
     /** {@inheritDoc} */
-    @Override public GridIterator<CacheDataRow> partitionIterator(int part) {
+    @Override public GridIterator<CacheDataRow> partitionIterator(int part, 
boolean withTombstones) {
         CacheDataStore data = partitionData(part);
 
         if (data == null)
             return new GridEmptyCloseableIterator<>();
 
-        return iterator(CU.UNDEFINED_CACHE_ID, singletonIterator(data), null, 
null);
+        return iterator(CU.UNDEFINED_CACHE_ID, singletonIterator(data), null, 
null, withTombstones);
     }
 
     /**
@@ -926,7 +926,8 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
     private GridCloseableIterator<CacheDataRow> iterator(final int cacheId,
         final Iterator<CacheDataStore> dataIt,
         final MvccSnapshot mvccSnapshot,
-        Boolean dataPageScanEnabled
+        Boolean dataPageScanEnabled,
+        boolean withTombstones
     ) {
         return new GridCloseableIteratorAdapter<CacheDataRow>() {
             /** */
@@ -962,7 +963,7 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
 
                             try {
                                 if (mvccSnapshot == null)
-                                    cur = cacheId == CU.UNDEFINED_CACHE_ID ? 
ds.cursor() : ds.cursor(cacheId);
+                                    cur = cacheId == CU.UNDEFINED_CACHE_ID ? 
ds.cursor(withTombstones) : ds.cursor(cacheId, withTombstones);
                                 else {
                                     cur = cacheId == CU.UNDEFINED_CACHE_ID ?
                                         ds.cursor(mvccSnapshot) : 
ds.cursor(cacheId, mvccSnapshot);
@@ -996,7 +997,10 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
      * @param dataIt Data store iterator.
      * @return Rows iterator
      */
-    private GridCloseableIterator<CacheDataRow> evictionSafeIterator(final int 
cacheId, final Iterator<CacheDataStore> dataIt) {
+    private GridCloseableIterator<CacheDataRow> evictionSafeIterator(
+            final int cacheId,
+            final Iterator<CacheDataStore> dataIt,
+            boolean withTombstones) {
         return new GridCloseableIteratorAdapter<CacheDataRow>() {
             /** */
             private GridCursor<? extends CacheDataRow> cur;
@@ -1027,7 +1031,7 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
                             if (!reservePartition(ds.partId()))
                                 continue;
 
-                            cur = cacheId == CU.UNDEFINED_CACHE_ID ? 
ds.cursor() : ds.cursor(cacheId);
+                            cur = cacheId == CU.UNDEFINED_CACHE_ID ? 
ds.cursor(withTombstones) : ds.cursor(cacheId, withTombstones);
                         }
                         else
                             break;
@@ -1700,17 +1704,19 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
                 case REMOVE: {
                     CacheDataRow oldRow = c.oldRow();
 
-                    finishRemove(cctx, row.key(), oldRow, true);
+                    finishRemove(cctx, row.key(), oldRow, null);
 
                     break;
                 }
 
                 case IN_PLACE:
-                    if (isTombstone(c.newRow()))
-                        decrementSize(cctx.cacheId());
+                    assert !isTombstone(c.newRow());
+
+                    if (isTombstone(c.oldRow())) {
+                        cctx.tombstoneRemoved();
 
-                    if (isTombstone(c.oldRow()))
                         incrementSize(cctx.cacheId());
+                    }
 
                     break;
 
@@ -2592,7 +2598,10 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
          */
         private void finishUpdate(GridCacheContext cctx, CacheDataRow newRow, 
@Nullable CacheDataRow oldRow)
             throws IgniteCheckedException {
-            boolean oldNull = oldRow == null || isTombstone(oldRow);
+            assert !isTombstone(newRow);
+
+            boolean oldTombstone = isTombstone(oldRow);
+            boolean oldNull = oldRow == null || oldTombstone;
 
             if (oldNull)
                 incrementSize(cctx.cacheId());
@@ -2614,6 +2623,9 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
             }
 
             updateIgfsMetrics(cctx, key, (oldNull ? null : oldRow.value()), 
newRow.value());
+
+            if (oldTombstone)
+                cctx.tombstoneRemoved();
         }
 
         /**
@@ -2656,7 +2668,7 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
 
                 CacheDataRow oldRow = dataTree.remove(new SearchRow(cacheId, 
key));
 
-                finishRemove(cctx, key, oldRow, true);
+                finishRemove(cctx, key, oldRow, null);
             }
             finally {
                 busyLock.leaveBusy();
@@ -2733,7 +2745,10 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
 
                 assert c.operationType() == PUT || c.operationType() == 
IN_PLACE : c.operationType();
 
-                finishRemove(cctx, key, c.oldRow, c.operationType() == PUT);
+                if (!isTombstone(c.oldRow))
+                    cctx.tombstoneCreated();
+
+                finishRemove(cctx, key, c.oldRow, c.newRow);
             }
             finally {
                 busyLock.leaveBusy();
@@ -2749,8 +2764,9 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
         private void finishRemove(GridCacheContext cctx,
             KeyCacheObject key,
             @Nullable CacheDataRow oldRow,
-            boolean removeFromStore) throws IgniteCheckedException {
-            boolean oldNull = oldRow == null || isTombstone(oldRow);
+            @Nullable CacheDataRow tombstoneRow) throws IgniteCheckedException 
{
+            boolean oldTombstone = isTombstone(oldRow);
+            boolean oldNull = oldRow == null || oldTombstone;
 
             if (!oldNull) {
                 clearPendingEntries(cctx, oldRow);
@@ -2763,10 +2779,13 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
             if (qryMgr.enabled())
                 qryMgr.remove(key, oldNull ? null : oldRow);
 
-            if (oldRow != null && removeFromStore)
+            if (oldRow != null && (tombstoneRow == null || tombstoneRow.link() 
!= oldRow.link()))
                 rowStore.removeRow(oldRow.link(), grp.statisticsHolderData());
 
             updateIgfsMetrics(cctx, key, (oldNull ? null : oldRow.value()), 
null);
+
+            if (oldTombstone && tombstoneRow == null)
+                cctx.tombstoneRemoved();
         }
 
         /**
@@ -2911,8 +2930,10 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
         }
 
         /** {@inheritDoc} */
-        @Override public GridCursor<? extends CacheDataRow> cursor() throws 
IgniteCheckedException {
-            return cursorSkipTombstone(dataTree.find(null, null));
+        @Override public GridCursor<? extends CacheDataRow> cursor(boolean 
withTombstones) throws IgniteCheckedException {
+            GridCursor<? extends CacheDataRow> cur = dataTree.find(null, null);
+
+            return withTombstones ? cur : cursorSkipTombstone(cur);
         }
 
         private GridCursor<? extends CacheDataRow> cursorSkipTombstone(final 
GridCursor<? extends CacheDataRow> cur) {
@@ -2964,8 +2985,8 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
         }
 
         /** {@inheritDoc} */
-        @Override public GridCursor<? extends CacheDataRow> cursor(int 
cacheId) throws IgniteCheckedException {
-            return cursor(cacheId, null, null);
+        @Override public GridCursor<? extends CacheDataRow> cursor(int 
cacheId, boolean withTombstones) throws IgniteCheckedException {
+            return cursor(cacheId, null, null, null, null, withTombstones);
         }
 
         /** {@inheritDoc} */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
index 5e637e9..e3e6435 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
@@ -1143,7 +1143,7 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
         CacheMapHolder hld = grp.sharedGroup() ? null : singleCacheEntryMap;
 
         try {
-            GridIterator<CacheDataRow> it0 = 
grp.offheap().partitionIterator(id);
+            GridIterator<CacheDataRow> it0 = 
grp.offheap().partitionIterator(id, true);
 
             while (it0.hasNext()) {
                 ctx.database().checkpointReadLock();
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index 6f5a0ea..d4bcbd8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -2476,11 +2476,11 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
 
 
         /** {@inheritDoc} */
-        @Override public GridCursor<? extends CacheDataRow> cursor() throws 
IgniteCheckedException {
+        @Override public GridCursor<? extends CacheDataRow> cursor(boolean 
withTombstones) throws IgniteCheckedException {
             CacheDataStore delegate = init0(true);
 
             if (delegate != null)
-                return delegate.cursor();
+                return delegate.cursor(withTombstones);
 
             return EMPTY_CURSOR;
         }
@@ -2555,11 +2555,11 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
         }
 
         /** {@inheritDoc} */
-        @Override public GridCursor<? extends CacheDataRow> cursor(int 
cacheId) throws IgniteCheckedException {
+        @Override public GridCursor<? extends CacheDataRow> cursor(int 
cacheId, boolean withTombstones) throws IgniteCheckedException {
             CacheDataStore delegate = init0(true);
 
             if (delegate != null)
-                return delegate.cursor(cacheId);
+                return delegate.cursor(cacheId, withTombstones);
 
             return EMPTY_CURSOR;
         }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/CollectConflictPartitionKeysTask.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/CollectConflictPartitionKeysTask.java
index 8e8128c..bd9db7b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/CollectConflictPartitionKeysTask.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/CollectConflictPartitionKeysTask.java
@@ -178,7 +178,7 @@ public class CollectConflictPartitionKeysTask extends 
ComputeTaskAdapter<Partiti
 
                 partSize = part.dataStore().fullSize();
 
-                GridIterator<CacheDataRow> it = 
grpCtx.offheap().partitionIterator(part.id());
+                GridIterator<CacheDataRow> it = 
grpCtx.offheap().partitionIterator(part.id(), false);
 
                 partEntryHashRecords = new ArrayList<>();
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/VerifyBackupPartitionsTask.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/VerifyBackupPartitionsTask.java
index f700567..8b18834 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/VerifyBackupPartitionsTask.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/VerifyBackupPartitionsTask.java
@@ -317,7 +317,7 @@ public class VerifyBackupPartitionsTask extends 
ComputeTaskAdapter<Set<String>,
 
                 partSize = part.dataStore().fullSize();
 
-                GridIterator<CacheDataRow> it = 
grpCtx.offheap().partitionIterator(part.id());
+                GridIterator<CacheDataRow> it = 
grpCtx.offheap().partitionIterator(part.id(), false);
 
                 while (it.hasNextX()) {
                     CacheDataRow row = it.nextX();
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/VerifyBackupPartitionsTaskV2.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/VerifyBackupPartitionsTaskV2.java
index 92b6e36..9d8335f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/VerifyBackupPartitionsTaskV2.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/VerifyBackupPartitionsTaskV2.java
@@ -579,7 +579,7 @@ public class VerifyBackupPartitionsTaskV2 extends 
ComputeTaskAdapter<VisorIdleVe
                 if (arg.checkCrc())
                     checkPartitionCrc(grpCtx, part, cpFlag);
 
-                GridIterator<CacheDataRow> it = 
grpCtx.offheap().partitionIterator(part.id());
+                GridIterator<CacheDataRow> it = 
grpCtx.offheap().partitionIterator(part.id(), false);
 
                 while (it.hasNextX()) {
                     CacheDataRow row = it.nextX();
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorFindAndDeleteGarbageInPersistenceClosure.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorFindAndDeleteGarbageInPersistenceClosure.java
index 722686e..552a1a7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorFindAndDeleteGarbageInPersistenceClosure.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorFindAndDeleteGarbageInPersistenceClosure.java
@@ -266,7 +266,7 @@ public class VisorFindAndDeleteGarbageInPersistenceClosure 
implements IgniteCall
             if (part.state() != GridDhtPartitionState.OWNING)
                 return Collections.emptyMap();
 
-            GridIterator<CacheDataRow> it = 
grpCtx.offheap().partitionIterator(part.id());
+            GridIterator<CacheDataRow> it = 
grpCtx.offheap().partitionIterator(part.id(), false);
 
             while (it.hasNextX()) {
                 CacheDataRow row = it.nextX();
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
index 0ab22fd..478cf80 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
@@ -1666,17 +1666,17 @@ public class IgniteCacheGroupsTest extends 
GridCommonAbstractTest {
 
         for (int p = 0; p < aff.partitions(); p++) {
             if (srv1Parts.contains(p)) {
-                GridIterator<CacheDataRow> it = 
grpSrv0.offheap().partitionIterator(p);
+                GridIterator<CacheDataRow> it = 
grpSrv0.offheap().partitionIterator(p, false);
                 assertFalse(it.hasNext());
 
-                it = grpSrv1.offheap().partitionIterator(p);
+                it = grpSrv1.offheap().partitionIterator(p, false);
                 assertTrue(it.hasNext());
             }
             else {
-                GridIterator<CacheDataRow> it = 
grpSrv0.offheap().partitionIterator(p);
+                GridIterator<CacheDataRow> it = 
grpSrv0.offheap().partitionIterator(p, false);
                 assertTrue(it.hasNext());
 
-                it = grpSrv1.offheap().partitionIterator(p);
+                it = grpSrv1.offheap().partitionIterator(p, false);
                 assertFalse(it.hasNext());
             }
         }
@@ -3919,7 +3919,7 @@ public class IgniteCacheGroupsTest extends 
GridCommonAbstractTest {
 
         Integer cacheId = null;
 
-        GridIterator<CacheDataRow> it = grp.offheap().partitionIterator(0);
+        GridIterator<CacheDataRow> it = grp.offheap().partitionIterator(0, 
false);
 
         int c = 0;
 
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheRemoveWithTombstonesTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheRemoveWithTombstonesTest.java
new file mode 100644
index 0000000..331fb64
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheRemoveWithTombstonesTest.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.spi.metric.LongMetric;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static 
org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
+import static 
org.apache.ignite.internal.processors.metric.impl.MetricUtils.cacheMetricsRegistryName;
+
+/**
+ *
+ */
+public class CacheRemoveWithTombstonesTest extends GridCommonAbstractTest {
+    /** */
+    private boolean persistence;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TestRecordingCommunicationSpi commSpi = new 
TestRecordingCommunicationSpi();
+
+        cfg.setCommunicationSpi(commSpi);
+
+        if (persistence) {
+            DataStorageConfiguration dsCfg = new DataStorageConfiguration()
+                    .setDefaultDataRegionConfiguration(
+                            new DataRegionConfiguration().setMaxSize(100L * 
1024 * 1024).setPersistenceEnabled(true))
+                    .setWalMode(WALMode.LOG_ONLY);
+
+            cfg.setDataStorageConfiguration(dsCfg);
+        }
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        cleanPersistenceDir();
+
+        super.afterTest();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testRemoveAndRebalanceRaceTx() throws Exception {
+        testRemoveAndRebalanceRace(TRANSACTIONAL, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testRemoveAndRebalanceRaceTxMvcc() throws Exception {
+        testRemoveAndRebalanceRace(TRANSACTIONAL_SNAPSHOT, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testRemoveAndRebalanceRaceAtomic() throws Exception {
+        testRemoveAndRebalanceRace(ATOMIC, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testRemoveAndRebalanceRaceTxWithPersistence() throws Exception 
{
+        persistence = true;
+
+        testRemoveAndRebalanceRace(TRANSACTIONAL, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testRemoveAndRebalanceRaceTxMvccWithPersistence() throws 
Exception {
+        persistence = true;
+
+        testRemoveAndRebalanceRace(TRANSACTIONAL_SNAPSHOT, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testRemoveAndRebalanceRaceAtomicWithPersistence() throws 
Exception {
+        persistence = true;
+
+        testRemoveAndRebalanceRace(ATOMIC, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     * @param expTombstone {@code True} if tombstones should be created.
+     */
+    private void testRemoveAndRebalanceRace(CacheAtomicityMode atomicityMode, 
boolean expTombstone) throws Exception {
+        IgniteEx ignite0 = startGrid(0);
+
+        if (persistence)
+            ignite0.cluster().active(true);
+
+        IgniteCache<Integer, Integer> cache0 = 
ignite0.createCache(cacheConfiguration(atomicityMode));
+
+        LongMetric tombstoneMetric0 =  
(LongMetric)ignite0.context().metric().registry(
+                cacheMetricsRegistryName(DEFAULT_CACHE_NAME, 
false)).findMetric("Tombstones");
+
+        Map<Integer, Integer> map = new HashMap<>();
+
+        final int KEYS = 1024;
+
+        for (int i = 0; i < KEYS; i++)
+            map.put(i, i);
+
+        cache0.putAll(map);
+
+        
TestRecordingCommunicationSpi.spi(ignite0).blockMessages(GridDhtPartitionSupplyMessageV2.class,
+                getTestIgniteInstanceName(1));
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new 
Callable<Object>() {
+            @Override public Object call() throws Exception {
+                return startGrid(1);
+            }
+        });
+
+        IgniteEx ignite1 = (IgniteEx)fut.get(30_000);
+
+        Set<Integer> removed = new HashSet<>();
+
+        // Do removes while rebalance is in progress.
+        for (int i = 0; i < KEYS; i++) {
+            if (i % 2 == 0) {
+                removed.add(i);
+
+                cache0.remove(i);
+            }
+        }
+
+        final LongMetric tombstoneMetric1 =  
(LongMetric)ignite1.context().metric().registry(
+                cacheMetricsRegistryName(DEFAULT_CACHE_NAME, 
false)).findMetric("Tombstones");
+
+        // On first node there should not be tombstones.
+        //assertEquals(0, tombstoneMetric0.get());
+
+        if (expTombstone)
+            assertEquals(removed.size(), tombstoneMetric1.get());
+        else
+            assertEquals(0, tombstoneMetric1.get());
+
+        // Update some of removed keys, this should remove tombstones.
+        for (int i = 0; i < KEYS; i++) {
+            if (i % 4 == 0) {
+                removed.remove(i);
+
+                cache0.put(i, i);
+            }
+        }
+
+        assert !removed.isEmpty();
+
+        //assertEquals(0, tombstoneMetric0.get());
+
+        if (expTombstone)
+            assertEquals(removed.size(), tombstoneMetric1.get());
+        else
+            assertEquals(0, tombstoneMetric1.get());
+
+        TestRecordingCommunicationSpi.spi(ignite0).stopBlock();
+
+        awaitPartitionMapExchange();
+
+        IgniteCache<Integer, Integer> cache1 = 
ignite(1).cache(DEFAULT_CACHE_NAME);
+
+        for (int i = 0; i < KEYS; i++) {
+            if (removed.contains(i))
+                assertNull(cache1.get(i));
+            else
+                assertEquals((Object)i, cache1.get(i));
+        }
+
+        // Tombstones should be removed after once rebalance is completed.
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return tombstoneMetric1.get() == 0;
+            }
+        }, 30_000);
+
+        assertEquals(0, tombstoneMetric1.get());
+    }
+
+    /**
+     * @param atomicityMode Cache atomicity mode.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration<Integer, Integer> 
cacheConfiguration(CacheAtomicityMode atomicityMode) {
+        CacheConfiguration<Integer, Integer> ccfg = new 
CacheConfiguration<>(DEFAULT_CACHE_NAME);
+
+        ccfg.setAtomicityMode(atomicityMode);
+        ccfg.setCacheMode(PARTITIONED);
+        ccfg.setBackups(2);
+        ccfg.setRebalanceMode(SYNC);
+        
ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+
+        return ccfg;
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlTest.java
index afe2b34..4461d94 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlTest.java
@@ -316,7 +316,7 @@ public class IgnitePdsWithTtlTest extends 
GridCommonAbstractTest {
             IgniteCacheOffheapManager.CacheDataStore dataStore =
                 
ctx.cache().cacheGroup(CU.cacheId(GROUP_NAME)).offheap().dataStore(locPart);
 
-            GridCursor cur = dataStore.cursor();
+            GridCursor cur = dataStore.cursor(false);
 
             assertFalse(cur.next());
             assertEquals(0, locPart.fullSize());
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite9.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite9.java
index 4263f9d..3927a34 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite9.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite9.java
@@ -34,6 +34,7 @@ import 
org.apache.ignite.internal.processors.cache.IgniteCacheGetCustomCollectio
 import 
org.apache.ignite.internal.processors.cache.IgniteCacheLoadRebalanceEvictionSelfTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.CacheAtomicPrimarySyncBackPressureTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.CacheOperationsInterruptTest;
+import 
org.apache.ignite.internal.processors.cache.distributed.CacheRemoveWithTombstonesTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.FailBackupOnAtomicOperationTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.IgniteCachePrimarySyncTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.IgniteTxCachePrimarySyncTest;
@@ -118,6 +119,8 @@ public class IgniteCacheTestSuite9 {
 
         GridTestUtils.addTestIfNeeded(suite, 
FailBackupOnAtomicOperationTest.class, ignoredTests);
 
+        GridTestUtils.addTestIfNeeded(suite, 
CacheRemoveWithTombstonesTest.class, ignoredTests);
+
         return suite;
     }
 }

Reply via email to