[ https://issues.apache.org/jira/browse/DRILL-6295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16437785#comment-16437785 ]
ASF GitHub Bot commented on DRILL-6295: --------------------------------------- Github user ilooner commented on a diff in the pull request: https://github.com/apache/drill/pull/1208#discussion_r181488592 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java --- @@ -262,68 +280,122 @@ public FlushBatchesHandlingClass(boolean isLastBatch, boolean schemaChanged) { } @Override - public void execute(Partitioner part) throws IOException { + public void execute(Partitioner part) throws IOException, InterruptedException { part.flushOutgoingBatches(isLastBatch, schemaChanged); } } /** - * Helper class to wrap Runnable with customized naming - * Exception handling + * Helper class to wrap Runnable with cancellation and waiting for completion support * */ - private static class CustomRunnable implements Runnable { + private static class PartitionerTask implements Runnable { + + private enum STATE { + NEW, + COMPLETING, + NORMAL, + EXCEPTIONAL, + CANCELLED, + INTERRUPTING, + INTERRUPTED + } + + private final AtomicReference<STATE> state; + private final AtomicReference<Thread> runner; + private final PartitionerDecorator partitionerDecorator; + private final AtomicInteger count; - private final String parentThreadName; - private final CountDownLatch latch; private final GeneralExecuteIface iface; - private final Partitioner part; + private final Partitioner partitioner; private CountDownLatchInjection testCountDownLatch; - private volatile IOException exp; + private volatile ExecutionException exception; - public CustomRunnable(final String parentThreadName, final CountDownLatch latch, final GeneralExecuteIface iface, - final Partitioner part, CountDownLatchInjection testCountDownLatch) { - this.parentThreadName = parentThreadName; - this.latch = latch; + public PartitionerTask(PartitionerDecorator partitionerDecorator, GeneralExecuteIface iface, Partitioner partitioner, AtomicInteger count, CountDownLatchInjection testCountDownLatch) { + state = new AtomicReference<>(STATE.NEW); + runner = new AtomicReference<>(); + this.partitionerDecorator = partitionerDecorator; this.iface = iface; - this.part = part; + this.partitioner = partitioner; + this.count = count; this.testCountDownLatch = testCountDownLatch; } @Override public void run() { - // Test only - Pause until interrupted by fragment thread - try { - testCountDownLatch.await(); - } catch (final InterruptedException e) { - logger.debug("Test only: partitioner thread is interrupted in test countdown latch await()", e); - } - - final Thread currThread = Thread.currentThread(); - final String currThreadName = currThread.getName(); - final OperatorStats localStats = part.getStats(); - try { - final String newThreadName = parentThreadName + currThread.getId(); - currThread.setName(newThreadName); + final Thread thread = Thread.currentThread(); + Preconditions.checkState(runner.compareAndSet(null, thread), + "PartitionerTask can be executed only once."); + if (state.get() == STATE.NEW) { + final String name = thread.getName(); + thread.setName(String.format("Partitioner-%s-%d", partitionerDecorator.thread.getName(), thread.getId())); + final OperatorStats localStats = partitioner.getStats(); localStats.clear(); localStats.startProcessing(); - iface.execute(part); - } catch (IOException e) { - exp = e; - } finally { - localStats.stopProcessing(); - currThread.setName(currThreadName); - latch.countDown(); + ExecutionException executionException = null; + try { + // Test only - Pause until interrupted by fragment thread + testCountDownLatch.await(); + iface.execute(partitioner); + } catch (InterruptedException e) { + if (state.compareAndSet(STATE.NEW, STATE.INTERRUPTED)) { + logger.warn("Partitioner Task interrupted during the run", e); + } + } catch (Throwable t) { + executionException = new ExecutionException(t); + } + if (state.compareAndSet(STATE.NEW, STATE.COMPLETING)) { + if (executionException == null) { + localStats.stopProcessing(); + state.lazySet(STATE.NORMAL); + } else { + exception = executionException; + state.lazySet(STATE.EXCEPTIONAL); + } + } + if (count.decrementAndGet() == 0) { + LockSupport.unpark(partitionerDecorator.thread); + } + thread.setName(name); + } + runner.set(null); + while (state.get() == STATE.INTERRUPTING) { --- End diff -- I think we can still leave a dirty interrupt flag. Consider the following case. 1. We create a PartitionerTask **Task A**. 2. **Task A** executes and reaches `thread.setName(name)` in the run method. 3. Now a cancel is called. 4. `thread.interrupt()` and `state.lazySet(STATE.INTERRUPTED)` is executed. 5. Now **Task A** can continue executing 6. **Task A** checks `state.get() == STATE.INTERRUPTING` which is false. So Thread.sleep is never called and the interrupt flag is never cleared. > PartitionerDecorator may close partitioners while CustomRunnable are active > during query cancellation > ----------------------------------------------------------------------------------------------------- > > Key: DRILL-6295 > URL: https://issues.apache.org/jira/browse/DRILL-6295 > Project: Apache Drill > Issue Type: Bug > Reporter: Vlad Rozov > Assignee: Vlad Rozov > Priority: Critical > Fix For: 1.14.0 > > > During query cancellation, in case > {{PartitionerDecorator.executeMethodLogic()}} is active (waiting on the > {{latch}}), the wait will be interrupted and {{Futures}} cancelled, but there > is no guarantee that all {{CustomRunnable}} terminate before returning from > {{PartitionerDecorator.executeMethodLogic()}}. On exit, both income and > outgoing batches are cleared, leading to clearing of underlying {{Vectors}} > and {{DrillBufs}}. This eventually causes unallocated memory access and JVM > crash as {{CustomRunnable}} may execute after income/outgoing batches are > cleared. -- This message was sent by Atlassian JIRA (v7.6.3#76005)