>From Janhavi Tripurwar <[email protected]>:

Janhavi Tripurwar has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19868 )


Change subject: track job correctly
......................................................................

track job correctly

Change-Id: I46e7aaf64fa040fbeaddd28403265937e4367c62
---
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
A 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/StatementTracker.java
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
A 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IStatementTracker.java
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
7 files changed, 308 insertions(+), 160 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/68/19868/1

diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
index bd62814..98fc87c 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
@@ -18,45 +18,32 @@
  */
 package org.apache.asterix.translator;

-import static 
org.apache.hyracks.api.job.resource.IJobCapacityController.JobSubmissionStatus.QUEUE;
-
+import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.TimeUnit;

 import org.apache.asterix.common.api.ICommonRequestParameters;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.asterix.om.base.AMutableDateTime;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobStatus;
-import org.apache.hyracks.api.job.resource.IJobCapacityController;
-import org.apache.hyracks.api.job.resource.IReadOnlyClusterCapacity;
-import org.apache.hyracks.api.util.ExceptionUtils;
-import org.apache.hyracks.util.LogRedactionUtil;
-import org.apache.hyracks.util.StorageUtil;

+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
 import com.fasterxml.jackson.databind.node.ObjectNode;

 public class ClientRequest extends BaseClientRequest {

-    protected static final int MAX_STATEMENT_LENGTH =
-            StorageUtil.getIntSizeInBytes(64, 
StorageUtil.StorageUnit.KILOBYTE);
     protected final long creationTime = System.nanoTime();
     protected final Thread executor;
-    protected final String statement;
     protected final String clientContextId;
-    protected final JobState jobState;
-    protected volatile JobId jobId;
-    private volatile String plan; // can be null
+    private final List<StatementTracker> statementTrackersList = new 
ArrayList<>();
+    private long stmtId;

     public ClientRequest(ICommonRequestParameters requestParameters) {
         super(requestParameters.getRequestReference());
         this.clientContextId = requestParameters.getClientContextId();
-        String stmt = requestParameters.getStatement();
-        this.statement = stmt.length() > MAX_STATEMENT_LENGTH ? 
stmt.substring(0, MAX_STATEMENT_LENGTH) : stmt;
         this.executor = Thread.currentThread();
-        this.jobState = new JobState();
     }

     @Override
@@ -64,17 +51,6 @@
         return clientContextId;
     }

-    public void setPlan(String plan) {
-        if (plan != null) {
-            this.plan = plan.length() > MAX_STATEMENT_LENGTH ? 
plan.substring(0, MAX_STATEMENT_LENGTH) : plan;
-        }
-    }
-
-    public synchronized void setJobId(JobId jobId) {
-        this.jobId = jobId;
-        setRunning();
-    }
-
     public Thread getExecutor() {
         return executor;
     }
@@ -83,15 +59,19 @@
     protected void doCancel(ICcApplicationContext appCtx) throws 
