>From Ali Alsuliman <[email protected]>:

Ali Alsuliman has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21183?usp=email )


Change subject: async status result
......................................................................

async status result

Change-Id: Id85284a685d15fc10f315572bc0cd2abeabcb534
---
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryResultApiServlet.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/DiscardResultRequestMessage.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/AsyncRequestsAPIUtil.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
8 files changed, 65 insertions(+), 15 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/83/21183/1

diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
index 4e9a366..72c2ebc 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
@@ -183,7 +183,11 @@
         putTime(json, state.endTime, "jobEndTime", dateTime);
         long queueTime = 0;
         if (state.createTime > 0) {
-            queueTime = (state.startTime > 0 ? state.startTime : 
System.currentTimeMillis()) - state.createTime;
+            // startTime - createTime, if job has started
+            // endTime - createTime, if job has ended but not started (failed 
while in the queue, cancelled/timeout)
+            // currentTime - createTime, if job is still in the queue
+            queueTime = (state.startTime > 0 ? state.startTime
+                    : (state.endTime > 0 ? state.endTime : 
System.currentTimeMillis())) - state.createTime;
         }
         json.put("jobQueueTime", TimeUnit.MILLISECONDS.toSeconds(queueTime));
         json.put("jobStatus", String.valueOf(state.status));
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryResultApiServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryResultApiServlet.java
index 90978fb..fdb761d 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryResultApiServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryResultApiServlet.java
@@ -21,12 +21,14 @@
 import java.util.concurrent.ConcurrentMap;

 import org.apache.asterix.app.message.DiscardResultRequestMessage;
+import org.apache.asterix.app.result.ResultReader;
 import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.messaging.api.INCMessageBroker;
 import org.apache.asterix.utils.AsyncRequestsAPIUtil;
 import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.result.IResultMetadata;
 import org.apache.hyracks.api.result.ResultSetId;
 import org.apache.hyracks.http.api.IServletRequest;
 import org.apache.hyracks.http.api.IServletResponse;
@@ -44,12 +46,19 @@
     }

     @Override
+    protected IResultMetadata getResultMetadata(JobId jobId, ResultSetId 
resultSetId) throws Exception {
+        ResultReader resultReader = new ResultReader(getResultSet(), jobId, 
resultSetId);
+        return resultReader.getMetadata();
+    }
+
+    @Override
     protected void discardResult(String requestId, JobId jobId, ResultSetId 
resultSetId) throws HyracksDataException {
         INCServiceContext serviceCtx = (INCServiceContext) 
appCtx.getServiceContext();
         INCMessageBroker messageBroker = (INCMessageBroker) 
serviceCtx.getMessageBroker();
         DiscardResultRequestMessage request =
                 new DiscardResultRequestMessage(serviceCtx.getNodeId(), jobId, 
resultSetId, requestId);
         try {
+            //TODO: handle receive response
             messageBroker.sendMessageToPrimaryCC(request);
         } catch (Exception e) {
             throw HyracksDataException.create(e);
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
index eb0881d..d9d8374 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
@@ -40,9 +40,11 @@
 import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.result.IResultMetadata;
 import org.apache.hyracks.api.result.IResultSet;
 import org.apache.hyracks.api.result.ResultJobRecord;
 import org.apache.hyracks.api.result.ResultSetId;
+import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.http.api.IServletRequest;
 import org.apache.hyracks.http.api.IServletResponse;
 import org.apache.hyracks.http.server.utils.HttpUtil;
@@ -71,6 +73,12 @@
             response.setStatus(HttpResponseStatus.BAD_REQUEST);
             return;
         }
+        IResultMetadata resultMetadata = getResultMetadata(handle.getJobId(), 
handle.getResultSetId());
+        if (resultMetadata == null) {
+            LOGGER.log(Level.INFO, "No result metadata found for handle: 
\"{}\"", strHandle);
+            response.setStatus(HttpResponseStatus.NOT_FOUND);
+            return;
+        }
         try {
             discardResult(handle.getRequestId(), handle.getJobId(), 
handle.getResultSetId());
             response.setStatus(HttpResponseStatus.ACCEPTED);
@@ -88,6 +96,12 @@
         }
     }

