>From Janhavi Tripurwar <[email protected]>:
Janhavi Tripurwar has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19868 )
Change subject: track job correctly
......................................................................
track job correctly
Change-Id: I46e7aaf64fa040fbeaddd28403265937e4367c62
---
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
A
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/StatementTracker.java
M
asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
A
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IStatementTracker.java
M
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
7 files changed, 308 insertions(+), 160 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/68/19868/1
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
index bd62814..98fc87c 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
@@ -18,45 +18,32 @@
*/
package org.apache.asterix.translator;
-import static
org.apache.hyracks.api.job.resource.IJobCapacityController.JobSubmissionStatus.QUEUE;
-
+import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.TimeUnit;
import org.apache.asterix.common.api.ICommonRequestParameters;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.asterix.om.base.AMutableDateTime;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobStatus;
-import org.apache.hyracks.api.job.resource.IJobCapacityController;
-import org.apache.hyracks.api.job.resource.IReadOnlyClusterCapacity;
-import org.apache.hyracks.api.util.ExceptionUtils;
-import org.apache.hyracks.util.LogRedactionUtil;
-import org.apache.hyracks.util.StorageUtil;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
public class ClientRequest extends BaseClientRequest {
- protected static final int MAX_STATEMENT_LENGTH =
- StorageUtil.getIntSizeInBytes(64,
StorageUtil.StorageUnit.KILOBYTE);
protected final long creationTime = System.nanoTime();
protected final Thread executor;
- protected final String statement;
protected final String clientContextId;
- protected final JobState jobState;
- protected volatile JobId jobId;
- private volatile String plan; // can be null
+ private final List<StatementTracker> statementTrackersList = new
ArrayList<>();
+ private long stmtId;
public ClientRequest(ICommonRequestParameters requestParameters) {
super(requestParameters.getRequestReference());
this.clientContextId = requestParameters.getClientContextId();
- String stmt = requestParameters.getStatement();
- this.statement = stmt.length() > MAX_STATEMENT_LENGTH ?
stmt.substring(0, MAX_STATEMENT_LENGTH) : stmt;
this.executor = Thread.currentThread();
- this.jobState = new JobState();
}
@Override
@@ -64,17 +51,6 @@
return clientContextId;
}
- public void setPlan(String plan) {
- if (plan != null) {
- this.plan = plan.length() > MAX_STATEMENT_LENGTH ?
plan.substring(0, MAX_STATEMENT_LENGTH) : plan;
- }
- }
-
- public synchronized void setJobId(JobId jobId) {
- this.jobId = jobId;
- setRunning();
- }
-
public Thread getExecutor() {
return executor;
}
@@ -83,15 +59,19 @@
protected void doCancel(ICcApplicationContext appCtx) throws
HyracksDataException {
// if the request has a job, we abort the job and do not interrupt the
thread as it will be notified
// that the job has been cancelled. Otherwise, we interrupt the thread
- if (jobId != null) {
- IHyracksClientConnection hcc = appCtx.getHcc();
- try {
- hcc.cancelJob(jobId);
- } catch (Exception e) {
- throw HyracksDataException.create(e);
+ for (StatementTracker statementTracker : statementTrackersList) {
+ JobId jobId = statementTracker.getJobId();
+ JobStatus status = statementTracker.getJobStatus();
+ if (jobId != null && status == JobStatus.RUNNING) {
+ IHyracksClientConnection hcc = appCtx.getHcc();
+ try {
+ hcc.cancelJob(jobId);
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
+ }
+ } else if (executor != null) {
+ executor.interrupt();
}
- } else if (executor != null) {
- executor.interrupt();
}
}
@@ -112,81 +92,48 @@
}
private ObjectNode asJson(ObjectNode json, boolean redact) {
- putJobDetails(json, redact);
- json.put("statement", redact ? LogRedactionUtil.statement(statement) :
statement);
json.put("clientContextID", clientContextId);
- if (plan != null) {
- json.put("plan", redact ? LogRedactionUtil.userData(plan) : plan);
- }
+ json.put("statements", processStatementTrackers(redact));
return json;
}
- @Override
- public void jobCreated(JobId jobId, IReadOnlyClusterCapacity
requiredClusterCapacity,
- IJobCapacityController.JobSubmissionStatus status) {
- jobState.createTime = System.currentTimeMillis();
- jobState.status = status == QUEUE ? JobStatus.PENDING :
JobStatus.RUNNING;
- jobState.requiredCPUs = requiredClusterCapacity.getAggregatedCores();
- jobState.requiredMemoryInBytes =
requiredClusterCapacity.getAggregatedMemoryByteSize();
+ public ArrayNode processStatementTrackers(boolean redact) {
+ ArrayNode statementsArray = JsonNodeFactory.instance.arrayNode();
+ for (StatementTracker statementTracker : statementTrackersList) {
+ ObjectNode trackerJson = JsonNodeFactory.instance.objectNode();
+ statementTracker.asJson(trackerJson, redact);
+ statementsArray.add(trackerJson);
+ }
+ return statementsArray;
+ }
+
+ protected StatementTracker createNewStatementTracker(long stmtId, String
stmtText, String uuid) {
+ return new StatementTracker(stmtId, stmtText, uuid);
+ }
+
+ public StatementTracker getStatementTrackerForStmt(long stmtId) {
+ return statementTrackersList.get((int) stmtId);
}
@Override
- public void jobStarted(JobId jobId) {
- jobState.startTime = System.currentTimeMillis();
- jobState.status = JobStatus.RUNNING;
+ public long getStmtId() {
+ return stmtId;
}
- @Override
- public void jobFinished(JobId jobId, JobStatus jobStatus, List<Exception>
exceptions) {
- jobState.endTime = System.currentTimeMillis();
- jobState.status = jobStatus;
- if (exceptions != null && !exceptions.isEmpty()) {
- jobState.errorMsg = processException(exceptions.get(0));
+ private void setStatementId(long stmtId) {
+ this.stmtId = stmtId;
+ }
+
+ public void createAndTrackStatement(long stmtId, String stmtText, String
uuid) {
+ setStatementId(stmtId);
+ StatementTracker tracker = createNewStatementTracker(stmtId, stmtText,
uuid);
+ this.statementTrackersList.add(tracker);
+ }
+
+ public void associateJobWithStatementTracker(JobId jobId, StatementTracker
tracker) {
+ if (jobId != null && tracker != null) {
+ tracker.setJobId(jobId); // Also record JobId in the tracker itself
+ setRunning();
}
}
-
- protected String processException(Exception e) {
- return ExceptionUtils.unwrap(e).getMessage();
- }
-
- private void putJobDetails(ObjectNode json, boolean redact) {
- try {
- json.put("jobId", jobId != null ? jobId.toString() : null);
- putJobState(json, jobState, redact);
- } catch (Throwable th) {
- // ignore
- }
- }
-
- private static void putJobState(ObjectNode json, JobState state, boolean
redact) {
- AMutableDateTime dateTime = new AMutableDateTime(0);
- putTime(json, state.createTime, "jobCreateTime", dateTime);
- putTime(json, state.startTime, "jobStartTime", dateTime);
- putTime(json, state.endTime, "jobEndTime", dateTime);
- long queueTime = (state.startTime > 0 ? state.startTime :
System.currentTimeMillis()) - state.createTime;
- json.put("jobQueueTime", TimeUnit.MILLISECONDS.toSeconds(queueTime));
- json.put("jobStatus", String.valueOf(state.status));
- json.put("jobRequiredCPUs", state.requiredCPUs);
- json.put("jobRequiredMemory", state.requiredMemoryInBytes);
- if (state.errorMsg != null) {
- json.put("error", redact ?
LogRedactionUtil.userData(state.errorMsg) : state.errorMsg);
- }
- }
-
- private static void putTime(ObjectNode json, long time, String label,
AMutableDateTime dateTime) {
- if (time > 0) {
- dateTime.setValue(time);
- json.put(label, dateTime.toSimpleString());
- }
- }
-
- static class JobState {
- volatile long createTime;
- volatile long startTime;
- volatile long endTime;
- volatile long requiredMemoryInBytes;
- volatile int requiredCPUs;
- volatile JobStatus status;
- volatile String errorMsg;
- }
}
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/StatementTracker.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/StatementTracker.java
new file mode 100644
index 0000000..5123508
--- /dev/null
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/StatementTracker.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.translator;
+
+import static
org.apache.hyracks.api.job.resource.IJobCapacityController.JobSubmissionStatus.QUEUE;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.asterix.common.api.IStatementTracker;
+import org.apache.asterix.om.base.AMutableDateTime;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
+import org.apache.hyracks.api.job.resource.IReadOnlyClusterCapacity;
+import org.apache.hyracks.api.util.ExceptionUtils;
+import org.apache.hyracks.util.LogRedactionUtil;
+import org.apache.hyracks.util.StorageUtil;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+public class StatementTracker implements IStatementTracker {
+ protected static final int MAX_STATEMENT_LENGTH =
+ StorageUtil.getIntSizeInBytes(64,
StorageUtil.StorageUnit.KILOBYTE);
+ protected volatile JobState jobState;
+ protected volatile String statement;
+ protected volatile JobId jobId;
+ private volatile String plan; // can be null
+ private final String fullStatementId;
+
+ public StatementTracker(long statementId, String statement, String uuid) {
+ this.jobState = new JobState();
+ this.statement = statement;
+ this.fullStatementId = uuid + "-" + statementId;
+ }
+
+ public ObjectNode asJson(ObjectNode json, boolean redact) {
+ putJobDetails(json, redact);
+ json.put("fullStatementId", fullStatementId);
+ json.put("statement", redact ? LogRedactionUtil.statement(statement) :
statement);
+ if (plan != null) {
+ json.put("plan", redact ? LogRedactionUtil.userData(plan) : plan);
+ }
+ return json;
+ }
+
+ public synchronized void setJobId(JobId jobId) {
+ this.jobId = jobId;
+ }
+
+ public synchronized JobId getJobId() {
+ return jobId;
+ }
+
+ public synchronized JobStatus getJobStatus() {
+ return jobState.status;
+ }
+
+ public void setPlan(String plan) {
+ if (plan != null) {
+ this.plan = plan.length() > StatementTracker.MAX_STATEMENT_LENGTH
+ ? plan.substring(0, StatementTracker.MAX_STATEMENT_LENGTH)
: plan;
+ }
+ }
+
+ public void jobCreated(JobId jobId, IReadOnlyClusterCapacity
requiredClusterCapacity,
+ IJobCapacityController.JobSubmissionStatus status) {
+ jobState.createTime = System.currentTimeMillis();
+ jobState.status = status == QUEUE ? JobStatus.PENDING :
JobStatus.RUNNING;
+ jobState.requiredCPUs = requiredClusterCapacity.getAggregatedCores();
+ jobState.requiredMemoryInBytes =
requiredClusterCapacity.getAggregatedMemoryByteSize();
+ }
+
+ public void jobStarted(JobId jobId) {
+ jobState.startTime = System.currentTimeMillis();
+ jobState.status = JobStatus.RUNNING;
+ }
+
+ public void jobFinished(JobId jobId, JobStatus jobStatus, List<Exception>
exceptions) {
+ jobState.endTime = System.currentTimeMillis();
+ jobState.status = jobStatus;
+ if (exceptions != null && !exceptions.isEmpty()) {
+ jobState.errorMsg = processException(exceptions.get(0));
+ }
+ }
+
+ protected String processException(Exception e) {
+ return ExceptionUtils.unwrap(e).getMessage();
+ }
+
+ private void putJobDetails(ObjectNode json, boolean redact) {
+ try {
+ json.put("jobId", jobId != null ? jobId.toString() : null);
+ putJobState(json, jobState, redact);
+ } catch (Throwable th) {
+ // ignore
+ }
+ }
+
+ private static void putJobState(ObjectNode json, JobState state, boolean
redact) {
+ AMutableDateTime dateTime = new AMutableDateTime(0);
+ putTime(json, state.createTime, "jobCreateTime", dateTime);
+ putTime(json, state.startTime, "jobStartTime", dateTime);
+ putTime(json, state.endTime, "jobEndTime", dateTime);
+ long queueTime = (state.startTime > 0 ? state.startTime :
System.currentTimeMillis()) - state.createTime;
+ json.put("jobQueueTime", TimeUnit.MILLISECONDS.toSeconds(queueTime));
+ json.put("jobStatus", String.valueOf(state.status));
+ json.put("jobRequiredCPUs", state.requiredCPUs);
+ json.put("jobRequiredMemory", state.requiredMemoryInBytes);
+ if (state.errorMsg != null) {
+ json.put("error", redact ?
LogRedactionUtil.userData(state.errorMsg) : state.errorMsg);
+ }
+ }
+
+ private static void putTime(ObjectNode json, long time, String label,
AMutableDateTime dateTime) {
+ if (time > 0) {
+ dateTime.setValue(time);
+ json.put(label, dateTime.toSimpleString());
+ }
+ }
+
+ static class JobState {
+ volatile long createTime;
+ volatile long startTime;
+ volatile long endTime;
+ volatile long requiredMemoryInBytes;
+ volatile int requiredCPUs;
+ volatile JobStatus status;
+ volatile String errorMsg;
+ }
+}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index cd7af74..c24764e 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -244,6 +244,7 @@
import org.apache.asterix.translator.SchedulableClientRequest;
import org.apache.asterix.translator.SessionConfig;
import org.apache.asterix.translator.SessionOutput;
+import org.apache.asterix.translator.StatementTracker;
import org.apache.asterix.translator.TypeTranslator;
import org.apache.asterix.translator.util.ValidateUtil;
import org.apache.asterix.utils.DataverseUtil;
@@ -358,7 +359,7 @@
IRequestParameters requestParameters, RequestExecutionContext
requestExecutionContext) throws Exception {
validateStatements(requestParameters);
- trackRequest(requestParameters);
+ ClientRequest clientRequest = (ClientRequest)
trackRequest(requestParameters);
Counter resultSetIdCounter = new Counter(0);
FileSplit outputFile = null;
String threadName = Thread.currentThread().getName();
@@ -375,6 +376,9 @@
try {
for (Statement stmt : statements) {
+ long stmtId = statements.indexOf(stmt);
+ clientRequest.createAndTrackStatement(stmtId, stmt.toString(),
+ requestParameters.getRequestReference().getUuid());
if (sessionConfig.is(SessionConfig.FORMAT_HTML)) {
sessionOutput.out().println(ApiServlet.HTML_STATEMENT_SEPARATOR);
}
@@ -387,9 +391,8 @@
rewriteStatement(stmt, stmtRewriter, metadataProvider);
Statement.Kind kind = stmt.getKind();
- ExecuteEachStatementResponseMessage
eachStatementResponseMessage =
- createNewStatementResponseMessage(kind,
requestParameters,
-
requestExecutionContext.isRequestReceivedByCc(), statements.indexOf(stmt));
+ ExecuteEachStatementResponseMessage
eachStatementResponseMessage = createNewStatementResponseMessage(
+ kind, requestParameters,
requestExecutionContext.isRequestReceivedByCc(), stmtId);
StatementProperties statementProperties =
eachStatementResponseMessage.getStatementProperties();
Stats stats = eachStatementResponseMessage.getStats();
IStatementExecutor.ResultMetadata outMetadata =
eachStatementResponseMessage.getMetadata();
@@ -614,7 +617,6 @@
this.appCtx.getRequestTracker().incrementFailedRequests();
throw ex;
} finally {
- // async queries are completed after their job completes
if (ResultDelivery.ASYNC != resultDelivery) {
appCtx.getRequestTracker().complete(requestParameters.getRequestReference().getUuid());
}
@@ -4241,7 +4243,8 @@
String reqId =
requestParameters.getRequestReference().getUuid();
final IRequestTracker requestTracker =
appCtx.getRequestTracker();
final ClientRequest clientRequest = (ClientRequest)
requestTracker.get(reqId);
- jobId = runTrackJob(hcc, spec, jobFlags, reqId,
requestParameters.getClientContextId(), clientRequest);
+ jobId = runTrackJob(hcc, spec, jobFlags, reqId,
requestParameters.getClientContextId(), clientRequest,
+ responseMessage.getStatementId());
clientRequest.markCancellable();
String nameBefore = Thread.currentThread().getName();
try {
@@ -4444,7 +4447,8 @@
jobSpec.setProperty(GlobalTxManager.GlOBAL_TX_PROPERTY_NAME, new GlobalTxInfo(
participatingDatasetIds, numParticipatingNodes,
numParticipatingPartitions));
}
- jobId = runTrackJob(hcc, jobSpec, jobFlags, reqId,
reqParams.getClientContextId(), clientRequest);
+ jobId = runTrackJob(hcc, jobSpec, jobFlags, reqId,
reqParams.getClientContextId(), clientRequest,
+ responseMessage.getStatementId());
clientRequest.markCancellable();
String nameBefore = Thread.currentThread().getName();
try {
@@ -4517,7 +4521,7 @@
final IRequestTracker requestTracker =
appCtx.getRequestTracker();
final ClientRequest clientRequest = (ClientRequest)
requestTracker.get(reqId);
jobId = runTrackJob(hcc, jobSpec, jobFlags, reqId,
requestParameters.getClientContextId(),
- clientRequest);
+ clientRequest, responseMessage.getStatementId());
clientRequest.markCancellable();
String nameBefore = Thread.currentThread().getName();
try {
@@ -4546,11 +4550,12 @@
}
private static JobId runTrackJob(IHyracksClientConnection hcc,
JobSpecification jobSpec, EnumSet<JobFlag> jobFlags,
- String reqId, String clientCtxId, ClientRequest clientRequest)
throws Exception {
+ String reqId, String clientCtxId, ClientRequest clientRequest,
long stmtIdx) throws Exception {
jobSpec.setRequestId(reqId);
JobId jobId = JobUtils.runJobIfActive(hcc, jobSpec, jobFlags, false);
LOGGER.info("Created job {} for uuid:{}, clientContextID:{}", jobId,
reqId, clientCtxId);
- clientRequest.setJobId(jobId);
+ StatementTracker tracker =
clientRequest.getStatementTrackerForStmt(stmtIdx - 1);
+ clientRequest.associateJobWithStatementTracker(jobId, tracker);
return jobId;
}
@@ -5573,7 +5578,8 @@
responsePrinter.addResultPrinter(new
ResultsPrinter(appCtx, resultReader,
metadataProvider.findOutputRecordType(), stats,
sessionOutput));
responsePrinter.printResults();
- }, requestParameters, cancellable, appCtx, metadataProvider,
atomicStmt);
+ }, requestParameters, cancellable, appCtx, metadataProvider,
atomicStmt,
+ responseMessage.getStatementId());
break;
case DEFERRED:
createAndRunJob(hcc, jobFlags, null, compiler, locker,
resultDelivery, id -> {
@@ -5585,7 +5591,8 @@
outMetadata.setResultSets(org.apache.commons.lang3.tuple.Triple.of(id,
resultSetId,
metadataProvider.findOutputRecordType()));
}
- }, requestParameters, cancellable, appCtx, metadataProvider,
atomicStmt);
+ }, requestParameters, cancellable, appCtx, metadataProvider,
atomicStmt,
+ responseMessage.getStatementId());
break;
default:
break;
@@ -5610,7 +5617,7 @@
stats.setJobProfile(resultMetadata.getJobProfile());
apiFramework.generateOptimizedLogicalPlanWithProfile(resultMetadata.getJobProfile());
}
-
clientRequest.setPlan(apiFramework.getExecutionPlans().getOptimizedLogicalPlan());
+ //
clientRequest.setPlan(apiFramework.getExecutionPlans().getOptimizedLogicalPlan());
stats.updateTotalWarningsCount(resultMetadata.getTotalWarningsCount());
WarningUtil.mergeWarnings(resultMetadata.getWarnings(),
warningCollector);
}
@@ -5628,7 +5635,7 @@
printed.setTrue();
printed.notify();
}
- }, requestParameters, cancellable, appCtx, metadataProvider,
atomicStmt);
+ }, requestParameters, cancellable, appCtx, metadataProvider,
atomicStmt, responseMessage.getStatementId());
} catch (Exception e) {
if (Objects.equals(JobId.INVALID, jobId.getValue())) {
// compilation failed
@@ -5665,7 +5672,7 @@
private void createAndRunJob(IHyracksClientConnection hcc,
EnumSet<JobFlag> jobFlags, Mutable<JobId> jId,
IStatementCompiler compiler, IMetadataLocker locker,
ResultDelivery resultDelivery, IResultPrinter printer,
IRequestParameters requestParameters, boolean cancellable,
ICcApplicationContext appCtx,
- MetadataProvider metadataProvider, Statement atomicStatement)
throws Exception {
+ MetadataProvider metadataProvider, Statement atomicStatement, long
stmdId) throws Exception {
String reqId = requestParameters.getRequestReference().getUuid();
final IRequestTracker requestTracker = appCtx.getRequestTracker();
final ClientRequest clientRequest = (ClientRequest)
requestTracker.get(reqId);
@@ -5703,7 +5710,8 @@
}
}
- jobId = runTrackJob(hcc, jobSpec, jobFlags, reqId,
requestParameters.getClientContextId(), clientRequest);
+ jobId = runTrackJob(hcc, jobSpec, jobFlags, reqId,
requestParameters.getClientContextId(), clientRequest,
+ stmdId);
if (jId != null) {
jId.setValue(jobId);
}
@@ -5842,9 +5850,10 @@
}
}
- protected void trackRequest(IRequestParameters requestParameters) throws
HyracksDataException {
+ protected IClientRequest trackRequest(IRequestParameters
requestParameters) throws HyracksDataException {
final IClientRequest clientRequest =
appCtx.getReceptionist().requestReceived(requestParameters);
this.appCtx.getRequestTracker().track(clientRequest);
+ return clientRequest;
}
protected void validateStatements(IRequestParameters requestParameters)
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
index 673611d..7662cb5 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
@@ -37,7 +37,6 @@
import org.apache.asterix.translator.ClientRequest;
import org.apache.hyracks.api.application.ICCServiceContext;
import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.http.api.IServletRequest;
import org.apache.hyracks.http.api.IServletResponse;
@@ -80,7 +79,7 @@
RequestParameters requestParameters = new
RequestParameters(requestReference, "select 1", null, null, null,
null, null, "1", null, null, null, true);
ClientRequest request = new ClientRequest(requestParameters);
- request.setJobId(new JobId(1));
+ // request.setJobId(new JobId(1));
request.markCancellable();
tracker.track(request);
// Tests the case that query is in the map.
@@ -97,7 +96,7 @@
requestParameters = new RequestParameters(requestReference2, "select
1", null, null, null, null, null, "2",
null, null, null, true);
ClientRequest request2 = new ClientRequest(requestParameters);
- request2.setJobId(new JobId(2));
+ // request2.setJobId(new JobId(2));
request2.markCancellable();
tracker.track(request2);
Mockito.doThrow(new Exception()).when(mockHcc).cancelJob(any());
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
index ee518dd..17d6a4a 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
@@ -18,14 +18,8 @@
*/
package org.apache.asterix.common.api;
-import java.util.List;
-
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobStatus;
-import org.apache.hyracks.api.job.resource.IJobCapacityController;
-import org.apache.hyracks.api.job.resource.IReadOnlyClusterCapacity;
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -109,29 +103,12 @@
ObjectNode asRedactedJson();
/**
- * Called when the job is created.
- *
- * @param jobId the job id
- * @param requiredClusterCapacity the required resources by the job
- * @param status the status of the job; whether it will be executed or
queued
+ * @return the current statementTracker of the request
*/
- void jobCreated(JobId jobId, IReadOnlyClusterCapacity
requiredClusterCapacity,
- IJobCapacityController.JobSubmissionStatus status);
+ IStatementTracker getStatementTrackerForStmt(long stmdId);
/**
- * Called when the job starts running.
- *
- * @param jobId the job id
+ * @return the current statementId of the request
*/
- void jobStarted(JobId jobId);
-
- /**
- * Called when the job finishes.
- *
- * @param jobId the job id
- * @param jobStatus the final job status
- * @param exceptions exceptions encountered if any
- */
- void jobFinished(JobId jobId, JobStatus jobStatus, List<Exception>
exceptions);
-
+ long getStmtId();
}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IStatementTracker.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IStatementTracker.java
new file mode 100644
index 0000000..fcd3667
--- /dev/null
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IStatementTracker.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.common.api;
+
+import java.util.List;
+
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
+import org.apache.hyracks.api.job.resource.IReadOnlyClusterCapacity;
+
+public interface IStatementTracker {
+
+ /**
+ * Called when the job is created.
+ *
+ * @param jobId the job id
+ * @param requiredClusterCapacity the required resources by the job
+ * @param status the status of the job; whether it will be executed or
queued
+ */
+ void jobCreated(JobId jobId, IReadOnlyClusterCapacity
requiredClusterCapacity,
+ IJobCapacityController.JobSubmissionStatus status);
+
+ /**
+ * Called when the job starts running.
+ *
+ * @param jobId the job id
+ */
+ void jobStarted(JobId jobId);
+
+ /**
+ * Called when the job finishes.
+ *
+ * @param jobId the job id
+ * @param jobStatus the final job status
+ * @param exceptions exceptions encountered if any
+ */
+ void jobFinished(JobId jobId, JobStatus jobStatus, List<Exception>
exceptions);
+}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
index ef9154d..15b9865 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
@@ -30,6 +30,7 @@
import org.apache.asterix.common.api.IClientRequest;
import org.apache.asterix.common.api.IRequestTracker;
+import org.apache.asterix.common.api.IStatementTracker;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.HyracksException;
@@ -148,13 +149,14 @@
}
@Override
- public void notifyJobCreation(JobId jobId, JobSpecification spec,
IJobCapacityController.JobSubmissionStatus status)
- throws HyracksException {
+ public void notifyJobCreation(JobId jobId, JobSpecification spec,
+ IJobCapacityController.JobSubmissionStatus status) {
String requestId = spec.getRequestId();
if (requestId != null) {
IClientRequest request = getRequest(requestId);
- if (request != null) {
- request.jobCreated(jobId, spec.getRequiredClusterCapacity(),
status);
+ IStatementTracker tracker =
request.getStatementTrackerForStmt(request.getStmtId());
+ if (tracker != null) {
+ tracker.jobCreated(jobId, spec.getRequiredClusterCapacity(),
status);
}
}
}
@@ -164,20 +166,21 @@
String requestId = spec.getRequestId();
if (requestId != null) {
IClientRequest request = getRequest(requestId);
- if (request != null) {
- request.jobStarted(jobId);
+ IStatementTracker tracker =
request.getStatementTrackerForStmt(request.getStmtId());
+ if (tracker != null) {
+ tracker.jobStarted(jobId);
}
}
}
@Override
- public void notifyJobFinish(JobId jobId, JobSpecification spec, JobStatus
jobStatus, List<Exception> exceptions)
- throws HyracksException {
+ public void notifyJobFinish(JobId jobId, JobSpecification spec, JobStatus
jobStatus, List<Exception> exceptions) {
String requestId = spec.getRequestId();
if (requestId != null) {
IClientRequest request = getRequest(requestId);
- if (request != null) {
- request.jobFinished(jobId, jobStatus, exceptions);
+ IStatementTracker tracker =
request.getStatementTrackerForStmt(request.getStmtId());
+ if (tracker != null) {
+ tracker.jobFinished(jobId, jobStatus, exceptions);
}
}
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19868
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I46e7aaf64fa040fbeaddd28403265937e4367c62
Gerrit-Change-Number: 19868
Gerrit-PatchSet: 1
Gerrit-Owner: Janhavi Tripurwar <[email protected]>
Gerrit-MessageType: newchange