HIVE-13368: LlapTaskUmbilicalExternalClient should handle submission rejection/failures/timeouts from LLAP daemon
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/134f2f74 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/134f2f74 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/134f2f74 Branch: refs/heads/master Commit: 134f2f749e1981738a660843e911db173c86bcfa Parents: 2514065 Author: Jason Dere <jd...@hortonworks.com> Authored: Mon Mar 28 11:16:33 2016 -0700 Committer: Jason Dere <jd...@hortonworks.com> Committed: Mon Mar 28 11:16:33 2016 -0700 ---------------------------------------------------------------------- .../hadoop/hive/llap/LlapInputFormat.java | 137 ++++++++++- .../ext/LlapTaskUmbilicalExternalClient.java | 237 ++++++++++++++++++- .../hadoop/hive/llap/LlapRecordReader.java | 123 +++++++++- 3 files changed, 480 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/134f2f74/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java b/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java index 847c74f..7f11e1d 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java @@ -23,13 +23,18 @@ import java.net.Socket; import java.nio.ByteBuffer; import java.util.List; import java.util.Set; +import java.util.concurrent.LinkedBlockingQueue; import com.google.common.collect.Lists; import com.google.protobuf.ByteString; + +import org.apache.commons.collections4.ListUtils; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.LlapRecordReader.ReaderEvent; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; import org.apache.hadoop.hive.llap.ext.LlapTaskUmbilicalExternalClient; +import org.apache.hadoop.hive.llap.ext.LlapTaskUmbilicalExternalClient.LlapTaskUmbilicalExternalResponder; import org.apache.hadoop.hive.llap.registry.ServiceInstance; import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet; import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; @@ -52,11 +57,19 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.common.security.TokenCache; +import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent; import org.apache.tez.runtime.api.impl.TaskSpec; import org.apache.tez.runtime.api.impl.TezEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.runtime.api.impl.EventType; +import org.apache.tez.runtime.api.impl.TezEvent; +import org.apache.tez.runtime.api.impl.TezHeartbeatRequest; +import org.apache.tez.runtime.api.impl.TezHeartbeatResponse; + + public class LlapInputFormat<V extends WritableComparable> implements InputFormat<NullWritable, V> { private static final Logger LOG = LoggerFactory.getLogger(LlapInputFormat.class); @@ -84,9 +97,11 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma LOG.info("Found service instance for host " + host + " with rpc port " + llapSubmitPort + " and outputformat port " + serviceInstance.getOutputFormatPort()); + LlapRecordReaderTaskUmbilicalExternalResponder umbilicalResponder = + new LlapRecordReaderTaskUmbilicalExternalResponder(); LlapTaskUmbilicalExternalClient llapClient = new LlapTaskUmbilicalExternalClient(job, submitWorkInfo.getTokenIdentifier(), - submitWorkInfo.getToken()); + submitWorkInfo.getToken(), umbilicalResponder); llapClient.init(job); llapClient.start(); @@ -117,7 +132,9 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma LOG.info("Registered id: " + id); - return new LlapRecordReader(socket.getInputStream(), llapSplit.getSchema(), Text.class); + LlapRecordReader recordReader = new LlapRecordReader(socket.getInputStream(), llapSplit.getSchema(), Text.class); + umbilicalResponder.setRecordReader(recordReader); + return recordReader; } @Override @@ -254,4 +271,120 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma containerCredentials.writeTokenStorageToStream(containerTokens_dob); return ByteBuffer.wrap(containerTokens_dob.getData(), 0, containerTokens_dob.getLength()); } + + private static class LlapRecordReaderTaskUmbilicalExternalResponder implements LlapTaskUmbilicalExternalResponder { + protected LlapRecordReader recordReader = null; + protected LinkedBlockingQueue<ReaderEvent> queuedEvents = new LinkedBlockingQueue<ReaderEvent>(); + + public LlapRecordReaderTaskUmbilicalExternalResponder() { + } + + @Override + public void submissionFailed(String fragmentId, Throwable throwable) { + try { + sendOrQueueEvent(LlapRecordReader.ReaderEvent.errorEvent( + "Received submission failed event for fragment ID " + fragmentId)); + } catch (Exception err) { + LOG.error("Error during heartbeat responder:", err); + } + } + + @Override + public void heartbeat(TezHeartbeatRequest request) { + TezTaskAttemptID taskAttemptId = request.getCurrentTaskAttemptID(); + List<TezEvent> inEvents = request.getEvents(); + for (TezEvent tezEvent : ListUtils.emptyIfNull(inEvents)) { + EventType eventType = tezEvent.getEventType(); + try { + switch (eventType) { + case TASK_ATTEMPT_COMPLETED_EVENT: + sendOrQueueEvent(LlapRecordReader.ReaderEvent.doneEvent()); + break; + case TASK_ATTEMPT_FAILED_EVENT: + TaskAttemptFailedEvent taskFailedEvent = (TaskAttemptFailedEvent) tezEvent.getEvent(); + sendOrQueueEvent(LlapRecordReader.ReaderEvent.errorEvent(taskFailedEvent.getDiagnostics())); + break; + case TASK_STATUS_UPDATE_EVENT: + // If we want to handle counters + break; + default: + LOG.warn("Unhandled event type " + eventType); + break; + } + } catch (Exception err) { + LOG.error("Error during heartbeat responder:", err); + } + } + } + + @Override + public void taskKilled(TezTaskAttemptID taskAttemptId) { + try { + sendOrQueueEvent(LlapRecordReader.ReaderEvent.errorEvent( + "Received task killed event for task ID " + taskAttemptId)); + } catch (Exception err) { + LOG.error("Error during heartbeat responder:", err); + } + } + + @Override + public void heartbeatTimeout(String taskAttemptId) { + try { + sendOrQueueEvent(LlapRecordReader.ReaderEvent.errorEvent( + "Timed out waiting for heartbeat for task ID " + taskAttemptId)); + } catch (Exception err) { + LOG.error("Error during heartbeat responder:", err); + } + } + + public synchronized LlapRecordReader getRecordReader() { + return recordReader; + } + + public synchronized void setRecordReader(LlapRecordReader recordReader) { + this.recordReader = recordReader; + + if (recordReader == null) { + return; + } + + // If any events were queued by the responder, give them to the record reader now. + while (!queuedEvents.isEmpty()) { + LlapRecordReader.ReaderEvent readerEvent = queuedEvents.poll(); + LOG.debug("Sending queued event to record reader: " + readerEvent.getEventType()); + recordReader.handleEvent(readerEvent); + } + } + + /** + * Send the ReaderEvents to the record reader, if it is registered to this responder. + * If there is no registered record reader, add them to a list of pending reader events + * since we don't want to drop these events. + * @param readerEvent + */ + protected synchronized void sendOrQueueEvent(LlapRecordReader.ReaderEvent readerEvent) { + LlapRecordReader recordReader = getRecordReader(); + if (recordReader != null) { + recordReader.handleEvent(readerEvent); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("No registered record reader, queueing event " + readerEvent.getEventType() + + " with message " + readerEvent.getMessage()); + } + + try { + queuedEvents.put(readerEvent); + } catch (Exception err) { + throw new RuntimeException("Unexpected exception while queueing reader event", err); + } + } + } + + /** + * Clear the list of queued reader events if we are not interested in sending any pending events to any registering record reader. + */ + public void clearQueuedEvents() { + queuedEvents.clear(); + } + } } http://git-wip-us.apache.org/repos/asf/hive/blob/134f2f74/llap-server/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java b/llap-server/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java index 16cfd01..7d06637 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java @@ -2,12 +2,17 @@ package org.apache.hadoop.hive.llap.ext; import java.io.IOException; import java.net.InetSocketAddress; +import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import com.google.common.base.Preconditions; +import org.apache.commons.collections4.ListUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; @@ -19,16 +24,20 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.ProtocolSignature; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.impl.EventType; import org.apache.tez.runtime.api.impl.TezEvent; import org.apache.tez.runtime.api.impl.TezHeartbeatRequest; import org.apache.tez.runtime.api.impl.TezHeartbeatResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + public class LlapTaskUmbilicalExternalClient extends AbstractService { private static final Logger LOG = LoggerFactory.getLogger(LlapTaskUmbilicalExternalClient.class); @@ -41,20 +50,51 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService { protected final String tokenIdentifier; protected final Token<JobTokenIdentifier> sessionToken; + private final ConcurrentMap<String, PendingEventData> pendingEvents = new ConcurrentHashMap<>(); + private final ConcurrentMap<String, TaskHeartbeatInfo> registeredTasks= new ConcurrentHashMap<String, TaskHeartbeatInfo>(); + private LlapTaskUmbilicalExternalResponder responder = null; + private final ScheduledThreadPoolExecutor timer; + private final long connectionTimeout; + + private static class TaskHeartbeatInfo { + final String taskAttemptId; + final String hostname; + final int port; + final AtomicLong lastHeartbeat = new AtomicLong(); + + public TaskHeartbeatInfo(String taskAttemptId, String hostname, int port) { + this.taskAttemptId = taskAttemptId; + this.hostname = hostname; + this.port = port; + this.lastHeartbeat.set(System.currentTimeMillis()); + } + } - private final ConcurrentMap<String, List<TezEvent>> pendingEvents = new ConcurrentHashMap<>(); + private static class PendingEventData { + final TaskHeartbeatInfo heartbeatInfo; + final List<TezEvent> tezEvents; + public PendingEventData(TaskHeartbeatInfo heartbeatInfo, List<TezEvent> tezEvents) { + this.heartbeatInfo = heartbeatInfo; + this.tezEvents = tezEvents; + } + } // TODO KKK Work out the details of the tokenIdentifier, and the session token. // It may just be possible to create one here - since Shuffle is not involved, and this is only used // for communication from LLAP-Daemons to the server. It will need to be sent in as part // of the job submission request. - public LlapTaskUmbilicalExternalClient(Configuration conf, String tokenIdentifier, Token<JobTokenIdentifier> sessionToken) { + public LlapTaskUmbilicalExternalClient(Configuration conf, String tokenIdentifier, + Token<JobTokenIdentifier> sessionToken, LlapTaskUmbilicalExternalResponder responder) { super(LlapTaskUmbilicalExternalClient.class.getName()); this.conf = conf; this.umbilical = new LlapTaskUmbilicalExternalImpl(); this.tokenIdentifier = tokenIdentifier; this.sessionToken = sessionToken; + this.responder = responder; + this.timer = new ScheduledThreadPoolExecutor(1); + this.connectionTimeout = HiveConf.getTimeVar(conf, + HiveConf.ConfVars.LLAP_DAEMON_AM_LIVENESS_CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS); // TODO. No support for the LLAP token yet. Add support for configurable threads, however 1 should always be enough. this.communicator = new LlapProtocolClientProxy(1, conf, null); this.communicator.init(conf); @@ -71,6 +111,7 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService { @Override public void serviceStop() { llapTaskUmbilicalServer.shutdownServer(); + timer.shutdown(); if (this.communicator != null) { this.communicator.stop(); } @@ -89,7 +130,15 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService { Preconditions.checkArgument(submitWorkRequestProto.getUsingTezAm() == false); // Register the pending events to be sent for this spec. - pendingEvents.putIfAbsent(submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString(), tezEvents); + String fragmentId = submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString(); + PendingEventData pendingEventData = new PendingEventData( + new TaskHeartbeatInfo(fragmentId, llapHost, llapPort), + tezEvents); + pendingEvents.putIfAbsent(fragmentId, pendingEventData); + + // Setup timer task to check for hearbeat timeouts + timer.scheduleAtFixedRate(new HeartbeatCheckTask(), + connectionTimeout, connectionTimeout, TimeUnit.MILLISECONDS); // Send out the actual SubmitWorkRequest communicator.sendSubmitWork(submitWorkRequestProto, llapHost, llapPort, @@ -99,7 +148,12 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService { public void setResponse(LlapDaemonProtocolProtos.SubmitWorkResponseProto response) { if (response.hasSubmissionState()) { if (response.getSubmissionState().equals(LlapDaemonProtocolProtos.SubmissionStateProto.REJECTED)) { - LOG.info("Fragment: " + submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString() + " rejected. Server Busy."); + String msg = "Fragment: " + submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString() + " rejected. Server Busy."; + LOG.info(msg); + if (responder != null) { + Throwable err = new RuntimeException(msg); + responder.submissionFailed(submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString(), err); + } return; } } @@ -107,7 +161,10 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService { @Override public void indicateError(Throwable t) { - LOG.error("Failed to submit: " + submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString(), t); + String msg = "Failed to submit: " + submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString(); + LOG.error(msg, t); + Throwable err = new RuntimeException(msg, t); + responder.submissionFailed(submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString(), err); } }); @@ -130,9 +187,101 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService { } + private void updateHeartbeatInfo(String taskAttemptId) { + int updateCount = 0; + + PendingEventData pendingEventData = pendingEvents.get(taskAttemptId); + if (pendingEventData != null) { + pendingEventData.heartbeatInfo.lastHeartbeat.set(System.currentTimeMillis()); + updateCount++; + } + + TaskHeartbeatInfo heartbeatInfo = registeredTasks.get(taskAttemptId); + if (heartbeatInfo != null) { + heartbeatInfo.lastHeartbeat.set(System.currentTimeMillis()); + updateCount++; + } + + if (updateCount == 0) { + LOG.warn("No tasks found for heartbeat from taskAttemptId " + taskAttemptId); + } + } + + private void updateHeartbeatInfo(String hostname, int port) { + int updateCount = 0; + + for (String key : pendingEvents.keySet()) { + PendingEventData pendingEventData = pendingEvents.get(key); + if (pendingEventData != null) { + if (pendingEventData.heartbeatInfo.hostname.equals(hostname) + && pendingEventData.heartbeatInfo.port == port) { + pendingEventData.heartbeatInfo.lastHeartbeat.set(System.currentTimeMillis()); + updateCount++; + } + } + } + + for (String key : registeredTasks.keySet()) { + TaskHeartbeatInfo heartbeatInfo = registeredTasks.get(key); + if (heartbeatInfo != null) { + if (heartbeatInfo.hostname.equals(hostname) + && heartbeatInfo.port == port) { + heartbeatInfo.lastHeartbeat.set(System.currentTimeMillis()); + updateCount++; + } + } + } + + if (updateCount == 0) { + LOG.info("No tasks found for heartbeat from hostname " + hostname + ", port " + port); + } + } + private class HeartbeatCheckTask implements Runnable { + public void run() { + long currentTime = System.currentTimeMillis(); + List<String> timedOutTasks = new ArrayList<String>(); + + // Check both pending and registered tasks for timeouts + for (String key : pendingEvents.keySet()) { + PendingEventData pendingEventData = pendingEvents.get(key); + if (pendingEventData != null) { + if (currentTime - pendingEventData.heartbeatInfo.lastHeartbeat.get() >= connectionTimeout) { + timedOutTasks.add(key); + } + } + } + for (String timedOutTask : timedOutTasks) { + LOG.info("Pending taskAttemptId " + timedOutTask + " timed out"); + responder.heartbeatTimeout(timedOutTask); + pendingEvents.remove(timedOutTask); + // TODO: Do we need to tell the LLAP daemon we are no longer interested in this task? + } + timedOutTasks.clear(); + for (String key : registeredTasks.keySet()) { + TaskHeartbeatInfo heartbeatInfo = registeredTasks.get(key); + if (heartbeatInfo != null) { + if (currentTime - heartbeatInfo.lastHeartbeat.get() >= connectionTimeout) { + timedOutTasks.add(key); + } + } + } + for (String timedOutTask : timedOutTasks) { + LOG.info("Running taskAttemptId " + timedOutTask + " timed out"); + responder.heartbeatTimeout(timedOutTask); + registeredTasks.remove(timedOutTask); + // TODO: Do we need to tell the LLAP daemon we are no longer interested in this task? + } + } + } + public interface LlapTaskUmbilicalExternalResponder { + void submissionFailed(String fragmentId, Throwable throwable); + void heartbeat(TezHeartbeatRequest request); + void taskKilled(TezTaskAttemptID taskAttemptId); + void heartbeatTimeout(String fragmentId); + } @@ -153,16 +302,35 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService { // This also provides completion information, and a possible notification when task actually starts running (first heartbeat) - // Incoming events can be ignored until the point when shuffle needs to be handled, instead of just scans. - + if (LOG.isDebugEnabled()) { + LOG.debug("Received heartbeat from container, request=" + request); + } + // Incoming events can be ignored until the point when shuffle needs to be handled, instead of just scans. TezHeartbeatResponse response = new TezHeartbeatResponse(); + + response.setLastRequestId(request.getRequestId()); // Assuming TaskAttemptId and FragmentIdentifierString are the same. Verify this. TezTaskAttemptID taskAttemptId = request.getCurrentTaskAttemptID(); + String taskAttemptIdString = taskAttemptId.toString(); + + updateHeartbeatInfo(taskAttemptIdString); - List<TezEvent> tezEvents = pendingEvents.remove(taskAttemptId.toString()); - if (tezEvents == null) { + List<TezEvent> tezEvents = null; + PendingEventData pendingEventData = pendingEvents.remove(taskAttemptIdString); + if (pendingEventData == null) { tezEvents = Collections.emptyList(); + + // If this heartbeat was not from a pending event and it's not in our list of registered tasks, + if (!registeredTasks.containsKey(taskAttemptIdString)) { + LOG.info("Unexpected heartbeat from " + taskAttemptIdString); + response.setShouldDie(); // Do any of the other fields need to be set? + return response; + } + } else { + tezEvents = pendingEventData.tezEvents; + // Tasks removed from the pending list should then be added to the registered list. + registeredTasks.put(taskAttemptIdString, pendingEventData.heartbeatInfo); } response.setLastRequestId(request.getRequestId()); @@ -172,20 +340,63 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService { response.setNextPreRoutedEventId(0); //Irrelevant. See comment above. response.setEvents(tezEvents); - // TODO KKK: Should ideally handle things like Task success notifications. - // Can this somehow be hooked into the LlapTaskCommunicator to make testing easy + List<TezEvent> inEvents = request.getEvents(); + if (LOG.isDebugEnabled()) { + LOG.debug("Heartbeat from " + taskAttemptIdString + + " events: " + (inEvents != null ? inEvents.size() : -1)); + } + for (TezEvent tezEvent : ListUtils.emptyIfNull(inEvents)) { + EventType eventType = tezEvent.getEventType(); + switch (eventType) { + case TASK_ATTEMPT_COMPLETED_EVENT: + LOG.debug("Task completed event for " + taskAttemptIdString); + registeredTasks.remove(taskAttemptIdString); + break; + case TASK_ATTEMPT_FAILED_EVENT: + LOG.debug("Task failed event for " + taskAttemptIdString); + registeredTasks.remove(taskAttemptIdString); + break; + case TASK_STATUS_UPDATE_EVENT: + // If we want to handle counters + LOG.debug("Task update event for " + taskAttemptIdString); + break; + default: + LOG.warn("Unhandled event type " + eventType); + break; + } + } + + // Pass the request on to the responder + try { + if (responder != null) { + responder.heartbeat(request); + } + } catch (Exception err) { + LOG.error("Error during responder execution", err); + } return response; } @Override public void nodeHeartbeat(Text hostname, int port) throws IOException { - // TODO Eventually implement - to handle keep-alive messages from pending work. + updateHeartbeatInfo(hostname.toString(), port); + // No need to propagate to this to the responder } @Override public void taskKilled(TezTaskAttemptID taskAttemptId) throws IOException { - // TODO Eventually implement - to handle preemptions within LLAP daemons. + String taskAttemptIdString = taskAttemptId.toString(); + LOG.error("Task killed - " + taskAttemptIdString); + registeredTasks.remove(taskAttemptIdString); + + try { + if (responder != null) { + responder.taskKilled(taskAttemptId); + } + } catch (Exception err) { + LOG.error("Error during responder execution", err); + } } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/134f2f74/ql/src/java/org/apache/hadoop/hive/llap/LlapRecordReader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapRecordReader.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapRecordReader.java index ce3d39a..30ed9cf 100644 --- a/ql/src/java/org/apache/hadoop/hive/llap/LlapRecordReader.java +++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapRecordReader.java @@ -18,9 +18,11 @@ package org.apache.hadoop.hive.llap; +import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.io.DataInputStream; +import java.util.concurrent.LinkedBlockingQueue; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.io.RCFile.Reader; @@ -33,16 +35,25 @@ import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.hive.metastore.api.Schema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class LlapRecordReader<V extends WritableComparable> implements RecordReader<NullWritable, V> { + private static final Logger LOG = LoggerFactory.getLogger(LlapRecordReader.class); DataInputStream din; Schema schema; Class<V> clazz; + + protected Thread readerThread = null; + protected LinkedBlockingQueue<ReaderEvent> readerEvents = new LinkedBlockingQueue<ReaderEvent>(); + public LlapRecordReader(InputStream in, Schema schema, Class<V> clazz) { din = new DataInputStream(in); this.schema = schema; this.clazz = clazz; + this.readerThread = Thread.currentThread(); } public Schema getSchema() { @@ -75,12 +86,120 @@ public class LlapRecordReader<V extends WritableComparable> implements RecordRea } @Override - public boolean next(NullWritable key, V value) { + public boolean next(NullWritable key, V value) throws IOException { try { + // Need a way to know what thread to interrupt, since this is a blocking thread. + setReaderThread(Thread.currentThread()); + value.readFields(din); return true; - } catch (IOException io) { + } catch (EOFException eof) { + // End of input. There should be a reader event available, or coming soon, so okay to be blocking call. + ReaderEvent event = getReaderEvent(); + switch (event.getEventType()) { + case DONE: + break; + default: + throw new IOException("Expected reader event with done status, but got " + + event.getEventType() + " with message " + event.getMessage()); + } return false; + } catch (IOException io) { + if (Thread.interrupted()) { + // Either we were interrupted by one of: + // 1. handleEvent(), in which case there is a reader event waiting for us in the queue + // 2. Some other unrelated cause which interrupted us, in which case there may not be a reader event coming. + // Either way we should not try to block trying to read the reader events queue. + if (readerEvents.isEmpty()) { + // Case 2. + throw io; + } else { + // Case 1. Fail the reader, sending back the error we received from the reader event. + ReaderEvent event = getReaderEvent(); + switch (event.getEventType()) { + case ERROR: + throw new IOException("Received reader event error: " + event.getMessage()); + default: + throw new IOException("Got reader event type " + event.getEventType() + ", expected error event"); + } + } + } else { + // If we weren't interrupted, just propagate the error + throw io; + } + } + } + + /** + * Define success/error events which are passed to the reader from a different thread. + * The reader will check for these events on end of input and interruption of the reader thread. + */ + public static class ReaderEvent { + public enum EventType { + DONE, + ERROR + } + + protected final EventType eventType; + protected final String message; + + protected ReaderEvent(EventType type, String message) { + this.eventType = type; + this.message = message; + } + + public static ReaderEvent doneEvent() { + return new ReaderEvent(EventType.DONE, ""); + } + + public static ReaderEvent errorEvent(String message) { + return new ReaderEvent(EventType.ERROR, message); } + + public EventType getEventType() { + return eventType; + } + + public String getMessage() { + return message; + } + } + + public void handleEvent(ReaderEvent event) { + switch (event.getEventType()) { + case DONE: + // Reader will check for the event queue upon the end of the input stream - no need to interrupt. + readerEvents.add(event); + break; + case ERROR: + readerEvents.add(event); + if (readerThread == null) { + throw new RuntimeException("Reader thread is unexpectedly null, during ReaderEvent error " + event.getMessage()); + } + // Reader is using a blocking socket .. interrupt it. + if (LOG.isDebugEnabled()) { + LOG.debug("Interrupting reader thread due to reader event with error " + event.getMessage()); + } + getReaderThread().interrupt(); + default: + throw new RuntimeException("Unhandled ReaderEvent type " + event.getEventType() + " with message " + event.getMessage()); + } + } + + protected ReaderEvent getReaderEvent() { + try { + ReaderEvent event = readerEvents.take(); + return event; + } catch (InterruptedException ie) { + throw new RuntimeException("Interrupted while getting readerEvents, not expected", ie); + } + } + + protected synchronized void setReaderThread(Thread readerThread) { + this.readerThread = readerThread; + } + + protected synchronized Thread getReaderThread() { + return readerThread; } }