+    protected IResultMetadata getResultMetadata(JobId jobId, ResultSetId 
resultSetId) throws Exception {
+        return ((ClusterControllerService) 
appCtx.getServiceContext().getControllerService())
+                .getResultDirectoryService().getResultMetadata(jobId, 
resultSetId);
+
+    }
+
     protected void discardResult(String requestId, JobId jobId, ResultSetId 
resultSetId) throws HyracksDataException {
         AsyncRequestsAPIUtil.discardResultPartitions((ICcApplicationContext) 
appCtx, jobId, resultSetId, requestId);
     }
@@ -107,11 +121,16 @@
         try {
             ResultJobRecord.Status status = resultReader.getStatus();
             final HttpResponseStatus httpStatus = 
ResultUtil.getHttpStatusFromResultStatus(status);
-            response.setStatus(httpStatus);
             if (httpStatus != HttpResponseStatus.OK) {
+                response.setStatus(httpStatus);
                 return;
             }
             ResultMetadata metadata = (ResultMetadata) 
resultReader.getMetadata();
+            if (metadata == null) {
+                response.setStatus(HttpResponseStatus.NOT_FOUND);
+                return;
+            }
+            response.setStatus(httpStatus);
             SessionOutput sessionOutput = initResponse(request, response, 
metadata.getFormat());
             processResults(handle, resultReader, sessionOutput, metadata, 
request);
         } catch (HyracksDataException e) {
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java
index f94a98b..9467fa0 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java
@@ -90,15 +90,23 @@
         printer.begin();
         printer.addHeaderPrinter(new StatusPrinter(resultStatus));
         printer.printHeaders();
+        ResultMetadata metadata = null;
+        if (uriMode || resultStatus == ResultStatus.SUCCESS) {
+            metadata = (ResultMetadata) resultReader.getMetadata();
+        }
         switch (resultStatus) {
-            case SUCCESS -> handleSuccessfulResult(request, strHandle, 
uriMode, printer, resultReader);
+            case SUCCESS -> {
+                if (metadata != null) {
+                    handleSuccessfulResult(request, strHandle, uriMode, 
printer, resultReader, metadata);
+                }
+            }
             case TIMEOUT -> handleTimeout(handle, executionState, printer, 
response);
             case FATAL, FAILED -> handleFailure(handle, executionState, 
printer, response, resultReaderStatus);
             case QUEUED, RUNNING -> {}
         }
         printer.printResults();
         if (uriMode) {
-            printMetricsAndFooters(printer, resultReader, request, 
handle.getRequestId(), handle.getJobId(), resultStatus);
+            printMetricsAndFooters(printer, metadata, request, 
handle.getRequestId(), handle.getJobId(), resultStatus);
         }
         printer.end();
         if (response.writer().checkError()) {
@@ -107,7 +115,7 @@
     }

     private void handleSuccessfulResult(IServletRequest request, String 
strHandle, boolean uriMode,
-            ResponsePrinter printer, ResultReader resultReader) throws 
HyracksDataException {
+            ResponsePrinter printer, ResultReader resultReader, ResultMetadata 
metadata) throws HyracksDataException {
         String servletPath = servletPath(request).replace("status", "result");
         String resHandle;
         if (uriMode) {
@@ -117,9 +125,7 @@
         }
         printer.addResultPrinter(new ResultHandlePrinter(resHandle));
         if (uriMode) {
-            ResultMetadata metadata = (ResultMetadata) 
resultReader.getMetadata();
-            printer.addResultPrinter(new ResultCountPrinter(
-                    ((ResultMetadata) 
(resultReader.getResultSetReader().getResultMetadata())).getResultCount()));
+            printer.addResultPrinter(new 
ResultCountPrinter(metadata.getResultCount()));
             printer.addResultPrinter(new 
PartitionInfoPrinter(resultReader.getResultSetReader().getResultRecords(),
                     resHandle, metadata.isResultSetOrdered()));
         }
@@ -172,9 +178,8 @@
         }
     }

