IGNITE-6654 Ignite client can hang in case IgniteOOM on server. This closes #2908.
Signed-off-by: nikolay_tikhonov <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/918febaa Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/918febaa Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/918febaa Branch: refs/heads/ignite-6748 Commit: 918febaa17efa9e109fc68d268afbc7109a800e9 Parents: 6ed872b Author: nikolay_tikhonov <[email protected]> Authored: Wed Oct 25 18:46:59 2017 +0300 Committer: nikolay_tikhonov <[email protected]> Committed: Wed Oct 25 18:46:59 2017 +0300 ---------------------------------------------------------------------- .../pagemem/impl/PageMemoryNoStoreImpl.java | 6 +- .../dht/atomic/GridDhtAtomicCache.java | 7 +- .../datastreamer/DataStreamerImpl.java | 23 +- .../cache/IgniteOutOfMemoryPropagationTest.java | 251 +++++++++++++++++++ .../testsuites/IgniteCacheTestSuite6.java | 5 + 5 files changed, 285 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/918febaa/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageMemoryNoStoreImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageMemoryNoStoreImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageMemoryNoStoreImpl.java index 6ba68c2..e219d6e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageMemoryNoStoreImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageMemoryNoStoreImpl.java @@ -290,9 +290,11 @@ public class PageMemoryNoStoreImpl implements PageMemory { if (relPtr == INVALID_REL_PTR) throw new IgniteOutOfMemoryException("Not enough memory allocated " + - "(consider increasing data region size or enabling evictions) " + "[policyName=" + dataRegionCfg.getName() + - ", size=" + U.readableSize(dataRegionCfg.getMaxSize(), true) + "]" + ", size=" + U.readableSize(dataRegionCfg.getMaxSize(), true) + "]" + U.nl() + + "Consider increasing memory policy size, enabling evictions, adding more nodes to the cluster, " + + "reducing number of backups or reducing model size." + ); assert (relPtr & ~PageIdUtils.PAGE_IDX_MASK) == 0 : U.hexLong(relPtr & ~PageIdUtils.PAGE_IDX_MASK); http://git-wip-us.apache.org/repos/asf/ignite/blob/918febaa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 5095f45..a7dd615 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -38,6 +38,7 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.mem.IgniteOutOfMemoryException; import org.apache.ignite.internal.pagemem.wal.StorageException; import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; @@ -96,6 +97,7 @@ import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.CO; import org.apache.ignite.internal.util.typedef.CX1; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; @@ -3213,7 +3215,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { catch (GridDhtInvalidPartitionException ignored) { // Ignore. } - catch (IgniteCheckedException e) { + catch (IgniteCheckedException|RuntimeException e) { + if(e instanceof RuntimeException && !X.hasCause(e, IgniteOutOfMemoryException.class)) + throw (RuntimeException)e; + IgniteCheckedException err = new IgniteCheckedException("Failed to update key on backup node: " + key, e); if (nearRes != null) http://git-wip-us.apache.org/repos/asf/ignite/blob/918febaa/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java index 6ed552a..d38132f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java @@ -188,6 +188,9 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed /** {@code True} if data loader has been cancelled. */ private volatile boolean cancelled; + /** Cancellation reason. */ + private volatile Throwable cancellationReason = null; + /** Fail counter. */ private final LongAdder8 failCntr = new LongAdder8(); @@ -210,7 +213,12 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed failCntr.increment(); - cancelled = true; + synchronized (DataStreamerImpl.this) { + if(cancellationReason == null) + cancellationReason = err; + + cancelled = true; + } } } }; @@ -399,12 +407,12 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed if (disconnectErr != null) throw disconnectErr; - throw new IllegalStateException("Data streamer has been closed."); + closedException(); } else if (cancelled) { busyLock.leaveBusy(); - throw new IllegalStateException("Data streamer has been closed."); + closedException(); } } @@ -886,7 +894,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed @Override public void run() { try { if (cancelled) - throw new IllegalStateException("DataStreamer closed."); + closedException(); load0(entriesForNode, resFut, activeKeys, remaps + 1); } @@ -990,6 +998,13 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed } /** + * Throws stream closed exception. + */ + private void closedException() { + throw new IllegalStateException("Data streamer has been closed.", cancellationReason); + } + + /** * @param key Key to map. * @param topVer Topology version. * @param cctx Context. http://git-wip-us.apache.org/repos/asf/ignite/blob/918febaa/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteOutOfMemoryPropagationTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteOutOfMemoryPropagationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteOutOfMemoryPropagationTest.java new file mode 100644 index 0000000..a13cbd4 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteOutOfMemoryPropagationTest.java @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.HashMap; +import java.util.Map; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.Ignition; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cluster.ClusterTopologyException; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.mem.IgniteOutOfMemoryException; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; + +/** + * + */ +public class IgniteOutOfMemoryPropagationTest extends GridCommonAbstractTest { + + /** */ + public static final int NODES = 3; + + /** */ + private CacheAtomicityMode atomicityMode; + + /** */ + private CacheMode mode; + + /** */ + private int backupsCount; + + /** */ + private CacheWriteSynchronizationMode writeSyncMode; + + /** */ + private IgniteEx client; + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + assert G.allGrids().isEmpty(); + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 20 * 60 * 1000; + } + + /** */ + public void testPutOOMPropagation() throws Exception { + testOOMPropagation(false); + } + + /** */ + public void testStreamerOOMPropagation() throws Exception { + testOOMPropagation(true); + } + + /** */ + private void testOOMPropagation(boolean useStreamer) throws Exception { + for (CacheAtomicityMode atomicityMode : CacheAtomicityMode.values()) { + for (CacheMode cacheMode : CacheMode.values()) { + for (CacheWriteSynchronizationMode writeSyncMode : CacheWriteSynchronizationMode.values()) { + for (int backupsCount = 0; backupsCount < 1; backupsCount++) { + if (writeSyncMode == CacheWriteSynchronizationMode.FULL_ASYNC + || cacheMode == CacheMode.REPLICATED) + continue; + + if (atomicityMode == CacheAtomicityMode.TRANSACTIONAL && !useStreamer) { + for (TransactionConcurrency concurrency : TransactionConcurrency.values()) { + for (TransactionIsolation isolation : TransactionIsolation.values()) { + checkOOMPropagation( + false, + CacheAtomicityMode.TRANSACTIONAL, + cacheMode, + writeSyncMode, + backupsCount, + concurrency, + isolation); + } + } + } + else + checkOOMPropagation(useStreamer, atomicityMode, cacheMode, writeSyncMode, backupsCount); + } + } + } + } + } + + /** */ + private void checkOOMPropagation(boolean useStreamer, CacheAtomicityMode atomicityMode, CacheMode cacheMode, + CacheWriteSynchronizationMode writeSyncMode, int backupsCount) throws Exception { + checkOOMPropagation(useStreamer, atomicityMode, cacheMode, writeSyncMode, backupsCount, null, null); + } + + /** */ + private void checkOOMPropagation(boolean useStreamer, CacheAtomicityMode atomicityMode, CacheMode cacheMode, + CacheWriteSynchronizationMode writeSyncMode, int backupsCount, + TransactionConcurrency concurrency, TransactionIsolation isolation) throws Exception { + Throwable t = null; + + System.out.println("Checking conf: CacheAtomicityMode." + atomicityMode + + " CacheMode." + mode + " CacheWriteSynchronizationMode." + writeSyncMode + " backupsCount = " + backupsCount + + " TransactionConcurrency." + concurrency + " TransactionIsolation." + isolation); + + initGrid(atomicityMode, cacheMode, writeSyncMode, backupsCount); + try { + forceOOM(useStreamer, concurrency, isolation); + } + catch (Throwable t0) { + t = t0; + + t.printStackTrace(System.out); + + assertTrue(X.hasCause(t, IgniteOutOfMemoryException.class, ClusterTopologyException.class)); + } + finally { + assertNotNull(t); + + stopAllGrids(); + } + } + + /** + * Ignite grid of 3 server nodes with passed parameters. + * + * @param atomicityMode atomicity mode + * @param mode cache mode + * @param writeSyncMode cache write synchronization mode + * @param backupsCount backups count + * @throws Exception + */ + private void initGrid(CacheAtomicityMode atomicityMode, CacheMode mode, + CacheWriteSynchronizationMode writeSyncMode, int backupsCount) throws Exception { + + this.atomicityMode = atomicityMode; + this.mode = mode; + this.backupsCount = backupsCount; + this.writeSyncMode = writeSyncMode; + + Ignition.setClientMode(false); + + for (int i = 0; i < NODES; i++) + startGrid(i); + + Ignition.setClientMode(true); + + client = startGrid(NODES + 1); + + // it is required to start first node in test jvm, but we can not start client node, + // because client will fail to connect and test will fail too. + // as workaround start first server node in test jvm and then stop it. + stopGrid(0); + } + + + /** */ + public void forceOOM(boolean useStreamer, TransactionConcurrency concurrency, + TransactionIsolation isolation) throws Exception { + final IgniteCache<Object, Object> cache = client.cache(DEFAULT_CACHE_NAME); + + IgniteDataStreamer<String, String> streamer = client.dataStreamer(DEFAULT_CACHE_NAME); + + Map<String, String> map = new HashMap<>(); + + Transaction tx = null; + + for (int i = 0; i < Integer.MAX_VALUE; i++) { + map.put("k" + i, "v" + i); + + if (map.size() > 1_000) { + if (concurrency != null && isolation != null) + tx = client.transactions().txStart(concurrency, isolation); + + if (useStreamer) + streamer.addData(map); + else + cache.putAll(map); + + map.clear(); + + if (tx != null) { + tx.commit(); + tx.close(); + } + } + } + } + + /** {@inheritDoc} */ + @Override protected boolean isMultiJvm() { + return true; + } + + /** {@inheritDoc} */ + @Override protected boolean isRemoteJvm(String igniteInstanceName) { + return !(Ignition.isClientMode() || igniteInstanceName.endsWith("0")); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + DataStorageConfiguration memCfg = new DataStorageConfiguration(); + + memCfg.setDefaultDataRegionConfiguration(new DataRegionConfiguration() + .setMaxSize(10 * 1024 * 1024 + 1)); + + cfg.setDataStorageConfiguration(memCfg); + + CacheConfiguration<Object, Object> baseCfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME); + + baseCfg.setAtomicityMode(this.atomicityMode); + baseCfg.setCacheMode(this.mode); + baseCfg.setBackups(this.backupsCount); + baseCfg.setWriteSynchronizationMode(this.writeSyncMode); + + cfg.setCacheConfiguration(baseCfg); + + return cfg; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/918febaa/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java index 7c71381..8a2d6a0 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java @@ -18,6 +18,7 @@ package org.apache.ignite.testsuites; import junit.framework.TestSuite; +import org.apache.ignite.internal.processors.cache.IgniteOutOfMemoryPropagationTest; import org.apache.ignite.internal.processors.cache.distributed.CacheExchangeMergeTest; import org.apache.ignite.internal.processors.cache.distributed.CachePartitionStateTest; import org.apache.ignite.internal.processors.cache.distributed.GridCachePartitionEvictionDuringReadThroughSelfTest; @@ -54,6 +55,10 @@ public class IgniteCacheTestSuite6 extends TestSuite { suite.addTestSuite(TxRollbackOnTimeoutNearCacheTest.class); suite.addTestSuite(IgniteCacheThreadLocalTxTest.class); + +// TODO enable this test after IGNITE-6753, now it takes too long +// suite.addTestSuite(IgniteOutOfMemoryPropagationTest.class); + return suite; } }
