>From Michael Blow <[email protected]>: Michael Blow has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21233?usp=email )
Change subject: [NO ISSUE][*DB][RT] Guard against cancelled request removed from tracker ...................................................................... [NO ISSUE][*DB][RT] Guard against cancelled request removed from tracker Ext-ref: MB-71890 Change-Id: Ib5c93b1607444612b0c485f649ab9424a004b68c Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21233 Reviewed-by: Ali Alsuliman <[email protected]> Tested-by: Michael Blow <[email protected]> --- M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java 1 file changed, 21 insertions(+), 14 deletions(-) Approvals: Ali Alsuliman: Looks good to me, approved Michael Blow: Verified Objections: Anon. E. Moose #1000171: Violations found diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index b930ce1..a2b70a3 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -4264,7 +4264,7 @@ try { Thread.currentThread().setName(nameBefore + " : WaitForCompletionForJobId: " + jobId); hcc.waitForCompletion(jobId); - ensureNotCancelled(clientRequest); + ensureNotCancelled(clientRequest, reqId); } finally { Thread.currentThread().setName(nameBefore); } @@ -4291,8 +4291,8 @@ Stats stats) throws Exception { CopyToStatement copyTo = (CopyToStatement) stmt; final IRequestTracker requestTracker = appCtx.getRequestTracker(); - final ClientRequest clientRequest = - (ClientRequest) requestTracker.get(requestParameters.getRequestReference().getUuid()); + String reqId = requestParameters.getRequestReference().getUuid(); + final ClientRequest clientRequest = (ClientRequest) requestTracker.get(reqId); final IMetadataLocker locker = new IMetadataLocker() { @Override public void lock() throws RuntimeDataException, InterruptedException { @@ -4300,7 +4300,7 @@ compilationLock.readLock().lockInterruptibly(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - ensureNotCancelled(clientRequest); + ensureNotCancelled(clientRequest, reqId); throw e; } } @@ -4469,7 +4469,7 @@ try { Thread.currentThread().setName(nameBefore + " : WaitForCompletionForJobId: " + jobId); hcc.waitForCompletion(jobId); - ensureNotCancelled(clientRequest); + ensureNotCancelled(clientRequest, reqId); } finally { Thread.currentThread().setName(nameBefore); } @@ -4541,7 +4541,7 @@ try { Thread.currentThread().setName(nameBefore + " : WaitForCompletionForJobId: " + jobId); hcc.waitForCompletion(jobId); - ensureNotCancelled(clientRequest); + ensureNotCancelled(clientRequest, reqId); } finally { Thread.currentThread().setName(nameBefore); } @@ -4565,6 +4565,9 @@ private static JobId runTrackJob(IHyracksClientConnection hcc, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags, String reqId, String clientCtxId, ClientRequest clientRequest, JobKind jobKind) throws Exception { + // Guard before submitting the job: if the request was cancelled and removed from the tracker, + // clientRequest will be null here; treat that as a cancellation rather than NPE on setJobId. + ensureNotCancelled(clientRequest, reqId); jobSpec.setRequestId(reqId); jobSpec.setProperty(JOB_KIND, jobKind); JobId jobId = JobUtils.runJobIfActive(hcc, jobSpec, jobFlags, false); @@ -5540,8 +5543,8 @@ IRequestParameters requestParameters, Map<String, IAObject> stmtParams, IStatementRewriter stmtRewriter) throws Exception { final IRequestTracker requestTracker = appCtx.getRequestTracker(); - final ClientRequest clientRequest = - (ClientRequest) requestTracker.get(requestParameters.getRequestReference().getUuid()); + String reqId = requestParameters.getRequestReference().getUuid(); + final ClientRequest clientRequest = (ClientRequest) requestTracker.get(reqId); final IMetadataLocker locker = new IMetadataLocker() { @Override public void lock() throws RuntimeDataException, InterruptedException { @@ -5549,7 +5552,7 @@ compilationLock.readLock().lockInterruptibly(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - ensureNotCancelled(clientRequest); + ensureNotCancelled(clientRequest, reqId); throw e; } } @@ -5789,6 +5792,8 @@ String reqId = requestParameters.getRequestReference().getUuid(); final IRequestTracker requestTracker = appCtx.getRequestTracker(); final ClientRequest clientRequest = (ClientRequest) requestTracker.get(reqId); + // Guard before markCancellable: request may have been cancelled and deregistered already. + ensureNotCancelled(clientRequest, reqId); if (cancellable) { clientRequest.markCancellable(); } @@ -5804,7 +5809,7 @@ SchedulableClientRequest.of(clientRequest, requestParameters, metadataProvider, jobSpec); appCtx.getReceptionist().ensureSchedulable(schedulableRequest); // ensure request not cancelled before running job - ensureNotCancelled(clientRequest); + ensureNotCancelled(clientRequest, reqId); if (atomicStatement != null) { Dataset ds = metadataProvider.findDataset(((InsertStatement) atomicStatement).getDatabaseName(), ((InsertStatement) atomicStatement).getDataverseName(), @@ -5838,7 +5843,7 @@ hcc.waitForCompletion(jobId); } else { hcc.waitForCompletion(jobId); - ensureNotCancelled(clientRequest); + ensureNotCancelled(clientRequest, reqId); printer.print(jobId); } if (atomic) { @@ -6096,9 +6101,11 @@ validateIfResourceIsActiveInFeed(metadataProvider.getApplicationContext(), dataset, sourceLoc); } - private static void ensureNotCancelled(ClientRequest clientRequest) throws RuntimeDataException { - if (clientRequest.isCancelled()) { - throw new RuntimeDataException(ErrorCode.REQUEST_CANCELLED, clientRequest.getId()); + private static void ensureNotCancelled(ClientRequest clientRequest, String reqId) throws RuntimeDataException { + // clientRequest may be null if the request was cancelled and deregistered from the tracker + // between when it was looked up and when this check is reached; treat that as cancelled. + if (clientRequest == null || clientRequest.isCancelled()) { + throw new RuntimeDataException(ErrorCode.REQUEST_CANCELLED, reqId); } } -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21233?usp=email To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings?usp=email Gerrit-MessageType: merged Gerrit-Project: asterixdb Gerrit-Branch: lumina Gerrit-Change-Id: Ib5c93b1607444612b0c485f649ab9424a004b68c Gerrit-Change-Number: 21233 Gerrit-PatchSet: 3 Gerrit-Owner: Michael Blow <[email protected]> Gerrit-Reviewer: Ali Alsuliman <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]>
