>From Ali Alsuliman <[email protected]>:
Ali Alsuliman has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21236?usp=email )
Change subject: async INSERT/UPSERT/UPDATE
......................................................................
async INSERT/UPSERT/UPDATE
Change-Id: I76c16fe556eea53e4e2a97256c7b4cbc34ace90f
---
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
1 file changed, 35 insertions(+), 69 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/36/21236/1
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index a9320f2..ff10403 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -4439,54 +4439,8 @@
String reqId = reqParams.getRequestReference().getUuid();
IRequestTracker requestTracker = appCtx.getRequestTracker();
ClientRequest clientRequest = (ClientRequest)
requestTracker.get(reqId);
- if (stmtInsertUpsert.getReturnExpression() != null) {
- deliverResult(hcc, resultSet, compiler, metadataProvider, locker,
resultDelivery, outMetadata, stats,
- reqParams, true, stmt, clientRequest, JobKind.DML);
- } else {
- locker.lock();
- JobId jobId = null;
- boolean atomic = false;
- try {
- final JobSpecification jobSpec = compiler.compile();
- if (jobSpec == null) {
- return jobSpec;
- }
- Dataset ds = metadataProvider.findDataset(databaseName,
dataverseName, datasetName);
- atomic = ds.isAtomic();
- if (atomic) {
- int numParticipatingNodes = appCtx.getNodeJobTracker()
- .getJobParticipatingNodes(jobSpec,
LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.class)
- .size();
- int numParticipatingPartitions =
appCtx.getNodeJobTracker().getNumParticipatingPartitions(jobSpec,
-
LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.class);
- List<Integer> participatingDatasetIds = new ArrayList<>();
- participatingDatasetIds.add(ds.getDatasetId());
- jobSpec.setProperty(AsterixJobProperty.GLOBAL_TX, new
GlobalTxInfo(participatingDatasetIds,
- numParticipatingNodes,
numParticipatingPartitions));
- }
- jobId = runTrackJob(hcc, jobSpec, jobFlags, reqId,
reqParams.getClientContextId(), clientRequest,
- JobKind.DML);
- clientRequest.markCancellable();
- String nameBefore = Thread.currentThread().getName();
- try {
- Thread.currentThread().setName(nameBefore + " :
WaitForCompletionForJobId: " + jobId);
- hcc.waitForCompletion(jobId);
- ensureNotCancelled(clientRequest, reqId);
- } finally {
- Thread.currentThread().setName(nameBefore);
- }
- if (atomic) {
- globalTxManager.commitTransaction(jobId);
- }
- } catch (Exception e) {
- if (atomic && jobId != null) {
- globalTxManager.abortTransaction(jobId);
- }
- throw e;
- } finally {
- locker.unlock();
- }
- }
+ deliverResult(hcc, resultSet, compiler, metadataProvider, locker,
resultDelivery, outMetadata, stats, reqParams,
+ true, stmt, clientRequest, JobKind.DML);
return null;
}
@@ -5601,6 +5555,7 @@
MetadataProvider metadataProvider, IMetadataLocker locker,
ResultDelivery resultDelivery,
ResultMetadata outMetadata, Stats stats, IRequestParameters
requestParameters, boolean cancellable,
Statement atomicStmt, ClientRequest clientRequest, JobKind
jobKind) throws Exception {
+ // resultSetId can be null
final ResultSetId resultSetId = metadataProvider.getResultSetId();
switch (resultDelivery) {
case ASYNC:
@@ -5614,6 +5569,8 @@
throw new RuntimeException(e);
}
});
+ // blocks and waits for the submission of the job to finish;
includes locking,compilation,job creation
+ // it doesn't wait for the job to finish.
synchronized (printed) {
while (!printed.booleanValue()) {
printed.wait();
@@ -5640,27 +5597,31 @@
break;
case IMMEDIATE:
createAndRunJob(hcc, jobFlags, null, compiler, locker,
resultDelivery, id -> {
- final ResultReader resultReader = new
ResultReader(resultSet, id, resultSetId);
- updateJobStats(id, stats,
metadataProvider.getResultSetId(), clientRequest);
- responsePrinter.addResultPrinter(new
ResultsPrinter(appCtx, resultReader,
- metadataProvider.findOutputRecordType(), stats,
sessionOutput));
- responsePrinter.printResults();
+ if (resultSetId != null) {
+ final ResultReader resultReader = new
ResultReader(resultSet, id, resultSetId);
+ updateJobStats(id, stats,
metadataProvider.getResultSetId(), clientRequest);
+ responsePrinter.addResultPrinter(new
ResultsPrinter(appCtx, resultReader,
+ metadataProvider.findOutputRecordType(),
stats, sessionOutput));
+ responsePrinter.printResults();
+ }
}, requestParameters, cancellable, appCtx, metadataProvider,
atomicStmt, jobKind);
break;
case DEFERRED:
createAndRunJob(hcc, jobFlags, null, compiler, locker,
resultDelivery, id -> {
- updateJobStats(id, stats,
metadataProvider.getResultSetId(), clientRequest);
- if (!sessionConfig.isIncludeHost()) {
- responsePrinter.addResultPrinter(new
ResultHandlePrinter(sessionOutput,
- new ResultHandle(id, resultSetId,
requestParameters.getRequestReference().getUuid())));
- } else {
- responsePrinter.addResultPrinter(
- new ResultHandlePrinter(sessionOutput, new
ResultHandle(id, resultSetId, null)));
- }
- responsePrinter.printResults();
- if (outMetadata != null) {
- outMetadata.getResultSets()
- .add(new ResultSetInfo(id, resultSetId,
metadataProvider.findOutputRecordType()));
+ if (resultSetId != null) {
+ updateJobStats(id, stats,
metadataProvider.getResultSetId(), clientRequest);
+ if (!sessionConfig.isIncludeHost()) {
+ responsePrinter.addResultPrinter(new
ResultHandlePrinter(sessionOutput, new ResultHandle(id,
+ resultSetId,
requestParameters.getRequestReference().getUuid())));
+ } else {
+ responsePrinter.addResultPrinter(
+ new ResultHandlePrinter(sessionOutput, new
ResultHandle(id, resultSetId, null)));
+ }
+ responsePrinter.printResults();
+ if (outMetadata != null) {
+ outMetadata.getResultSets()
+ .add(new ResultSetInfo(id, resultSetId,
metadataProvider.findOutputRecordType()));
+ }
}
}, requestParameters, cancellable, appCtx, metadataProvider,
atomicStmt, jobKind);
break;
@@ -5702,10 +5663,13 @@
try {
createAndRunJob(hcc, jobFlags, jobId, compiler, locker,
resultDelivery, id -> {
jobIdFuture.complete(id);
- final ResultHandle handle = sessionConfig.isIncludeHost()
? new ResultHandle(id, resultSetId, null)
- : new ResultHandle(id, resultSetId,
requestParameters.getRequestReference().getUuid());
- responsePrinter.addResultPrinter(new
ResultHandlePrinter(sessionOutput, handle));
- responsePrinter.printResults();
+ if (resultSetId != null) {
+ final ResultHandle handle =
sessionConfig.isIncludeHost()
+ ? new ResultHandle(id, resultSetId, null)
+ : new ResultHandle(id, resultSetId,
requestParameters.getRequestReference().getUuid());
+ responsePrinter.addResultPrinter(new
ResultHandlePrinter(sessionOutput, handle));
+ responsePrinter.printResults();
+ }
synchronized (printed) {
printed.setTrue();
printed.notify();
@@ -5720,6 +5684,7 @@
}
});
try {
+ // blocks and waits for the job to finish
jobSubmitFuture.get(sessionOutput.config().getTimeout(),
TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
handleAsyncJobTimeout(hcc, jobIdFuture, jobId.get(),
jobSubmitFuture);
@@ -5841,6 +5806,7 @@
jId.setValue(jobId);
}
if (ResultDelivery.ASYNC == resultDelivery) {
+ // 'print' unblocks the waiting async request to return a
response to the client
printer.print(jobId);
hcc.waitForCompletion(jobId);
} else {
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21236?usp=email
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings?usp=email
Gerrit-MessageType: newchange
Gerrit-Project: asterixdb
Gerrit-Branch: lumina
Gerrit-Change-Id: I76c16fe556eea53e4e2a97256c7b4cbc34ace90f
Gerrit-Change-Number: 21236
Gerrit-PatchSet: 1
Gerrit-Owner: Ali Alsuliman <[email protected]>