Repository: ignite Updated Branches: refs/heads/master 8828c1d19 -> cb0d1de30
IGNITE-9882: Hadoop: fixed OOME on TC. This closes #5003. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/cb0d1de3 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/cb0d1de3 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/cb0d1de3 Branch: refs/heads/master Commit: cb0d1de30c44fb333f5ea7c4db087015a7cfe8b5 Parents: 8828c1d Author: tledkov-gridgain <[email protected]> Authored: Wed Oct 17 17:11:09 2018 +0300 Committer: devozerov <[email protected]> Committed: Wed Oct 17 17:11:09 2018 +0300 ---------------------------------------------------------------------- .../apache/ignite/internal/IgniteKernal.java | 4 ++ .../processors/hadoop/HadoopHelper.java | 5 ++ .../processors/hadoop/HadoopNoopHelper.java | 5 ++ .../internal/processors/igfs/IgfsImpl.java | 3 ++ .../processors/hadoop/HadoopHelperImpl.java | 8 ++- .../impl/HadoopAbstractMapReduceTest.java | 18 +++++++ .../taskexecutor/HadoopExecutorServiceTest.java | 52 ++------------------ 7 files changed, 45 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/cb0d1de3/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 7f4310f..c6ec9be 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -2289,6 +2289,9 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { } } + if (ctx.hadoopHelper() != null) + ctx.hadoopHelper().close(); + if (starveTask != null) starveTask.close(); @@ -2377,6 +2380,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { MarshallerExclusions.clearCache(); BinaryEnumCache.clear(); + gw.writeLock(); try { http://git-wip-us.apache.org/repos/asf/ignite/blob/cb0d1de3/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopHelper.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopHelper.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopHelper.java index 7936fef..ae9985d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopHelper.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopHelper.java @@ -59,4 +59,9 @@ public interface HadoopHelper { * @return Work directory. */ public String workDirectory(); + + /** + * Close helper. + */ + public void close(); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/cb0d1de3/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopNoopHelper.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopNoopHelper.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopNoopHelper.java index f8f870f..986af1f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopNoopHelper.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopNoopHelper.java @@ -62,6 +62,11 @@ public class HadoopNoopHelper implements HadoopHelper { throw unsupported(); } + /** {@inheritDoc} */ + @Override public void close() { + // No-op. + } + /** * @return Exception. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/cb0d1de3/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java index ff53e00..bac536d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java @@ -276,6 +276,9 @@ public final class IgfsImpl implements IgfsEx { // Restore interrupted flag. if (interrupted) Thread.currentThread().interrupt(); + + if (dualPool != null) + dualPool.shutdownNow(); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/cb0d1de3/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopHelperImpl.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopHelperImpl.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopHelperImpl.java index 0e86529..6da79b2 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopHelperImpl.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopHelperImpl.java @@ -35,7 +35,7 @@ import java.io.InputStream; */ public class HadoopHelperImpl implements HadoopHelper { /** Kernal context. */ - private final GridKernalContext ctx; + private GridKernalContext ctx; /** Common class loader. */ private volatile HadoopClassLoader ldr; @@ -130,4 +130,10 @@ public class HadoopHelperImpl implements HadoopHelper { throw new IgniteException("Failed to resolve Ignite work directory.", e); } } + + /** {@inheritDoc} */ + @Override public void close() { + // Force drop KernalContext link, because HadoopHelper leaks in some tests. + ctx = null; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/cb0d1de3/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractMapReduceTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractMapReduceTest.java index fc6d7f8..fa224b2 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractMapReduceTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractMapReduceTest.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.SortedMap; import java.util.TreeMap; import java.util.UUID; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; @@ -55,6 +56,7 @@ import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; import org.apache.ignite.internal.processors.hadoop.counter.HadoopPerformanceCounter; import org.apache.ignite.internal.processors.hadoop.impl.examples.HadoopWordCount1; import org.apache.ignite.internal.processors.hadoop.impl.examples.HadoopWordCount2; +import org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopFileSystemsUtils; import org.apache.ignite.internal.processors.igfs.IgfsEx; import org.apache.ignite.internal.processors.igfs.IgfsUtils; import org.apache.ignite.internal.util.lang.GridAbsPredicate; @@ -344,6 +346,22 @@ public class HadoopAbstractMapReduceTest extends HadoopAbstractWordCountTest { super.beforeTest(); } + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + igniteSecondary = null; + secondaryFs = null; + igfs = null; + + HadoopFileSystemsUtils.clearFileSystemCache(); + FileSystem.clearStatistics(); + + Map stat = GridTestUtils.getFieldValue(FileSystem.class, FileSystem.class, "statisticsTable"); + + stat.clear(); + } + /** * Start grid with IGFS. * http://git-wip-us.apache.org/repos/asf/ignite/blob/cb0d1de3/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopExecutorServiceTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopExecutorServiceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopExecutorServiceTest.java index 3486a14..cc6cbb3 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopExecutorServiceTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopExecutorServiceTest.java @@ -17,14 +17,13 @@ package org.apache.ignite.internal.processors.hadoop.impl.taskexecutor; -import java.util.concurrent.Callable; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.LongAdder; -import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopExecutorService; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.LongAdder; + /** * */ @@ -70,49 +69,4 @@ public class HadoopExecutorServiceTest extends GridCommonAbstractTest { assertTrue(exec.shutdown(0)); } - - /** - * @throws Exception If failed. - */ - public void testShutdown() throws Exception { - for (int i = 0; i < 5; i++) { - final HadoopExecutorService exec = new HadoopExecutorService(log, "_GRID_NAME_", 10, 5); - - final LongAdder sum = new LongAdder(); - - final AtomicBoolean finish = new AtomicBoolean(); - - IgniteInternalFuture<?> fut = multithreadedAsync(new Callable<Object>() { - @Override public Object call() throws Exception { - while (!finish.get()) { - exec.submit(new Callable<Void>() { - @Override public Void call() throws Exception { - sum.increment(); - - return null; - } - }); - } - - return null; - } - }, 19); - - Thread.sleep(200); - - assertTrue(exec.shutdown(50)); - - long res = sum.sum(); - - assertTrue(res > 0); - - finish.set(true); - - fut.get(); - - assertEquals(res, sum.sum()); // Nothing was executed after shutdown. - - X.println("_ ok"); - } - } } \ No newline at end of file
