Github user vrozov commented on a diff in the pull request:
https://github.com/apache/drill/pull/1208#discussion_r181927070
--- Diff:
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
---
@@ -262,68 +280,122 @@ public FlushBatchesHandlingClass(boolean
isLastBatch, boolean schemaChanged) {
}
@Override
- public void execute(Partitioner part) throws IOException {
+ public void execute(Partitioner part) throws IOException,
InterruptedException {
part.flushOutgoingBatches(isLastBatch, schemaChanged);
}
}
/**
- * Helper class to wrap Runnable with customized naming
- * Exception handling
+ * Helper class to wrap Runnable with cancellation and waiting for
completion support
*
*/
- private static class CustomRunnable implements Runnable {
+ private static class PartitionerTask implements Runnable {
+
+ private enum STATE {
+ NEW,
+ COMPLETING,
+ NORMAL,
+ EXCEPTIONAL,
+ CANCELLED,
+ INTERRUPTING,
+ INTERRUPTED
+ }
+
+ private final AtomicReference<STATE> state;
+ private final AtomicReference<Thread> runner;
+ private final PartitionerDecorator partitionerDecorator;
+ private final AtomicInteger count;
- private final String parentThreadName;
- private final CountDownLatch latch;
private final GeneralExecuteIface iface;
- private final Partitioner part;
+ private final Partitioner partitioner;
private CountDownLatchInjection testCountDownLatch;
- private volatile IOException exp;
+ private volatile ExecutionException exception;
- public CustomRunnable(final String parentThreadName, final
CountDownLatch latch, final GeneralExecuteIface iface,
- final Partitioner part, CountDownLatchInjection
testCountDownLatch) {
- this.parentThreadName = parentThreadName;
- this.latch = latch;
+ public PartitionerTask(PartitionerDecorator partitionerDecorator,
GeneralExecuteIface iface, Partitioner partitioner, AtomicInteger count,
CountDownLatchInjection testCountDownLatch) {
+ state = new AtomicReference<>(STATE.NEW);
+ runner = new AtomicReference<>();
+ this.partitionerDecorator = partitionerDecorator;
this.iface = iface;
- this.part = part;
+ this.partitioner = partitioner;
+ this.count = count;
this.testCountDownLatch = testCountDownLatch;
}
@Override
public void run() {
- // Test only - Pause until interrupted by fragment thread
- try {
- testCountDownLatch.await();
- } catch (final InterruptedException e) {
- logger.debug("Test only: partitioner thread is interrupted in test
countdown latch await()", e);
- }
-
- final Thread currThread = Thread.currentThread();
- final String currThreadName = currThread.getName();
- final OperatorStats localStats = part.getStats();
- try {
- final String newThreadName = parentThreadName + currThread.getId();
- currThread.setName(newThreadName);
+ final Thread thread = Thread.currentThread();
+ if (runner.compareAndSet(null, thread)) {
+ final String name = thread.getName();
+ thread.setName(String.format("Partitioner-%s-%d",
partitionerDecorator.thread.getName(), thread.getId()));
+ final OperatorStats localStats = partitioner.getStats();
localStats.clear();
localStats.startProcessing();
- iface.execute(part);
- } catch (IOException e) {
- exp = e;
- } finally {
- localStats.stopProcessing();
- currThread.setName(currThreadName);
- latch.countDown();
+ ExecutionException executionException = null;
+ try {
+ // Test only - Pause until interrupted by fragment thread
+ testCountDownLatch.await();
+ if (state.get() == STATE.NEW) {
+ iface.execute(partitioner);
+ }
+ } catch (InterruptedException e) {
+ if (state.compareAndSet(STATE.NEW, STATE.INTERRUPTED)) {
+ logger.warn("Partitioner Task interrupted during the run", e);
+ }
+ } catch (Throwable t) {
+ executionException = new ExecutionException(t);
+ } finally {
+ if (state.compareAndSet(STATE.NEW, STATE.COMPLETING)) {
+ if (executionException == null) {
+ localStats.stopProcessing();
+ state.lazySet(STATE.NORMAL);
+ } else {
+ exception = executionException;
+ state.lazySet(STATE.EXCEPTIONAL);
+ }
+ }
+ if (count.decrementAndGet() == 0) {
+ LockSupport.unpark(partitionerDecorator.thread);
+ }
+ thread.setName(name);
+ while (state.get() == STATE.INTERRUPTING) {
+ Thread.yield();
+ }
+ // Clear interrupt flag
+ Thread.interrupted();
+ }
+ }
+ }
+
+ void cancel(boolean mayInterruptIfRunning) {
+ Preconditions.checkState(Thread.currentThread() ==
partitionerDecorator.thread,
+ String.format("PartitionerTask can be cancelled only from the
main %s thread", partitionerDecorator.thread.getName()));
+ if (runner.compareAndSet(null, partitionerDecorator.thread)) {
+ if (partitionerDecorator.executor instanceof ThreadPoolExecutor) {
+ ((ThreadPoolExecutor)partitionerDecorator.executor).remove(this);
+ }
+ count.decrementAndGet();
--- End diff --
Agree, it is necessary to re-check 'count` after canceling tasks.
---