This is an automated email from the ASF dual-hosted git repository. hashutosh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 781b7fc HIVE-23449 : LLAP: Reduce mkdir and config creations in submitWork hotpath (Rajesh Balamohan via Ashutosh Chauhan) 781b7fc is described below commit 781b7fc3e450f5a15e1afa2096189884b772b115 Author: Rajesh Balamohan <rbalamo...@apache.org> AuthorDate: Sat May 16 09:12:49 2020 -0700 HIVE-23449 : LLAP: Reduce mkdir and config creations in submitWork hotpath (Rajesh Balamohan via Ashutosh Chauhan) Signed-off-by: Ashutosh Chauhan <hashut...@apache.org> --- .../hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java | 13 +++++-------- .../apache/hadoop/hive/llap/daemon/impl/QueryTracker.java | 4 ++-- .../hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java | 13 ++++++++----- .../hadoop/hive/llap/shufflehandler/ShuffleHandler.java | 4 ++++ .../hive/llap/daemon/impl/TaskExecutorTestHelpers.java | 3 ++- .../hive/llap/daemon/impl/TestContainerRunnerImpl.java | 6 ++++-- 6 files changed, 25 insertions(+), 18 deletions(-) diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index 6a13b55..9c73747 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.UgiFactory; @@ -271,23 +272,19 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu vertex.getVertexName(), request.getFragmentNumber(), request.getAttemptNumber(), vertex.getUser(), vertex, jobToken, fragmentIdString, tokenInfo, amNodeId); - String[] localDirs = fragmentInfo.getLocalDirs(); - Preconditions.checkNotNull(localDirs); - if (LOG.isDebugEnabled()) { - LOG.debug("Dirs are: " + Arrays.toString(localDirs)); - } // May need to setup localDir for re-localization, which is usually setup as Environment.PWD. // Used for re-localization, to add the user specified configuration (conf_pb_binary_stream) - Configuration callableConf = new Configuration(getConfig()); + // Lazy create conf object, as it gets expensive in this codepath. + Supplier<Configuration> callableConf = () -> new Configuration(getConfig()); UserGroupInformation fsTaskUgi = fsUgiFactory == null ? null : fsUgiFactory.createUgi(); boolean isGuaranteed = request.hasIsGuaranteed() && request.getIsGuaranteed(); // enable the printing of (per daemon) LLAP task queue/run times via LLAP_TASK_TIME_SUMMARY ConfVars tezSummary = ConfVars.TEZ_EXEC_SUMMARY; ConfVars llapTasks = ConfVars.LLAP_TASK_TIME_SUMMARY; - boolean addTaskTimes = callableConf.getBoolean(tezSummary.varname, tezSummary.defaultBoolVal) - && callableConf.getBoolean(llapTasks.varname, llapTasks.defaultBoolVal); + boolean addTaskTimes = getConfig().getBoolean(tezSummary.varname, tezSummary.defaultBoolVal) + && getConfig().getBoolean(llapTasks.varname, llapTasks.defaultBoolVal); final String llapHost; if (UserGroupInformation.isSecurityEnabled()) { diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java index eae8e08..bf4eea0 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java @@ -211,9 +211,9 @@ public class QueryTracker extends AbstractService { LOG.debug("Registering request for {} with the ShuffleHandler", queryIdentifier); } if (!vertex.getIsExternalSubmission()) { + String[] localDirs = (ShuffleHandler.get().isDirWatcherEnabled()) ? queryInfo.getLocalDirs() : null; ShuffleHandler.get() - .registerDag(appIdString, dagIdentifier, appToken, - user, queryInfo.getLocalDirs()); + .registerDag(appIdString, dagIdentifier, appToken, user, localDirs); } return queryInfo.registerFragment( diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index 3619252..bc26dc0 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -86,6 +86,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; /** * @@ -93,7 +94,7 @@ import java.util.concurrent.atomic.AtomicLong; public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { private static final Logger LOG = LoggerFactory.getLogger(TaskRunnerCallable.class); private final SubmitWorkRequestProto request; - private final Configuration conf; + private final Supplier<Configuration> conf; private final Map<String, String> envMap; private final String pid = null; private final ObjectRegistryImpl objectRegistry; @@ -135,8 +136,9 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { @VisibleForTesting public TaskRunnerCallable(SubmitWorkRequestProto request, QueryFragmentInfo fragmentInfo, - Configuration conf, ExecutionContext executionContext, Map<String, String> envMap, - Credentials credentials, long memoryAvailable, AMReporter amReporter, ConfParams confParams, + Supplier<Configuration> conf, ExecutionContext executionContext, + Map<String, String> envMap, Credentials credentials, long memoryAvailable, + AMReporter amReporter, ConfParams confParams, LlapDaemonExecutorMetrics metrics, KilledTaskHandler killedTaskHandler, FragmentCompletionHandler fragmentCompleteHandler, HadoopShim tezHadoopShim, TezTaskAttemptID attemptId, SignableVertexSpec vertex, TezEvent initialEvent, @@ -192,6 +194,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { setMDCFromNDC(); try { + final Configuration config = conf.get(); isStarted.set(true); this.startTime = System.currentTimeMillis(); threadName = Thread.currentThread().getName(); @@ -254,7 +257,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { @Override public LlapTaskUmbilicalProtocol run() throws Exception { return RPC.getProxy(LlapTaskUmbilicalProtocol.class, - LlapTaskUmbilicalProtocol.versionID, address, taskOwner, conf, socketFactory); + LlapTaskUmbilicalProtocol.versionID, address, taskOwner, config, socketFactory); } }); @@ -277,7 +280,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { try { synchronized (this) { if (shouldRunTask) { - taskRunner = new TezTaskRunner2(conf, fsTaskUgi, fragmentInfo.getLocalDirs(), + taskRunner = new TezTaskRunner2(config, fsTaskUgi, fragmentInfo.getLocalDirs(), taskSpec, vertex.getQueryIdentifier().getAppAttemptNumber(), serviceConsumerMetadata, envMap, startedInputsMap, taskReporter, executor, objectRegistry, pid, executionContext, memoryAvailable, false, tezHadoopShim); diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java index aff2c2e..9294fb3 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java @@ -431,6 +431,10 @@ public class ShuffleHandler implements AttemptRegistrationListener { return port; } + public boolean isDirWatcherEnabled() { + return dirWatcher != null; + } + /** * Register an application and it's associated credentials and user information. * diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java index 50dec47..af3f292 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java @@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; +import com.google.common.base.Supplier; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.llap.LlapNodeId; import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler; @@ -212,7 +213,7 @@ public class TaskExecutorTestHelpers { public MockRequest(SubmitWorkRequestProto requestProto, QueryFragmentInfo fragmentInfo, boolean canFinish, boolean canFinishQueue, long workTime, TezEvent initialEvent, boolean isGuaranteed) { - super(requestProto, fragmentInfo, new Configuration(), new ExecutionContextImpl("localhost"), + super(requestProto, fragmentInfo, Configuration::new, new ExecutionContextImpl("localhost"), null, new Credentials(), 0, mock(AMReporter.class), null, mock( LlapDaemonExecutorMetrics.class), mock(KilledTaskHandler.class), mock( FragmentCompletionHandler.class), new DefaultHadoopShim(), null, diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestContainerRunnerImpl.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestContainerRunnerImpl.java index 8ae00b9..93ca9f2 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestContainerRunnerImpl.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestContainerRunnerImpl.java @@ -174,7 +174,9 @@ public class TestContainerRunnerImpl { containerRunner.submitWork(sRequest); Assert.assertEquals(ShuffleHandler.get().getRegisteredApps().size(), 1); Assert.assertEquals(ShuffleHandler.get().getRegisteredApps().get(appId), dagId); - Assert.assertEquals(ShuffleHandler.get().getRegisteredDirectories().size(), 1); - Assert.assertEquals(ShuffleHandler.get().getRegisteredDirectories().get(appId), dagId); + if (ShuffleHandler.get().isDirWatcherEnabled()) { + Assert.assertEquals(ShuffleHandler.get().getRegisteredDirectories().size(), 1); + Assert.assertEquals(ShuffleHandler.get().getRegisteredDirectories().get(appId), dagId); + } } }