HyracksDataException {
         // if the request has a job, we abort the job and do not interrupt the 
thread as it will be notified
         // that the job has been cancelled. Otherwise, we interrupt the thread
-        if (jobId != null) {
-            IHyracksClientConnection hcc = appCtx.getHcc();
-            try {
-                hcc.cancelJob(jobId);
-            } catch (Exception e) {
-                throw HyracksDataException.create(e);
+        for (StatementTracker statementTracker : statementTrackersList) {
+            JobId jobId = statementTracker.getJobId();
+            JobStatus status = statementTracker.getJobStatus();
+            if (jobId != null && status == JobStatus.RUNNING) {
+                IHyracksClientConnection hcc = appCtx.getHcc();
+                try {
+                    hcc.cancelJob(jobId);
+                } catch (Exception e) {
+                    throw HyracksDataException.create(e);
+                }
+            } else if (executor != null) {
+                executor.interrupt();
             }
-        } else if (executor != null) {
-            executor.interrupt();
         }
     }

@@ -112,81 +92,48 @@
     }

     private ObjectNode asJson(ObjectNode json, boolean redact) {
-        putJobDetails(json, redact);
-        json.put("statement", redact ? LogRedactionUtil.statement(statement) : 
statement);
         json.put("clientContextID", clientContextId);
-        if (plan != null) {
-            json.put("plan", redact ? LogRedactionUtil.userData(plan) : plan);
-        }
+        json.put("statements", processStatementTrackers(redact));
         return json;
     }

-    @Override
-    public void jobCreated(JobId jobId, IReadOnlyClusterCapacity 
requiredClusterCapacity,
-            IJobCapacityController.JobSubmissionStatus status) {
-        jobState.createTime = System.currentTimeMillis();
-        jobState.status = status == QUEUE ? JobStatus.PENDING : 
JobStatus.RUNNING;
-        jobState.requiredCPUs = requiredClusterCapacity.getAggregatedCores();
-        jobState.requiredMemoryInBytes = 
requiredClusterCapacity.getAggregatedMemoryByteSize();
+    public ArrayNode processStatementTrackers(boolean redact) {
+        ArrayNode statementsArray = JsonNodeFactory.instance.arrayNode();
+        for (StatementTracker statementTracker : statementTrackersList) {
+            ObjectNode trackerJson = JsonNodeFactory.instance.objectNode();
+            statementTracker.asJson(trackerJson, redact);
+            statementsArray.add(trackerJson);
+        }
+        return statementsArray;
+    }
+
+    protected StatementTracker createNewStatementTracker(long stmtId, String 
stmtText, String uuid) {
+        return new StatementTracker(stmtId, stmtText, uuid);
+    }
+
+    public StatementTracker getStatementTrackerForStmt(long stmtId) {
+        return statementTrackersList.get((int) stmtId);
     }

     @Override
-    public void jobStarted(JobId jobId) {
-        jobState.startTime = System.currentTimeMillis();
-        jobState.status = JobStatus.RUNNING;
+    public long getStmtId() {
+        return stmtId;
     }

-    @Override
-    public void jobFinished(JobId jobId, JobStatus jobStatus, List<Exception> 
exceptions) {
-        jobState.endTime = System.currentTimeMillis();
-        jobState.status = jobStatus;
-        if (exceptions != null && !exceptions.isEmpty()) {
-            jobState.errorMsg = processException(exceptions.get(0));
+    private void setStatementId(long stmtId) {
+        this.stmtId = stmtId;
+    }
+
+    public void createAndTrackStatement(long stmtId, String stmtText, String 
uuid) {
+        setStatementId(stmtId);
+        StatementTracker tracker = createNewStatementTracker(stmtId, stmtText, 
uuid);
+        this.statementTrackersList.add(tracker);
+    }
+
+    public void associateJobWithStatementTracker(JobId jobId, StatementTracker 
tracker) {
+        if (jobId != null && tracker != null) {
+            tracker.setJobId(jobId); // Also record JobId in the tracker itself
+            setRunning();
         }
     }
-
-    protected String processException(Exception e) {
-        return ExceptionUtils.unwrap(e).getMessage();
-    }
-
-    private void putJobDetails(ObjectNode json, boolean redact) {
-        try {
-            json.put("jobId", jobId != null ? jobId.toString() : null);
-            putJobState(json, jobState, redact);
-        } catch (Throwable th) {
-            // ignore
-        }
-    }
-
-    private static void putJobState(ObjectNode json, JobState state, boolean 
redact) {
-        AMutableDateTime dateTime = new AMutableDateTime(0);
-        putTime(json, state.createTime, "jobCreateTime", dateTime);
-        putTime(json, state.startTime, "jobStartTime", dateTime);
-        putTime(json, state.endTime, "jobEndTime", dateTime);
-        long queueTime = (state.startTime > 0 ? state.startTime : 
System.currentTimeMillis()) - state.createTime;
-        json.put("jobQueueTime", TimeUnit.MILLISECONDS.toSeconds(queueTime));
-        json.put("jobStatus", String.valueOf(state.status));
-        json.put("jobRequiredCPUs", state.requiredCPUs);
-        json.put("jobRequiredMemory", state.requiredMemoryInBytes);
-        if (state.errorMsg != null) {
-            json.put("error", redact ? 
LogRedactionUtil.userData(state.errorMsg) : state.errorMsg);
-        }
-    }
-
-    private static void putTime(ObjectNode json, long time, String label, 
AMutableDateTime dateTime) {
-        if (time > 0) {
-            dateTime.setValue(time);
-            json.put(label, dateTime.toSimpleString());
-        }
-    }
-
-    static class JobState {
-        volatile long createTime;
-        volatile long startTime;
-        volatile long endTime;
-        volatile long requiredMemoryInBytes;
-        volatile int requiredCPUs;
-        volatile JobStatus status;
-        volatile String errorMsg;
-    }
 }
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/StatementTracker.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/StatementTracker.java
new file mode 100644
index 0000000..5123508
--- /dev/null
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/StatementTracker.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.translator;
+
+import static 
org.apache.hyracks.api.job.resource.IJobCapacityController.JobSubmissionStatus.QUEUE;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.asterix.common.api.IStatementTracker;
+import org.apache.asterix.om.base.AMutableDateTime;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
+import org.apache.hyracks.api.job.resource.IReadOnlyClusterCapacity;
+import org.apache.hyracks.api.util.ExceptionUtils;
+import org.apache.hyracks.util.LogRedactionUtil;
+import org.apache.hyracks.util.StorageUtil;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+public class StatementTracker implements IStatementTracker {
+    protected static final int MAX_STATEMENT_LENGTH =
+            StorageUtil.getIntSizeInBytes(64, 
StorageUtil.StorageUnit.KILOBYTE);
+    protected volatile JobState jobState;
+    protected volatile String statement;
+    protected volatile JobId jobId;
+    private volatile String plan; // can be null
+    private final String fullStatementId;
+
+    public StatementTracker(long statementId, String statement, String uuid) {
+        this.jobState = new JobState();
+        this.statement = statement;
+        this.fullStatementId = uuid + "-" + statementId;
+    }
+
+    public ObjectNode asJson(ObjectNode json, boolean redact) {
+        putJobDetails(json, redact);
+        json.put("fullStatementId", fullStatementId);
+        json.put("statement", redact ? LogRedactionUtil.statement(statement) : 
statement);
+        if (plan != null) {
+            json.put("plan", redact ? LogRedactionUtil.userData(plan) : plan);
+        }
+        return json;
+    }
+
+    public synchronized void setJobId(JobId jobId) {
+        this.jobId = jobId;
+    }
+
+    public synchronized JobId getJobId() {
+        return jobId;
+    }
+
+    public synchronized JobStatus getJobStatus() {
+        return jobState.status;
+    }
+
+    public void setPlan(String plan) {
+        if (plan != null) {
+            this.plan = plan.length() > StatementTracker.MAX_STATEMENT_LENGTH
+                    ? plan.substring(0, StatementTracker.MAX_STATEMENT_LENGTH) 
: plan;
+        }
+    }
+
+    public void jobCreated(JobId jobId, IReadOnlyClusterCapacity 
requiredClusterCapacity,
+            IJobCapacityController.JobSubmissionStatus status) {
+        jobState.createTime = System.currentTimeMillis();
+        jobState.status = status == QUEUE ? JobStatus.PENDING : 
JobStatus.RUNNING;
+        jobState.requiredCPUs = requiredClusterCapacity.getAggregatedCores();
+        jobState.requiredMemoryInBytes = 
requiredClusterCapacity.getAggregatedMemoryByteSize();
+    }
+
+    public void jobStarted(JobId jobId) {
+        jobState.startTime = System.currentTimeMillis();
+        jobState.status = JobStatus.RUNNING;
+    }
+
+    public void jobFinished(JobId jobId, JobStatus jobStatus, List<Exception> 
exceptions) {
+        jobState.endTime = System.currentTimeMillis();
+        jobState.status = jobStatus;
+        if (exceptions != null && !exceptions.isEmpty()) {
+            jobState.errorMsg = processException(exceptions.get(0));
+        }
+    }
+
+    protected String processException(Exception e) {
+        return ExceptionUtils.unwrap(e).getMessage();
+    }
+
+    private void putJobDetails(ObjectNode json, boolean redact) {
+        try {
+            json.put("jobId", jobId != null ? jobId.toString() : null);
+            putJobState(json, jobState, redact);
+        } catch (Throwable th) {
+            // ignore
+        }
+    }
+
+    private static void putJobState(ObjectNode json, JobState state, boolean 
redact) {
+        AMutableDateTime dateTime = new AMutableDateTime(0);
+        putTime(json, state.createTime, "jobCreateTime", dateTime);
+        putTime(json, state.startTime, "jobStartTime", dateTime);
+        putTime(json, state.endTime, "jobEndTime", dateTime);
+        long queueTime = (state.startTime > 0 ? state.startTime : 
System.currentTimeMillis()) - state.createTime;
+        json.put("jobQueueTime", TimeUnit.MILLISECONDS.toSeconds(queueTime));
+        json.put("jobStatus", String.valueOf(state.status));
+        json.put("jobRequiredCPUs", state.requiredCPUs);
+        json.put("jobRequiredMemory", state.requiredMemoryInBytes);
+        if (state.errorMsg != null) {
+            json.put("error", redact ? 
LogRedactionUtil.userData(state.errorMsg) : state.errorMsg);
+        }
+    }
+
+    private static void putTime(ObjectNode json, long time, String label, 
AMutableDateTime dateTime) {
+        if (time > 0) {
+            dateTime.setValue(time);
+            json.put(label, dateTime.toSimpleString());
+        }
+    }
+
+    static class JobState {
+        volatile long createTime;
+        volatile long startTime;
+        volatile long endTime;
+        volatile long requiredMemoryInBytes;
+        volatile int requiredCPUs;
+        volatile JobStatus status;
+        volatile String errorMsg;
+    }
+}
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index cd7af74..c24764e 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -244,6 +244,7 @@
 import org.apache.asterix.translator.SchedulableClientRequest;
 import org.apache.asterix.translator.SessionConfig;
 import org.apache.asterix.translator.SessionOutput;
+import org.apache.asterix.translator.StatementTracker;
 import org.apache.asterix.translator.TypeTranslator;
 import org.apache.asterix.translator.util.ValidateUtil;
 import org.apache.asterix.utils.DataverseUtil;
@@ -358,7 +359,7 @@
             IRequestParameters requestParameters, RequestExecutionContext 
requestExecutionContext) throws Exception {
         validateStatements(requestParameters);

-        trackRequest(requestParameters);
+        ClientRequest clientRequest = (ClientRequest) 
trackRequest(requestParameters);
         Counter resultSetIdCounter = new Counter(0);
         FileSplit outputFile = null;
         String threadName = Thread.currentThread().getName();
@@ -375,6 +376,9 @@

         try {
             for (Statement stmt : statements) {
+                long stmtId = statements.indexOf(stmt);
+                clientRequest.createAndTrackStatement(stmtId, stmt.toString(),
+                        requestParameters.getRequestReference().getUuid());
                 if (sessionConfig.is(SessionConfig.FORMAT_HTML)) {
                     
sessionOutput.out().println(ApiServlet.HTML_STATEMENT_SEPARATOR);
                 }
@@ -387,9 +391,8 @@
                 rewriteStatement(stmt, stmtRewriter, metadataProvider);
                 Statement.Kind kind = stmt.getKind();

-                ExecuteEachStatementResponseMessage 
eachStatementResponseMessage =
-                        createNewStatementResponseMessage(kind, 
requestParameters,
-                                
requestExecutionContext.isRequestReceivedByCc(), statements.indexOf(stmt));
+                ExecuteEachStatementResponseMessage 
eachStatementResponseMessage = createNewStatementResponseMessage(
+                        kind, requestParameters, 
requestExecutionContext.isRequestReceivedByCc(), stmtId);
                 StatementProperties statementProperties = 
eachStatementResponseMessage.getStatementProperties();
                 Stats stats = eachStatementResponseMessage.getStats();
                 IStatementExecutor.ResultMetadata outMetadata = 
eachStatementResponseMessage.getMetadata();
@@ -614,7 +617,6 @@
             this.appCtx.getRequestTracker().incrementFailedRequests();
             throw ex;
         } finally {
-            // async queries are completed after their job completes
             if (ResultDelivery.ASYNC != resultDelivery) {
                 
appCtx.getRequestTracker().complete(requestParameters.getRequestReference().getUuid());
             }
@@ -4241,7 +4243,8 @@
                 String reqId = 
requestParameters.getRequestReference().getUuid();
                 final IRequestTracker requestTracker = 
appCtx.getRequestTracker();
                 final ClientRequest clientRequest = (ClientRequest) 
requestTracker.get(reqId);
-                jobId = runTrackJob(hcc, spec, jobFlags, reqId, 
requestParameters.getClientContextId(), clientRequest);
+                jobId = runTrackJob(hcc, spec, jobFlags, reqId, 
requestParameters.getClientContextId(), clientRequest,
+                        responseMessage.getStatementId());
                 clientRequest.markCancellable();
                 String nameBefore = Thread.currentThread().getName();
                 try {
@@ -4444,7 +4447,8 @@
                     
jobSpec.setProperty(GlobalTxManager.GlOBAL_TX_PROPERTY_NAME, new GlobalTxInfo(
                             participatingDatasetIds, numParticipatingNodes, 
numParticipatingPartitions));
                 }
-                jobId = runTrackJob(hcc, jobSpec, jobFlags, reqId, 
reqParams.getClientContextId(), clientRequest);
+                jobId = runTrackJob(hcc, jobSpec, jobFlags, reqId, 
reqParams.getClientContextId(), clientRequest,
+                        responseMessage.getStatementId());
                 clientRequest.markCancellable();
                 String nameBefore = Thread.currentThread().getName();
                 try {
@@ -4517,7 +4521,7 @@
                 final IRequestTracker requestTracker = 
appCtx.getRequestTracker();
                 final ClientRequest clientRequest = (ClientRequest) 
requestTracker.get(reqId);
                 jobId = runTrackJob(hcc, jobSpec, jobFlags, reqId, 
requestParameters.getClientContextId(),
-                        clientRequest);
+                        clientRequest, responseMessage.getStatementId());
                 clientRequest.markCancellable();
                 String nameBefore = Thread.currentThread().getName();
                 try {
@@ -4546,11 +4550,12 @@
     }

     private static JobId runTrackJob(IHyracksClientConnection hcc, 
JobSpecification jobSpec, EnumSet<JobFlag> jobFlags,
-            String reqId, String clientCtxId, ClientRequest clientRequest) 
throws Exception {
+            String reqId, String clientCtxId, ClientRequest clientRequest, 
long stmtIdx) throws Exception {
         jobSpec.setRequestId(reqId);
         JobId jobId = JobUtils.runJobIfActive(hcc, jobSpec, jobFlags, false);
         LOGGER.info("Created job {} for uuid:{}, clientContextID:{}", jobId, 
reqId, clientCtxId);
-        clientRequest.setJobId(jobId);
+        StatementTracker tracker = 
clientRequest.getStatementTrackerForStmt(stmtIdx - 1);
+        clientRequest.associateJobWithStatementTracker(jobId, tracker);
         return jobId;
     }

@@ -5573,7 +5578,8 @@
                     responsePrinter.addResultPrinter(new 
ResultsPrinter(appCtx, resultReader,
                             metadataProvider.findOutputRecordType(), stats, 
sessionOutput));
                     responsePrinter.printResults();
-                }, requestParameters, cancellable, appCtx, metadataProvider, 
atomicStmt);
+                }, requestParameters, cancellable, appCtx, metadataProvider, 
atomicStmt,
+                        responseMessage.getStatementId());
                 break;
             case DEFERRED:
                 createAndRunJob(hcc, jobFlags, null, compiler, locker, 
resultDelivery, id -> {
@@ -5585,7 +5591,8 @@
                         
outMetadata.setResultSets(org.apache.commons.lang3.tuple.Triple.of(id, 
resultSetId,
                                 metadataProvider.findOutputRecordType()));
                     }
-                }, requestParameters, cancellable, appCtx, metadataProvider, 
atomicStmt);
+                }, requestParameters, cancellable, appCtx, metadataProvider, 
atomicStmt,
+                        responseMessage.getStatementId());
                 break;
             default:
                 break;
