This is an automated email from the ASF dual-hosted git repository. sboikov pushed a commit to branch ignite-11704 in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-11704 by this push: new 0a574a6 ignite-11704 0a574a6 is described below commit 0a574a68385cf250b928972654b0486e2efe67a4 Author: sboikov <sboi...@apache.org> AuthorDate: Tue Jul 23 10:42:28 2019 +0300 ignite-11704 --- .../cache/persistence/CacheDataRowAdapter.java | 13 +- .../CacheRemoveWithTombstonesLoadTest.java | 347 +++++++++++++++++++++ .../ignite/testsuites/IgniteCacheTestSuite9.java | 2 + 3 files changed, 360 insertions(+), 2 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java index 1be2bb5..5216e21 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java @@ -260,10 +260,19 @@ public class CacheDataRowAdapter implements CacheDataRow { int itemId = itemId(nextLink); incomplete = readIncomplete(incomplete, sharedCtx, coctx, pageMem, - grpId, pageAddr, itemId, io, rowData, readCacheId, skipVer); + grpId, pageAddr, itemId, io, rowData, readCacheId, skipVer); + + if (incomplete == null) { + if (rowData == TOMBSTONES && val != null && !sharedCtx.database().isTombstone(this)) { + // TODO IGNITE-11704. + ver = null; + key = null; + val = null; + verReady = true; + } - if (incomplete == null) return; + } if (rowData == KEY_ONLY) { if (key != null) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheRemoveWithTombstonesLoadTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheRemoveWithTombstonesLoadTest.java new file mode 100644 index 0000000..2780132 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheRemoveWithTombstonesLoadTest.java @@ -0,0 +1,347 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.Ignition; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.WALMode; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.spi.metric.LongMetric; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.Callable; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheRebalanceMode.ASYNC; +import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.cacheMetricsRegistryName; + +/** + * + */ +public class CacheRemoveWithTombstonesLoadTest extends GridCommonAbstractTest { + /** */ + private boolean persistence; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + DataStorageConfiguration dsCfg = new DataStorageConfiguration(); + + if (persistence) { + dsCfg.setDefaultDataRegionConfiguration( + new DataRegionConfiguration().setMaxSize(100L * 1024 * 1024).setPersistenceEnabled(true)) + .setWalMode(WALMode.LOG_ONLY); + } + + dsCfg.setPageSize(1024); + + cfg.setDataStorageConfiguration(dsCfg); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + cleanPersistenceDir(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + cleanPersistenceDir(); + + super.afterTest(); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testRemoveAndRebalance() throws Exception { + removeAndRebalance(); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testRemoveAndRebalanceWithPersistence() throws Exception { + persistence = true; + + testRemoveAndRebalance(); + } + + /** + * @throws Exception If failed. + */ + private void removeAndRebalance() throws Exception { + IgniteEx ignite0 = startGrid(0); + + if (persistence) + ignite0.cluster().active(true); + + final int pageSize = ignite0.configuration().getDataStorageConfiguration().getPageSize(); + + assert pageSize > 0; + + IgniteCache<TestKey, TestValue> cache0 = ignite0.createCache(cacheConfiguration()); + + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + List<TestKey> keys = new ArrayList<>(); + + Map<TestKey, TestValue> data = new HashMap<>(); + + final int KEYS = 10_000; + final int ADD_NODES = 3; + + for (int i = 0; i < KEYS; i++) { + TestKey key = new TestKey(i, new byte[rnd.nextInt(pageSize * 3)]); + + keys.add(key); + + data.put(key, new TestValue(new byte[rnd.nextInt(pageSize * 3)])); + } + + cache0.putAll(data); + + AtomicInteger nodeIdx = new AtomicInteger(); + + for (int iter = 0; iter < ADD_NODES; iter++) { + IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + int idx = nodeIdx.incrementAndGet(); + + info("Start node: " + idx); + + return startGrid(idx); + } + }); + + long endTime = System.currentTimeMillis() + 5000; + + while (System.currentTimeMillis() < endTime) { + for (int i = 0; i < 100; i++) { + TestKey key = keys.get(rnd.nextInt(keys.size())); + + if (rnd.nextBoolean()) { + cache0.remove(key); + + data.remove(key); + } else { + TestValue val = new TestValue(new byte[rnd.nextInt(pageSize * 3)]); + + cache0.put(key, val); + data.put(key, val); + } + } + } + + fut.get(30_000); + + checkData(keys, data); + + waitTombstoneCleanup(); + + checkData(keys, data); + } + + awaitPartitionMapExchange(); + + for (int iter = 0; iter < ADD_NODES; iter++) { + IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + int idx = nodeIdx.getAndDecrement(); + + info("Stop node: " + idx); + + stopGrid(idx); + + awaitPartitionMapExchange(); + + return null; + } + }); + + long endTime = System.currentTimeMillis() + 5000; + + while (System.currentTimeMillis() < endTime) { + for (int i = 0; i < 100; i++) { + TestKey key = keys.get(rnd.nextInt(keys.size())); + + if (rnd.nextBoolean()) { + cache0.remove(key); + + data.remove(key); + } else { + TestValue val = new TestValue(new byte[rnd.nextInt(pageSize * 3)]); + + cache0.put(key, val); + data.put(key, val); + } + } + } + + fut.get(30_000); + + checkData(keys, data); + + waitTombstoneCleanup(); + + checkData(keys, data); + } + } + + private void checkData(List<TestKey> keys, Map<TestKey, TestValue> data) { + for (Ignite node : Ignition.allGrids()) { + if (!node.name().endsWith("CacheRemoveWithTombstonesLoadTest1")) + continue; + + info("Check node: " + node.name()); + + IgniteCache<TestKey, TestValue> cache = node.cache(DEFAULT_CACHE_NAME); + + for (TestKey key : keys) { + TestValue expVal = data.get(key); + TestValue val = cache.get(key); + + if (expVal == null) + assertNull(val); + else { + assertNotNull(val); + assertTrue(Arrays.equals(expVal.dummyData, val.dummyData)); + } + } + } + } + + /** + * @throws Exception If failed. + */ + private void waitTombstoneCleanup() throws Exception { + for (Ignite node : Ignition.allGrids()) { + final LongMetric tombstones = ((IgniteEx)node).context().metric().registry( + cacheMetricsRegistryName(DEFAULT_CACHE_NAME, false)).findMetric("Tombstones"); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return tombstones.get() == 0; + } + }, 30_000); + + assertEquals("Failed to wait for tombstone cleanup: " + node.name(), 0, tombstones.get()); + } + } + + /** + * @return Cache configuration. + */ + private CacheConfiguration<TestKey, TestValue> cacheConfiguration() { + CacheConfiguration<TestKey, TestValue> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME); + + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setCacheMode(PARTITIONED); + ccfg.setBackups(1); + ccfg.setRebalanceMode(ASYNC); + ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + + return ccfg; + } + + /** + * + */ + static class TestKey { + /** */ + private final int id; + + /** */ + private final byte[] dummyData; + + /** + * @param id ID. + * @param dummyData Dummy byte array (to test with various key sizes). + */ + public TestKey(int id, byte[] dummyData) { + this.id = id; + this.dummyData = dummyData; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + + TestKey testKey = (TestKey) o; + + return id == testKey.id && Arrays.equals(dummyData, testKey.dummyData); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int result = Objects.hash(id); + result = 31 * result + Arrays.hashCode(dummyData); + return result; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "TestKey [id=" + id + "]"; + } + } + + /** + * + */ + static class TestValue { + /** */ + private final byte[] dummyData; + + /** + * @param dummyData Dummy byte array (to test with various value sizes). + */ + public TestValue(byte[] dummyData) { + this.dummyData = dummyData; + } + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite9.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite9.java index 3927a34..708d4d0 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite9.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite9.java @@ -34,6 +34,7 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheGetCustomCollectio import org.apache.ignite.internal.processors.cache.IgniteCacheLoadRebalanceEvictionSelfTest; import org.apache.ignite.internal.processors.cache.distributed.CacheAtomicPrimarySyncBackPressureTest; import org.apache.ignite.internal.processors.cache.distributed.CacheOperationsInterruptTest; +import org.apache.ignite.internal.processors.cache.distributed.CacheRemoveWithTombstonesLoadTest; import org.apache.ignite.internal.processors.cache.distributed.CacheRemoveWithTombstonesTest; import org.apache.ignite.internal.processors.cache.distributed.FailBackupOnAtomicOperationTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteCachePrimarySyncTest; @@ -120,6 +121,7 @@ public class IgniteCacheTestSuite9 { GridTestUtils.addTestIfNeeded(suite, FailBackupOnAtomicOperationTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, CacheRemoveWithTombstonesTest.class, ignoredTests); + GridTestUtils.addTestIfNeeded(suite, CacheRemoveWithTombstonesLoadTest.class, ignoredTests); return suite; }