Repository: tajo Updated Branches: refs/heads/master 618faa242 -> 98f142cc7
TAJO-819: KillQuery does not work for running query on TajoWorker. (jaehwa) Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/98f142cc Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/98f142cc Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/98f142cc Branch: refs/heads/master Commit: 98f142cc73db1449ec04353c90897721806ecd42 Parents: 618faa2 Author: blrunner <[email protected]> Authored: Wed May 21 16:50:04 2014 +0900 Committer: blrunner <[email protected]> Committed: Wed May 21 16:50:04 2014 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../java/org/apache/tajo/client/TajoAdmin.java | 9 +- .../java/org/apache/tajo/client/TajoClient.java | 10 +- .../apache/tajo/master/querymaster/Query.java | 6 +- .../tajo/master/querymaster/QueryUnit.java | 11 +- .../master/querymaster/QueryUnitAttempt.java | 13 ++- .../tajo/master/querymaster/SubQuery.java | 14 ++- .../apache/tajo/master/rm/TajoRMContext.java | 7 ++ .../master/rm/TajoWorkerResourceManager.java | 103 ++++++++++--------- .../java/org/apache/tajo/master/rm/Worker.java | 6 +- .../tajo/webapp/QueryExecutorServlet.java | 18 ++++ .../main/java/org/apache/tajo/worker/Task.java | 2 +- .../src/main/resources/webapps/admin/query.jsp | 31 +++++- .../tajo/master/rm/TestTajoResourceManager.java | 79 +++++++++++++- 14 files changed, 242 insertions(+), 69 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/98f142cc/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index b94f59c..8281614 100644 --- a/CHANGES +++ b/CHANGES @@ -41,6 +41,8 @@ Release 0.9.0 - unreleased BUG FIXES + TAJO-819: KillQuery does not work for running query on TajoWorker. (jaehwa) + TAJO-808: Fix pre-commit build failure. (jinho) TAJO-827: SUM() overflow in the case of INT4. (Hyoungjun Kim via hyunsik) http://git-wip-us.apache.org/repos/asf/tajo/blob/98f142cc/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java index 25b91a4..ad42675 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java @@ -22,6 +22,7 @@ import com.google.protobuf.ServiceException; import org.apache.commons.cli.*; import org.apache.commons.lang.StringUtils; import org.apache.tajo.QueryId; +import org.apache.tajo.TajoProtos; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.ipc.ClientProtos.BriefQueryInfo; import org.apache.tajo.ipc.ClientProtos.WorkerResourceInfo; @@ -395,11 +396,13 @@ public class TajoAdmin { public void processKill(Writer writer, String queryIdStr) throws IOException, ServiceException { - boolean killedSuccessfully = tajoClient.killQuery(TajoIdUtils.parseQueryId(queryIdStr)); - if (killedSuccessfully) { + QueryStatus status = tajoClient.killQuery(TajoIdUtils.parseQueryId(queryIdStr)); + if (status.getState() == TajoProtos.QueryState.QUERY_KILLED) { writer.write(queryIdStr + " is killed successfully.\n"); + } else if (status.getState() == TajoProtos.QueryState.QUERY_KILL_WAIT) { + writer.write(queryIdStr + " will be finished after a while.\n"); } else { - writer.write("killing query is failed."); + writer.write("ERROR:" + status.getErrorMessage()); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/98f142cc/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java index 7d84592..2f9e138 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java @@ -854,7 +854,7 @@ public class TajoClient implements Closeable { }.withRetries(); } - public boolean killQuery(final QueryId queryId) + public QueryStatus killQuery(final QueryId queryId) throws ServiceException, IOException { QueryStatus status = getQueryStatus(queryId); @@ -874,7 +874,9 @@ public class TajoClient implements Closeable { long currentTimeMillis = System.currentTimeMillis(); long timeKillIssued = currentTimeMillis; - while ((currentTimeMillis < timeKillIssued + 10000L) && (status.getState() != QueryState.QUERY_KILLED)) { + while ((currentTimeMillis < timeKillIssued + 10000L) + && ((status.getState() != QueryState.QUERY_KILLED) + || (status.getState() == QueryState.QUERY_KILL_WAIT))) { try { Thread.sleep(100L); } catch(InterruptedException ie) { @@ -883,13 +885,13 @@ public class TajoClient implements Closeable { currentTimeMillis = System.currentTimeMillis(); status = getQueryStatus(queryId); } - return status.getState() == QueryState.QUERY_KILLED; + } catch(Exception e) { LOG.debug("Error when checking for application status", e); - return false; } finally { connPool.releaseConnection(tmClient); } + return status; } public List<CatalogProtos.FunctionDescProto> getFunctions(final String functionName) throws ServiceException { http://git-wip-us.apache.org/repos/asf/tajo/blob/98f142cc/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java index 2848095..04e82ca 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java @@ -686,7 +686,11 @@ public class Query implements EventHandler<QueryEvent> { try { getStateMachine().doTransition(event.getType(), event); } catch (InvalidStateTransitonException e) { - LOG.error("Can't handle this event at current state", e); + LOG.error("Can't handle this event at current state" + + ", type:" + event + + ", oldState:" + oldState.name() + + ", nextState:" + getState().name() + , e); eventHandler.handle(new QueryEvent(this.id, QueryEventType.INTERNAL_ERROR)); } http://git-wip-us.apache.org/repos/asf/tajo/blob/98f142cc/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java index 27625b4..33cf19b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java @@ -155,6 +155,11 @@ public class QueryUnit implements EventHandler<TaskEvent> { .addTransition(TaskState.FAILED, TaskState.FAILED, EnumSet.of(TaskEventType.T_KILL, TaskEventType.T_ATTEMPT_KILLED, TaskEventType.T_ATTEMPT_SUCCEEDED)) + // Transitions from KILLED state + .addTransition(TaskState.KILLED, TaskState.KILLED, + TaskEventType.T_ATTEMPT_KILLED, + new KillTaskTransition()) + .installTopology(); private final StateMachine<TaskState, TaskEventType, TaskEvent> stateMachine; @@ -589,7 +594,11 @@ public class QueryUnit implements EventHandler<TaskEvent> { try { stateMachine.doTransition(event.getType(), event); } catch (InvalidStateTransitonException e) { - LOG.error("Can't handle this event at current state", e); + LOG.error("Can't handle this event at current state" + + ", eventType:" + event.getType().name() + + ", oldState:" + oldState.name() + + ", nextState:" + getState().name() + , e); eventHandler.handle(new QueryEvent(TajoIdUtils.parseQueryId(getId().toString()), QueryEventType.INTERNAL_ERROR)); } http://git-wip-us.apache.org/repos/asf/tajo/blob/98f142cc/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java index c3aae67..361f88f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java @@ -161,9 +161,12 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> { EnumSet.of( TaskAttemptEventType.TA_UPDATE)) .addTransition(TaskAttemptState.TA_KILLED, TaskAttemptState.TA_KILLED, - TaskAttemptEventType.TA_LOCAL_KILLED, + EnumSet.of( + TaskAttemptEventType.TA_LOCAL_KILLED, + TaskAttemptEventType.TA_KILL, + TaskAttemptEventType.TA_ASSIGNED, + TaskAttemptEventType.TA_DONE), new TaskKilledCompleteTransition()) - .installTopology(); private final StateMachine<TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent> @@ -427,7 +430,11 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> { try { stateMachine.doTransition(event.getType(), event); } catch (InvalidStateTransitonException e) { - LOG.error("Can't handle this event at current state of " + event.getTaskAttemptId() + ")", e); + LOG.error("Can't handle this event at current state of " + event.getTaskAttemptId() + ")" + + ", eventType:" + event.getType().name() + + ", oldState:" + oldState.name() + + ", nextState:" + getState().name() + , e); eventHandler.handle( new SubQueryDiagnosticsUpdateEvent(event.getTaskAttemptId().getQueryUnitId().getExecutionBlockId(), "Can't handle this event at current state of " + event.getTaskAttemptId() + ")")); http://git-wip-us.apache.org/repos/asf/tajo/blob/98f142cc/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java index 08517ef..e8a4d07 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java @@ -35,7 +35,10 @@ import org.apache.hadoop.yarn.util.Records; import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.QueryIdFactory; import org.apache.tajo.QueryUnitId; -import org.apache.tajo.catalog.*; +import org.apache.tajo.catalog.CatalogUtil; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.catalog.statistics.ColumnStats; import org.apache.tajo.catalog.statistics.StatisticsUtil; @@ -236,7 +239,8 @@ public class SubQuery implements EventHandler<SubQueryEvent> { SubQueryEventType.SQ_START, SubQueryEventType.SQ_KILL, SubQueryEventType.SQ_FAILED, - SubQueryEventType.SQ_INTERNAL_ERROR)) + SubQueryEventType.SQ_INTERNAL_ERROR, + SubQueryEventType.SQ_SUBQUERY_COMPLETED)) .installTopology(); @@ -594,7 +598,11 @@ public class SubQuery implements EventHandler<SubQueryEvent> { try { getStateMachine().doTransition(event.getType(), event); } catch (InvalidStateTransitonException e) { - LOG.error("Can't handle this event at current state", e); + LOG.error("Can't handle this event at current state" + + ", eventType:" + event.getType().name() + + ", oldState:" + oldState.name() + + ", nextState:" + getState().name() + , e); eventHandler.handle(new SubQueryEvent(getId(), SubQueryEventType.SQ_INTERNAL_ERROR)); } http://git-wip-us.apache.org/repos/asf/tajo/blob/98f142cc/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java index a995058..2229f04 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java @@ -48,6 +48,9 @@ public class TajoRMContext { private final Set<String> liveQueryMasterWorkerResources = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>()); + private final Set<QueryId> stoppedQueryIds = + Collections.newSetFromMap(new ConcurrentHashMap<QueryId, Boolean>()); + public TajoRMContext(Dispatcher dispatcher) { this.rmDispatcher = dispatcher; } @@ -81,4 +84,8 @@ public class TajoRMContext { public Set<String> getQueryMasterWorker() { return liveQueryMasterWorkerResources; } + + public Set<QueryId> getStoppedQueryIds() { + return stoppedQueryIds; + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/98f142cc/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java index 15ac6b6..bb9f07d 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java @@ -292,57 +292,63 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke ", liveWorkers=" + rmContext.getWorkers().size()); } - List<AllocatedWorkerResource> allocatedWorkerResources = chooseWorkers(resourceRequest); - - if(allocatedWorkerResources.size() > 0) { - List<WorkerAllocatedResource> allocatedResources = - new ArrayList<WorkerAllocatedResource>(); - - for(AllocatedWorkerResource allocatedResource: allocatedWorkerResources) { - NodeId nodeId = NodeId.newInstance(allocatedResource.worker.getHostName(), - allocatedResource.worker.getPeerRpcPort()); - - TajoWorkerContainerId containerId = new TajoWorkerContainerId(); - - containerId.setApplicationAttemptId( - ApplicationIdUtils.createApplicationAttemptId(resourceRequest.queryId)); - containerId.setId(containerIdSeq.incrementAndGet()); - - ContainerIdProto containerIdProto = containerId.getProto(); - allocatedResources.add(WorkerAllocatedResource.newBuilder() - .setContainerId(containerIdProto) - .setNodeId(nodeId.toString()) - .setWorkerHost(allocatedResource.worker.getHostName()) - .setQueryMasterPort(allocatedResource.worker.getQueryMasterPort()) - .setClientPort(allocatedResource.worker.getClientPort()) - .setPeerRpcPort(allocatedResource.worker.getPeerRpcPort()) - .setWorkerPullServerPort(allocatedResource.worker.getPullServerPort()) - .setAllocatedMemoryMB(allocatedResource.allocatedMemoryMB) - .setAllocatedDiskSlots(allocatedResource.allocatedDiskSlots) - .build()); - + // TajoWorkerResourceManager can't return allocated disk slots occasionally. + // Because the rest resource request can remains after QueryMaster stops. + // Thus we need to find whether QueryId stopped or not. + if (!rmContext.getStoppedQueryIds().contains(resourceRequest.queryId)) { + List<AllocatedWorkerResource> allocatedWorkerResources = chooseWorkers(resourceRequest); + + if(allocatedWorkerResources.size() > 0) { + List<WorkerAllocatedResource> allocatedResources = + new ArrayList<WorkerAllocatedResource>(); + + for(AllocatedWorkerResource allocatedResource: allocatedWorkerResources) { + NodeId nodeId = NodeId.newInstance(allocatedResource.worker.getHostName(), + allocatedResource.worker.getPeerRpcPort()); + + TajoWorkerContainerId containerId = new TajoWorkerContainerId(); + + containerId.setApplicationAttemptId( + ApplicationIdUtils.createApplicationAttemptId(resourceRequest.queryId)); + containerId.setId(containerIdSeq.incrementAndGet()); + + ContainerIdProto containerIdProto = containerId.getProto(); + allocatedResources.add(WorkerAllocatedResource.newBuilder() + .setContainerId(containerIdProto) + .setNodeId(nodeId.toString()) + .setWorkerHost(allocatedResource.worker.getHostName()) + .setQueryMasterPort(allocatedResource.worker.getQueryMasterPort()) + .setClientPort(allocatedResource.worker.getClientPort()) + .setPeerRpcPort(allocatedResource.worker.getPeerRpcPort()) + .setWorkerPullServerPort(allocatedResource.worker.getPullServerPort()) + .setAllocatedMemoryMB(allocatedResource.allocatedMemoryMB) + .setAllocatedDiskSlots(allocatedResource.allocatedDiskSlots) + .build()); + + + allocatedResourceMap.putIfAbsent(containerIdProto, allocatedResource); + } - allocatedResourceMap.putIfAbsent(containerIdProto, allocatedResource); - } + resourceRequest.callBack.run(WorkerResourceAllocationResponse.newBuilder() + .setQueryId(resourceRequest.request.getQueryId()) + .addAllWorkerAllocatedResource(allocatedResources) + .build() + ); - resourceRequest.callBack.run(WorkerResourceAllocationResponse.newBuilder() - .setQueryId(resourceRequest.request.getQueryId()) - .addAllWorkerAllocatedResource(allocatedResources) - .build() - ); - - } else { - if(LOG.isDebugEnabled()) { - LOG.debug("========================================="); - LOG.debug("Available Workers"); - for(String liveWorker: rmContext.getWorkers().keySet()) { - LOG.debug(rmContext.getWorkers().get(liveWorker).toString()); + } else { + if(LOG.isDebugEnabled()) { + LOG.debug("========================================="); + LOG.debug("Available Workers"); + for(String liveWorker: rmContext.getWorkers().keySet()) { + LOG.debug(rmContext.getWorkers().get(liveWorker).toString()); + } + LOG.debug("========================================="); } - LOG.debug("========================================="); + requestQueue.put(resourceRequest); + Thread.sleep(100); } - requestQueue.put(resourceRequest); - Thread.sleep(100); } + } catch(InterruptedException ie) { LOG.error(ie); } @@ -531,7 +537,12 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke } else { ContainerIdProto containerId = rmContext.getQueryMasterContainer().remove(queryId); releaseWorkerResource(containerId); + rmContext.getStoppedQueryIds().add(queryId); LOG.info(String.format("Released QueryMaster (%s) resource:" + resource, queryId.toString())); } } + + public TajoRMContext getRMContext() { + return rmContext; + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/98f142cc/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java index 0d6b5ee..de6ee9e 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java @@ -281,7 +281,11 @@ public class Worker implements EventHandler<WorkerEvent>, Comparable<Worker> { try { stateMachine.doTransition(event.getType(), event); } catch (InvalidStateTransitonException e) { - LOG.error("Can't handle this event at current state", e); + LOG.error("Can't handle this event at current state" + + ", eventType:" + event.getType().name() + + ", oldState:" + oldState.name() + + ", nextState:" + getState().name() + , e); LOG.error("Invalid event " + event.getType() + " on Worker " + getWorkerId()); } if (oldState != getState()) { http://git-wip-us.apache.org/repos/asf/tajo/blob/98f142cc/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java b/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java index faeadaf..3cb7d25 100644 --- a/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java +++ b/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java @@ -13,6 +13,7 @@ import org.apache.tajo.conf.TajoConf; import org.apache.tajo.ipc.ClientProtos; import org.apache.tajo.jdbc.TajoResultSet; import org.apache.tajo.util.JSPUtil; +import org.apache.tajo.util.TajoIdUtils; import org.codehaus.jackson.map.DeserializationConfig; import org.codehaus.jackson.map.ObjectMapper; @@ -170,7 +171,24 @@ public class QueryExecutorServlet extends HttpServlet { } queryRunners.clear(); } + } else if("killQuery".equals(action)) { + String queryId = request.getParameter("queryId"); + if(queryId == null || queryId.trim().isEmpty()) { + errorResponse(response, "No queryId parameter"); + return; + } + QueryStatus status = tajoClient.killQuery(TajoIdUtils.parseQueryId(queryId)); + + if (status.getState() == TajoProtos.QueryState.QUERY_KILLED) { + returnValue.put("successMessage", queryId + " is killed successfully."); + } else if (status.getState() == TajoProtos.QueryState.QUERY_KILL_WAIT) { + returnValue.put("successMessage", queryId + " will be finished after a while."); + } else { + errorResponse(response, "ERROR:" + status.getErrorMessage()); + return; + } } + returnValue.put("success", "true"); writeHttpResponse(response, returnValue); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/tajo/blob/98f142cc/tajo-core/src/main/java/org/apache/tajo/worker/Task.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java index 4010faf..4e4f5fc 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java @@ -563,7 +563,7 @@ public class Task { int retryWaitTime = 1000; try { // for releasing fetch latch - while(retryNum < maxRetryNum) { + while(!killed && retryNum < maxRetryNum) { if (retryNum > 0) { try { Thread.sleep(retryWaitTime); http://git-wip-us.apache.org/repos/asf/tajo/blob/98f142cc/tajo-core/src/main/resources/webapps/admin/query.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/admin/query.jsp b/tajo-core/src/main/resources/webapps/admin/query.jsp index 4e8d7b0..fecc806 100644 --- a/tajo-core/src/main/resources/webapps/admin/query.jsp +++ b/tajo-core/src/main/resources/webapps/admin/query.jsp @@ -60,9 +60,31 @@ <!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> <html> <head> - <link rel="stylesheet" type = "text/css" href = "/static/style.css" /> - <meta http-equiv="Content-Type" content="text/html; charset=UTF-8"> - <title>Tajo</title> + <link rel="stylesheet" type = "text/css" href = "/static/style.css" /> + <meta http-equiv="Content-Type" content="text/html; charset=UTF-8"> + <title>Tajo</title> + <script src="/static/js/jquery.js" type="text/javascript"></script> + <script type="text/javascript"> + + function killQuery(queryId) { + $.ajax({ + type: "POST", + url: "query_exec", + data: { action: "killQuery", queryId: queryId } + }) + .done(function(msg) { + var resultJson = $.parseJSON(msg); + if(resultJson.success == "false") { + alert(resultJson.errorMessage); + } else { + alert(resultJson.successMessage); + location.reload(); + } + }) + } + + + </script> </head> <body> <%@ include file="header.jsp"%> @@ -76,7 +98,7 @@ } else { %> <table width="100%" border="1" class='border_table'> - <tr></tr><th>QueryId</th><th>Query Master</th><th>Started</th><th>Progress</th><th>Time</th><th>Status</th></th><th>sql</th></tr> + <tr></tr><th>QueryId</th><th>Query Master</th><th>Started</th><th>Progress</th><th>Time</th><th>Status</th></th><th>sql</th><th>Kill Query</th></tr> <% for(QueryInProgress eachQuery: runningQueries) { long time = System.currentTimeMillis() - eachQuery.getQueryInfo().getStartTime(); @@ -91,6 +113,7 @@ <td><%=StringUtils.formatTime(time)%></td> <td><%=eachQuery.getQueryInfo().getQueryState()%></td> <td><%=eachQuery.getQueryInfo().getSql()%></td> + <td><input id="btnSubmit" type="submit" value="Kill" onClick="javascript:killQuery('<%=eachQuery.getQueryId()%>');"></td> </tr> <% } http://git-wip-us.apache.org/repos/asf/tajo/blob/98f142cc/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java b/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java index 34deb29..09d674a 100644 --- a/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java +++ b/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java @@ -34,8 +34,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import static org.apache.tajo.ipc.TajoResourceTrackerProtocol.NodeHeartbeat; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; public class TestTajoResourceManager { private final PrimitiveProtos.BoolProto BOOL_TRUE = PrimitiveProtos.BoolProto.newBuilder().setValue(true).build(); @@ -375,4 +374,80 @@ public class TestTajoResourceManager { } } } + + @Test + public void testDiskResourceWithStoppedQuery() throws Exception { + TajoWorkerResourceManager tajoWorkerResourceManager = null; + + try { + tajoWorkerResourceManager = initResourceManager(false); + + final float minDiskSlots = 1.0f; + final float maxDiskSlots = 2.0f; + int memoryMB = 256; + + QueryId queryId = QueryIdFactory.newQueryId(queryIdTime, 3); + + WorkerResourceAllocationRequest request = WorkerResourceAllocationRequest.newBuilder() + .setResourceRequestPriority(ResourceRequestPriority.DISK) + .setNumContainers(60) + .setQueryId(queryId.getProto()) + .setMaxDiskSlotPerContainer(maxDiskSlots) + .setMinDiskSlotPerContainer(minDiskSlots) + .setMinMemoryMBPerContainer(memoryMB) + .setMaxMemoryMBPerContainer(memoryMB) + .build(); + + final CountDownLatch barrier = new CountDownLatch(1); + final List<YarnProtos.ContainerIdProto> containerIds = new ArrayList<YarnProtos.ContainerIdProto>(); + + + RpcCallback<WorkerResourceAllocationResponse> callBack = new RpcCallback<WorkerResourceAllocationResponse>() { + + @Override + public void run(WorkerResourceAllocationResponse response) { + TestTajoResourceManager.this.response = response; + barrier.countDown(); + } + }; + + tajoWorkerResourceManager.getRMContext().getStoppedQueryIds().add(queryId); + tajoWorkerResourceManager.allocateWorkerResources(request, callBack); + assertFalse(barrier.await(3, TimeUnit.SECONDS)); + + assertNull(response); + + // assert after callback + int totalUsedDisks = 0; + for(Worker worker: tajoWorkerResourceManager.getWorkers().values()) { + WorkerResource resource = worker.getResource(); + //each worker allocated 3 container (2 disk slot = 2, 1 disk slot = 1) + assertEquals(5.0f, resource.getAvailableDiskSlots(), 0); + assertEquals(0, resource.getUsedDiskSlots(), 0); + assertEquals(0, resource.getUsedMemoryMB()); + + totalUsedDisks += resource.getUsedDiskSlots(); + } + + assertEquals(0, totalUsedDisks, 0); + + for(YarnProtos.ContainerIdProto eachContainerId: containerIds) { + tajoWorkerResourceManager.releaseWorkerResource(eachContainerId); + } + + for(Worker worker: tajoWorkerResourceManager.getWorkers().values()) { + WorkerResource resource = worker.getResource(); + assertEquals(workerMemoryMB, resource.getAvailableMemoryMB()); + assertEquals(0, resource.getUsedMemoryMB()); + + assertEquals(workerDiskSlots, resource.getAvailableDiskSlots(), 0); + assertEquals(0.0f, resource.getUsedDiskSlots(), 0); + } + } finally { + if (tajoWorkerResourceManager != null) { + tajoWorkerResourceManager.stop(); + } + } + } + }