@@ -5610,7 +5617,7 @@
             stats.setJobProfile(resultMetadata.getJobProfile());
             
apiFramework.generateOptimizedLogicalPlanWithProfile(resultMetadata.getJobProfile());
         }
-        
clientRequest.setPlan(apiFramework.getExecutionPlans().getOptimizedLogicalPlan());
+        //        
clientRequest.setPlan(apiFramework.getExecutionPlans().getOptimizedLogicalPlan());
         stats.updateTotalWarningsCount(resultMetadata.getTotalWarningsCount());
         WarningUtil.mergeWarnings(resultMetadata.getWarnings(), 
warningCollector);
     }
@@ -5628,7 +5635,7 @@
                     printed.setTrue();
                     printed.notify();
                 }
-            }, requestParameters, cancellable, appCtx, metadataProvider, 
atomicStmt);
+            }, requestParameters, cancellable, appCtx, metadataProvider, 
atomicStmt, responseMessage.getStatementId());
         } catch (Exception e) {
             if (Objects.equals(JobId.INVALID, jobId.getValue())) {
                 // compilation failed
@@ -5665,7 +5672,7 @@
     private void createAndRunJob(IHyracksClientConnection hcc, 
EnumSet<JobFlag> jobFlags, Mutable<JobId> jId,
             IStatementCompiler compiler, IMetadataLocker locker, 
ResultDelivery resultDelivery, IResultPrinter printer,
             IRequestParameters requestParameters, boolean cancellable, 
ICcApplicationContext appCtx,
-            MetadataProvider metadataProvider, Statement atomicStatement) 
throws Exception {
+            MetadataProvider metadataProvider, Statement atomicStatement, long 
stmdId) throws Exception {
         String reqId = requestParameters.getRequestReference().getUuid();
         final IRequestTracker requestTracker = appCtx.getRequestTracker();
         final ClientRequest clientRequest = (ClientRequest) 
requestTracker.get(reqId);
@@ -5703,7 +5710,8 @@
                 }
             }

-            jobId = runTrackJob(hcc, jobSpec, jobFlags, reqId, 
requestParameters.getClientContextId(), clientRequest);
+            jobId = runTrackJob(hcc, jobSpec, jobFlags, reqId, 
requestParameters.getClientContextId(), clientRequest,
+                    stmdId);
             if (jId != null) {
                 jId.setValue(jobId);
             }
@@ -5842,9 +5850,10 @@
         }
     }

