Github user vrozov commented on a diff in the pull request:
https://github.com/apache/drill/pull/1105#discussion_r173528013
--- Diff:
exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
---
@@ -488,47 +548,74 @@ void receiverFinished(FragmentHandle handle) {
sendEvent(new FragmentEvent(EventType.RECEIVER_FINISHED, handle));
}
+ /**
+ * Tell the {@link FragmentEventProcessor} not to process anymore
events. This keeps stray cancellation requests
+ * from being processed after the root has finished running and
interrupts in the root thread have been cleared.
+ */
+ public void terminate() {
+ terminate.set(true);
+ }
+
@Override
protected void processEvent(FragmentEvent event) {
+ if (event.type.equals(EventType.RECEIVER_FINISHED)) {
+ // Finish request
+ if (terminate.get()) {
+ // We have already recieved a cancellation or we have terminated
the event processor. Do not process anymore finish requests.
+ return;
+ }
+ } else {
+ // Cancel request
+ if (!terminate.compareAndSet(false, true)) {
+ // We have already received a cancellation or we have terminated
the event processor. Do not process anymore cancellation requests.
+ // This prevents the root thread from being interrupted at an
inappropriate time.
+ return;
+ }
+ }
+
switch (event.type) {
case CANCEL:
- /*
- * We set the cancel requested flag but the actual cancellation
is managed by the run() loop, if called.
- */
+ // We set the cancel requested flag but the actual cancellation
is managed by the run() loop, if called.
updateState(FragmentState.CANCELLATION_REQUESTED);
-
- /*
- * Interrupt the thread so that it exits from any blocking
operation it could be executing currently. We
- * synchronize here to ensure we don't accidentally create a
race condition where we interrupt the close out
- * procedure of the main thread.
- */
- synchronized (myThreadRef) {
- final Thread myThread = myThreadRef.get();
- if (myThread != null) {
- logger.debug("Interrupting fragment thread {}",
myThread.getName());
- myThread.interrupt();
- }
- }
+ // The root was started so we have to interrupt it in case it is
performing a blocking operation.
+ killThread();
+ terminate.set(true);
break;
-
case CANCEL_AND_FINISH:
+ // In this case the root was never started so we do not have to
interrupt the thread.
updateState(FragmentState.CANCELLATION_REQUESTED);
+ // The FragmentExecutor#run() loop will not execute in this case
so we have to cleanup resources here
cleanup(FragmentState.FINISHED);
+ terminate.set(true);
--- End diff --
it is not necessary (terminate should be already true).
---