Repository: hive Updated Branches: refs/heads/master 2c7f2e9d6 -> df51738ad
HIVE-14091 : some errors are not propagated to LLAP external clients (Sergey Shelukhin/Jason Dere, reviewed by Jason Dere/Sergey Shelukhin) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/df51738a Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/df51738a Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/df51738a Branch: refs/heads/master Commit: df51738ad8c0de6e57b5b08bc20a06f0b3c734b7 Parents: 2c7f2e9 Author: Sergey Shelukhin <ser...@apache.org> Authored: Thu Jun 30 12:21:22 2016 -0700 Committer: Sergey Shelukhin <ser...@apache.org> Committed: Thu Jun 30 12:23:14 2016 -0700 ---------------------------------------------------------------------- .../hadoop/hive/llap/LlapBaseRecordReader.java | 19 +++++++++++++++---- .../hadoop/hive/llap/LlapBaseInputFormat.java | 4 ++-- .../hadoop/hive/llap/TestLlapOutputFormat.java | 3 ++- 3 files changed, 19 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/df51738a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java index f2700c8..59dec1b 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java @@ -54,8 +54,10 @@ public class LlapBaseRecordReader<V extends WritableComparable> implements Recor protected final LinkedBlockingQueue<ReaderEvent> readerEvents = new LinkedBlockingQueue<ReaderEvent>(); protected final long timeout; protected final Closeable client; + private final Closeable socket; - public LlapBaseRecordReader(InputStream in, Schema schema, Class<V> clazz, JobConf job, Closeable client) { + public LlapBaseRecordReader(InputStream in, Schema schema, + Class<V> clazz, JobConf job, Closeable client, Closeable socket) { din = new DataInputStream(in); this.schema = schema; this.clazz = clazz; @@ -63,6 +65,7 @@ public class LlapBaseRecordReader<V extends WritableComparable> implements Recor this.timeout = 3 * HiveConf.getTimeVar(job, HiveConf.ConfVars.LLAP_DAEMON_AM_LIVENESS_CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS); this.client = client; + this.socket = socket; } public Schema getSchema() { @@ -78,6 +81,7 @@ public class LlapBaseRecordReader<V extends WritableComparable> implements Recor LOG.error("Error closing input stream:" + err.getMessage(), err); caughtException = err; } + // Don't close the socket - the stream already does that if needed. if (client != null) { try { @@ -152,9 +156,10 @@ public class LlapBaseRecordReader<V extends WritableComparable> implements Recor ReaderEvent event = getReaderEvent(); switch (event.getEventType()) { case ERROR: - throw new IOException("Received reader event error: " + event.getMessage()); + throw new IOException("Received reader event error: " + event.getMessage(), io); default: - throw new IOException("Got reader event type " + event.getEventType() + ", expected error event"); + throw new IOException("Got reader event type " + event.getEventType() + + ", expected error event", io); } } } else { @@ -214,7 +219,13 @@ public class LlapBaseRecordReader<V extends WritableComparable> implements Recor if (LOG.isDebugEnabled()) { LOG.debug("Interrupting reader thread due to reader event with error " + event.getMessage()); } - getReaderThread().interrupt(); + readerThread.interrupt(); + try { + socket.close(); + } catch (IOException e) { + // Leave the client to time out. + LOG.error("Cannot close the socket on error", e); + } break; default: throw new RuntimeException("Unhandled ReaderEvent type " + event.getEventType() + " with message " + event.getMessage()); http://git-wip-us.apache.org/repos/asf/hive/blob/df51738a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java ---------------------------------------------------------------------- diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java index 6d63797..aef5762 100644 --- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java +++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java @@ -170,8 +170,8 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>> LOG.info("Registered id: " + fragmentId); @SuppressWarnings("rawtypes") - LlapBaseRecordReader recordReader = new LlapBaseRecordReader( - socket.getInputStream(), llapSplit.getSchema(), Text.class, job, llapClient); + LlapBaseRecordReader recordReader = new LlapBaseRecordReader(socket.getInputStream(), + llapSplit.getSchema(), Text.class, job, llapClient, (java.io.Closeable)socket); umbilicalResponder.setRecordReader(recordReader); return recordReader; } http://git-wip-us.apache.org/repos/asf/hive/blob/df51738a/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java index 2288cd4..4159be5 100644 --- a/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java +++ b/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java @@ -103,7 +103,8 @@ public class TestLlapOutputFormat { writer.close(null); InputStream in = socket.getInputStream(); - LlapBaseRecordReader reader = new LlapBaseRecordReader(in, null, Text.class, job, null); + LlapBaseRecordReader reader = new LlapBaseRecordReader( + in, null, Text.class, job, null, null); LOG.debug("Have record reader");