>From Ali Alsuliman <[email protected]>:

Ali Alsuliman has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21236?usp=email )


Change subject: async INSERT/UPSERT/UPDATE
......................................................................

async INSERT/UPSERT/UPDATE

Change-Id: I76c16fe556eea53e4e2a97256c7b4cbc34ace90f
---
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
1 file changed, 35 insertions(+), 69 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/36/21236/1

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index a9320f2..ff10403 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -4439,54 +4439,8 @@
         String reqId = reqParams.getRequestReference().getUuid();
         IRequestTracker requestTracker = appCtx.getRequestTracker();
         ClientRequest clientRequest = (ClientRequest) 
requestTracker.get(reqId);
-        if (stmtInsertUpsert.getReturnExpression() != null) {
-            deliverResult(hcc, resultSet, compiler, metadataProvider, locker, 
resultDelivery, outMetadata, stats,
-                    reqParams, true, stmt, clientRequest, JobKind.DML);
-        } else {
-            locker.lock();
-            JobId jobId = null;
-            boolean atomic = false;
-            try {
-                final JobSpecification jobSpec = compiler.compile();
-                if (jobSpec == null) {
-                    return jobSpec;
-                }
-                Dataset ds = metadataProvider.findDataset(databaseName, 
dataverseName, datasetName);
-                atomic = ds.isAtomic();
-                if (atomic) {
-                    int numParticipatingNodes = appCtx.getNodeJobTracker()
-                            .getJobParticipatingNodes(jobSpec, 
LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.class)
-                            .size();
-                    int numParticipatingPartitions = 
appCtx.getNodeJobTracker().getNumParticipatingPartitions(jobSpec,
-                            
LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.class);
-                    List<Integer> participatingDatasetIds = new ArrayList<>();
-                    participatingDatasetIds.add(ds.getDatasetId());
-                    jobSpec.setProperty(AsterixJobProperty.GLOBAL_TX, new 
GlobalTxInfo(participatingDatasetIds,
-                            numParticipatingNodes, 
numParticipatingPartitions));
-                }
-                jobId = runTrackJob(hcc, jobSpec, jobFlags, reqId, 
reqParams.getClientContextId(), clientRequest,
-                        JobKind.DML);
-                clientRequest.markCancellable();
-                String nameBefore = Thread.currentThread().getName();
-                try {
-                    Thread.currentThread().setName(nameBefore + " : 
WaitForCompletionForJobId: " + jobId);
-                    hcc.waitForCompletion(jobId);
-                    ensureNotCancelled(clientRequest, reqId);
-                } finally {
-                    Thread.currentThread().setName(nameBefore);
-                }
-                if (atomic) {
-                    globalTxManager.commitTransaction(jobId);
-                }
-            } catch (Exception e) {
-                if (atomic && jobId != null) {
-                    globalTxManager.abortTransaction(jobId);
-                }
-                throw e;
-            } finally {
-                locker.unlock();
-            }
-        }
+        deliverResult(hcc, resultSet, compiler, metadataProvider, locker, 
resultDelivery, outMetadata, stats, reqParams,
+                true, stmt, clientRequest, JobKind.DML);
         return null;
     }

@@ -5601,6 +5555,7 @@
             MetadataProvider metadataProvider, IMetadataLocker locker, 
ResultDelivery resultDelivery,
             ResultMetadata outMetadata, Stats stats, IRequestParameters 
requestParameters, boolean cancellable,
             Statement atomicStmt, ClientRequest clientRequest, JobKind 
jobKind) throws Exception {
+        // resultSetId can be null
         final ResultSetId resultSetId = metadataProvider.getResultSetId();
         switch (resultDelivery) {
             case ASYNC:
@@ -5614,6 +5569,8 @@
                         throw new RuntimeException(e);
                     }
                 });