-    private void printMetricsAndFooters(ResponsePrinter printer, ResultReader 
resultReader, IServletRequest request,
+    private void printMetricsAndFooters(ResponsePrinter printer, 
ResultMetadata metadata, IServletRequest request,
             String requestId, JobId jobId, ResultStatus status) throws 
HyracksDataException {
-        ResultMetadata metadata = (ResultMetadata) resultReader.getMetadata();
         if (metadata != null && status != ResultStatus.QUEUED && status != 
ResultStatus.RUNNING) {
             printMetricsWithResultMetadata(printer, request, metadata);
         } else {
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/DiscardResultRequestMessage.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/DiscardResultRequestMessage.java
index 7ca897a..79ddbca 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/DiscardResultRequestMessage.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/DiscardResultRequestMessage.java
@@ -24,6 +24,7 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.result.ResultSetId;
+import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;

@@ -45,6 +46,11 @@

     @Override
     public void handle(ICcApplicationContext appCtx) throws 
HyracksDataException {
-        AsyncRequestsAPIUtil.discardResultPartitions((ICcApplicationContext) 
appCtx, jobId, resultSetId, requestId);
+        try {
+            
AsyncRequestsAPIUtil.discardResultPartitions((ICcApplicationContext) appCtx, 
jobId, resultSetId, requestId);
+        } catch (Throwable th) {
+            // catching Throwable to prevent any unexpected exception from 
crashing the CC
+            LOGGER.log(Level.WARN, "unexpected exception while processing 
discard result request message", th);
+        }
     }
 }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/AsyncRequestsAPIUtil.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/AsyncRequestsAPIUtil.java
index 88184fa..6395da9 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/AsyncRequestsAPIUtil.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/AsyncRequestsAPIUtil.java
@@ -55,7 +55,7 @@
 public class AsyncRequestsAPIUtil {

     private static final Logger LOGGER = LogManager.getLogger();
-    private static final long RESULT_PARTITIONS_FETCH_TIMEOUT_MILLIS = 
TimeUnit.SECONDS.toMillis(1);
+    private static final long RESULT_PARTITIONS_FETCH_TIMEOUT_MILLIS = 
TimeUnit.SECONDS.toMillis(10);
     public static final long NC_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(5);

     /**
@@ -71,7 +71,7 @@
             String requestId) throws HyracksDataException {
         IResultDirectoryService resultDirectoryService =
                 ((ClusterControllerService) 
appCtx.getServiceContext().getControllerService())
-                        .getResultDirectoryService();;
+                        .getResultDirectoryService();
         // Check if result is in a valid state for discarding
         ResultJobRecord.Status status = 
resultDirectoryService.getResultStatus(jobId, resultSetId);
         if (status.getState() != ResultJobRecord.State.SUCCESS) {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java
index 73440b1..4308d64 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java
@@ -25,6 +25,7 @@

 import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobStatus;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;

@@ -110,8 +111,14 @@
         updateState(State.RUNNING);
     }

-    public void finish() {
+    public void finish(JobStatus jobStatus) {
         jobEndTime = System.nanoTime();
+        if (jobStatus != null && (status.state == State.RUNNING || 
status.state == State.IDLE)) {
+            switch (jobStatus) {
+                case TERMINATED -> updateState(State.SUCCESS);
+                case FAILURE, FAILURE_BEFORE_EXECUTION ->  
updateState(State.FAILED);
+            }
+        }
     }

     public long getJobDuration() {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
index 049f1c3..93ee281 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
@@ -108,7 +108,7 @@
             if (resultJobRecord == null) {
                 return;
             }
-            resultJobRecord.finish();
+            resultJobRecord.finish(jobStatus);
             jobResultCallback.completed(jobId, resultJobRecord);
         } else {
             reportJobFailure(jobId, exceptions);

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21183?usp=email
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings?usp=email

Gerrit-MessageType: newchange
Gerrit-Project: asterixdb
Gerrit-Branch: lumina
Gerrit-Change-Id: Id85284a685d15fc10f315572bc0cd2abeabcb534
Gerrit-Change-Number: 21183
Gerrit-PatchSet: 1
Gerrit-Owner: Ali Alsuliman <[email protected]>

Reply via email to