[ https://issues.apache.org/jira/browse/DRILL-6295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16440221#comment-16440221 ]
ASF GitHub Bot commented on DRILL-6295: --------------------------------------- Github user vrozov commented on a diff in the pull request: https://github.com/apache/drill/pull/1208#discussion_r181927070 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java --- @@ -262,68 +280,122 @@ public FlushBatchesHandlingClass(boolean isLastBatch, boolean schemaChanged) { } @Override - public void execute(Partitioner part) throws IOException { + public void execute(Partitioner part) throws IOException, InterruptedException { part.flushOutgoingBatches(isLastBatch, schemaChanged); } } /** - * Helper class to wrap Runnable with customized naming - * Exception handling + * Helper class to wrap Runnable with cancellation and waiting for completion support * */ - private static class CustomRunnable implements Runnable { + private static class PartitionerTask implements Runnable { + + private enum STATE { + NEW, + COMPLETING, + NORMAL, + EXCEPTIONAL, + CANCELLED, + INTERRUPTING, + INTERRUPTED + } + + private final AtomicReference<STATE> state; + private final AtomicReference<Thread> runner; + private final PartitionerDecorator partitionerDecorator; + private final AtomicInteger count; - private final String parentThreadName; - private final CountDownLatch latch; private final GeneralExecuteIface iface; - private final Partitioner part; + private final Partitioner partitioner; private CountDownLatchInjection testCountDownLatch; - private volatile IOException exp; + private volatile ExecutionException exception; - public CustomRunnable(final String parentThreadName, final CountDownLatch latch, final GeneralExecuteIface iface, - final Partitioner part, CountDownLatchInjection testCountDownLatch) { - this.parentThreadName = parentThreadName; - this.latch = latch; + public PartitionerTask(PartitionerDecorator partitionerDecorator, GeneralExecuteIface iface, Partitioner partitioner, AtomicInteger count, CountDownLatchInjection testCountDownLatch) { + state = new AtomicReference<>(STATE.NEW); + runner = new AtomicReference<>(); + this.partitionerDecorator = partitionerDecorator; this.iface = iface; - this.part = part; + this.partitioner = partitioner; + this.count = count; this.testCountDownLatch = testCountDownLatch; } @Override public void run() { - // Test only - Pause until interrupted by fragment thread - try { - testCountDownLatch.await(); - } catch (final InterruptedException e) { - logger.debug("Test only: partitioner thread is interrupted in test countdown latch await()", e); - } - - final Thread currThread = Thread.currentThread(); - final String currThreadName = currThread.getName(); - final OperatorStats localStats = part.getStats(); - try { - final String newThreadName = parentThreadName + currThread.getId(); - currThread.setName(newThreadName); + final Thread thread = Thread.currentThread(); + if (runner.compareAndSet(null, thread)) { + final String name = thread.getName(); + thread.setName(String.format("Partitioner-%s-%d", partitionerDecorator.thread.getName(), thread.getId())); + final OperatorStats localStats = partitioner.getStats(); localStats.clear(); localStats.startProcessing(); - iface.execute(part); - } catch (IOException e) { - exp = e; - } finally { - localStats.stopProcessing(); - currThread.setName(currThreadName); - latch.countDown(); + ExecutionException executionException = null; + try { + // Test only - Pause until interrupted by fragment thread + testCountDownLatch.await(); + if (state.get() == STATE.NEW) { + iface.execute(partitioner); + } + } catch (InterruptedException e) { + if (state.compareAndSet(STATE.NEW, STATE.INTERRUPTED)) { + logger.warn("Partitioner Task interrupted during the run", e); + } + } catch (Throwable t) { + executionException = new ExecutionException(t); + } finally { + if (state.compareAndSet(STATE.NEW, STATE.COMPLETING)) { + if (executionException == null) { + localStats.stopProcessing(); + state.lazySet(STATE.NORMAL); + } else { + exception = executionException; + state.lazySet(STATE.EXCEPTIONAL); + } + } + if (count.decrementAndGet() == 0) { + LockSupport.unpark(partitionerDecorator.thread); + } + thread.setName(name); + while (state.get() == STATE.INTERRUPTING) { + Thread.yield(); + } + // Clear interrupt flag + Thread.interrupted(); + } + } + } + + void cancel(boolean mayInterruptIfRunning) { + Preconditions.checkState(Thread.currentThread() == partitionerDecorator.thread, + String.format("PartitionerTask can be cancelled only from the main %s thread", partitionerDecorator.thread.getName())); + if (runner.compareAndSet(null, partitionerDecorator.thread)) { + if (partitionerDecorator.executor instanceof ThreadPoolExecutor) { + ((ThreadPoolExecutor)partitionerDecorator.executor).remove(this); + } + count.decrementAndGet(); --- End diff -- Agree, it is necessary to re-check 'count` after canceling tasks. > PartitionerDecorator may close partitioners while CustomRunnable are active > during query cancellation > ----------------------------------------------------------------------------------------------------- > > Key: DRILL-6295 > URL: https://issues.apache.org/jira/browse/DRILL-6295 > Project: Apache Drill > Issue Type: Bug > Reporter: Vlad Rozov > Assignee: Vlad Rozov > Priority: Critical > Fix For: 1.14.0 > > > During query cancellation, in case > {{PartitionerDecorator.executeMethodLogic()}} is active (waiting on the > {{latch}}), the wait will be interrupted and {{Futures}} cancelled, but there > is no guarantee that all {{CustomRunnable}} terminate before returning from > {{PartitionerDecorator.executeMethodLogic()}}. On exit, both income and > outgoing batches are cleared, leading to clearing of underlying {{Vectors}} > and {{DrillBufs}}. This eventually causes unallocated memory access and JVM > crash as {{CustomRunnable}} may execute after income/outgoing batches are > cleared. -- This message was sent by Atlassian JIRA (v7.6.3#76005)