+                // blocks and waits for the submission of the job to finish; 
includes locking,compilation,job creation
+                // it doesn't wait for the job to finish.
                 synchronized (printed) {
                     while (!printed.booleanValue()) {
                         printed.wait();
@@ -5640,27 +5597,31 @@
                 break;
             case IMMEDIATE:
                 createAndRunJob(hcc, jobFlags, null, compiler, locker, 
resultDelivery, id -> {
-                    final ResultReader resultReader = new 
ResultReader(resultSet, id, resultSetId);
-                    updateJobStats(id, stats, 
metadataProvider.getResultSetId(), clientRequest);
-                    responsePrinter.addResultPrinter(new 
ResultsPrinter(appCtx, resultReader,
-                            metadataProvider.findOutputRecordType(), stats, 
sessionOutput));
-                    responsePrinter.printResults();
+                    if (resultSetId != null) {
+                        final ResultReader resultReader = new 
ResultReader(resultSet, id, resultSetId);
+                        updateJobStats(id, stats, 
metadataProvider.getResultSetId(), clientRequest);
+                        responsePrinter.addResultPrinter(new 
ResultsPrinter(appCtx, resultReader,
+                                metadataProvider.findOutputRecordType(), 
stats, sessionOutput));
+                        responsePrinter.printResults();
+                    }
                 }, requestParameters, cancellable, appCtx, metadataProvider, 
atomicStmt, jobKind);
                 break;
             case DEFERRED:
                 createAndRunJob(hcc, jobFlags, null, compiler, locker, 
resultDelivery, id -> {
-                    updateJobStats(id, stats, 
metadataProvider.getResultSetId(), clientRequest);
-                    if (!sessionConfig.isIncludeHost()) {
-                        responsePrinter.addResultPrinter(new 
ResultHandlePrinter(sessionOutput,
-                                new ResultHandle(id, resultSetId, 
requestParameters.getRequestReference().getUuid())));
-                    } else {
-                        responsePrinter.addResultPrinter(
-                                new ResultHandlePrinter(sessionOutput, new 
ResultHandle(id, resultSetId, null)));
-                    }
-                    responsePrinter.printResults();
-                    if (outMetadata != null) {
-                        outMetadata.getResultSets()
-                                .add(new ResultSetInfo(id, resultSetId, 
metadataProvider.findOutputRecordType()));
+                    if (resultSetId != null) {
+                        updateJobStats(id, stats, 
metadataProvider.getResultSetId(), clientRequest);
+                        if (!sessionConfig.isIncludeHost()) {
+                            responsePrinter.addResultPrinter(new 
ResultHandlePrinter(sessionOutput, new ResultHandle(id,
+                                    resultSetId, 
requestParameters.getRequestReference().getUuid())));
+                        } else {
+                            responsePrinter.addResultPrinter(
+                                    new ResultHandlePrinter(sessionOutput, new 
ResultHandle(id, resultSetId, null)));
+                        }
+                        responsePrinter.printResults();
+                        if (outMetadata != null) {
+                            outMetadata.getResultSets()
+                                    .add(new ResultSetInfo(id, resultSetId, 
metadataProvider.findOutputRecordType()));
+                        }
                     }
                 }, requestParameters, cancellable, appCtx, metadataProvider, 
atomicStmt, jobKind);
                 break;
@@ -5702,10 +5663,13 @@
             try {
                 createAndRunJob(hcc, jobFlags, jobId, compiler, locker, 
resultDelivery, id -> {
                     jobIdFuture.complete(id);
-                    final ResultHandle handle = sessionConfig.isIncludeHost() 
? new ResultHandle(id, resultSetId, null)
-                            : new ResultHandle(id, resultSetId, 
requestParameters.getRequestReference().getUuid());
-                    responsePrinter.addResultPrinter(new 
ResultHandlePrinter(sessionOutput, handle));
-                    responsePrinter.printResults();
+                    if (resultSetId != null) {
+                        final ResultHandle handle = 
sessionConfig.isIncludeHost()
+                                ? new ResultHandle(id, resultSetId, null)
+                                : new ResultHandle(id, resultSetId, 
requestParameters.getRequestReference().getUuid());
+                        responsePrinter.addResultPrinter(new 
ResultHandlePrinter(sessionOutput, handle));
+                        responsePrinter.printResults();
+                    }
                     synchronized (printed) {
                         printed.setTrue();
                         printed.notify();
@@ -5720,6 +5684,7 @@
             }
         });
         try {
+            // blocks and waits for the job to finish
             jobSubmitFuture.get(sessionOutput.config().getTimeout(), 
TimeUnit.MILLISECONDS);
         } catch (TimeoutException e) {
             handleAsyncJobTimeout(hcc, jobIdFuture, jobId.get(), 
jobSubmitFuture);
@@ -5841,6 +5806,7 @@
                 jId.setValue(jobId);
             }
             if (ResultDelivery.ASYNC == resultDelivery) {
+                // 'print' unblocks the waiting async request to return a 
response to the client
                 printer.print(jobId);
                 hcc.waitForCompletion(jobId);
             } else {

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21236?usp=email
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings?usp=email

Gerrit-MessageType: newchange
Gerrit-Project: asterixdb
Gerrit-Branch: lumina
Gerrit-Change-Id: I76c16fe556eea53e4e2a97256c7b4cbc34ace90f
Gerrit-Change-Number: 21236
Gerrit-PatchSet: 1
Gerrit-Owner: Ali Alsuliman <[email protected]>

Reply via email to