>From Peeyush Gupta <[email protected]>:
Peeyush Gupta has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20768?usp=email )
Change subject: WIP: async api fixes
......................................................................
WIP: async api fixes
Change-Id: I9ca684c1a90403ba2813cc858be76be69b8a4a6d
---
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
3 files changed, 29 insertions(+), 9 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/68/20768/1
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
index 5ad2d1c..ffe780e 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
@@ -130,7 +130,11 @@
}
}
// if the was no error, we can set the result status to success
- executionState.setStatus(ResultStatus.SUCCESS, HttpResponseStatus.OK);
+ if (delivery == IStatementExecutor.ResultDelivery.ASYNC) {
+ executionState.setStatus(ResultStatus.SUCCESS,
HttpResponseStatus.ACCEPTED);
+ } else {
+ executionState.setStatus(ResultStatus.SUCCESS,
HttpResponseStatus.OK);
+ }
updateStatsFromCC(stats, responseMsg);
if (param.isSignature() && delivery !=
IStatementExecutor.ResultDelivery.ASYNC && !param.isParseOnly()) {
responsePrinter.addResultPrinter(SignaturePrinter.newInstance(responseMsg.getExecutionPlans()));
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
index c4956bd..b2fb84c 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
@@ -258,7 +258,11 @@
executeStatement(request, requestRef, statement,
sessionOutput, resultProperties, statementProperties,
stats, param, executionState,
param.getOptionalParams(), statementParams, responsePrinter,
warnings);
- executionState.setStatus(ResultStatus.SUCCESS,
HttpResponseStatus.OK);
+ if (delivery == ResultDelivery.ASYNC) {
+ executionState.setStatus(ResultStatus.SUCCESS,
HttpResponseStatus.ACCEPTED);
+ } else {
+ executionState.setStatus(ResultStatus.SUCCESS,
HttpResponseStatus.OK);
+ }
}
errorCount = 0;
} catch (Exception |
org.apache.asterix.lang.sqlpp.parser.TokenMgrError e) {
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index c3f7c55..3f3de9a 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -65,10 +65,8 @@
import org.apache.asterix.app.active.ActiveNotificationHandler;
import org.apache.asterix.app.active.FeedEventsListener;
import org.apache.asterix.app.external.ExternalLibraryJobUtils;
-import org.apache.asterix.app.result.ExecutionError;
import org.apache.asterix.app.result.ResultHandle;
import org.apache.asterix.app.result.ResultReader;
-import org.apache.asterix.app.result.fields.ErrorsPrinter;
import org.apache.asterix.app.result.fields.ResultHandlePrinter;
import org.apache.asterix.app.result.fields.ResultsPrinter;
import org.apache.asterix.app.result.fields.StatusPrinter;
@@ -5600,13 +5598,26 @@
switch (resultDelivery) {
case ASYNC:
MutableBoolean printed = new MutableBoolean(false);
- executorService.submit(() -> asyncCreateAndRunJob(hcc,
compiler, locker, resultDelivery,
- requestParameters, cancellable, resultSetId, printed,
metadataProvider, atomicStmt, jobKind));
+ Future<?> f = executorService.submit(() -> {
+ try {
+ asyncCreateAndRunJob(hcc, compiler, locker,
resultDelivery, requestParameters, cancellable,
+ resultSetId, printed, metadataProvider,
atomicStmt, jobKind);
+ } catch (Exception e) {
+ throw new RuntimeException(e.getCause());
+ }
+ });
synchronized (printed) {
while (!printed.booleanValue()) {
printed.wait();
}
}
+ if (f.isDone()) {
+ try {
+ f.get();
+ } catch (Exception e) {
+ throw
HyracksDataException.create(e.getCause().getCause());
+ }
+ }
break;
case IMMEDIATE:
createAndRunJob(hcc, jobFlags, null, compiler, locker,
resultDelivery, id -> {
@@ -5665,7 +5676,7 @@
private void asyncCreateAndRunJob(IHyracksClientConnection hcc,
IStatementCompiler compiler, IMetadataLocker locker,
ResultDelivery resultDelivery, IRequestParameters
requestParameters, boolean cancellable,
ResultSetId resultSetId, MutableBoolean printed, MetadataProvider
metadataProvider, Statement atomicStmt,
- JobKind jobKind) {
+ JobKind jobKind) throws Exception {
Mutable<JobId> jobId = new MutableObject<>(JobId.INVALID);
final CompletableFuture<JobId> jobIdFuture = new CompletableFuture<>();
Future<?> jobSubmitFuture = executorService.submit(() -> {
@@ -5698,8 +5709,10 @@
} catch (ExecutionException e) {
Throwable cause = e.getCause() != null ? e.getCause() : e;
handleAsyncJobException(cause, jobId.get(), resultDelivery);
+ throw HyracksDataException.create(e.getCause().getCause());
} catch (Exception e) {
handleAsyncJobException(e, jobId.get(), resultDelivery);
+ throw e;
} finally {
synchronized (printed) {
if (printed.isFalse()) {
@@ -5722,8 +5735,7 @@
private void handleAsyncJobException(Throwable e, JobId jobId,
ResultDelivery resultDelivery) {
if (Objects.equals(JobId.INVALID, jobId)) {
// compilation failed
- responsePrinter.addResultPrinter(new
StatusPrinter(AbstractQueryApiServlet.ResultStatus.FAILED));
- responsePrinter.addResultPrinter(new
ErrorsPrinter(Collections.singletonList(ExecutionError.of(e))));
+ responsePrinter.addResultPrinter(new
StatusPrinter(AbstractQueryApiServlet.ResultStatus.FATAL));
try {
responsePrinter.printResults();
} catch (HyracksDataException ex) {
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20768?usp=email
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings?usp=email
Gerrit-MessageType: newchange
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I9ca684c1a90403ba2813cc858be76be69b8a4a6d
Gerrit-Change-Number: 20768
Gerrit-PatchSet: 1
Gerrit-Owner: Peeyush Gupta <[email protected]>