[ https://issues.apache.org/jira/browse/DRILL-6295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16439872#comment-16439872 ]
ASF GitHub Bot commented on DRILL-6295: --------------------------------------- Github user ilooner commented on a diff in the pull request: https://github.com/apache/drill/pull/1208#discussion_r181851002 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java --- @@ -262,68 +280,124 @@ public FlushBatchesHandlingClass(boolean isLastBatch, boolean schemaChanged) { } @Override - public void execute(Partitioner part) throws IOException { + public void execute(Partitioner part) throws IOException, InterruptedException { part.flushOutgoingBatches(isLastBatch, schemaChanged); } } /** - * Helper class to wrap Runnable with customized naming - * Exception handling + * Helper class to wrap Runnable with cancellation and waiting for completion support * */ - private static class CustomRunnable implements Runnable { + private static class PartitionerTask implements Runnable { + + private enum STATE { + NEW, + COMPLETING, + NORMAL, + EXCEPTIONAL, + CANCELLED, + INTERRUPTING, + INTERRUPTED + } + + private final AtomicReference<STATE> state; + private final AtomicReference<Thread> runner; + private final PartitionerDecorator partitionerDecorator; + private final AtomicInteger count; - private final String parentThreadName; - private final CountDownLatch latch; private final GeneralExecuteIface iface; - private final Partitioner part; + private final Partitioner partitioner; private CountDownLatchInjection testCountDownLatch; - private volatile IOException exp; + private volatile ExecutionException exception; - public CustomRunnable(final String parentThreadName, final CountDownLatch latch, final GeneralExecuteIface iface, - final Partitioner part, CountDownLatchInjection testCountDownLatch) { - this.parentThreadName = parentThreadName; - this.latch = latch; + public PartitionerTask(PartitionerDecorator partitionerDecorator, GeneralExecuteIface iface, Partitioner partitioner, AtomicInteger count, CountDownLatchInjection testCountDownLatch) { + state = new AtomicReference<>(STATE.NEW); + runner = new AtomicReference<>(); + this.partitionerDecorator = partitionerDecorator; this.iface = iface; - this.part = part; + this.partitioner = partitioner; + this.count = count; this.testCountDownLatch = testCountDownLatch; } @Override public void run() { - // Test only - Pause until interrupted by fragment thread + final Thread thread = Thread.currentThread(); + Preconditions.checkState(runner.compareAndSet(null, thread), + "PartitionerTask can be executed only once."); + if (state.get() == STATE.NEW) { + final String name = thread.getName(); + thread.setName(String.format("Partitioner-%s-%d", partitionerDecorator.thread.getName(), thread.getId())); + final OperatorStats localStats = partitioner.getStats(); + localStats.clear(); + localStats.startProcessing(); + ExecutionException executionException = null; + try { + // Test only - Pause until interrupted by fragment thread + testCountDownLatch.await(); + iface.execute(partitioner); + } catch (InterruptedException e) { + if (state.compareAndSet(STATE.NEW, STATE.INTERRUPTED)) { + logger.warn("Partitioner Task interrupted during the run", e); + } + } catch (Throwable t) { + executionException = new ExecutionException(t); + } + if (state.compareAndSet(STATE.NEW, STATE.COMPLETING)) { + if (executionException == null) { + localStats.stopProcessing(); + state.lazySet(STATE.NORMAL); + } else { + exception = executionException; + state.lazySet(STATE.EXCEPTIONAL); + } + } + if (count.decrementAndGet() == 0) { + LockSupport.unpark(partitionerDecorator.thread); + } + thread.setName(name); + } + runner.set(null); + while (state.get() == STATE.INTERRUPTING) { + Thread.yield(); + } + // Clear interrupt flag try { - testCountDownLatch.await(); - } catch (final InterruptedException e) { - logger.debug("Test only: partitioner thread is interrupted in test countdown latch await()", e); + Thread.sleep(0); --- End diff -- Could we use Thread.interrupted() instead? Javadoc suggests it's a good alternative to use for clearing the interrupt flag. Also it avoids an unnecessary yield on some JVM implementations. ``` /** * Tests whether the current thread has been interrupted. The * <i>interrupted status</i> of the thread is cleared by this method. In * other words, if this method were to be called twice in succession, the * second call would return false (unless the current thread were * interrupted again, after the first call had cleared its interrupted * status and before the second call had examined it). * * <p>A thread interruption ignored because a thread was not alive * at the time of the interrupt will be reflected by this method * returning false. * * @return <code>true</code> if the current thread has been interrupted; * <code>false</code> otherwise. * @see #isInterrupted() * @revised 6.0 */ ``` > PartitionerDecorator may close partitioners while CustomRunnable are active > during query cancellation > ----------------------------------------------------------------------------------------------------- > > Key: DRILL-6295 > URL: https://issues.apache.org/jira/browse/DRILL-6295 > Project: Apache Drill > Issue Type: Bug > Reporter: Vlad Rozov > Assignee: Vlad Rozov > Priority: Critical > Fix For: 1.14.0 > > > During query cancellation, in case > {{PartitionerDecorator.executeMethodLogic()}} is active (waiting on the > {{latch}}), the wait will be interrupted and {{Futures}} cancelled, but there > is no guarantee that all {{CustomRunnable}} terminate before returning from > {{PartitionerDecorator.executeMethodLogic()}}. On exit, both income and > outgoing batches are cleared, leading to clearing of underlying {{Vectors}} > and {{DrillBufs}}. This eventually causes unallocated memory access and JVM > crash as {{CustomRunnable}} may execute after income/outgoing batches are > cleared. -- This message was sent by Atlassian JIRA (v7.6.3#76005)