ignite-4492 Add MBean for StripedExecutor This closes #1491.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8e12513e Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8e12513e Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8e12513e Branch: refs/heads/ignite-comm-balance-master Commit: 8e12513efb24cc6df1da0968560ac932544ee68d Parents: c52cb9f Author: voipp <[email protected]> Authored: Tue Feb 14 15:08:59 2017 +0300 Committer: sboikov <[email protected]> Committed: Tue Feb 14 15:08:59 2017 +0300 ---------------------------------------------------------------------- .../apache/ignite/internal/IgniteKernal.java | 48 +++++- .../internal/StripedExecutorMXBeanAdapter.java | 90 ++++++++++ .../ignite/internal/util/StripedExecutor.java | 55 +++++- .../ignite/mxbean/StripedExecutorMXBean.java | 90 ++++++++++ .../internal/util/StripedExecutorTest.java | 168 +++++++++++++++++++ .../testsuites/IgniteComputeGridTestSuite.java | 2 + 6 files changed, 447 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8e12513e/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index a3d8c7b..cdbe2e3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -168,6 +168,7 @@ import org.apache.ignite.marshaller.MarshallerExclusions; import org.apache.ignite.marshaller.optimized.OptimizedMarshaller; import org.apache.ignite.mxbean.ClusterLocalNodeMetricsMXBean; import org.apache.ignite.mxbean.IgniteMXBean; +import org.apache.ignite.mxbean.StripedExecutorMXBean; import org.apache.ignite.mxbean.ThreadPoolMXBean; import org.apache.ignite.plugin.IgnitePlugin; import org.apache.ignite.plugin.PluginNotFoundException; @@ -296,6 +297,10 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { @GridToStringExclude private ObjectName restExecSvcMBean; + /** */ + @GridToStringExclude + private ObjectName stripedExecSvcMBean; + /** Kernal start timestamp. */ private long startTime = U.currentTimeMillis(); @@ -963,6 +968,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { registerKernalMBean(); registerLocalNodeMBean(); registerExecutorMBeans(execSvc, sysExecSvc, p2pExecSvc, mgmtExecSvc, restExecSvc); + registerStripedExecutorMBean(stripedExecSvc); // Lifecycle bean notifications. notifyLifecycleBeans(AFTER_NODE_START); @@ -1541,7 +1547,14 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { } } - /** @throws IgniteCheckedException If registration failed. */ + /** + * @param execSvc + * @param sysExecSvc + * @param p2pExecSvc + * @param mgmtExecSvc + * @param restExecSvc + * @throws IgniteCheckedException If failed. + */ private void registerExecutorMBeans(ExecutorService execSvc, ExecutorService sysExecSvc, ExecutorService p2pExecSvc, @@ -1582,8 +1595,34 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { return res; } catch (JMException e) { - throw new IgniteCheckedException("Failed to register executor service MBean [name=" + name + ", exec=" + exec + ']', - e); + throw new IgniteCheckedException("Failed to register executor service MBean [name=" + name + + ", exec=" + exec + ']', e); + } + } + + /** + * @param stripedExecSvc Executor service. + * @throws IgniteCheckedException If registration failed. + */ + private void registerStripedExecutorMBean(StripedExecutor stripedExecSvc) throws IgniteCheckedException { + if (stripedExecSvc != null) { + String name = "StripedExecutor"; + + try { + stripedExecSvcMBean = U.registerMBean( + cfg.getMBeanServer(), + cfg.getGridName(), + "Thread Pools", + name, + new StripedExecutorMXBeanAdapter(stripedExecSvc), + StripedExecutorMXBean.class); + + if (log.isDebugEnabled()) + log.debug("Registered executor service MBean: " + stripedExecSvcMBean); + } catch (JMException e) { + throw new IgniteCheckedException("Failed to register executor service MBean [name=" + + name + ", exec=" + stripedExecSvc + ']', e); + } } } @@ -2046,7 +2085,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { unregisterMBean(p2PExecSvcMBean) & unregisterMBean(kernalMBean) & unregisterMBean(locNodeMBean) & - unregisterMBean(restExecSvcMBean) + unregisterMBean(restExecSvcMBean) & + unregisterMBean(stripedExecSvcMBean) )) errOnStop = false; http://git-wip-us.apache.org/repos/asf/ignite/blob/8e12513e/modules/core/src/main/java/org/apache/ignite/internal/StripedExecutorMXBeanAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/StripedExecutorMXBeanAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/StripedExecutorMXBeanAdapter.java new file mode 100644 index 0000000..e6811b7 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/StripedExecutorMXBeanAdapter.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import java.util.concurrent.ExecutorService; +import org.apache.ignite.internal.util.StripedExecutor; +import org.apache.ignite.mxbean.StripedExecutorMXBean; + +/** + * Adapter for {@link StripedExecutorMXBean} which delegates all method calls to the underlying + * {@link ExecutorService} instance. + */ +public class StripedExecutorMXBeanAdapter implements StripedExecutorMXBean { + /** */ + private final StripedExecutor exec; + + /** + * @param exec Executor service + */ + StripedExecutorMXBeanAdapter(StripedExecutor exec) { + assert exec != null; + + this.exec = exec; + } + + /** {@inheritDoc} */ + @Override public void checkStarvation() { + exec.checkStarvation(); + } + + /** {@inheritDoc} */ + @Override public int getStripesCount() { + return exec.stripes(); + } + + /** {@inheritDoc} */ + @Override public boolean isShutdown() { + return exec.isShutdown(); + } + + /** {@inheritDoc} */ + @Override public boolean isTerminated() { + return exec.isTerminated(); + } + + /** {@inheritDoc} */ + @Override public int getTotalQueueSize() { + return exec.queueSize(); + } + + /** {@inheritDoc} */ + @Override public long getTotalCompletedTasksCount() { + return exec.completedTasks(); + } + + /** {@inheritDoc} */ + @Override public long[] getStripesCompletedTasksCounts() { + return exec.stripesCompletedTasks(); + } + + /** {@inheritDoc} */ + @Override public int getActiveCount() { + return exec.activeStripesCount(); + } + + /** {@inheritDoc} */ + @Override public boolean[] getStripesActiveStatuses() { + return exec.stripesActiveStatuses(); + } + + /** {@inheritDoc} */ + @Override public int[] getStripesQueueSizes() { + return exec.stripesQueueSizes(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8e12513e/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java index 201cb34..e70f0ce 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java @@ -55,9 +55,10 @@ public class StripedExecutor implements ExecutorService { private final IgniteLogger log; /** - * Constructor. - * * @param cnt Count. + * @param gridName Node name. + * @param poolName Pool name. + * @param log Logger. */ public StripedExecutor(int cnt, String gridName, String poolName, final IgniteLogger log) { A.ensure(cnt > 0, "cnt > 0"); @@ -268,6 +269,56 @@ public class StripedExecutor implements ExecutorService { } /** + * @return Completed tasks per stripe count. + */ + public long[] stripesCompletedTasks() { + long[] res = new long[stripes()]; + + for (int i = 0; i < res.length; i++) + res[i] = stripes[i].completedCnt; + + return res; + } + + /** + * @return Number of active tasks per stripe. + */ + public boolean[] stripesActiveStatuses() { + boolean[] res = new boolean[stripes()]; + + for (int i = 0; i < res.length; i++) + res[i] = stripes[i].active; + + return res; + } + + /** + * @return Number of active tasks. + */ + public int activeStripesCount() { + int res = 0; + + for (boolean status : stripesActiveStatuses()) { + if (status) + res++; + } + + return res; + } + + /** + * @return Size of queue per stripe. + */ + public int[] stripesQueueSizes() { + int[] res = new int[stripes()]; + + for (int i = 0; i < res.length; i++) + res[i] = stripes[i].queueSize(); + + return res; + } + + /** * Operation not supported. */ @NotNull @Override public <T> Future<T> submit( http://git-wip-us.apache.org/repos/asf/ignite/blob/8e12513e/modules/core/src/main/java/org/apache/ignite/mxbean/StripedExecutorMXBean.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/mxbean/StripedExecutorMXBean.java b/modules/core/src/main/java/org/apache/ignite/mxbean/StripedExecutorMXBean.java new file mode 100644 index 0000000..7428b19 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/mxbean/StripedExecutorMXBean.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.mxbean; + +/** + * MBean that provides access to information about striped executor service. + */ +@MXBeanDescription("MBean that provides access to information about striped executor service.") +public interface StripedExecutorMXBean { + /** + * Checks for starvation in striped pool, dumps in log information if potential starvation + * was found. + */ + @MXBeanDescription("Checks for starvation in striped pool.") + public void checkStarvation(); + + /** + * @return Stripes count. + */ + @MXBeanDescription("Stripes count.") + public int getStripesCount(); + + /** + * + * @return {@code True} if this executor has been shut down. + */ + @MXBeanDescription("True if this executor has been shut down.") + public boolean isShutdown(); + + /** + * Note that + * {@code isTerminated()} is never {@code true} unless either {@code shutdown()} or + * {@code shutdownNow()} was called first. + * + * @return {@code True} if all tasks have completed following shut down. + */ + @MXBeanDescription("True if all tasks have completed following shut down.") + public boolean isTerminated(); + + /** + * @return Return total queue size of all stripes. + */ + @MXBeanDescription("Total queue size of all stripes.") + public int getTotalQueueSize(); + + /** + * @return Completed tasks count. + */ + @MXBeanDescription("Completed tasks count of all stripes.") + public long getTotalCompletedTasksCount(); + + /** + * @return Number of completed tasks per stripe. + */ + @MXBeanDescription("Number of completed tasks per stripe.") + public long[] getStripesCompletedTasksCounts(); + + /** + * @return Number of active tasks. + */ + @MXBeanDescription("Number of active tasks of all stripes.") + public int getActiveCount(); + + /** + * @return Number of active tasks per stripe. + */ + @MXBeanDescription("Number of active tasks per stripe.") + public boolean[] getStripesActiveStatuses(); + + /** + * @return Size of queue per stripe. + */ + @MXBeanDescription("Size of queue per stripe.") + public int[] getStripesQueueSizes(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8e12513e/modules/core/src/test/java/org/apache/ignite/internal/util/StripedExecutorTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/StripedExecutorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/StripedExecutorTest.java new file mode 100644 index 0000000..543907f --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/StripedExecutorTest.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util; + +import org.apache.ignite.logger.java.JavaLogger; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * + */ +public class StripedExecutorTest extends GridCommonAbstractTest { + /** */ + private StripedExecutor stripedExecSvc; + + /** {@inheritDoc} */ + @Override public void beforeTest() { + stripedExecSvc = new StripedExecutor(3, "foo name", "pool name", new JavaLogger()); + } + + /** {@inheritDoc} */ + @Override public void afterTest() { + stripedExecSvc.shutdown(); + } + + /** + * @throws Exception If failed. + */ + public void testCompletedTasks() throws Exception { + stripedExecSvc.execute(0, new TestRunnable()); + stripedExecSvc.execute(1, new TestRunnable()); + + sleepASec(); + + assertEquals(2, stripedExecSvc.completedTasks()); + } + + /** + * @throws Exception If failed. + */ + public void testStripesCompletedTasks() throws Exception { + stripedExecSvc.execute(0, new TestRunnable()); + stripedExecSvc.execute(1, new TestRunnable()); + + sleepASec(); + + long[] completedTaks = stripedExecSvc.stripesCompletedTasks(); + + assertEquals(1, completedTaks[0]); + assertEquals(1, completedTaks[1]); + assertEquals(0, completedTaks[2]); + } + + /** + * @throws Exception If failed. + */ + public void testStripesActiveStatuses() throws Exception { + stripedExecSvc.execute(0, new TestRunnable()); + stripedExecSvc.execute(1, new TestRunnable(true)); + + sleepASec(); + + boolean[] statuses = stripedExecSvc.stripesActiveStatuses(); + + assertFalse(statuses[0]); + assertTrue(statuses[1]); + assertFalse(statuses[0]); + } + + /** + * @throws Exception If failed. + */ + public void testActiveStripesCount() throws Exception { + stripedExecSvc.execute(0, new TestRunnable()); + stripedExecSvc.execute(1, new TestRunnable(true)); + + sleepASec(); + + assertEquals(1, stripedExecSvc.activeStripesCount()); + } + + /** + * @throws Exception If failed. + */ + public void testStripesQueueSizes() throws Exception { + stripedExecSvc.execute(0, new TestRunnable()); + stripedExecSvc.execute(0, new TestRunnable(true)); + stripedExecSvc.execute(0, new TestRunnable(true)); + stripedExecSvc.execute(1, new TestRunnable(true)); + stripedExecSvc.execute(1, new TestRunnable(true)); + stripedExecSvc.execute(1, new TestRunnable(true)); + + sleepASec(); + + int[] queueSizes = stripedExecSvc.stripesQueueSizes(); + + assertEquals(1, queueSizes[0]); + assertEquals(2, queueSizes[1]); + assertEquals(0, queueSizes[2]); + } + + /** + * @throws Exception If failed. + */ + public void testQueueSize() throws Exception { + stripedExecSvc.execute(1, new TestRunnable()); + stripedExecSvc.execute(1, new TestRunnable(true)); + stripedExecSvc.execute(1, new TestRunnable(true)); + + sleepASec(); + + assertEquals(1, stripedExecSvc.queueSize()); + } + + /** + * + */ + private final class TestRunnable implements Runnable { + /** */ + private final boolean infinitely; + + /** + * + */ + public TestRunnable() { + this(false); + } + + /** + * @param infinitely {@code True} if should sleep infinitely. + */ + public TestRunnable(boolean infinitely) { + this.infinitely = infinitely; + } + + /** {@inheritDoc} */ + @Override public void run() { + try { + while (infinitely) + sleepASec(); + } + catch (InterruptedException e) { + info("Got interrupted exception while sleeping: " + e); + } + } + } + + /** + * @throws InterruptedException If interrupted. + */ + private void sleepASec() throws InterruptedException { + Thread.sleep(1000); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8e12513e/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java index 8a501fd..9a80b10 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java @@ -72,6 +72,7 @@ import org.apache.ignite.internal.managers.checkpoint.GridCheckpointManagerSelfT import org.apache.ignite.internal.managers.checkpoint.GridCheckpointTaskSelfTest; import org.apache.ignite.internal.managers.communication.GridCommunicationManagerListenersSelfTest; import org.apache.ignite.internal.processors.compute.PublicThreadpoolStarvationTest; +import org.apache.ignite.internal.util.StripedExecutorTest; import org.apache.ignite.p2p.GridMultinodeRedeployContinuousModeSelfTest; import org.apache.ignite.p2p.GridMultinodeRedeployIsolatedModeSelfTest; import org.apache.ignite.p2p.GridMultinodeRedeployPrivateModeSelfTest; @@ -152,6 +153,7 @@ public class IgniteComputeGridTestSuite { suite.addTestSuite(TaskNodeRestartTest.class); suite.addTestSuite(IgniteRoundRobinErrorAfterClientReconnectTest.class); suite.addTestSuite(PublicThreadpoolStarvationTest.class); + suite.addTestSuite(StripedExecutorTest.class); return suite; }
