Author: sseth Date: Tue Apr 21 03:38:16 2015 New Revision: 1675025 URL: http://svn.apache.org/r1675025 Log: LLAP: Avoid fetching data multiple times in case of broadcast. (Siddharth Seth)
Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java?rev=1675025&r1=1675024&r2=1675025&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java Tue Apr 21 03:38:16 2015 @@ -33,6 +33,8 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.hive.common.CallableWithNdc; import org.apache.hadoop.hive.llap.daemon.HistoryLogger; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics; import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol; @@ -55,6 +57,7 @@ import org.apache.tez.runtime.api.impl.I import org.apache.tez.runtime.api.impl.TaskSpec; import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl; import org.apache.tez.runtime.internals.api.TaskReporterInterface; +import org.apache.tez.runtime.library.input.UnorderedKVInput; import org.apache.tez.runtime.task.TezChild; import org.apache.tez.runtime.task.TezTaskRunner; @@ -150,7 +153,7 @@ public class TaskRunnerCallable extends Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<>(); serviceConsumerMetadata.put(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID, TezCommonUtils.convertJobTokenToBytes(jobToken)); - Multimap<String, String> startedInputsMap = HashMultimap.create(); + Multimap<String, String> startedInputsMap = createStartedInputMap(request.getFragmentSpec()); UserGroupInformation taskOwner = UserGroupInformation.createRemoteUser(request.getTokenIdentifier()); @@ -187,7 +190,7 @@ public class TaskRunnerCallable extends if (shouldDie) { LOG.info("Got a shouldDie notification via heartbeats. Shutting down"); return new TezChild.ContainerExecutionResult( - TezChild.ContainerExecutionResult.ExitStatus.SUCCESS, null, + TezChild.ContainerExecutionResult.ExitStatus.ASKED_TO_DIE, null, "Asked to die by the AM"); } } catch (IOException e) { @@ -252,6 +255,20 @@ public class TaskRunnerCallable extends return !inputClassName.equals(MRInputLegacy.class.getName()); } + private Multimap<String, String> createStartedInputMap(FragmentSpecProto fragmentSpec) { + Multimap<String, String> startedInputMap = HashMultimap.create(); + // Let the Processor control start for Broadcast inputs. + + // TODO For now, this affects non broadcast unsorted cases as well. Make use of the edge + // property when it's available. + for (IOSpecProto inputSpec : fragmentSpec.getInputSpecsList()) { + if (inputSpec.getIoDescriptor().getClassName().equals(UnorderedKVInput.class.getName())) { + startedInputMap.put(fragmentSpec.getVertexName(), inputSpec.getConnectedVertexName()); + } + } + return startedInputMap; + } + public void shutdown() { if (executor != null) { executor.shutdownNow();