-    protected void trackRequest(IRequestParameters requestParameters) throws 
HyracksDataException {
+    protected IClientRequest trackRequest(IRequestParameters 
requestParameters) throws HyracksDataException {
         final IClientRequest clientRequest = 
appCtx.getReceptionist().requestReceived(requestParameters);
         this.appCtx.getRequestTracker().track(clientRequest);
+        return clientRequest;
     }

     protected void validateStatements(IRequestParameters requestParameters)
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
index 673611d..7662cb5 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
@@ -37,7 +37,6 @@
 import org.apache.asterix.translator.ClientRequest;
 import org.apache.hyracks.api.application.ICCServiceContext;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.http.api.IServletRequest;
 import org.apache.hyracks.http.api.IServletResponse;
@@ -80,7 +79,7 @@
         RequestParameters requestParameters = new 
RequestParameters(requestReference, "select 1", null, null, null,
                 null, null, "1", null, null, null, true);
         ClientRequest request = new ClientRequest(requestParameters);
-        request.setJobId(new JobId(1));
+        //        request.setJobId(new JobId(1));
         request.markCancellable();
         tracker.track(request);
         // Tests the case that query is in the map.
@@ -97,7 +96,7 @@
         requestParameters = new RequestParameters(requestReference2, "select 
1", null, null, null, null, null, "2",
                 null, null, null, true);
         ClientRequest request2 = new ClientRequest(requestParameters);
-        request2.setJobId(new JobId(2));
+        //        request2.setJobId(new JobId(2));
         request2.markCancellable();
         tracker.track(request2);
         Mockito.doThrow(new Exception()).when(mockHcc).cancelJob(any());
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
index ee518dd..17d6a4a 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
@@ -18,14 +18,8 @@
  */
 package org.apache.asterix.common.api;

-import java.util.List;
-
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobStatus;
-import org.apache.hyracks.api.job.resource.IJobCapacityController;
-import org.apache.hyracks.api.job.resource.IReadOnlyClusterCapacity;

 import com.fasterxml.jackson.databind.node.ObjectNode;

@@ -109,29 +103,12 @@
     ObjectNode asRedactedJson();

     /**
-     * Called when the job is created.
-     *
-     * @param jobId the job id
-     * @param requiredClusterCapacity the required resources by the job
-     * @param status the status of the job; whether it will be executed or 
queued
+     * @return the current statementTracker of the request
      */
-    void jobCreated(JobId jobId, IReadOnlyClusterCapacity 
requiredClusterCapacity,
-            IJobCapacityController.JobSubmissionStatus status);
+    IStatementTracker getStatementTrackerForStmt(long stmdId);

     /**
-     * Called when the job starts running.
-     *
-     * @param jobId the job id
+     * @return the current statementId of the request
      */
-    void jobStarted(JobId jobId);
-
-    /**
-     * Called when the job finishes.
-     *
-     * @param jobId the job id
-     * @param jobStatus the final job status
-     * @param exceptions exceptions encountered if any
-     */
-    void jobFinished(JobId jobId, JobStatus jobStatus, List<Exception> 
exceptions);
-
+    long getStmtId();
 }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IStatementTracker.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IStatementTracker.java
