Michael Blow has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/1159
Change subject: Add HTTP API to Collect Threaddumps From NCs ...................................................................... Add HTTP API to Collect Threaddumps From NCs Thread dump returned in JSON format. e.g. http://localhost:19002/admin/cluster/node/asterix_nc2/threaddump Change-Id: I536606a1fbc394c6c70bb8ce14791cc411691617 --- M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ClusterAPIServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ClusterNodeDetailsAPIServlet.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java A hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeThreadDumpWork.java A hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NotifyThreadDumpResponse.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java A hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ThreadDumpWork.java 18 files changed, 399 insertions(+), 47 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/59/1159/1 diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ClusterAPIServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ClusterAPIServlet.java index 8cedabc..4012b59 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ClusterAPIServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ClusterAPIServlet.java @@ -38,6 +38,8 @@ public class ClusterAPIServlet extends HttpServlet { private static final long serialVersionUID = 1L; + public static final String NODE_ID_KEY = "node_id"; + @Override public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException { response.setContentType("application/json"); @@ -87,8 +89,9 @@ String nodeURL = requestURL.toString().replaceAll("/[^./]+/\\.\\./", "/"); for (int i = 0; i < ncs.length(); i++) { JSONObject nc = ncs.getJSONObject(i); - nc.put("configUri", nodeURL + nc.getString("node_id") + "/config"); - nc.put("statsUri", nodeURL + nc.getString("node_id") + "/stats"); + nc.put("configUri", nodeURL + nc.getString(NODE_ID_KEY) + "/config"); + nc.put("statsUri", nodeURL + nc.getString(NODE_ID_KEY) + "/stats"); + nc.put("threadDumpUri", nodeURL + nc.getString(NODE_ID_KEY) + "/threaddump"); } return json; } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ClusterNodeDetailsAPIServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ClusterNodeDetailsAPIServlet.java index 9cccdad..0e04240 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ClusterNodeDetailsAPIServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ClusterNodeDetailsAPIServlet.java @@ -18,6 +18,8 @@ */ package org.apache.asterix.api.http.servlet; +import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR; + import java.io.IOException; import java.io.PrintWriter; import java.util.ArrayList; @@ -33,8 +35,6 @@ import org.json.JSONArray; import org.json.JSONException; import org.json.JSONObject; - -import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR; public class ClusterNodeDetailsAPIServlet extends ClusterAPIServlet { private static final long serialVersionUID = 1L; @@ -68,7 +68,11 @@ private JSONObject processNode(HttpServletRequest request, IHyracksClientConnection hcc) throws Exception { - String[] parts = request.getPathInfo().substring(1).replaceAll("/+", "/").split("/"); + String pathInfo = request.getPathInfo(); + if (pathInfo.endsWith("/")) { + throw new IllegalArgumentException(); + } + String[] parts = pathInfo.substring(1).split("/"); final String node = parts[0]; if (parts.length == 1) { @@ -90,9 +94,14 @@ case "config": json = processNodeConfig(hcc, node); break; + case "stats": json = processNodeStats(hcc, node); break; + + case "threaddump": + return processNodeThreadDump(hcc, node); + default: throw new IllegalArgumentException(); } @@ -144,7 +153,9 @@ gcArray.put(i, gcArray.getJSONArray(i).get(index)); } } else if (!"node-id".equals(key) && !"gc-names".equals(key)) { - json.put(key, json.getJSONArray(key).get(index)); + if (json.get(key) instanceof JSONArray) { + json.put(key, json.getJSONArray(key).get(index)); + } } } return json; @@ -160,4 +171,16 @@ } return new JSONObject(config); } + + private JSONObject processNodeThreadDump(IHyracksClientConnection hcc, String node) throws Exception { + if ("cc".equals(node)) { + return new JSONObject(); + } + String dump = hcc.getThreadDump(node); + if (dump == null) { + throw new IllegalArgumentException(); + } + return new JSONObject(dump); + } + } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java index ca0783b..88c4edb 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java @@ -46,7 +46,8 @@ CLI_DEPLOY_BINARY, CLI_UNDEPLOY_BINARY, CLUSTER_SHUTDOWN, - GET_NODE_DETAILS_JSON + GET_NODE_DETAILS_JSON, + THREAD_DUMP } public abstract static class Function implements Serializable { @@ -325,4 +326,20 @@ } } + public static class ThreadDumpFunction extends Function { + private final String node; + + public ThreadDumpFunction(String node) { + this.node = node; + } + + @Override + public FunctionId getFunctionId() { + return FunctionId.THREAD_DUMP; + } + + public String getNode() { + return node; + } + } } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java index 3f453e5..86a8ceb 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java @@ -138,4 +138,12 @@ new HyracksClientInterfaceFunctions.GetNodeDetailsJSONFunction(nodeId, includeStats, includeConfig); return (String) rpci.call(ipcHandle, gjsf); } + + @Override + public String getThreadDump(String node) throws Exception { + HyracksClientInterfaceFunctions.ThreadDumpFunction tdf = + new HyracksClientInterfaceFunctions.ThreadDumpFunction(node); + return (String)rpci.call(ipcHandle, tdf); + + } } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java index 73813f3..4b27caf 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java @@ -197,4 +197,9 @@ public String getNodeDetailsJSON(String nodeId, boolean includeStats, boolean includeConfig) throws Exception { return hci.getNodeDetailsJSON(nodeId, includeStats, includeConfig); } + + @Override + public String getThreadDump(String node) throws Exception { + return hci.getThreadDump(node); + } } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java index 0690c9f..6c15da2 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java @@ -197,4 +197,9 @@ * @throws Exception */ public String getNodeDetailsJSON(String nodeId, boolean includeStats, boolean includeConfig) throws Exception; + + /** + * Gets thread dump from the specified node as a serialized JSON string + */ + public String getThreadDump(String node) throws Exception; } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java index 4ddb81f..c2af2e7 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java @@ -57,4 +57,6 @@ public void stopCluster() throws Exception; public String getNodeDetailsJSON(String nodeId, boolean includeStats, boolean includeConfig) throws Exception; + + public String getThreadDump(String node) throws Exception; } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java index 8dada48..0ed309e 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java @@ -22,6 +22,7 @@ import java.io.FileReader; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; @@ -66,6 +67,8 @@ import org.apache.hyracks.control.cc.work.GetJobInfoWork; import org.apache.hyracks.control.cc.work.GetJobStatusWork; import org.apache.hyracks.control.cc.work.GetNodeControllersInfoWork; +import org.apache.hyracks.control.cc.work.GetNodeThreadDumpWork; +import org.apache.hyracks.control.cc.work.GetNodeThreadDumpWork.ThreadDumpRun; import org.apache.hyracks.control.cc.work.GetNodeDetailsJSONWork; import org.apache.hyracks.control.cc.work.GetResultPartitionLocationsWork; import org.apache.hyracks.control.cc.work.GetResultStatusWork; @@ -75,6 +78,7 @@ import org.apache.hyracks.control.cc.work.NotifyDeployBinaryWork; import org.apache.hyracks.control.cc.work.NotifyShutdownWork; import org.apache.hyracks.control.cc.work.NotifyStateDumpResponse; +import org.apache.hyracks.control.cc.work.NotifyThreadDumpResponse; import org.apache.hyracks.control.cc.work.RegisterNodeWork; import org.apache.hyracks.control.cc.work.RegisterPartitionAvailibilityWork; import org.apache.hyracks.control.cc.work.RegisterPartitionRequestWork; @@ -156,6 +160,8 @@ private final Map<String, StateDumpRun> stateDumpRunMap; + private final Map<String, ThreadDumpRun> threadDumpRunMap; + private ShutdownRun shutdownCallback; private ICCApplicationEntryPoint aep; @@ -204,6 +210,7 @@ deploymentRunMap = new HashMap<>(); stateDumpRunMap = new HashMap<>(); + threadDumpRunMap = Collections.synchronizedMap(new HashMap<>()); } private static ClusterTopology computeClusterTopology(CCConfig ccConfig) throws Exception { @@ -520,6 +527,13 @@ workQueue.schedule(new GetNodeDetailsJSONWork(ClusterControllerService.this, gndjf.getNodeId(), gndjf.isIncludeStats(), gndjf.isIncludeConfig(), new IPCResponder<>(handle, mid))); return; + + case THREAD_DUMP: + HyracksClientInterfaceFunctions.ThreadDumpFunction tdf = + (HyracksClientInterfaceFunctions.ThreadDumpFunction) fn; + workQueue.schedule(new GetNodeThreadDumpWork(ClusterControllerService.this, tdf.getNode(), + new IPCResponder<String>(handle, mid))); + return; } try { handle.send(mid, null, new IllegalArgumentException("Unknown function " + fn.getFunctionId())); @@ -658,10 +672,20 @@ dsrf.getStateDumpId(), dsrf.getState())); return; } + case SHUTDOWN_RESPONSE: { CCNCFunctions.ShutdownResponseFunction sdrf = (ShutdownResponseFunction) fn; workQueue.schedule(new NotifyShutdownWork(ClusterControllerService.this, sdrf.getNodeId())); return; + } + + case THREAD_DUMP_RESPONSE: { + CCNCFunctions.ThreadDumpResponseFunction tdrf = + (CCNCFunctions.ThreadDumpResponseFunction)fn; + workQueue.schedule(new NotifyThreadDumpResponse(ClusterControllerService.this, tdrf.getNodeId(), + tdrf.getThreadDumpJSON())); + return; + } } LOGGER.warning("Unknown function: " + fn.getFunctionId()); @@ -715,4 +739,16 @@ public synchronized ShutdownRun getShutdownRun() { return shutdownCallback; } + + public void addThreadDumpRun(String nodeId, ThreadDumpRun tdr) { + threadDumpRunMap.put(nodeId, tdr); + } + + public ThreadDumpRun removeThreadDumpRun(String nodeId) { + return threadDumpRunMap.remove(nodeId); + } + + public ThreadDumpRun getThreadDumpRun(String nodeId) { + return threadDumpRunMap.get(nodeId); + } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java index a848c6e..0551824 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java @@ -20,6 +20,7 @@ import java.io.File; import java.util.Arrays; +import java.util.Date; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -288,6 +289,7 @@ JSONObject o = new JSONObject(); o.put("node-id", ncConfig.nodeId); + if (includeConfig) { o.put("os-name", osName); o.put("arch", arch); @@ -304,6 +306,7 @@ o.put("pid", pid); } if (includeStats) { + o.put("date", new Date()); o.put("rrd-ptr", rrdPtr); o.put("heartbeat-times", hbTime); o.put("heap-init-sizes", heapInitSize); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeThreadDumpWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeThreadDumpWork.java new file mode 100644 index 0000000..e559be8 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeThreadDumpWork.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.control.cc.work; + +import org.apache.hyracks.control.cc.ClusterControllerService; +import org.apache.hyracks.control.cc.NodeControllerState; +import org.apache.hyracks.control.common.work.IResultCallback; +import org.apache.hyracks.control.common.work.SynchronizableWork; + +public class GetNodeThreadDumpWork extends SynchronizableWork { + private final ClusterControllerService ccs; + private final String nodeId; + private final IResultCallback<String> callback; + private final ThreadDumpRun run = new ThreadDumpRun(); + + + public GetNodeThreadDumpWork(ClusterControllerService ccs, String nodeId, IResultCallback<String> callback) { + this.ccs = ccs; + this.nodeId = nodeId; + this.callback = callback; + } + + @Override + protected void doRun() throws Exception { + final NodeControllerState ncState = ccs.getNodeMap().get(nodeId); + if (ncState == null) { + // bad node id, reply with null immediately + callback.setValue(null); + } else { + ccs.addThreadDumpRun(nodeId, run); + ncState.getNodeController().takeThreadDump(); + } + } + + public class ThreadDumpRun { + + public synchronized void notifyThreadDumpReceived(String threadDumpJSON) { + callback.setValue(threadDumpJSON); + } + } +} diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NotifyThreadDumpResponse.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NotifyThreadDumpResponse.java new file mode 100644 index 0000000..b0a53a0 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NotifyThreadDumpResponse.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.control.cc.work; + +import org.apache.hyracks.control.cc.ClusterControllerService; +import org.apache.hyracks.control.common.work.AbstractWork; + +public class NotifyThreadDumpResponse extends AbstractWork { + + private final ClusterControllerService ccs; + + private final String nodeId; + + private final String threadDumpJSON; + + public NotifyThreadDumpResponse(ClusterControllerService ccs, String nodeId, String threadDumpJSON) { + this.ccs = ccs; + this.nodeId = nodeId; + this.threadDumpJSON = threadDumpJSON; + } + + @Override + public void run() { + ccs.getThreadDumpRun(nodeId).notifyThreadDumpReceived(threadDumpJSON); + } +} diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java index ec1613d..18566b3 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java @@ -70,4 +70,6 @@ public void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, int partition) throws Exception; public void getNodeControllerInfos() throws Exception; + + public void notifyThreadDump(String nodeId, String threadDumpJSON) throws Exception; } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java index 75c3127..3116454 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java @@ -54,4 +54,6 @@ public void shutdown() throws Exception; public void sendApplicationMessageToNC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception; + + public void takeThreadDump() throws Exception; } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java index 96dca4e..9c8fe69 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java @@ -101,6 +101,9 @@ STATE_DUMP_REQUEST, STATE_DUMP_RESPONSE, + THREAD_DUMP_REQUEST, + THREAD_DUMP_RESPONSE, + OTHER } @@ -895,6 +898,39 @@ } } + public static class ThreadDumpRequestFunction extends Function { + private static final long serialVersionUID = 1L; + + @Override + public FunctionId getFunctionId() { + return FunctionId.THREAD_DUMP_REQUEST; + } + } + + public static class ThreadDumpResponseFunction extends Function { + private static final long serialVersionUID = 1L; + private final String nodeId; + private final String threadDumpJSON; + + public ThreadDumpResponseFunction(String nodeId, String threadDumpJSON) { + this.nodeId = nodeId; + this.threadDumpJSON = threadDumpJSON; + } + + @Override + public FunctionId getFunctionId() { + return FunctionId.THREAD_DUMP_RESPONSE; + } + + public String getNodeId() { + return nodeId; + } + + public String getThreadDumpJSON() { + return threadDumpJSON; + } + } + public static class ReportPartitionAvailabilityFunction extends Function { private static final long serialVersionUID = 1L; diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java index 416b064..ed5dc43 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java @@ -116,24 +116,27 @@ ipcHandle.send(-1, fn, null); } - public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult, boolean emptyResult, int partition, - int nPartitions, NetworkAddress networkAddress) throws Exception { - CCNCFunctions.RegisterResultPartitionLocationFunction fn = new CCNCFunctions.RegisterResultPartitionLocationFunction( - jobId, rsId, orderedResult, emptyResult, partition, nPartitions, networkAddress); + @Override + public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult, + boolean emptyResult, int partition, int nPartitions, + NetworkAddress networkAddress) throws Exception { + CCNCFunctions.RegisterResultPartitionLocationFunction fn = + new CCNCFunctions.RegisterResultPartitionLocationFunction(jobId, rsId, orderedResult, emptyResult, + partition, nPartitions, networkAddress); ipcHandle.send(-1, fn, null); } @Override public void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) throws Exception { - CCNCFunctions.ReportResultPartitionWriteCompletionFunction fn = new CCNCFunctions.ReportResultPartitionWriteCompletionFunction( - jobId, rsId, partition); + CCNCFunctions.ReportResultPartitionWriteCompletionFunction fn = + new CCNCFunctions.ReportResultPartitionWriteCompletionFunction(jobId, rsId, partition); ipcHandle.send(-1, fn, null); } @Override public void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, int partition) throws Exception { - CCNCFunctions.ReportResultPartitionFailureFunction fn = new CCNCFunctions.ReportResultPartitionFailureFunction( - jobId, rsId, partition); + CCNCFunctions.ReportResultPartitionFailureFunction fn = + new CCNCFunctions.ReportResultPartitionFailureFunction(jobId, rsId, partition); ipcHandle.send(-1, fn, null); } @@ -144,14 +147,20 @@ @Override public void notifyStateDump(String nodeId, String stateDumpId, String state) throws Exception { - CCNCFunctions.StateDumpResponseFunction fn = new CCNCFunctions.StateDumpResponseFunction(nodeId, stateDumpId, - state); + CCNCFunctions.StateDumpResponseFunction fn = + new CCNCFunctions.StateDumpResponseFunction(nodeId, stateDumpId, state); ipcHandle.send(-1, fn, null); } @Override public void notifyShutdown(String nodeId) throws Exception{ CCNCFunctions.ShutdownResponseFunction sdrf = new CCNCFunctions.ShutdownResponseFunction(nodeId); - ipcHandle.send(-1,sdrf,null); + ipcHandle.send(-1, sdrf, null); } + @Override + public void notifyThreadDump(String nodeId, String threadDumpJSON) throws Exception { + CCNCFunctions.ThreadDumpResponseFunction tdrf = + new CCNCFunctions.ThreadDumpResponseFunction(nodeId, threadDumpJSON); + ipcHandle.send(-1, tdrf, null); + } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java index 841c889..dac6573 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java @@ -101,4 +101,10 @@ deploymentId, nodeId); ipcHandle.send(-1, fn, null); } + + @Override + public void takeThreadDump() throws Exception { + CCNCFunctions.ThreadDumpRequestFunction fn = new CCNCFunctions.ThreadDumpRequestFunction(); + ipcHandle.send(-1, fn, null); + } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java index edadf57..da50828 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java @@ -88,6 +88,7 @@ import org.apache.hyracks.control.nc.work.ShutdownWork; import org.apache.hyracks.control.nc.work.StartTasksWork; import org.apache.hyracks.control.nc.work.StateDumpWork; +import org.apache.hyracks.control.nc.work.ThreadDumpWork; import org.apache.hyracks.control.nc.work.UnDeployBinaryWork; import org.apache.hyracks.ipc.api.IIPCHandle; import org.apache.hyracks.ipc.api.IIPCI; @@ -416,6 +417,10 @@ return queue; } + public ThreadMXBean getThreadMXBean() { + return threadMXBean; + } + private class HeartbeatTask extends TimerTask { private IClusterController cc; @@ -509,72 +514,70 @@ Exception exception) { CCNCFunctions.Function fn = (CCNCFunctions.Function) payload; switch (fn.getFunctionId()) { - case SEND_APPLICATION_MESSAGE: { - CCNCFunctions.SendApplicationMessageFunction amf = (CCNCFunctions.SendApplicationMessageFunction) fn; + case SEND_APPLICATION_MESSAGE: + CCNCFunctions.SendApplicationMessageFunction amf = + (CCNCFunctions.SendApplicationMessageFunction) fn; queue.schedule(new ApplicationMessageWork(NodeControllerService.this, amf.getMessage(), amf.getDeploymentId(), amf.getNodeId())); return; - } - case START_TASKS: { + + case START_TASKS: CCNCFunctions.StartTasksFunction stf = (CCNCFunctions.StartTasksFunction) fn; queue.schedule(new StartTasksWork(NodeControllerService.this, stf.getDeploymentId(), stf.getJobId(), stf.getPlanBytes(), stf.getTaskDescriptors(), stf.getConnectorPolicies(), stf.getFlags())); return; - } - case ABORT_TASKS: { + case ABORT_TASKS: CCNCFunctions.AbortTasksFunction atf = (CCNCFunctions.AbortTasksFunction) fn; queue.schedule(new AbortTasksWork(NodeControllerService.this, atf.getJobId(), atf.getTasks())); return; - } - case CLEANUP_JOBLET: { + case CLEANUP_JOBLET: CCNCFunctions.CleanupJobletFunction cjf = (CCNCFunctions.CleanupJobletFunction) fn; queue.schedule(new CleanupJobletWork(NodeControllerService.this, cjf.getJobId(), cjf.getStatus())); return; - } - case REPORT_PARTITION_AVAILABILITY: { - CCNCFunctions.ReportPartitionAvailabilityFunction rpaf = (CCNCFunctions.ReportPartitionAvailabilityFunction) fn; + case REPORT_PARTITION_AVAILABILITY: + CCNCFunctions.ReportPartitionAvailabilityFunction rpaf = + (CCNCFunctions.ReportPartitionAvailabilityFunction) fn; queue.schedule(new ReportPartitionAvailabilityWork(NodeControllerService.this, rpaf.getPartitionId(), rpaf.getNetworkAddress())); return; - } - case NODE_REGISTRATION_RESULT: { + case NODE_REGISTRATION_RESULT: CCNCFunctions.NodeRegistrationResult nrrf = (CCNCFunctions.NodeRegistrationResult) fn; setNodeRegistrationResult(nrrf.getNodeParameters(), nrrf.getException()); return; - } - case GET_NODE_CONTROLLERS_INFO_RESPONSE: { - CCNCFunctions.GetNodeControllersInfoResponseFunction gncirf = (CCNCFunctions.GetNodeControllersInfoResponseFunction) fn; + case GET_NODE_CONTROLLERS_INFO_RESPONSE: + CCNCFunctions.GetNodeControllersInfoResponseFunction gncirf = + (CCNCFunctions.GetNodeControllersInfoResponseFunction) fn; setNodeControllersInfo(gncirf.getNodeControllerInfos()); return; - } - case DEPLOY_BINARY: { - CCNCFunctions.DeployBinaryFunction ndbf = (CCNCFunctions.DeployBinaryFunction) fn; - queue.schedule(new DeployBinaryWork(NodeControllerService.this, ndbf.getDeploymentId(), - ndbf.getBinaryURLs())); + case DEPLOY_BINARY: + CCNCFunctions.DeployBinaryFunction dbf = (CCNCFunctions.DeployBinaryFunction) fn; + queue.schedule(new DeployBinaryWork(NodeControllerService.this, dbf.getDeploymentId(), + dbf.getBinaryURLs())); return; - } - case UNDEPLOY_BINARY: { + case UNDEPLOY_BINARY: CCNCFunctions.UnDeployBinaryFunction ndbf = (CCNCFunctions.UnDeployBinaryFunction) fn; queue.schedule(new UnDeployBinaryWork(NodeControllerService.this, ndbf.getDeploymentId())); return; - } - case STATE_DUMP_REQUEST: { + case STATE_DUMP_REQUEST: final CCNCFunctions.StateDumpRequestFunction dsrf = (StateDumpRequestFunction) fn; queue.schedule(new StateDumpWork(NodeControllerService.this, dsrf.getStateDumpId())); return; - } - case SHUTDOWN_REQUEST: { + + case SHUTDOWN_REQUEST: queue.schedule(new ShutdownWork(NodeControllerService.this)); return; - } + + case THREAD_DUMP_REQUEST: + queue.schedule(new ThreadDumpWork(NodeControllerService.this)); + return; } throw new IllegalArgumentException("Unknown function: " + fn.getFunctionId()); @@ -594,7 +597,7 @@ } /** - * Shutdown hook that invokes {@link NCApplicationEntryPoint#stop() stop} method. + * Shutdown hook that invokes {@link NodeControllerService#stop() stop} method. */ private static class JVMShutdownHook extends Thread { diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ThreadDumpWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ThreadDumpWork.java new file mode 100644 index 0000000..0ff1ee4 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ThreadDumpWork.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.control.nc.work; + +import java.lang.management.ThreadInfo; +import java.lang.management.ThreadMXBean; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hyracks.control.common.work.SynchronizableWork; +import org.apache.hyracks.control.nc.NodeControllerService; +import org.json.JSONObject; + +public class ThreadDumpWork extends SynchronizableWork { + private final NodeControllerService ncs; + + public ThreadDumpWork(NodeControllerService ncs) { + this.ncs = ncs; + } + + @Override + protected void doRun() throws Exception { + ThreadMXBean threadMXBean = ncs.getThreadMXBean(); + ThreadInfo [] threadInfos = threadMXBean.dumpAllThreads(true, true); + List<Map<String, Object>> threads = new ArrayList<>(); + + for (ThreadInfo thread : threadInfos) { + Map<String, Object> threadMap = new HashMap<>(); + threadMap.put("name", thread.getThreadName()); + threadMap.put("id", thread.getThreadId()); + threadMap.put("state", thread.getThreadState().name()); + List<String> stacktrace = new ArrayList<>(); + for (StackTraceElement element : thread.getStackTrace()) { + stacktrace.add(element.toString()); + } + threadMap.put("stack", stacktrace); + + if (thread.getLockName() != null) { + threadMap.put("lock_name", thread.getLockName()); + } + if (thread.getLockOwnerId() != -1) { + threadMap.put("lock_owner_id", thread.getLockOwnerId()); + } + if (thread.getBlockedTime() > 0) { + threadMap.put("blocked_time", thread.getBlockedTime()); + } + if (thread.getBlockedCount() > 0) { + threadMap.put("blocked_count", thread.getBlockedCount()); + } + if (thread.getLockedMonitors().length > 0) { + threadMap.put("locked_monitors", Arrays.asList(thread.getLockedMonitors())); + } + if (thread.getLockedSynchronizers().length > 0) { + threadMap.put("locked_synchronizers", Arrays.asList(thread.getLockedSynchronizers())); + } + threads.add(threadMap); + } + JSONObject json = new JSONObject(); + json.put("date", new Date()); + json.put("threads", threads); + + long [] deadlockedThreads = threadMXBean.findDeadlockedThreads(); + long [] monitorDeadlockedThreads = threadMXBean.findMonitorDeadlockedThreads(); + if (deadlockedThreads != null && deadlockedThreads.length > 0) { + json.put("deadlocked_thread_ids", deadlockedThreads); + } + if (monitorDeadlockedThreads != null && monitorDeadlockedThreads.length > 0) { + json.put("monitor_deadlocked_thread_ids", monitorDeadlockedThreads); + } + + ncs.getClusterController().notifyThreadDump(ncs.getApplicationContext().getNodeId(), json.toString()); + } +} -- To view, visit https://asterix-gerrit.ics.uci.edu/1159 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I536606a1fbc394c6c70bb8ce14791cc411691617 Gerrit-PatchSet: 1 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Michael Blow <mb...@apache.org>