Repository: ignite Updated Branches: refs/heads/master 32fc6c3c1 -> d1be9b855
IGNITE-6892 OOM should be covered by failure handling Signed-off-by: Andrey Gura <ag...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d1be9b85 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d1be9b85 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d1be9b85 Branch: refs/heads/master Commit: d1be9b85507eb3358327e93b81031f92e660531b Parents: 32fc6c3 Author: Aleksey Plekhanov <plehanov.a...@gmail.com> Authored: Wed Apr 11 18:24:51 2018 +0300 Committer: Andrey Gura <ag...@apache.org> Committed: Wed Apr 11 18:24:51 2018 +0300 ---------------------------------------------------------------------- .../apache/ignite/IgniteSystemProperties.java | 8 + .../org/apache/ignite/internal/IgnitionEx.java | 50 +++- .../discovery/GridDiscoveryManager.java | 3 + .../processors/cache/WalStateManager.java | 8 +- .../continuous/GridContinuousProcessor.java | 3 + .../datastreamer/DataStreamProcessor.java | 3 + .../processors/failure/FailureProcessor.java | 11 + .../internal/processors/job/GridJobWorker.java | 8 +- .../service/GridServiceProcessor.java | 15 +- .../thread/IgniteStripedThreadPoolExecutor.java | 8 +- .../ignite/thread/IgniteThreadFactory.java | 30 ++- .../ignite/thread/IgniteThreadPoolExecutor.java | 12 +- .../ignite/thread/OomExceptionHandler.java | 44 ++++ .../ignite/failure/OomFailureHandlerTest.java | 255 +++++++++++++++++++ .../ignite/testsuites/IgniteBasicTestSuite.java | 2 + 15 files changed, 430 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/d1be9b85/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index 662338c..437f49f 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -863,6 +863,14 @@ public final class IgniteSystemProperties { public static final String IGNITE_BPLUS_TREE_LOCK_RETRIES = "IGNITE_BPLUS_TREE_LOCK_RETRIES"; /** + * Amount of memory reserved in the heap at node start, which can be dropped to increase the chances of success when + * handling OutOfMemoryError. + * + * Default is {@code 64kb}. + */ + public static final String IGNITE_FAILURE_HANDLER_RESERVE_BUFFER_SIZE = "IGNITE_FAILURE_HANDLER_RESERVE_BUFFER_SIZE"; + + /** * The threshold of uneven distribution above which partition distribution will be logged. * * The default is '50', that means: warn about nodes with 50+% difference. http://git-wip-us.apache.org/repos/asf/ignite/blob/d1be9b85/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java index 10a0752..b3c3ee8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal; import java.io.File; import java.io.IOException; import java.io.InputStream; +import java.lang.Thread.UncaughtExceptionHandler; import java.lang.management.ManagementFactory; import java.lang.reflect.Constructor; import java.net.MalformedURLException; @@ -88,6 +89,7 @@ import org.apache.ignite.internal.util.typedef.CA; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; @@ -1764,6 +1766,13 @@ public class IgnitionEx { validateThreadPoolSize(cfg.getPublicThreadPoolSize(), "public"); + UncaughtExceptionHandler oomeHnd = new UncaughtExceptionHandler() { + @Override public void uncaughtException(Thread t, Throwable e) { + if (grid != null && X.hasCause(e, OutOfMemoryError.class)) + grid.context().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e)); + } + }; + execSvc = new IgniteThreadPoolExecutor( "pub", cfg.getIgniteInstanceName(), @@ -1771,7 +1780,8 @@ public class IgnitionEx { cfg.getPublicThreadPoolSize(), DFLT_THREAD_KEEP_ALIVE_TIME, new LinkedBlockingQueue<Runnable>(), - GridIoPolicy.PUBLIC_POOL); + GridIoPolicy.PUBLIC_POOL, + oomeHnd); execSvc.allowCoreThreadTimeOut(true); @@ -1784,7 +1794,8 @@ public class IgnitionEx { cfg.getServiceThreadPoolSize(), DFLT_THREAD_KEEP_ALIVE_TIME, new LinkedBlockingQueue<Runnable>(), - GridIoPolicy.SERVICE_POOL); + GridIoPolicy.SERVICE_POOL, + oomeHnd); svcExecSvc.allowCoreThreadTimeOut(true); @@ -1797,7 +1808,8 @@ public class IgnitionEx { cfg.getSystemThreadPoolSize(), DFLT_THREAD_KEEP_ALIVE_TIME, new LinkedBlockingQueue<Runnable>(), - GridIoPolicy.SYSTEM_POOL); + GridIoPolicy.SYSTEM_POOL, + oomeHnd); sysExecSvc.allowCoreThreadTimeOut(true); @@ -1828,7 +1840,8 @@ public class IgnitionEx { cfg.getManagementThreadPoolSize(), DFLT_THREAD_KEEP_ALIVE_TIME, new LinkedBlockingQueue<Runnable>(), - GridIoPolicy.MANAGEMENT_POOL); + GridIoPolicy.MANAGEMENT_POOL, + oomeHnd); mgmtExecSvc.allowCoreThreadTimeOut(true); @@ -1844,7 +1857,8 @@ public class IgnitionEx { cfg.getPeerClassLoadingThreadPoolSize(), DFLT_THREAD_KEEP_ALIVE_TIME, new LinkedBlockingQueue<Runnable>(), - GridIoPolicy.P2P_POOL); + GridIoPolicy.P2P_POOL, + oomeHnd); p2pExecSvc.allowCoreThreadTimeOut(true); @@ -1879,7 +1893,8 @@ public class IgnitionEx { callbackExecSvc = new IgniteStripedThreadPoolExecutor( cfg.getAsyncCallbackPoolSize(), cfg.getIgniteInstanceName(), - "callback"); + "callback", + oomeHnd); if (myCfg.getConnectorConfiguration() != null) { validateThreadPoolSize(myCfg.getConnectorConfiguration().getThreadPoolSize(), "connector"); @@ -1890,7 +1905,9 @@ public class IgnitionEx { myCfg.getConnectorConfiguration().getThreadPoolSize(), myCfg.getConnectorConfiguration().getThreadPoolSize(), DFLT_THREAD_KEEP_ALIVE_TIME, - new LinkedBlockingQueue<Runnable>() + new LinkedBlockingQueue<Runnable>(), + GridIoPolicy.UNDEFINED, + oomeHnd ); restExecSvc.allowCoreThreadTimeOut(true); @@ -1905,7 +1922,8 @@ public class IgnitionEx { myCfg.getUtilityCacheThreadPoolSize(), myCfg.getUtilityCacheKeepAliveTime(), new LinkedBlockingQueue<Runnable>(), - GridIoPolicy.UTILITY_CACHE_POOL); + GridIoPolicy.UTILITY_CACHE_POOL, + oomeHnd); utilityCacheExecSvc.allowCoreThreadTimeOut(true); @@ -1916,7 +1934,8 @@ public class IgnitionEx { 1, DFLT_THREAD_KEEP_ALIVE_TIME, new LinkedBlockingQueue<Runnable>(), - GridIoPolicy.AFFINITY_POOL); + GridIoPolicy.AFFINITY_POOL, + oomeHnd); affExecSvc.allowCoreThreadTimeOut(true); @@ -1930,7 +1949,8 @@ public class IgnitionEx { cpus * 2, 3000L, new LinkedBlockingQueue<Runnable>(1000), - GridIoPolicy.IDX_POOL + GridIoPolicy.IDX_POOL, + oomeHnd ); } @@ -1943,7 +1963,8 @@ public class IgnitionEx { cfg.getQueryThreadPoolSize(), DFLT_THREAD_KEEP_ALIVE_TIME, new LinkedBlockingQueue<Runnable>(), - GridIoPolicy.QUERY_POOL); + GridIoPolicy.QUERY_POOL, + oomeHnd); qryExecSvc.allowCoreThreadTimeOut(true); @@ -1954,7 +1975,8 @@ public class IgnitionEx { 2, DFLT_THREAD_KEEP_ALIVE_TIME, new LinkedBlockingQueue<Runnable>(), - GridIoPolicy.SCHEMA_POOL); + GridIoPolicy.SCHEMA_POOL, + oomeHnd); schemaExecSvc.allowCoreThreadTimeOut(true); @@ -1970,7 +1992,9 @@ public class IgnitionEx { execCfg.getSize(), execCfg.getSize(), DFLT_THREAD_KEEP_ALIVE_TIME, - new LinkedBlockingQueue<Runnable>()); + new LinkedBlockingQueue<Runnable>(), + GridIoPolicy.UNDEFINED, + oomeHnd); customExecSvcs.put(execCfg.getName(), exec); } http://git-wip-us.apache.org/repos/asf/ignite/blob/d1be9b85/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 400bb5f..77c9657 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -130,6 +130,7 @@ import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator; import org.apache.ignite.spi.discovery.DiscoverySpiOrderSupport; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.thread.IgniteThread; +import org.apache.ignite.thread.OomExceptionHandler; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -924,6 +925,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { segChkThread = new IgniteThread(segChkWrk); + segChkThread.setUncaughtExceptionHandler(new OomExceptionHandler(ctx)); + segChkThread.start(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/d1be9b85/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java index 0ac699f..64a6819 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java @@ -38,6 +38,7 @@ import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.thread.OomExceptionHandler; import org.apache.ignite.thread.IgniteThread; import org.jetbrains.annotations.Nullable; @@ -473,7 +474,12 @@ public class WalStateManager extends GridCacheSharedManagerAdapter { // not-yet-flushed dirty pages have been logged. WalStateChangeWorker worker = new WalStateChangeWorker(msg, cpFut); - new IgniteThread(worker).start(); + IgniteThread thread = new IgniteThread(worker); + + thread.setUncaughtExceptionHandler(new OomExceptionHandler( + cctx.kernalContext())); + + thread.start(); } else { // Disable: not-yet-flushed operations are not logged, so wait for them http://git-wip-us.apache.org/repos/asf/ignite/blob/d1be9b85/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index cebe4b1..2d48b7d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -88,6 +88,7 @@ import org.apache.ignite.spi.discovery.DiscoveryDataBag; import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData; import org.apache.ignite.spi.discovery.DiscoveryDataBag.JoiningNodeDiscoveryData; import org.apache.ignite.thread.IgniteThread; +import org.apache.ignite.thread.OomExceptionHandler; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; @@ -1727,6 +1728,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter { } }); + checker.setUncaughtExceptionHandler(new OomExceptionHandler(ctx)); + bufCheckThreads.put(routineId, checker); checker.start(); http://git-wip-us.apache.org/repos/asf/ignite/blob/d1be9b85/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java index 8b984c0..e63d7d4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java @@ -44,6 +44,7 @@ import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.stream.StreamReceiver; import org.apache.ignite.thread.IgniteThread; +import org.apache.ignite.thread.OomExceptionHandler; import org.jetbrains.annotations.Nullable; import java.util.Collection; @@ -125,6 +126,8 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter { } }); + flusher.setUncaughtExceptionHandler(new OomExceptionHandler(ctx)); + flusher.start(); if (log.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/ignite/blob/d1be9b85/modules/core/src/main/java/org/apache/ignite/internal/processors/failure/FailureProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/failure/FailureProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/failure/FailureProcessor.java index 615fb9f..0234e84 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/failure/FailureProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/failure/FailureProcessor.java @@ -19,12 +19,14 @@ package org.apache.ignite.internal.processors.failure; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.failure.FailureContext; import org.apache.ignite.failure.FailureHandler; import org.apache.ignite.failure.StopNodeOrHaltFailureHandler; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.processors.GridProcessorAdapter; +import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; /** @@ -40,6 +42,9 @@ public class FailureProcessor extends GridProcessorAdapter { /** Failure context. */ private volatile FailureContext failureCtx; + /** Reserve buffer, which can be dropped to handle OOME. */ + private volatile byte[] reserveBuf; + /** * @param ctx Context. */ @@ -56,6 +61,9 @@ public class FailureProcessor extends GridProcessorAdapter { if (hnd == null) hnd = getDefaultFailureHandler(); + reserveBuf = new byte[IgniteSystemProperties.getInteger( + IgniteSystemProperties.IGNITE_FAILURE_HANDLER_RESERVE_BUFFER_SIZE, 64 * 1024)]; + assert hnd != null; this.hnd = hnd; @@ -102,6 +110,9 @@ public class FailureProcessor extends GridProcessorAdapter { U.error(ignite.log(), "Critical failure. Will be handled accordingly to configured handler [hnd=" + hnd.getClass() + ", failureCtx=" + failureCtx + ']', failureCtx.error()); + if (reserveBuf != null && X.hasCause(failureCtx.error(), OutOfMemoryError.class)) + reserveBuf = null; + boolean invalidated = hnd.onFailure(ignite, failureCtx); if (invalidated) { http://git-wip-us.apache.org/repos/asf/ignite/blob/d1be9b85/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java index 6d2e621..f7c07f5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java @@ -36,6 +36,8 @@ import org.apache.ignite.compute.ComputeJobContext; import org.apache.ignite.compute.ComputeJobMasterLeaveAware; import org.apache.ignite.compute.ComputeUserUndeclaredException; import org.apache.ignite.events.JobEvent; +import org.apache.ignite.failure.FailureContext; +import org.apache.ignite.failure.FailureType; import org.apache.ignite.igfs.IgfsOutOfSpaceException; import org.apache.ignite.internal.GridInternalException; import org.apache.ignite.internal.GridJobContextImpl; @@ -603,9 +605,13 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject { X.hasCause(e, ClusterTopologyCheckedException.class)) // Should be throttled, because GridServiceProxy continuously retry getting service. LT.error(log, e, "Failed to execute job [jobId=" + ses.getJobId() + ", ses=" + ses + ']'); - else + else { U.error(log, "Failed to execute job [jobId=" + ses.getJobId() + ", ses=" + ses + ']', e); + if (X.hasCause(e, OutOfMemoryError.class)) + ctx.failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e)); + } + ex = e; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/d1be9b85/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java index ff68e72..63f5027 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.service; +import java.lang.Thread.UncaughtExceptionHandler; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -103,6 +104,7 @@ import org.apache.ignite.services.ServiceConfiguration; import org.apache.ignite.services.ServiceDeploymentException; import org.apache.ignite.services.ServiceDescriptor; import org.apache.ignite.thread.IgniteThreadFactory; +import org.apache.ignite.thread.OomExceptionHandler; import org.apache.ignite.transactions.Transaction; import org.jetbrains.annotations.Nullable; import java.util.concurrent.ConcurrentHashMap; @@ -112,7 +114,6 @@ import static org.apache.ignite.IgniteSystemProperties.getString; import static org.apache.ignite.configuration.DeploymentMode.ISOLATED; import static org.apache.ignite.configuration.DeploymentMode.PRIVATE; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_SERVICES_COMPATIBILITY_MODE; -import static org.apache.ignite.internal.processors.cache.GridCacheUtils.UTILITY_CACHE_NAME; import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED; import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; @@ -154,8 +155,12 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite /** Busy lock. */ private volatile GridSpinBusyLock busyLock = new GridSpinBusyLock(); + /** Uncaught exception handler for thread pools. */ + private final UncaughtExceptionHandler oomeHnd = new OomExceptionHandler(ctx); + /** Thread factory. */ - private ThreadFactory threadFactory = new IgniteThreadFactory(ctx.igniteInstanceName(), "service"); + private ThreadFactory threadFactory = new IgniteThreadFactory(ctx.igniteInstanceName(), "service", + oomeHnd); /** Thread local for service name. */ private ThreadLocal<String> svcName = new ThreadLocal<>(); @@ -175,7 +180,8 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite public GridServiceProcessor(GridKernalContext ctx) { super(ctx); - depExe = Executors.newSingleThreadExecutor(new IgniteThreadFactory(ctx.igniteInstanceName(), "srvc-deploy")); + depExe = Executors.newSingleThreadExecutor(new IgniteThreadFactory(ctx.igniteInstanceName(), + "srvc-deploy", oomeHnd)); String servicesCompatibilityMode = getString(IGNITE_SERVICES_COMPATIBILITY_MODE); @@ -373,7 +379,8 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite busyLock = new GridSpinBusyLock(); - depExe = Executors.newSingleThreadExecutor(new IgniteThreadFactory(ctx.igniteInstanceName(), "srvc-deploy")); + depExe = Executors.newSingleThreadExecutor(new IgniteThreadFactory(ctx.igniteInstanceName(), + "srvc-deploy", oomeHnd)); start(); http://git-wip-us.apache.org/repos/asf/ignite/blob/d1be9b85/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java b/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java index 3cd7484..418812f 100644 --- a/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java +++ b/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java @@ -17,6 +17,7 @@ package org.apache.ignite.thread; +import java.lang.Thread.UncaughtExceptionHandler; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -45,10 +46,11 @@ public class IgniteStripedThreadPoolExecutor implements ExecutorService { * @param igniteInstanceName Node name. * @param threadNamePrefix Thread name prefix. */ - public IgniteStripedThreadPoolExecutor(int concurrentLvl, String igniteInstanceName, String threadNamePrefix) { + public IgniteStripedThreadPoolExecutor(int concurrentLvl, String igniteInstanceName, String threadNamePrefix, + UncaughtExceptionHandler eHnd) { execs = new ExecutorService[concurrentLvl]; - ThreadFactory factory = new IgniteThreadFactory(igniteInstanceName, threadNamePrefix); + ThreadFactory factory = new IgniteThreadFactory(igniteInstanceName, threadNamePrefix, eHnd); for (int i = 0; i < concurrentLvl; i++) execs[i] = Executors.newSingleThreadExecutor(factory); @@ -173,4 +175,4 @@ public class IgniteStripedThreadPoolExecutor implements ExecutorService { @Override public String toString() { return S.toString(IgniteStripedThreadPoolExecutor.class, this); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/d1be9b85/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java index 062c973..23bf14d 100644 --- a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java @@ -17,9 +17,9 @@ package org.apache.ignite.thread; +import java.lang.Thread.UncaughtExceptionHandler; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; - import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.util.typedef.internal.S; import org.jetbrains.annotations.NotNull; @@ -41,6 +41,9 @@ public class IgniteThreadFactory implements ThreadFactory { /** */ private final byte plc; + /** Exception handler. */ + private final UncaughtExceptionHandler eHnd; + /** * Constructs new thread factory for given grid. All threads will belong * to the same default thread group. @@ -49,7 +52,19 @@ public class IgniteThreadFactory implements ThreadFactory { * @param threadName Thread name. */ public IgniteThreadFactory(String igniteInstanceName, String threadName) { - this(igniteInstanceName, threadName, GridIoPolicy.UNDEFINED); + this(igniteInstanceName, threadName, null); + } + + /** + * Constructs new thread factory for given grid. All threads will belong + * to the same default thread group. + * + * @param igniteInstanceName Ignite instance name. + * @param threadName Thread name. + * @param eHnd Uncaught exception handler. + */ + public IgniteThreadFactory(String igniteInstanceName, String threadName, UncaughtExceptionHandler eHnd) { + this(igniteInstanceName, threadName, GridIoPolicy.UNDEFINED, eHnd); } /** @@ -59,16 +74,23 @@ public class IgniteThreadFactory implements ThreadFactory { * @param igniteInstanceName Ignite instance name. * @param threadName Thread name. * @param plc {@link GridIoPolicy} for thread pool. + * @param eHnd Uncaught exception handler. */ - public IgniteThreadFactory(String igniteInstanceName, String threadName, byte plc) { + public IgniteThreadFactory(String igniteInstanceName, String threadName, byte plc, UncaughtExceptionHandler eHnd) { this.igniteInstanceName = igniteInstanceName; this.threadName = threadName; this.plc = plc; + this.eHnd = eHnd; } /** {@inheritDoc} */ @Override public Thread newThread(@NotNull Runnable r) { - return new IgniteThread(igniteInstanceName, threadName, r, idxGen.incrementAndGet(), -1, plc); + Thread thread = new IgniteThread(igniteInstanceName, threadName, r, idxGen.incrementAndGet(), -1, plc); + + if (eHnd != null) + thread.setUncaughtExceptionHandler(eHnd); + + return thread; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/d1be9b85/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadPoolExecutor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadPoolExecutor.java b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadPoolExecutor.java index 83c64c3..fed77ad 100644 --- a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadPoolExecutor.java +++ b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadPoolExecutor.java @@ -17,6 +17,7 @@ package org.apache.ignite.thread; +import java.lang.Thread.UncaughtExceptionHandler; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadFactory; @@ -53,7 +54,8 @@ public class IgniteThreadPoolExecutor extends ThreadPoolExecutor { maxPoolSize, keepAliveTime, workQ, - GridIoPolicy.UNDEFINED); + GridIoPolicy.UNDEFINED, + null); } /** @@ -68,6 +70,7 @@ public class IgniteThreadPoolExecutor extends ThreadPoolExecutor { * @param workQ The queue to use for holding tasks before they are executed. This queue will hold only * runnable tasks submitted by the {@link #execute(Runnable)} method. * @param plc {@link GridIoPolicy} for thread pool. + * @param eHnd Uncaught exception handler for thread pool. */ public IgniteThreadPoolExecutor( String threadNamePrefix, @@ -76,14 +79,15 @@ public class IgniteThreadPoolExecutor extends ThreadPoolExecutor { int maxPoolSize, long keepAliveTime, BlockingQueue<Runnable> workQ, - byte plc) { + byte plc, + UncaughtExceptionHandler eHnd) { super( corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.MILLISECONDS, workQ, - new IgniteThreadFactory(igniteInstanceName, threadNamePrefix, plc) + new IgniteThreadFactory(igniteInstanceName, threadNamePrefix, plc, eHnd) ); } @@ -114,4 +118,4 @@ public class IgniteThreadPoolExecutor extends ThreadPoolExecutor { new AbortPolicy() ); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/d1be9b85/modules/core/src/main/java/org/apache/ignite/thread/OomExceptionHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/thread/OomExceptionHandler.java b/modules/core/src/main/java/org/apache/ignite/thread/OomExceptionHandler.java new file mode 100644 index 0000000..3a62ad8 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/thread/OomExceptionHandler.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.thread; + +import org.apache.ignite.failure.FailureContext; +import org.apache.ignite.failure.FailureType; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.util.typedef.X; + +/** + * OOM exception handler for system threads. + */ +public class OomExceptionHandler implements Thread.UncaughtExceptionHandler { + /** Context. */ + private final GridKernalContext ctx; + + /** + * @param ctx Context. + */ + public OomExceptionHandler(GridKernalContext ctx) { + this.ctx = ctx; + } + + /** {@inheritDoc} */ + @Override public void uncaughtException(Thread t, Throwable e) { + if (X.hasCause(e, OutOfMemoryError.class)) + ctx.failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e)); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/d1be9b85/modules/core/src/test/java/org/apache/ignite/failure/OomFailureHandlerTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/failure/OomFailureHandlerTest.java b/modules/core/src/test/java/org/apache/ignite/failure/OomFailureHandlerTest.java new file mode 100644 index 0000000..2af94b8 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/failure/OomFailureHandlerTest.java @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.failure; + +import javax.cache.processor.EntryProcessor; +import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.MutableEntry; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.events.Event; +import org.apache.ignite.events.EventType; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.lang.IgniteCallable; +import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.services.Service; +import org.apache.ignite.services.ServiceContext; +import org.apache.ignite.testframework.GridTestUtils; + +/** + * Out of memory error failure handler test. + */ +public class OomFailureHandlerTest extends AbstractFailureHandlerTest { + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setCacheConfiguration(new CacheConfiguration() + .setName(DEFAULT_CACHE_NAME) + .setCacheMode(CacheMode.PARTITIONED) + .setBackups(0) + ); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + + /** + * Test OOME in IgniteCompute. + */ + public void testComputeOomError() throws Exception { + IgniteEx ignite0 = startGrid(0); + IgniteEx ignite1 = startGrid(1); + + try { + IgniteFuture<Boolean> res = ignite0.compute(ignite0.cluster().forNodeId(ignite1.cluster().localNode().id())) + .callAsync(new IgniteCallable<Boolean>() { + @Override public Boolean call() throws Exception { + throw new OutOfMemoryError(); + } + }); + + res.get(); + } + catch (Throwable ignore) { + // Expected. + } + + assertFailureState(ignite0, ignite1); + } + + /** + * Test OOME in EntryProcessor. + */ + public void testEntryProcessorOomError() throws Exception { + IgniteEx ignite0 = startGrid(0); + IgniteEx ignite1 = startGrid(1); + + IgniteCache<Integer, Integer> cache0 = ignite0.getOrCreateCache(DEFAULT_CACHE_NAME); + IgniteCache<Integer, Integer> cache1 = ignite1.getOrCreateCache(DEFAULT_CACHE_NAME); + + awaitPartitionMapExchange(); + + Integer key = primaryKey(cache1); + + cache1.put(key, key); + + try { + IgniteFuture fut = cache0.invokeAsync(key, new EntryProcessor<Integer, Integer, Object>() { + @Override public Object process(MutableEntry<Integer, Integer> entry, + Object... arguments) throws EntryProcessorException { + throw new OutOfMemoryError(); + } + }); + + fut.get(); + } + catch (Throwable ignore) { + // Expected. + } + + assertFailureState(ignite0, ignite1); + } + + /** + * Test OOME in service method invocation. + */ + public void testServiceInvokeOomError() throws Exception { + IgniteEx ignite0 = startGrid(0); + IgniteEx ignite1 = startGrid(1); + + IgniteCache<Integer, Integer> cache1 = ignite1.getOrCreateCache(DEFAULT_CACHE_NAME); + + awaitPartitionMapExchange(); + + Integer key = primaryKey(cache1); + + ignite0.services().deployKeyAffinitySingleton("fail-invoke-service", new FailServiceImpl(false), + DEFAULT_CACHE_NAME, key); + + FailService svc = ignite0.services().serviceProxy("fail-invoke-service", FailService.class, false); + + try { + svc.fail(); + } + catch (Throwable ignore) { + // Expected. + } + + assertFailureState(ignite0, ignite1); + } + + /** + * Test OOME in service execute. + */ + public void testServiceExecuteOomError() throws Exception { + IgniteEx ignite0 = startGrid(0); + IgniteEx ignite1 = startGrid(1); + + IgniteCache<Integer, Integer> cache1 = ignite1.getOrCreateCache(DEFAULT_CACHE_NAME); + + awaitPartitionMapExchange(); + + Integer key = primaryKey(cache1); + + ignite0.services().deployKeyAffinitySingleton("fail-execute-service", new FailServiceImpl(true), + DEFAULT_CACHE_NAME, key); + + assertFailureState(ignite0, ignite1); + } + + /** + * Test OOME in event listener. + */ + public void testEventListenerOomError() throws Exception { + IgniteEx ignite0 = startGrid(0); + IgniteEx ignite1 = startGrid(1); + + IgniteCache<Integer, Integer> cache0 = ignite0.getOrCreateCache(DEFAULT_CACHE_NAME); + IgniteCache<Integer, Integer> cache1 = ignite1.getOrCreateCache(DEFAULT_CACHE_NAME); + + awaitPartitionMapExchange(); + + ignite1.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + throw new OutOfMemoryError(); + } + }, EventType.EVT_CACHE_OBJECT_PUT); + + Integer key = primaryKey(cache1); + + try { + cache0.put(key, key); + } + catch (Throwable ignore) { + // Expected. + } + + assertFailureState(ignite0, ignite1); + } + + /** + * @param igniteWork Working ignite instance. + * @param igniteFail Failed ignite instance. + */ + private static void assertFailureState(Ignite igniteWork, Ignite igniteFail) throws IgniteInterruptedCheckedException { + assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return dummyFailureHandler(igniteFail).failure(); + } + }, 5000L)); + + assertFalse(dummyFailureHandler(igniteWork).failure()); + } + + /** + * + */ + private interface FailService extends Service { + /** + * Fail. + */ + void fail(); + } + + /** + * + */ + private static class FailServiceImpl implements FailService { + /** Fail on execute. */ + private final boolean failOnExec; + + /** + * @param failOnExec Fail on execute. + */ + private FailServiceImpl(boolean failOnExec) { + this.failOnExec = failOnExec; + } + + /** {@inheritDoc} */ + @Override public void fail() { + throw new OutOfMemoryError(); + } + + /** {@inheritDoc} */ + @Override public void cancel(ServiceContext ctx) { + } + + /** {@inheritDoc} */ + @Override public void init(ServiceContext ctx) throws Exception { + } + + /** {@inheritDoc} */ + @Override public void execute(ServiceContext ctx) throws Exception { + if (failOnExec) + throw new OutOfMemoryError(); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/d1be9b85/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java index c4b7d92..c388f1d 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java @@ -22,6 +22,7 @@ import junit.framework.TestSuite; import org.apache.ignite.GridSuppressedExceptionSelfTest; import org.apache.ignite.failure.FailureHandlerTriggeredTest; import org.apache.ignite.failure.IoomFailureHandlerTest; +import org.apache.ignite.failure.OomFailureHandlerTest; import org.apache.ignite.failure.StopNodeFailureHandlerTest; import org.apache.ignite.failure.StopNodeOrHaltFailureHandlerTest; import org.apache.ignite.internal.ClassSetTest; @@ -199,6 +200,7 @@ public class IgniteBasicTestSuite extends TestSuite { suite.addTestSuite(StopNodeFailureHandlerTest.class); suite.addTestSuite(StopNodeOrHaltFailureHandlerTest.class); suite.addTestSuite(IoomFailureHandlerTest.class); + suite.addTestSuite(OomFailureHandlerTest.class); return suite; }