Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1105#discussion_r173527951 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java --- @@ -488,47 +548,74 @@ void receiverFinished(FragmentHandle handle) { sendEvent(new FragmentEvent(EventType.RECEIVER_FINISHED, handle)); } + /** + * Tell the {@link FragmentEventProcessor} not to process anymore events. This keeps stray cancellation requests + * from being processed after the root has finished running and interrupts in the root thread have been cleared. + */ + public void terminate() { + terminate.set(true); + } + @Override protected void processEvent(FragmentEvent event) { + if (event.type.equals(EventType.RECEIVER_FINISHED)) { + // Finish request + if (terminate.get()) { + // We have already recieved a cancellation or we have terminated the event processor. Do not process anymore finish requests. + return; + } + } else { + // Cancel request + if (!terminate.compareAndSet(false, true)) { + // We have already received a cancellation or we have terminated the event processor. Do not process anymore cancellation requests. + // This prevents the root thread from being interrupted at an inappropriate time. + return; + } + } + switch (event.type) { case CANCEL: - /* - * We set the cancel requested flag but the actual cancellation is managed by the run() loop, if called. - */ + // We set the cancel requested flag but the actual cancellation is managed by the run() loop, if called. updateState(FragmentState.CANCELLATION_REQUESTED); - - /* - * Interrupt the thread so that it exits from any blocking operation it could be executing currently. We - * synchronize here to ensure we don't accidentally create a race condition where we interrupt the close out - * procedure of the main thread. - */ - synchronized (myThreadRef) { - final Thread myThread = myThreadRef.get(); - if (myThread != null) { - logger.debug("Interrupting fragment thread {}", myThread.getName()); - myThread.interrupt(); - } - } + // The root was started so we have to interrupt it in case it is performing a blocking operation. + killThread(); + terminate.set(true); --- End diff -- it is not necessary (terminate should be already true).
---