Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1105#discussion_r173527951
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
 ---
    @@ -488,47 +548,74 @@ void receiverFinished(FragmentHandle handle) {
           sendEvent(new FragmentEvent(EventType.RECEIVER_FINISHED, handle));
         }
     
    +    /**
    +     * Tell the {@link FragmentEventProcessor} not to process anymore 
events. This keeps stray cancellation requests
    +     * from being processed after the root has finished running and 
interrupts in the root thread have been cleared.
    +     */
    +    public void terminate() {
    +      terminate.set(true);
    +    }
    +
         @Override
         protected void processEvent(FragmentEvent event) {
    +      if (event.type.equals(EventType.RECEIVER_FINISHED)) {
    +        // Finish request
    +        if (terminate.get()) {
    +          // We have already recieved a cancellation or we have terminated 
the event processor. Do not process anymore finish requests.
    +          return;
    +        }
    +      } else {
    +        // Cancel request
    +        if (!terminate.compareAndSet(false, true)) {
    +          // We have already received a cancellation or we have terminated 
the event processor. Do not process anymore cancellation requests.
    +          // This prevents the root thread from being interrupted at an 
inappropriate time.
    +          return;
    +        }
    +      }
    +
           switch (event.type) {
             case CANCEL:
    -          /*
    -           * We set the cancel requested flag but the actual cancellation 
is managed by the run() loop, if called.
    -           */
    +          // We set the cancel requested flag but the actual cancellation 
is managed by the run() loop, if called.
               updateState(FragmentState.CANCELLATION_REQUESTED);
    -
    -          /*
    -           * Interrupt the thread so that it exits from any blocking 
operation it could be executing currently. We
    -           * synchronize here to ensure we don't accidentally create a 
race condition where we interrupt the close out
    -           * procedure of the main thread.
    -          */
    -          synchronized (myThreadRef) {
    -            final Thread myThread = myThreadRef.get();
    -            if (myThread != null) {
    -              logger.debug("Interrupting fragment thread {}", 
myThread.getName());
    -              myThread.interrupt();
    -            }
    -          }
    +          // The root was started so we have to interrupt it in case it is 
performing a blocking operation.
    +          killThread();
    +          terminate.set(true);
    --- End diff --
    
    it is not necessary (terminate should be already true).


---

Reply via email to