new file mode 100644
index 0000000..fcd3667
--- /dev/null
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IStatementTracker.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.common.api;
+
+import java.util.List;
+
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
+import org.apache.hyracks.api.job.resource.IReadOnlyClusterCapacity;
+
+public interface IStatementTracker {
+
+    /**
+     * Called when the job is created.
+     *
+     * @param jobId the job id
+     * @param requiredClusterCapacity the required resources by the job
+     * @param status the status of the job; whether it will be executed or 
queued
+     */
+    void jobCreated(JobId jobId, IReadOnlyClusterCapacity 
requiredClusterCapacity,
+            IJobCapacityController.JobSubmissionStatus status);
+
+    /**
+     * Called when the job starts running.
+     *
+     * @param jobId the job id
+     */
+    void jobStarted(JobId jobId);
+
+    /**
+     * Called when the job finishes.
+     *
+     * @param jobId the job id
+     * @param jobStatus the final job status
+     * @param exceptions exceptions encountered if any
+     */
+    void jobFinished(JobId jobId, JobStatus jobStatus, List<Exception> 
exceptions);
+}
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
index ef9154d..15b9865 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
@@ -30,6 +30,7 @@

 import org.apache.asterix.common.api.IClientRequest;
 import org.apache.asterix.common.api.IRequestTracker;
