This is an automated email from the ASF dual-hosted git repository. sboikov pushed a commit to branch ignite-11704 in repository https://gitbox.apache.org/repos/asf/ignite.git
commit 5a05291a9414ef8f8b37ec3d1e8a2e0a9ce4c70f Author: sboikov <sboi...@apache.org> AuthorDate: Thu Jul 11 23:45:40 2019 +0300 ignite-11704 --- .../processors/cache/CacheMetricsImpl.java | 14 ++ .../processors/cache/GridCacheContext.java | 14 ++ .../cache/IgniteCacheOffheapManager.java | 6 +- .../cache/IgniteCacheOffheapManagerImpl.java | 69 ++++-- .../dht/topology/GridDhtLocalPartition.java | 2 +- .../cache/persistence/GridCacheOffheapManager.java | 8 +- .../verify/CollectConflictPartitionKeysTask.java | 2 +- .../cache/verify/VerifyBackupPartitionsTask.java | 2 +- .../cache/verify/VerifyBackupPartitionsTaskV2.java | 2 +- ...orFindAndDeleteGarbageInPersistenceClosure.java | 2 +- .../processors/cache/IgniteCacheGroupsTest.java | 10 +- .../distributed/CacheRemoveWithTombstonesTest.java | 261 +++++++++++++++++++++ .../cache/persistence/db/IgnitePdsWithTtlTest.java | 2 +- .../ignite/testsuites/IgniteCacheTestSuite9.java | 3 + 14 files changed, 355 insertions(+), 42 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java index ab1c4a1..5808db6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java @@ -29,6 +29,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.topology.Grid import org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore; import org.apache.ignite.internal.processors.metric.MetricRegistry; import org.apache.ignite.internal.processors.metric.impl.HitRateMetric; +import org.apache.ignite.internal.processors.metric.impl.LongAdderMetricImpl; import org.apache.ignite.internal.processors.metric.impl.LongMetricImpl; import org.apache.ignite.internal.processors.metric.impl.MetricUtils; import org.apache.ignite.internal.util.tostring.GridToStringExclude; @@ -161,6 +162,9 @@ public class CacheMetricsImpl implements CacheMetrics { /** Number of currently clearing partitions for rebalancing. */ private final LongMetricImpl rebalanceClearingPartitions; + /** */ + private LongAdderMetricImpl tombstones; + /** Cache metrics. */ @GridToStringExclude private transient CacheMetricsImpl delegate; @@ -312,6 +316,8 @@ public class CacheMetricsImpl implements CacheMetrics { rebalanceClearingPartitions = mreg.metric("RebalanceClearingPartitionsLeft", "Number of partitions need to be cleared before actual rebalance start."); + + tombstones = mreg.longAdderMetric("Tombstones", "Number of tombstone entries"); } /** @@ -1009,6 +1015,14 @@ public class CacheMetricsImpl implements CacheMetrics { delegate.addPutAndGetTimeNanos(duration); } + public void tombstoneCreated() { + tombstones.increment(); + } + + public void tombstoneRemoved() { + tombstones.decrement(); + } + /** {@inheritDoc} */ @Override public String getKeyType() { CacheConfiguration ccfg = cctx.config(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index 87c6253..5ac56db 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -2372,6 +2372,20 @@ public class GridCacheContext<K, V> implements Externalizable { } } + public void tombstoneCreated() { + GridCacheAdapter cache = this.cache; + + if (cache != null) + cache.metrics0().tombstoneCreated(); + } + + public void tombstoneRemoved() { + GridCacheAdapter cache = this.cache; + + if (cache != null) + cache.metrics0().tombstoneRemoved(); + } + /** {@inheritDoc} */ @Override public String toString() { return "GridCacheContext: " + name(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java index 4221a2d..c11e909 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java @@ -452,7 +452,7 @@ public interface IgniteCacheOffheapManager { * @return Iterator for given partition. * @throws IgniteCheckedException If failed. */ - public GridIterator<CacheDataRow> partitionIterator(final int part) throws IgniteCheckedException; + public GridIterator<CacheDataRow> partitionIterator(final int part, boolean withTombstones) throws IgniteCheckedException; /** * @param part Partition number. @@ -963,7 +963,7 @@ public interface IgniteCacheOffheapManager { * @return Data cursor. * @throws IgniteCheckedException If failed. */ - public GridCursor<? extends CacheDataRow> cursor() throws IgniteCheckedException; + public GridCursor<? extends CacheDataRow> cursor(boolean withTombstones) throws IgniteCheckedException; /** * @param x Implementation specific argument, {@code null} always means that we need to return full detached data row. @@ -984,7 +984,7 @@ public interface IgniteCacheOffheapManager { * @return Data cursor. * @throws IgniteCheckedException If failed. */ - public GridCursor<? extends CacheDataRow> cursor(int cacheId) throws IgniteCheckedException; + public GridCursor<? extends CacheDataRow> cursor(int cacheId, boolean withTombstones) throws IgniteCheckedException; /** * @param cacheId Cache ID. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index e628345..c45e3b1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@ -747,8 +747,8 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager GridCacheVersion obsoleteVer = null; try (GridCloseableIterator<CacheDataRow> it = grp.isLocal() ? - iterator(cctx.cacheId(), cacheDataStores().iterator(), null, null) : - evictionSafeIterator(cctx.cacheId(), cacheDataStores().iterator())) { + iterator(cctx.cacheId(), cacheDataStores().iterator(), null, null, true) : + evictionSafeIterator(cctx.cacheId(), cacheDataStores().iterator(), true)) { while (it.hasNext()) { cctx.shared().database().checkpointReadLock(); @@ -891,7 +891,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager @Nullable MvccSnapshot mvccSnapshot, Boolean dataPageScanEnabled ) { - return iterator(cacheId, cacheData(primary, backups, topVer), mvccSnapshot, dataPageScanEnabled); + return iterator(cacheId, cacheData(primary, backups, topVer), mvccSnapshot, dataPageScanEnabled, false); } /** {@inheritDoc} */ @@ -902,17 +902,17 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager if (data == null) return new GridEmptyCloseableIterator<>(); - return iterator(cacheId, singletonIterator(data), mvccSnapshot, dataPageScanEnabled); + return iterator(cacheId, singletonIterator(data), mvccSnapshot, dataPageScanEnabled, false); } /** {@inheritDoc} */ - @Override public GridIterator<CacheDataRow> partitionIterator(int part) { + @Override public GridIterator<CacheDataRow> partitionIterator(int part, boolean withTombstones) { CacheDataStore data = partitionData(part); if (data == null) return new GridEmptyCloseableIterator<>(); - return iterator(CU.UNDEFINED_CACHE_ID, singletonIterator(data), null, null); + return iterator(CU.UNDEFINED_CACHE_ID, singletonIterator(data), null, null, withTombstones); } /** @@ -926,7 +926,8 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager private GridCloseableIterator<CacheDataRow> iterator(final int cacheId, final Iterator<CacheDataStore> dataIt, final MvccSnapshot mvccSnapshot, - Boolean dataPageScanEnabled + Boolean dataPageScanEnabled, + boolean withTombstones ) { return new GridCloseableIteratorAdapter<CacheDataRow>() { /** */ @@ -962,7 +963,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager try { if (mvccSnapshot == null) - cur = cacheId == CU.UNDEFINED_CACHE_ID ? ds.cursor() : ds.cursor(cacheId); + cur = cacheId == CU.UNDEFINED_CACHE_ID ? ds.cursor(withTombstones) : ds.cursor(cacheId, withTombstones); else { cur = cacheId == CU.UNDEFINED_CACHE_ID ? ds.cursor(mvccSnapshot) : ds.cursor(cacheId, mvccSnapshot); @@ -996,7 +997,10 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager * @param dataIt Data store iterator. * @return Rows iterator */ - private GridCloseableIterator<CacheDataRow> evictionSafeIterator(final int cacheId, final Iterator<CacheDataStore> dataIt) { + private GridCloseableIterator<CacheDataRow> evictionSafeIterator( + final int cacheId, + final Iterator<CacheDataStore> dataIt, + boolean withTombstones) { return new GridCloseableIteratorAdapter<CacheDataRow>() { /** */ private GridCursor<? extends CacheDataRow> cur; @@ -1027,7 +1031,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager if (!reservePartition(ds.partId())) continue; - cur = cacheId == CU.UNDEFINED_CACHE_ID ? ds.cursor() : ds.cursor(cacheId); + cur = cacheId == CU.UNDEFINED_CACHE_ID ? ds.cursor(withTombstones) : ds.cursor(cacheId, withTombstones); } else break; @@ -1700,17 +1704,19 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager case REMOVE: { CacheDataRow oldRow = c.oldRow(); - finishRemove(cctx, row.key(), oldRow, true); + finishRemove(cctx, row.key(), oldRow, null); break; } case IN_PLACE: - if (isTombstone(c.newRow())) - decrementSize(cctx.cacheId()); + assert !isTombstone(c.newRow()); + + if (isTombstone(c.oldRow())) { + cctx.tombstoneRemoved(); - if (isTombstone(c.oldRow())) incrementSize(cctx.cacheId()); + } break; @@ -2592,7 +2598,10 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager */ private void finishUpdate(GridCacheContext cctx, CacheDataRow newRow, @Nullable CacheDataRow oldRow) throws IgniteCheckedException { - boolean oldNull = oldRow == null || isTombstone(oldRow); + assert !isTombstone(newRow); + + boolean oldTombstone = isTombstone(oldRow); + boolean oldNull = oldRow == null || oldTombstone; if (oldNull) incrementSize(cctx.cacheId()); @@ -2614,6 +2623,9 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager } updateIgfsMetrics(cctx, key, (oldNull ? null : oldRow.value()), newRow.value()); + + if (oldTombstone) + cctx.tombstoneRemoved(); } /** @@ -2656,7 +2668,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager CacheDataRow oldRow = dataTree.remove(new SearchRow(cacheId, key)); - finishRemove(cctx, key, oldRow, true); + finishRemove(cctx, key, oldRow, null); } finally { busyLock.leaveBusy(); @@ -2733,7 +2745,10 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager assert c.operationType() == PUT || c.operationType() == IN_PLACE : c.operationType(); - finishRemove(cctx, key, c.oldRow, c.operationType() == PUT); + if (!isTombstone(c.oldRow)) + cctx.tombstoneCreated(); + + finishRemove(cctx, key, c.oldRow, c.newRow); } finally { busyLock.leaveBusy(); @@ -2749,8 +2764,9 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager private void finishRemove(GridCacheContext cctx, KeyCacheObject key, @Nullable CacheDataRow oldRow, - boolean removeFromStore) throws IgniteCheckedException { - boolean oldNull = oldRow == null || isTombstone(oldRow); + @Nullable CacheDataRow tombstoneRow) throws IgniteCheckedException { + boolean oldTombstone = isTombstone(oldRow); + boolean oldNull = oldRow == null || oldTombstone; if (!oldNull) { clearPendingEntries(cctx, oldRow); @@ -2763,10 +2779,13 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager if (qryMgr.enabled()) qryMgr.remove(key, oldNull ? null : oldRow); - if (oldRow != null && removeFromStore) + if (oldRow != null && (tombstoneRow == null || tombstoneRow.link() != oldRow.link())) rowStore.removeRow(oldRow.link(), grp.statisticsHolderData()); updateIgfsMetrics(cctx, key, (oldNull ? null : oldRow.value()), null); + + if (oldTombstone && tombstoneRow == null) + cctx.tombstoneRemoved(); } /** @@ -2911,8 +2930,10 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager } /** {@inheritDoc} */ - @Override public GridCursor<? extends CacheDataRow> cursor() throws IgniteCheckedException { - return cursorSkipTombstone(dataTree.find(null, null)); + @Override public GridCursor<? extends CacheDataRow> cursor(boolean withTombstones) throws IgniteCheckedException { + GridCursor<? extends CacheDataRow> cur = dataTree.find(null, null); + + return withTombstones ? cur : cursorSkipTombstone(cur); } private GridCursor<? extends CacheDataRow> cursorSkipTombstone(final GridCursor<? extends CacheDataRow> cur) { @@ -2964,8 +2985,8 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager } /** {@inheritDoc} */ - @Override public GridCursor<? extends CacheDataRow> cursor(int cacheId) throws IgniteCheckedException { - return cursor(cacheId, null, null); + @Override public GridCursor<? extends CacheDataRow> cursor(int cacheId, boolean withTombstones) throws IgniteCheckedException { + return cursor(cacheId, null, null, null, null, withTombstones); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java index 5e637e9..e3e6435 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java @@ -1143,7 +1143,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements CacheMapHolder hld = grp.sharedGroup() ? null : singleCacheEntryMap; try { - GridIterator<CacheDataRow> it0 = grp.offheap().partitionIterator(id); + GridIterator<CacheDataRow> it0 = grp.offheap().partitionIterator(id, true); while (it0.hasNext()) { ctx.database().checkpointReadLock(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java index 6f5a0ea..d4bcbd8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java @@ -2476,11 +2476,11 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple /** {@inheritDoc} */ - @Override public GridCursor<? extends CacheDataRow> cursor() throws IgniteCheckedException { + @Override public GridCursor<? extends CacheDataRow> cursor(boolean withTombstones) throws IgniteCheckedException { CacheDataStore delegate = init0(true); if (delegate != null) - return delegate.cursor(); + return delegate.cursor(withTombstones); return EMPTY_CURSOR; } @@ -2555,11 +2555,11 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple } /** {@inheritDoc} */ - @Override public GridCursor<? extends CacheDataRow> cursor(int cacheId) throws IgniteCheckedException { + @Override public GridCursor<? extends CacheDataRow> cursor(int cacheId, boolean withTombstones) throws IgniteCheckedException { CacheDataStore delegate = init0(true); if (delegate != null) - return delegate.cursor(cacheId); + return delegate.cursor(cacheId, withTombstones); return EMPTY_CURSOR; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/CollectConflictPartitionKeysTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/CollectConflictPartitionKeysTask.java index 8e8128c..bd9db7b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/CollectConflictPartitionKeysTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/CollectConflictPartitionKeysTask.java @@ -178,7 +178,7 @@ public class CollectConflictPartitionKeysTask extends ComputeTaskAdapter<Partiti partSize = part.dataStore().fullSize(); - GridIterator<CacheDataRow> it = grpCtx.offheap().partitionIterator(part.id()); + GridIterator<CacheDataRow> it = grpCtx.offheap().partitionIterator(part.id(), false); partEntryHashRecords = new ArrayList<>(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/VerifyBackupPartitionsTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/VerifyBackupPartitionsTask.java index f700567..8b18834 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/VerifyBackupPartitionsTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/VerifyBackupPartitionsTask.java @@ -317,7 +317,7 @@ public class VerifyBackupPartitionsTask extends ComputeTaskAdapter<Set<String>, partSize = part.dataStore().fullSize(); - GridIterator<CacheDataRow> it = grpCtx.offheap().partitionIterator(part.id()); + GridIterator<CacheDataRow> it = grpCtx.offheap().partitionIterator(part.id(), false); while (it.hasNextX()) { CacheDataRow row = it.nextX(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/VerifyBackupPartitionsTaskV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/VerifyBackupPartitionsTaskV2.java index 92b6e36..9d8335f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/VerifyBackupPartitionsTaskV2.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/VerifyBackupPartitionsTaskV2.java @@ -579,7 +579,7 @@ public class VerifyBackupPartitionsTaskV2 extends ComputeTaskAdapter<VisorIdleVe if (arg.checkCrc()) checkPartitionCrc(grpCtx, part, cpFlag); - GridIterator<CacheDataRow> it = grpCtx.offheap().partitionIterator(part.id()); + GridIterator<CacheDataRow> it = grpCtx.offheap().partitionIterator(part.id(), false); while (it.hasNextX()) { CacheDataRow row = it.nextX(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorFindAndDeleteGarbageInPersistenceClosure.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorFindAndDeleteGarbageInPersistenceClosure.java index 722686e..552a1a7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorFindAndDeleteGarbageInPersistenceClosure.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorFindAndDeleteGarbageInPersistenceClosure.java @@ -266,7 +266,7 @@ public class VisorFindAndDeleteGarbageInPersistenceClosure implements IgniteCall if (part.state() != GridDhtPartitionState.OWNING) return Collections.emptyMap(); - GridIterator<CacheDataRow> it = grpCtx.offheap().partitionIterator(part.id()); + GridIterator<CacheDataRow> it = grpCtx.offheap().partitionIterator(part.id(), false); while (it.hasNextX()) { CacheDataRow row = it.nextX(); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java index 0ab22fd..478cf80 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java @@ -1666,17 +1666,17 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest { for (int p = 0; p < aff.partitions(); p++) { if (srv1Parts.contains(p)) { - GridIterator<CacheDataRow> it = grpSrv0.offheap().partitionIterator(p); + GridIterator<CacheDataRow> it = grpSrv0.offheap().partitionIterator(p, false); assertFalse(it.hasNext()); - it = grpSrv1.offheap().partitionIterator(p); + it = grpSrv1.offheap().partitionIterator(p, false); assertTrue(it.hasNext()); } else { - GridIterator<CacheDataRow> it = grpSrv0.offheap().partitionIterator(p); + GridIterator<CacheDataRow> it = grpSrv0.offheap().partitionIterator(p, false); assertTrue(it.hasNext()); - it = grpSrv1.offheap().partitionIterator(p); + it = grpSrv1.offheap().partitionIterator(p, false); assertFalse(it.hasNext()); } } @@ -3919,7 +3919,7 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest { Integer cacheId = null; - GridIterator<CacheDataRow> it = grp.offheap().partitionIterator(0); + GridIterator<CacheDataRow> it = grp.offheap().partitionIterator(0, false); int c = 0; diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheRemoveWithTombstonesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheRemoveWithTombstonesTest.java new file mode 100644 index 0000000..331fb64 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheRemoveWithTombstonesTest.java @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.WALMode; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.TestRecordingCommunicationSpi; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.spi.metric.LongMetric; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; + +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheRebalanceMode.SYNC; +import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.cacheMetricsRegistryName; + +/** + * + */ +public class CacheRemoveWithTombstonesTest extends GridCommonAbstractTest { + /** */ + private boolean persistence; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TestRecordingCommunicationSpi commSpi = new TestRecordingCommunicationSpi(); + + cfg.setCommunicationSpi(commSpi); + + if (persistence) { + DataStorageConfiguration dsCfg = new DataStorageConfiguration() + .setDefaultDataRegionConfiguration( + new DataRegionConfiguration().setMaxSize(100L * 1024 * 1024).setPersistenceEnabled(true)) + .setWalMode(WALMode.LOG_ONLY); + + cfg.setDataStorageConfiguration(dsCfg); + } + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + cleanPersistenceDir(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + cleanPersistenceDir(); + + super.afterTest(); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testRemoveAndRebalanceRaceTx() throws Exception { + testRemoveAndRebalanceRace(TRANSACTIONAL, true); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testRemoveAndRebalanceRaceTxMvcc() throws Exception { + testRemoveAndRebalanceRace(TRANSACTIONAL_SNAPSHOT, false); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testRemoveAndRebalanceRaceAtomic() throws Exception { + testRemoveAndRebalanceRace(ATOMIC, false); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testRemoveAndRebalanceRaceTxWithPersistence() throws Exception { + persistence = true; + + testRemoveAndRebalanceRace(TRANSACTIONAL, true); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testRemoveAndRebalanceRaceTxMvccWithPersistence() throws Exception { + persistence = true; + + testRemoveAndRebalanceRace(TRANSACTIONAL_SNAPSHOT, false); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testRemoveAndRebalanceRaceAtomicWithPersistence() throws Exception { + persistence = true; + + testRemoveAndRebalanceRace(ATOMIC, false); + } + + /** + * @throws Exception If failed. + * @param expTombstone {@code True} if tombstones should be created. + */ + private void testRemoveAndRebalanceRace(CacheAtomicityMode atomicityMode, boolean expTombstone) throws Exception { + IgniteEx ignite0 = startGrid(0); + + if (persistence) + ignite0.cluster().active(true); + + IgniteCache<Integer, Integer> cache0 = ignite0.createCache(cacheConfiguration(atomicityMode)); + + LongMetric tombstoneMetric0 = (LongMetric)ignite0.context().metric().registry( + cacheMetricsRegistryName(DEFAULT_CACHE_NAME, false)).findMetric("Tombstones"); + + Map<Integer, Integer> map = new HashMap<>(); + + final int KEYS = 1024; + + for (int i = 0; i < KEYS; i++) + map.put(i, i); + + cache0.putAll(map); + + TestRecordingCommunicationSpi.spi(ignite0).blockMessages(GridDhtPartitionSupplyMessageV2.class, + getTestIgniteInstanceName(1)); + + IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + return startGrid(1); + } + }); + + IgniteEx ignite1 = (IgniteEx)fut.get(30_000); + + Set<Integer> removed = new HashSet<>(); + + // Do removes while rebalance is in progress. + for (int i = 0; i < KEYS; i++) { + if (i % 2 == 0) { + removed.add(i); + + cache0.remove(i); + } + } + + final LongMetric tombstoneMetric1 = (LongMetric)ignite1.context().metric().registry( + cacheMetricsRegistryName(DEFAULT_CACHE_NAME, false)).findMetric("Tombstones"); + + // On first node there should not be tombstones. + //assertEquals(0, tombstoneMetric0.get()); + + if (expTombstone) + assertEquals(removed.size(), tombstoneMetric1.get()); + else + assertEquals(0, tombstoneMetric1.get()); + + // Update some of removed keys, this should remove tombstones. + for (int i = 0; i < KEYS; i++) { + if (i % 4 == 0) { + removed.remove(i); + + cache0.put(i, i); + } + } + + assert !removed.isEmpty(); + + //assertEquals(0, tombstoneMetric0.get()); + + if (expTombstone) + assertEquals(removed.size(), tombstoneMetric1.get()); + else + assertEquals(0, tombstoneMetric1.get()); + + TestRecordingCommunicationSpi.spi(ignite0).stopBlock(); + + awaitPartitionMapExchange(); + + IgniteCache<Integer, Integer> cache1 = ignite(1).cache(DEFAULT_CACHE_NAME); + + for (int i = 0; i < KEYS; i++) { + if (removed.contains(i)) + assertNull(cache1.get(i)); + else + assertEquals((Object)i, cache1.get(i)); + } + + // Tombstones should be removed after once rebalance is completed. + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return tombstoneMetric1.get() == 0; + } + }, 30_000); + + assertEquals(0, tombstoneMetric1.get()); + } + + /** + * @param atomicityMode Cache atomicity mode. + * @return Cache configuration. + */ + private CacheConfiguration<Integer, Integer> cacheConfiguration(CacheAtomicityMode atomicityMode) { + CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME); + + ccfg.setAtomicityMode(atomicityMode); + ccfg.setCacheMode(PARTITIONED); + ccfg.setBackups(2); + ccfg.setRebalanceMode(SYNC); + ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + + return ccfg; + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlTest.java index afe2b34..4461d94 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlTest.java @@ -316,7 +316,7 @@ public class IgnitePdsWithTtlTest extends GridCommonAbstractTest { IgniteCacheOffheapManager.CacheDataStore dataStore = ctx.cache().cacheGroup(CU.cacheId(GROUP_NAME)).offheap().dataStore(locPart); - GridCursor cur = dataStore.cursor(); + GridCursor cur = dataStore.cursor(false); assertFalse(cur.next()); assertEquals(0, locPart.fullSize()); diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite9.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite9.java index 4263f9d..3927a34 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite9.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite9.java @@ -34,6 +34,7 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheGetCustomCollectio import org.apache.ignite.internal.processors.cache.IgniteCacheLoadRebalanceEvictionSelfTest; import org.apache.ignite.internal.processors.cache.distributed.CacheAtomicPrimarySyncBackPressureTest; import org.apache.ignite.internal.processors.cache.distributed.CacheOperationsInterruptTest; +import org.apache.ignite.internal.processors.cache.distributed.CacheRemoveWithTombstonesTest; import org.apache.ignite.internal.processors.cache.distributed.FailBackupOnAtomicOperationTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteCachePrimarySyncTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteTxCachePrimarySyncTest; @@ -118,6 +119,8 @@ public class IgniteCacheTestSuite9 { GridTestUtils.addTestIfNeeded(suite, FailBackupOnAtomicOperationTest.class, ignoredTests); + GridTestUtils.addTestIfNeeded(suite, CacheRemoveWithTombstonesTest.class, ignoredTests); + return suite; } }