>From Michael Blow <[email protected]>:

Michael Blow has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21233?usp=email )


Change subject: [NO ISSUE][*DB][RT] Guard against cancelled request removed 
from tracker
......................................................................

[NO ISSUE][*DB][RT] Guard against cancelled request removed from tracker

Ext-ref: MB-71890
Change-Id: Ib5c93b1607444612b0c485f649ab9424a004b68c
---
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
1 file changed, 21 insertions(+), 12 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/33/21233/1

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index b930ce1..08907f2 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -4264,7 +4264,7 @@
                 try {
                     Thread.currentThread().setName(nameBefore + " : 
WaitForCompletionForJobId: " + jobId);
                     hcc.waitForCompletion(jobId);
-                    ensureNotCancelled(clientRequest);
+                    ensureNotCancelled(clientRequest, reqId);
                 } finally {
                     Thread.currentThread().setName(nameBefore);
                 }
@@ -4291,8 +4291,9 @@
             Stats stats) throws Exception {
         CopyToStatement copyTo = (CopyToStatement) stmt;
         final IRequestTracker requestTracker = appCtx.getRequestTracker();
+        String reqId = requestParameters.getRequestReference().getUuid();
         final ClientRequest clientRequest =
-                (ClientRequest) 
requestTracker.get(requestParameters.getRequestReference().getUuid());
+                (ClientRequest) requestTracker.get(reqId);
         final IMetadataLocker locker = new IMetadataLocker() {
             @Override
             public void lock() throws RuntimeDataException, 
InterruptedException {
@@ -4300,7 +4301,7 @@
                     compilationLock.readLock().lockInterruptibly();
                 } catch (InterruptedException e) {
                     Thread.currentThread().interrupt();
-                    ensureNotCancelled(clientRequest);
+                    ensureNotCancelled(clientRequest, reqId);
                     throw e;
                 }
             }
@@ -4469,7 +4470,7 @@
                 try {
                     Thread.currentThread().setName(nameBefore + " : 
WaitForCompletionForJobId: " + jobId);
                     hcc.waitForCompletion(jobId);
-                    ensureNotCancelled(clientRequest);
+                    ensureNotCancelled(clientRequest, reqId);
                 } finally {
                     Thread.currentThread().setName(nameBefore);
                 }
@@ -4541,7 +4542,7 @@
                 try {
                     Thread.currentThread().setName(nameBefore + " : 
WaitForCompletionForJobId: " + jobId);
                     hcc.waitForCompletion(jobId);
-                    ensureNotCancelled(clientRequest);
+                    ensureNotCancelled(clientRequest, reqId);
                 } finally {
                     Thread.currentThread().setName(nameBefore);
                 }
@@ -4565,6 +4566,9 @@

     private static JobId runTrackJob(IHyracksClientConnection hcc, 
JobSpecification jobSpec, EnumSet<JobFlag> jobFlags,
             String reqId, String clientCtxId, ClientRequest clientRequest, 
JobKind jobKind) throws Exception {
+        // Guard before submitting the job: if the request was cancelled and 
removed from the tracker,
+        // clientRequest will be null here; treat that as a cancellation 
rather than NPE on setJobId.
+        ensureNotCancelled(clientRequest, reqId);
         jobSpec.setRequestId(reqId);
         jobSpec.setProperty(JOB_KIND, jobKind);
         JobId jobId = JobUtils.runJobIfActive(hcc, jobSpec, jobFlags, false);
@@ -5540,8 +5544,9 @@
             IRequestParameters requestParameters, Map<String, IAObject> 
stmtParams, IStatementRewriter stmtRewriter)
             throws Exception {
         final IRequestTracker requestTracker = appCtx.getRequestTracker();
+        String reqId = requestParameters.getRequestReference().getUuid();
         final ClientRequest clientRequest =
-                (ClientRequest) 
requestTracker.get(requestParameters.getRequestReference().getUuid());
+                (ClientRequest) requestTracker.get(reqId);
         final IMetadataLocker locker = new IMetadataLocker() {
             @Override
             public void lock() throws RuntimeDataException, 
InterruptedException {
@@ -5549,7 +5554,7 @@
                     compilationLock.readLock().lockInterruptibly();
                 } catch (InterruptedException e) {
                     Thread.currentThread().interrupt();
-                    ensureNotCancelled(clientRequest);
+                    ensureNotCancelled(clientRequest, reqId);
                     throw e;
                 }
             }
@@ -5789,6 +5794,8 @@
         String reqId = requestParameters.getRequestReference().getUuid();
         final IRequestTracker requestTracker = appCtx.getRequestTracker();
         final ClientRequest clientRequest = (ClientRequest) 
requestTracker.get(reqId);
+        // Guard before markCancellable: request may have been cancelled and 
deregistered already.
+        ensureNotCancelled(clientRequest, reqId);
         if (cancellable) {
             clientRequest.markCancellable();
         }
@@ -5804,7 +5811,7 @@
                     SchedulableClientRequest.of(clientRequest, 
requestParameters, metadataProvider, jobSpec);
             appCtx.getReceptionist().ensureSchedulable(schedulableRequest);
             // ensure request not cancelled before running job
-            ensureNotCancelled(clientRequest);
+            ensureNotCancelled(clientRequest, reqId);
             if (atomicStatement != null) {
                 Dataset ds = metadataProvider.findDataset(((InsertStatement) 
atomicStatement).getDatabaseName(),
                         ((InsertStatement) atomicStatement).getDataverseName(),
@@ -5838,7 +5845,7 @@
                 hcc.waitForCompletion(jobId);
             } else {
                 hcc.waitForCompletion(jobId);
-                ensureNotCancelled(clientRequest);
+                ensureNotCancelled(clientRequest, reqId);
                 printer.print(jobId);
             }
             if (atomic) {
@@ -6096,9 +6103,11 @@
         
validateIfResourceIsActiveInFeed(metadataProvider.getApplicationContext(), 
dataset, sourceLoc);
     }

-    private static void ensureNotCancelled(ClientRequest clientRequest) throws 
RuntimeDataException {
-        if (clientRequest.isCancelled()) {
-            throw new RuntimeDataException(ErrorCode.REQUEST_CANCELLED, 
clientRequest.getId());
+    private static void ensureNotCancelled(ClientRequest clientRequest, String 
reqId) throws RuntimeDataException {
+        // clientRequest may be null if the request was cancelled and 
deregistered from the tracker
+        // between when it was looked up and when this check is reached; treat 
that as cancelled.
+        if (clientRequest == null || clientRequest.isCancelled()) {
+            throw new RuntimeDataException(ErrorCode.REQUEST_CANCELLED, reqId);
         }
     }


--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21233?usp=email
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings?usp=email

Gerrit-MessageType: newchange
Gerrit-Project: asterixdb
Gerrit-Branch: lumina
Gerrit-Change-Id: Ib5c93b1607444612b0c485f649ab9424a004b68c
Gerrit-Change-Number: 21233
Gerrit-PatchSet: 1
Gerrit-Owner: Michael Blow <[email protected]>

Reply via email to