DRILL-1096: Stop ScanBatch if read fails and pass failure context to the client, break the producer loop when iter outcome is stop
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/a7d21d6f Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/a7d21d6f Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/a7d21d6f Branch: refs/heads/master Commit: a7d21d6f777d79f0a527ced9115a73decec37c40 Parents: 015680c Author: Hanifi Gunes <[email protected]> Authored: Mon Jul 21 15:08:13 2014 -0700 Committer: Aditya Kishore <[email protected]> Committed: Thu Jul 24 16:16:03 2014 -0700 ---------------------------------------------------------------------- .../main/java/org/apache/drill/exec/physical/impl/ScanBatch.java | 4 ++++ .../drill/exec/physical/impl/producer/ProducerConsumerBatch.java | 3 +++ 2 files changed, 7 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a7d21d6f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java index f440546..a8881f0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java @@ -177,6 +177,10 @@ public class ScanBatch implements RecordBatch { } else { return IterOutcome.OK; } + } catch (Exception ex) { + logger.debug("Failed to read the batch. Stopping...", ex); + context.fail(ex); + return IterOutcome.STOP; } finally { oContext.getStats().stopProcessing(); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a7d21d6f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java index 6b43044..120a611 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java @@ -78,6 +78,8 @@ public class ProducerConsumerBatch extends AbstractRecordBatch { } if (wrapper.finished) { return IterOutcome.NONE; + } else if (wrapper.failed) { + return IterOutcome.STOP; } recordCount = wrapper.batch.getRecordCount(); @@ -129,6 +131,7 @@ public class ProducerConsumerBatch extends AbstractRecordBatch { } catch (InterruptedException e) { throw new RuntimeException(e); } + return; case OK_NEW_SCHEMA: case OK: try {
