>From Ali Alsuliman <[email protected]>: Ali Alsuliman has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21183?usp=email )
Change subject: [ASTERIXDB-3649][API] Async API enhancements ...................................................................... [ASTERIXDB-3649][API] Async API enhancements - user model changes: no - storage format changes: no - interface changes: no Details: Certain statements like COPY can be tracked in async mode but they don't produce a result. This needs to be handled. - Update ResultJobRecord of jobs with no ResultMetadata when they are finished (otherwise, status remains RUNNING even if finished, calling StatusApi will show them as RUNNING) - Fixed GET StatusApi to check if ResultMetadata exists. - Fixed GET ResultApi to check if ResultMetadata exists. - Increased timeout of result fetch when discarding to 10 seconds. - Fixed jobQueueTime to take care of cases where a job have ended without it starting. Ext-ref: MB-69765 Change-Id: Id85284a685d15fc10f315572bc0cd2abeabcb534 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21183 Integration-Tests: Jenkins <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: Ali Alsuliman <[email protected]> --- M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryResultApiServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/DiscardResultRequestMessage.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/AsyncRequestsAPIUtil.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java 8 files changed, 54 insertions(+), 15 deletions(-) Approvals: Jenkins: Verified; Verified Ali Alsuliman: Looks good to me, but someone else must approve Murtadha Hubail: Looks good to me, approved Anon. E. Moose #1000171: diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java index 4e9a366..72c2ebc 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java @@ -183,7 +183,11 @@ putTime(json, state.endTime, "jobEndTime", dateTime); long queueTime = 0; if (state.createTime > 0) { - queueTime = (state.startTime > 0 ? state.startTime : System.currentTimeMillis()) - state.createTime; + // startTime - createTime, if job has started + // endTime - createTime, if job has ended but not started (failed while in the queue, cancelled/timeout) + // currentTime - createTime, if job is still in the queue + queueTime = (state.startTime > 0 ? state.startTime + : (state.endTime > 0 ? state.endTime : System.currentTimeMillis())) - state.createTime; } json.put("jobQueueTime", TimeUnit.MILLISECONDS.toSeconds(queueTime)); json.put("jobStatus", String.valueOf(state.status)); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryResultApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryResultApiServlet.java index 90978fb..1340893 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryResultApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryResultApiServlet.java @@ -50,6 +50,7 @@ DiscardResultRequestMessage request = new DiscardResultRequestMessage(serviceCtx.getNodeId(), jobId, resultSetId, requestId); try { + //TODO: handle receive response messageBroker.sendMessageToPrimaryCC(request); } catch (Exception e) { throw HyracksDataException.create(e); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java index eb0881d..97a4b7f 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java @@ -107,11 +107,16 @@ try { ResultJobRecord.Status status = resultReader.getStatus(); final HttpResponseStatus httpStatus = ResultUtil.getHttpStatusFromResultStatus(status); - response.setStatus(httpStatus); if (httpStatus != HttpResponseStatus.OK) { + response.setStatus(httpStatus); return; } ResultMetadata metadata = (ResultMetadata) resultReader.getMetadata(); + if (metadata == null) { + response.setStatus(HttpResponseStatus.NOT_FOUND); + return; + } + response.setStatus(httpStatus); SessionOutput sessionOutput = initResponse(request, response, metadata.getFormat()); processResults(handle, resultReader, sessionOutput, metadata, request); } catch (HyracksDataException e) { diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java index f94a98b..9467fa0 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java @@ -90,15 +90,23 @@ printer.begin(); printer.addHeaderPrinter(new StatusPrinter(resultStatus)); printer.printHeaders(); + ResultMetadata metadata = null; + if (uriMode || resultStatus == ResultStatus.SUCCESS) { + metadata = (ResultMetadata) resultReader.getMetadata(); + } switch (resultStatus) { - case SUCCESS -> handleSuccessfulResult(request, strHandle, uriMode, printer, resultReader); + case SUCCESS -> { + if (metadata != null) { + handleSuccessfulResult(request, strHandle, uriMode, printer, resultReader, metadata); + } + } case TIMEOUT -> handleTimeout(handle, executionState, printer, response); case FATAL, FAILED -> handleFailure(handle, executionState, printer, response, resultReaderStatus); case QUEUED, RUNNING -> {} } printer.printResults(); if (uriMode) { - printMetricsAndFooters(printer, resultReader, request, handle.getRequestId(), handle.getJobId(), resultStatus); + printMetricsAndFooters(printer, metadata, request, handle.getRequestId(), handle.getJobId(), resultStatus); } printer.end(); if (response.writer().checkError()) { @@ -107,7 +115,7 @@ } private void handleSuccessfulResult(IServletRequest request, String strHandle, boolean uriMode, - ResponsePrinter printer, ResultReader resultReader) throws HyracksDataException { + ResponsePrinter printer, ResultReader resultReader, ResultMetadata metadata) throws HyracksDataException { String servletPath = servletPath(request).replace("status", "result"); String resHandle; if (uriMode) { @@ -117,9 +125,7 @@ } printer.addResultPrinter(new ResultHandlePrinter(resHandle)); if (uriMode) { - ResultMetadata metadata = (ResultMetadata) resultReader.getMetadata(); - printer.addResultPrinter(new ResultCountPrinter( - ((ResultMetadata) (resultReader.getResultSetReader().getResultMetadata())).getResultCount())); + printer.addResultPrinter(new ResultCountPrinter(metadata.getResultCount())); printer.addResultPrinter(new PartitionInfoPrinter(resultReader.getResultSetReader().getResultRecords(), resHandle, metadata.isResultSetOrdered())); } @@ -172,9 +178,8 @@ } } - private void printMetricsAndFooters(ResponsePrinter printer, ResultReader resultReader, IServletRequest request, + private void printMetricsAndFooters(ResponsePrinter printer, ResultMetadata metadata, IServletRequest request, String requestId, JobId jobId, ResultStatus status) throws HyracksDataException { - ResultMetadata metadata = (ResultMetadata) resultReader.getMetadata(); if (metadata != null && status != ResultStatus.QUEUED && status != ResultStatus.RUNNING) { printMetricsWithResultMetadata(printer, request, metadata); } else { diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/DiscardResultRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/DiscardResultRequestMessage.java index 7ca897a..79ddbca 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/DiscardResultRequestMessage.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/DiscardResultRequestMessage.java @@ -24,6 +24,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.result.ResultSetId; +import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -45,6 +46,11 @@ @Override public void handle(ICcApplicationContext appCtx) throws HyracksDataException { - AsyncRequestsAPIUtil.discardResultPartitions((ICcApplicationContext) appCtx, jobId, resultSetId, requestId); + try { + AsyncRequestsAPIUtil.discardResultPartitions((ICcApplicationContext) appCtx, jobId, resultSetId, requestId); + } catch (Throwable th) { + // catching Throwable to prevent any unexpected exception from crashing the CC + LOGGER.log(Level.WARN, "unexpected exception while processing discard result request message", th); + } } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/AsyncRequestsAPIUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/AsyncRequestsAPIUtil.java index 88184fa..b92ec57 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/AsyncRequestsAPIUtil.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/AsyncRequestsAPIUtil.java @@ -39,6 +39,7 @@ import org.apache.hyracks.api.application.INCServiceContext; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.api.result.IResultMetadata; import org.apache.hyracks.api.result.ResultDirectoryRecord; import org.apache.hyracks.api.result.ResultJobRecord; import org.apache.hyracks.api.result.ResultSetId; @@ -55,7 +56,7 @@ public class AsyncRequestsAPIUtil { private static final Logger LOGGER = LogManager.getLogger(); - private static final long RESULT_PARTITIONS_FETCH_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(1); + private static final long RESULT_PARTITIONS_FETCH_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(10); public static final long NC_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(5); /** @@ -71,7 +72,7 @@ String requestId) throws HyracksDataException { IResultDirectoryService resultDirectoryService = ((ClusterControllerService) appCtx.getServiceContext().getControllerService()) - .getResultDirectoryService();; + .getResultDirectoryService(); // Check if result is in a valid state for discarding ResultJobRecord.Status status = resultDirectoryService.getResultStatus(jobId, resultSetId); if (status.getState() != ResultJobRecord.State.SUCCESS) { @@ -79,6 +80,16 @@ resultSetId, requestId, status); return; } + IResultMetadata resultMetadata = resultDirectoryService.getResultMetadata(jobId, resultSetId); + if (resultMetadata == null) { + LOGGER.debug( + "Result metadata not found for job {}, result set {}, request id {}. Removing async req tracking info", + jobId, resultSetId, requestId); + if (requestId != null) { + appCtx.getRequestTracker().removeAsyncOrDeferredRequest(requestId); + } + return; + } // Send discard result messages to all nodes containing result partitions Set<String> nodeIds = fetchResultNodeIds(resultDirectoryService, jobId, resultSetId); diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java index 73440b1..4308d64 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java @@ -25,6 +25,7 @@ import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.JobStatus; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -110,8 +111,14 @@ updateState(State.RUNNING); } - public void finish() { + public void finish(JobStatus jobStatus) { jobEndTime = System.nanoTime(); + if (jobStatus != null && (status.state == State.RUNNING || status.state == State.IDLE)) { + switch (jobStatus) { + case TERMINATED -> updateState(State.SUCCESS); + case FAILURE, FAILURE_BEFORE_EXECUTION -> updateState(State.FAILED); + } + } } public long getJobDuration() { diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java index 049f1c3..93ee281 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java @@ -108,7 +108,7 @@ if (resultJobRecord == null) { return; } - resultJobRecord.finish(); + resultJobRecord.finish(jobStatus); jobResultCallback.completed(jobId, resultJobRecord); } else { reportJobFailure(jobId, exceptions); -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21183?usp=email To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings?usp=email Gerrit-MessageType: merged Gerrit-Project: asterixdb Gerrit-Branch: lumina Gerrit-Change-Id: Id85284a685d15fc10f315572bc0cd2abeabcb534 Gerrit-Change-Number: 21183 Gerrit-PatchSet: 3 Gerrit-Owner: Ali Alsuliman <[email protected]> Gerrit-Reviewer: Ali Alsuliman <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]>
