This is an automated email from the ASF dual-hosted git repository.

sboikov pushed a commit to branch ignite-11704
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/ignite-11704 by this push:
     new 0a574a6  ignite-11704
0a574a6 is described below

commit 0a574a68385cf250b928972654b0486e2efe67a4
Author: sboikov <sboi...@apache.org>
AuthorDate: Tue Jul 23 10:42:28 2019 +0300

    ignite-11704
---
 .../cache/persistence/CacheDataRowAdapter.java     |  13 +-
 .../CacheRemoveWithTombstonesLoadTest.java         | 347 +++++++++++++++++++++
 .../ignite/testsuites/IgniteCacheTestSuite9.java   |   2 +
 3 files changed, 360 insertions(+), 2 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
index 1be2bb5..5216e21 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
@@ -260,10 +260,19 @@ public class CacheDataRowAdapter implements CacheDataRow {
                         int itemId = itemId(nextLink);
 
                         incomplete = readIncomplete(incomplete, sharedCtx, 
coctx, pageMem,
-                            grpId, pageAddr, itemId, io, rowData, readCacheId, 
skipVer);
+                                grpId, pageAddr, itemId, io, rowData, 
readCacheId, skipVer);
+
+                        if (incomplete == null) {
+                            if (rowData == TOMBSTONES && val != null && 
!sharedCtx.database().isTombstone(this)) {
+                                // TODO IGNITE-11704.
+                                ver = null;
+                                key = null;
+                                val = null;
+                                verReady = true;
+                            }
 
-                        if (incomplete == null)
                             return;
+                        }
 
                         if (rowData == KEY_ONLY) {
                             if (key != null)
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheRemoveWithTombstonesLoadTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheRemoveWithTombstonesLoadTest.java
new file mode 100644
index 0000000..2780132
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheRemoveWithTombstonesLoadTest.java
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.spi.metric.LongMetric;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheRebalanceMode.ASYNC;
+import static 
org.apache.ignite.internal.processors.metric.impl.MetricUtils.cacheMetricsRegistryName;
+
+/**
+ *
+ */
+public class CacheRemoveWithTombstonesLoadTest extends GridCommonAbstractTest {
+    /** */
+    private boolean persistence;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        DataStorageConfiguration dsCfg = new DataStorageConfiguration();
+
+        if (persistence) {
+            dsCfg.setDefaultDataRegionConfiguration(
+                    new DataRegionConfiguration().setMaxSize(100L * 1024 * 
1024).setPersistenceEnabled(true))
+                    .setWalMode(WALMode.LOG_ONLY);
+        }
+
+        dsCfg.setPageSize(1024);
+
+        cfg.setDataStorageConfiguration(dsCfg);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        cleanPersistenceDir();
+
+        super.afterTest();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testRemoveAndRebalance() throws Exception {
+        removeAndRebalance();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testRemoveAndRebalanceWithPersistence() throws Exception {
+        persistence = true;
+
+        testRemoveAndRebalance();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void removeAndRebalance() throws Exception {
+        IgniteEx ignite0 = startGrid(0);
+
+        if (persistence)
+            ignite0.cluster().active(true);
+
+        final int pageSize = 
ignite0.configuration().getDataStorageConfiguration().getPageSize();
+
+        assert pageSize > 0;
+
+        IgniteCache<TestKey, TestValue> cache0 = 
ignite0.createCache(cacheConfiguration());
+
+        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+        List<TestKey> keys = new ArrayList<>();
+
+        Map<TestKey, TestValue> data = new HashMap<>();
+
+        final int KEYS = 10_000;
+        final int ADD_NODES = 3;
+
+        for (int i = 0; i < KEYS; i++) {
+            TestKey key = new TestKey(i, new byte[rnd.nextInt(pageSize * 3)]);
+
+            keys.add(key);
+
+            data.put(key, new TestValue(new byte[rnd.nextInt(pageSize * 3)]));
+        }
+
+        cache0.putAll(data);
+
+        AtomicInteger nodeIdx = new AtomicInteger();
+
+        for (int iter = 0; iter < ADD_NODES; iter++) {
+            IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new 
Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    int idx = nodeIdx.incrementAndGet();
+
+                    info("Start node: " + idx);
+
+                    return startGrid(idx);
+                }
+            });
+
+            long endTime = System.currentTimeMillis() + 5000;
+
+            while (System.currentTimeMillis() < endTime) {
+                for (int i = 0; i < 100; i++) {
+                    TestKey key = keys.get(rnd.nextInt(keys.size()));
+
+                    if (rnd.nextBoolean()) {
+                        cache0.remove(key);
+
+                        data.remove(key);
+                    } else {
+                        TestValue val = new TestValue(new 
byte[rnd.nextInt(pageSize * 3)]);
+
+                        cache0.put(key, val);
+                        data.put(key, val);
+                    }
+                }
+            }
+
+            fut.get(30_000);
+
+            checkData(keys, data);
+
+            waitTombstoneCleanup();
+
+            checkData(keys, data);
+        }
+
+        awaitPartitionMapExchange();
+
+        for (int iter = 0; iter < ADD_NODES; iter++) {
+            IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new 
Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    int idx = nodeIdx.getAndDecrement();
+
+                    info("Stop node: " + idx);
+
+                    stopGrid(idx);
+
+                    awaitPartitionMapExchange();
+
+                    return null;
+                }
+            });
+
+            long endTime = System.currentTimeMillis() + 5000;
+
+            while (System.currentTimeMillis() < endTime) {
+                for (int i = 0; i < 100; i++) {
+                    TestKey key = keys.get(rnd.nextInt(keys.size()));
+
+                    if (rnd.nextBoolean()) {
+                        cache0.remove(key);
+
+                        data.remove(key);
+                    } else {
+                        TestValue val = new TestValue(new 
byte[rnd.nextInt(pageSize * 3)]);
+
+                        cache0.put(key, val);
+                        data.put(key, val);
+                    }
+                }
+            }
+
+            fut.get(30_000);
+
+            checkData(keys, data);
+
+            waitTombstoneCleanup();
+
+            checkData(keys, data);
+        }
+    }
+
+    private void checkData(List<TestKey> keys, Map<TestKey, TestValue> data) {
+        for (Ignite node : Ignition.allGrids()) {
+            if (!node.name().endsWith("CacheRemoveWithTombstonesLoadTest1"))
+                continue;
+
+            info("Check node: " + node.name());
+
+            IgniteCache<TestKey, TestValue> cache = 
node.cache(DEFAULT_CACHE_NAME);
+
+            for (TestKey key : keys) {
+                TestValue expVal = data.get(key);
+                TestValue val = cache.get(key);
+
+                if (expVal == null)
+                    assertNull(val);
+                else {
+                    assertNotNull(val);
+                    assertTrue(Arrays.equals(expVal.dummyData, val.dummyData));
+                }
+            }
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void waitTombstoneCleanup() throws Exception {
+        for (Ignite node : Ignition.allGrids()) {
+            final LongMetric tombstones =  
((IgniteEx)node).context().metric().registry(
+                cacheMetricsRegistryName(DEFAULT_CACHE_NAME, 
false)).findMetric("Tombstones");
+
+            GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                @Override public boolean apply() {
+                    return tombstones.get() == 0;
+                }
+            }, 30_000);
+
+            assertEquals("Failed to wait for tombstone cleanup: " + 
node.name(), 0, tombstones.get());
+        }
+    }
+
+    /**
+     * @return Cache configuration.
+     */
+    private CacheConfiguration<TestKey, TestValue> cacheConfiguration() {
+        CacheConfiguration<TestKey, TestValue> ccfg = new 
CacheConfiguration<>(DEFAULT_CACHE_NAME);
+
+        ccfg.setAtomicityMode(TRANSACTIONAL);
+        ccfg.setCacheMode(PARTITIONED);
+        ccfg.setBackups(1);
+        ccfg.setRebalanceMode(ASYNC);
+        
ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+
+        return ccfg;
+    }
+
+    /**
+     *
+     */
+    static class TestKey {
+        /** */
+        private final int id;
+
+        /** */
+        private final byte[] dummyData;
+
+        /**
+         * @param id ID.
+         * @param dummyData Dummy byte array (to test with various key sizes).
+         */
+        public TestKey(int id, byte[] dummyData) {
+            this.id = id;
+            this.dummyData = dummyData;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            TestKey testKey = (TestKey) o;
+
+            return id == testKey.id && Arrays.equals(dummyData, 
testKey.dummyData);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            int result = Objects.hash(id);
+            result = 31 * result + Arrays.hashCode(dummyData);
+            return result;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return "TestKey [id=" + id + "]";
+        }
+    }
+
+    /**
+     *
+     */
+    static class TestValue {
+        /** */
+        private final byte[] dummyData;
+
+        /**
+         * @param dummyData Dummy byte array (to test with various value 
sizes).
+         */
+        public TestValue(byte[] dummyData) {
+            this.dummyData = dummyData;
+        }
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite9.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite9.java
index 3927a34..708d4d0 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite9.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite9.java
@@ -34,6 +34,7 @@ import 
org.apache.ignite.internal.processors.cache.IgniteCacheGetCustomCollectio
 import 
org.apache.ignite.internal.processors.cache.IgniteCacheLoadRebalanceEvictionSelfTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.CacheAtomicPrimarySyncBackPressureTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.CacheOperationsInterruptTest;
+import 
org.apache.ignite.internal.processors.cache.distributed.CacheRemoveWithTombstonesLoadTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.CacheRemoveWithTombstonesTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.FailBackupOnAtomicOperationTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.IgniteCachePrimarySyncTest;
@@ -120,6 +121,7 @@ public class IgniteCacheTestSuite9 {
         GridTestUtils.addTestIfNeeded(suite, 
FailBackupOnAtomicOperationTest.class, ignoredTests);
 
         GridTestUtils.addTestIfNeeded(suite, 
CacheRemoveWithTombstonesTest.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, 
CacheRemoveWithTombstonesLoadTest.class, ignoredTests);
 
         return suite;
     }

Reply via email to