+import org.apache.asterix.common.api.IStatementTracker;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.HyracksException;
@@ -148,13 +149,14 @@
     }

     @Override
-    public void notifyJobCreation(JobId jobId, JobSpecification spec, 
IJobCapacityController.JobSubmissionStatus status)
-            throws HyracksException {
+    public void notifyJobCreation(JobId jobId, JobSpecification spec,
+            IJobCapacityController.JobSubmissionStatus status) {
         String requestId = spec.getRequestId();
         if (requestId != null) {
             IClientRequest request = getRequest(requestId);
-            if (request != null) {
-                request.jobCreated(jobId, spec.getRequiredClusterCapacity(), 
status);
+            IStatementTracker tracker = 
request.getStatementTrackerForStmt(request.getStmtId());
+            if (tracker != null) {
+                tracker.jobCreated(jobId, spec.getRequiredClusterCapacity(), 
status);
             }
         }
     }
@@ -164,20 +166,21 @@
         String requestId = spec.getRequestId();
         if (requestId != null) {
             IClientRequest request = getRequest(requestId);
-            if (request != null) {
-                request.jobStarted(jobId);
+            IStatementTracker tracker = 
request.getStatementTrackerForStmt(request.getStmtId());
+            if (tracker != null) {
+                tracker.jobStarted(jobId);
             }
         }
     }

     @Override
-    public void notifyJobFinish(JobId jobId, JobSpecification spec, JobStatus 
jobStatus, List<Exception> exceptions)
-            throws HyracksException {
+    public void notifyJobFinish(JobId jobId, JobSpecification spec, JobStatus 
jobStatus, List<Exception> exceptions) {
         String requestId = spec.getRequestId();
         if (requestId != null) {
             IClientRequest request = getRequest(requestId);
-            if (request != null) {
-                request.jobFinished(jobId, jobStatus, exceptions);
+            IStatementTracker tracker = 
request.getStatementTrackerForStmt(request.getStmtId());
+            if (tracker != null) {
+                tracker.jobFinished(jobId, jobStatus, exceptions);
             }
         }
     }

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19868
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I46e7aaf64fa040fbeaddd28403265937e4367c62
Gerrit-Change-Number: 19868
Gerrit-PatchSet: 1
Gerrit-Owner: Janhavi Tripurwar <[email protected]>
Gerrit-MessageType: newchange

Reply via email to