>From Ali Alsuliman <[email protected]>:
Ali Alsuliman has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21183?usp=email )
Change subject: async status result
......................................................................
async status result
Change-Id: Id85284a685d15fc10f315572bc0cd2abeabcb534
---
M
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryResultApiServlet.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/DiscardResultRequestMessage.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/AsyncRequestsAPIUtil.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
8 files changed, 65 insertions(+), 15 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/83/21183/1
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
index 4e9a366..72c2ebc 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
@@ -183,7 +183,11 @@
putTime(json, state.endTime, "jobEndTime", dateTime);
long queueTime = 0;
if (state.createTime > 0) {
- queueTime = (state.startTime > 0 ? state.startTime :
System.currentTimeMillis()) - state.createTime;
+ // startTime - createTime, if job has started
+ // endTime - createTime, if job has ended but not started (failed
while in the queue, cancelled/timeout)
+ // currentTime - createTime, if job is still in the queue
+ queueTime = (state.startTime > 0 ? state.startTime
+ : (state.endTime > 0 ? state.endTime :
System.currentTimeMillis())) - state.createTime;
}
json.put("jobQueueTime", TimeUnit.MILLISECONDS.toSeconds(queueTime));
json.put("jobStatus", String.valueOf(state.status));
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryResultApiServlet.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryResultApiServlet.java
index 90978fb..fdb761d 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryResultApiServlet.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryResultApiServlet.java
@@ -21,12 +21,14 @@
import java.util.concurrent.ConcurrentMap;
import org.apache.asterix.app.message.DiscardResultRequestMessage;
+import org.apache.asterix.app.result.ResultReader;
import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.messaging.api.INCMessageBroker;
import org.apache.asterix.utils.AsyncRequestsAPIUtil;
import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.result.IResultMetadata;
import org.apache.hyracks.api.result.ResultSetId;
import org.apache.hyracks.http.api.IServletRequest;
import org.apache.hyracks.http.api.IServletResponse;
@@ -44,12 +46,19 @@
}
@Override
+ protected IResultMetadata getResultMetadata(JobId jobId, ResultSetId
resultSetId) throws Exception {
+ ResultReader resultReader = new ResultReader(getResultSet(), jobId,
resultSetId);
+ return resultReader.getMetadata();
+ }
+
+ @Override
protected void discardResult(String requestId, JobId jobId, ResultSetId
resultSetId) throws HyracksDataException {
INCServiceContext serviceCtx = (INCServiceContext)
appCtx.getServiceContext();
INCMessageBroker messageBroker = (INCMessageBroker)
serviceCtx.getMessageBroker();
DiscardResultRequestMessage request =
new DiscardResultRequestMessage(serviceCtx.getNodeId(), jobId,
resultSetId, requestId);
try {
+ //TODO: handle receive response
messageBroker.sendMessageToPrimaryCC(request);
} catch (Exception e) {
throw HyracksDataException.create(e);
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
index eb0881d..d9d8374 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
@@ -40,9 +40,11 @@
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.result.IResultMetadata;
import org.apache.hyracks.api.result.IResultSet;
import org.apache.hyracks.api.result.ResultJobRecord;
import org.apache.hyracks.api.result.ResultSetId;
+import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.http.api.IServletRequest;
import org.apache.hyracks.http.api.IServletResponse;
import org.apache.hyracks.http.server.utils.HttpUtil;
@@ -71,6 +73,12 @@
response.setStatus(HttpResponseStatus.BAD_REQUEST);
return;
}
+ IResultMetadata resultMetadata = getResultMetadata(handle.getJobId(),
handle.getResultSetId());
+ if (resultMetadata == null) {
+ LOGGER.log(Level.INFO, "No result metadata found for handle:
\"{}\"", strHandle);
+ response.setStatus(HttpResponseStatus.NOT_FOUND);
+ return;
+ }
try {
discardResult(handle.getRequestId(), handle.getJobId(),
handle.getResultSetId());
response.setStatus(HttpResponseStatus.ACCEPTED);
@@ -88,6 +96,12 @@
}
}
+ protected IResultMetadata getResultMetadata(JobId jobId, ResultSetId
resultSetId) throws Exception {
+ return ((ClusterControllerService)
appCtx.getServiceContext().getControllerService())
+ .getResultDirectoryService().getResultMetadata(jobId,
resultSetId);
+
+ }
+
protected void discardResult(String requestId, JobId jobId, ResultSetId
resultSetId) throws HyracksDataException {
AsyncRequestsAPIUtil.discardResultPartitions((ICcApplicationContext)
appCtx, jobId, resultSetId, requestId);
}
@@ -107,11 +121,16 @@
try {
ResultJobRecord.Status status = resultReader.getStatus();
final HttpResponseStatus httpStatus =
ResultUtil.getHttpStatusFromResultStatus(status);
- response.setStatus(httpStatus);
if (httpStatus != HttpResponseStatus.OK) {
+ response.setStatus(httpStatus);
return;
}
ResultMetadata metadata = (ResultMetadata)
resultReader.getMetadata();
+ if (metadata == null) {
+ response.setStatus(HttpResponseStatus.NOT_FOUND);
+ return;
+ }
+ response.setStatus(httpStatus);
SessionOutput sessionOutput = initResponse(request, response,
metadata.getFormat());
processResults(handle, resultReader, sessionOutput, metadata,
request);
} catch (HyracksDataException e) {
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java
index f94a98b..9467fa0 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java
@@ -90,15 +90,23 @@
printer.begin();
printer.addHeaderPrinter(new StatusPrinter(resultStatus));
printer.printHeaders();
+ ResultMetadata metadata = null;
+ if (uriMode || resultStatus == ResultStatus.SUCCESS) {
+ metadata = (ResultMetadata) resultReader.getMetadata();
+ }
switch (resultStatus) {
- case SUCCESS -> handleSuccessfulResult(request, strHandle,
uriMode, printer, resultReader);
+ case SUCCESS -> {
+ if (metadata != null) {
+ handleSuccessfulResult(request, strHandle, uriMode,
printer, resultReader, metadata);
+ }
+ }
case TIMEOUT -> handleTimeout(handle, executionState, printer,
response);
case FATAL, FAILED -> handleFailure(handle, executionState,
printer, response, resultReaderStatus);
case QUEUED, RUNNING -> {}
}
printer.printResults();
if (uriMode) {
- printMetricsAndFooters(printer, resultReader, request,
handle.getRequestId(), handle.getJobId(), resultStatus);
+ printMetricsAndFooters(printer, metadata, request,
handle.getRequestId(), handle.getJobId(), resultStatus);
}
printer.end();
if (response.writer().checkError()) {
@@ -107,7 +115,7 @@
}
private void handleSuccessfulResult(IServletRequest request, String
strHandle, boolean uriMode,
- ResponsePrinter printer, ResultReader resultReader) throws
HyracksDataException {
+ ResponsePrinter printer, ResultReader resultReader, ResultMetadata
metadata) throws HyracksDataException {
String servletPath = servletPath(request).replace("status", "result");
String resHandle;
if (uriMode) {
@@ -117,9 +125,7 @@
}
printer.addResultPrinter(new ResultHandlePrinter(resHandle));
if (uriMode) {
- ResultMetadata metadata = (ResultMetadata)
resultReader.getMetadata();
- printer.addResultPrinter(new ResultCountPrinter(
- ((ResultMetadata)
(resultReader.getResultSetReader().getResultMetadata())).getResultCount()));
+ printer.addResultPrinter(new
ResultCountPrinter(metadata.getResultCount()));
printer.addResultPrinter(new
PartitionInfoPrinter(resultReader.getResultSetReader().getResultRecords(),
resHandle, metadata.isResultSetOrdered()));
}
@@ -172,9 +178,8 @@
}
}
- private void printMetricsAndFooters(ResponsePrinter printer, ResultReader
resultReader, IServletRequest request,
+ private void printMetricsAndFooters(ResponsePrinter printer,
ResultMetadata metadata, IServletRequest request,
String requestId, JobId jobId, ResultStatus status) throws
HyracksDataException {
- ResultMetadata metadata = (ResultMetadata) resultReader.getMetadata();
if (metadata != null && status != ResultStatus.QUEUED && status !=
ResultStatus.RUNNING) {
printMetricsWithResultMetadata(printer, request, metadata);
} else {
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/DiscardResultRequestMessage.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/DiscardResultRequestMessage.java
index 7ca897a..79ddbca 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/DiscardResultRequestMessage.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/DiscardResultRequestMessage.java
@@ -24,6 +24,7 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.result.ResultSetId;
+import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -45,6 +46,11 @@
@Override
public void handle(ICcApplicationContext appCtx) throws
HyracksDataException {
- AsyncRequestsAPIUtil.discardResultPartitions((ICcApplicationContext)
appCtx, jobId, resultSetId, requestId);
+ try {
+
AsyncRequestsAPIUtil.discardResultPartitions((ICcApplicationContext) appCtx,
jobId, resultSetId, requestId);
+ } catch (Throwable th) {
+ // catching Throwable to prevent any unexpected exception from
crashing the CC
+ LOGGER.log(Level.WARN, "unexpected exception while processing
discard result request message", th);
+ }
}
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/AsyncRequestsAPIUtil.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/AsyncRequestsAPIUtil.java
index 88184fa..6395da9 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/AsyncRequestsAPIUtil.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/AsyncRequestsAPIUtil.java
@@ -55,7 +55,7 @@
public class AsyncRequestsAPIUtil {
private static final Logger LOGGER = LogManager.getLogger();
- private static final long RESULT_PARTITIONS_FETCH_TIMEOUT_MILLIS =
TimeUnit.SECONDS.toMillis(1);
+ private static final long RESULT_PARTITIONS_FETCH_TIMEOUT_MILLIS =
TimeUnit.SECONDS.toMillis(10);
public static final long NC_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(5);
/**
@@ -71,7 +71,7 @@
String requestId) throws HyracksDataException {
IResultDirectoryService resultDirectoryService =
((ClusterControllerService)
appCtx.getServiceContext().getControllerService())
- .getResultDirectoryService();;
+ .getResultDirectoryService();
// Check if result is in a valid state for discarding
ResultJobRecord.Status status =
resultDirectoryService.getResultStatus(jobId, resultSetId);
if (status.getState() != ResultJobRecord.State.SUCCESS) {
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java
index 73440b1..4308d64 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java
@@ -25,6 +25,7 @@
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobStatus;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -110,8 +111,14 @@
updateState(State.RUNNING);
}
- public void finish() {
+ public void finish(JobStatus jobStatus) {
jobEndTime = System.nanoTime();
+ if (jobStatus != null && (status.state == State.RUNNING ||
status.state == State.IDLE)) {
+ switch (jobStatus) {
+ case TERMINATED -> updateState(State.SUCCESS);
+ case FAILURE, FAILURE_BEFORE_EXECUTION ->
updateState(State.FAILED);
+ }
+ }
}
public long getJobDuration() {
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
index 049f1c3..93ee281 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
@@ -108,7 +108,7 @@
if (resultJobRecord == null) {
return;
}
- resultJobRecord.finish();
+ resultJobRecord.finish(jobStatus);
jobResultCallback.completed(jobId, resultJobRecord);
} else {
reportJobFailure(jobId, exceptions);
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21183?usp=email
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings?usp=email
Gerrit-MessageType: newchange
Gerrit-Project: asterixdb
Gerrit-Branch: lumina
Gerrit-Change-Id: Id85284a685d15fc10f315572bc0cd2abeabcb534
Gerrit-Change-Number: 21183
Gerrit-PatchSet: 1
Gerrit-Owner: Ali Alsuliman <[email protected]>