This is an automated email from the ASF dual-hosted git repository.

hashutosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 781b7fc  HIVE-23449 : LLAP: Reduce mkdir and config creations in 
submitWork hotpath (Rajesh Balamohan via Ashutosh Chauhan)
781b7fc is described below

commit 781b7fc3e450f5a15e1afa2096189884b772b115
Author: Rajesh Balamohan <rbalamo...@apache.org>
AuthorDate: Sat May 16 09:12:49 2020 -0700

    HIVE-23449 : LLAP: Reduce mkdir and config creations in submitWork hotpath 
(Rajesh Balamohan via Ashutosh Chauhan)
    
    Signed-off-by: Ashutosh Chauhan <hashut...@apache.org>
---
 .../hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java   | 13 +++++--------
 .../apache/hadoop/hive/llap/daemon/impl/QueryTracker.java   |  4 ++--
 .../hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java    | 13 ++++++++-----
 .../hadoop/hive/llap/shufflehandler/ShuffleHandler.java     |  4 ++++
 .../hive/llap/daemon/impl/TaskExecutorTestHelpers.java      |  3 ++-
 .../hive/llap/daemon/impl/TestContainerRunnerImpl.java      |  6 ++++--
 6 files changed, 25 insertions(+), 18 deletions(-)

diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index 6a13b55..9c73747 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.UgiFactory;
@@ -271,23 +272,19 @@ public class ContainerRunnerImpl extends CompositeService 
implements ContainerRu
           vertex.getVertexName(), request.getFragmentNumber(), 
request.getAttemptNumber(),
           vertex.getUser(), vertex, jobToken, fragmentIdString, tokenInfo, 
amNodeId);
 
-      String[] localDirs = fragmentInfo.getLocalDirs();
-      Preconditions.checkNotNull(localDirs);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Dirs are: " + Arrays.toString(localDirs));
-      }
       // May need to setup localDir for re-localization, which is usually 
setup as Environment.PWD.
       // Used for re-localization, to add the user specified configuration 
(conf_pb_binary_stream)
 
-      Configuration callableConf = new Configuration(getConfig());
+      // Lazy create conf object, as it gets expensive in this codepath.
+      Supplier<Configuration> callableConf = () -> new 
Configuration(getConfig());
       UserGroupInformation fsTaskUgi = fsUgiFactory == null ? null : 
fsUgiFactory.createUgi();
       boolean isGuaranteed = request.hasIsGuaranteed() && 
request.getIsGuaranteed();
 
       // enable the printing of (per daemon) LLAP task queue/run times via 
LLAP_TASK_TIME_SUMMARY
       ConfVars tezSummary = ConfVars.TEZ_EXEC_SUMMARY;
       ConfVars llapTasks = ConfVars.LLAP_TASK_TIME_SUMMARY;
-      boolean addTaskTimes = callableConf.getBoolean(tezSummary.varname, 
tezSummary.defaultBoolVal)
-                             && callableConf.getBoolean(llapTasks.varname, 
llapTasks.defaultBoolVal);
+      boolean addTaskTimes = getConfig().getBoolean(tezSummary.varname, 
tezSummary.defaultBoolVal)
+                             && getConfig().getBoolean(llapTasks.varname, 
llapTasks.defaultBoolVal);
 
       final String llapHost;
       if (UserGroupInformation.isSecurityEnabled()) {
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
index eae8e08..bf4eea0 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
@@ -211,9 +211,9 @@ public class QueryTracker extends AbstractService {
         LOG.debug("Registering request for {} with the ShuffleHandler", 
queryIdentifier);
       }
       if (!vertex.getIsExternalSubmission()) {
+        String[] localDirs = (ShuffleHandler.get().isDirWatcherEnabled()) ? 
queryInfo.getLocalDirs() : null;
         ShuffleHandler.get()
-            .registerDag(appIdString, dagIdentifier, appToken,
-                user, queryInfo.getLocalDirs());
+            .registerDag(appIdString, dagIdentifier, appToken, user, 
localDirs);
       }
 
       return queryInfo.registerFragment(
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index 3619252..bc26dc0 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@ -86,6 +86,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
 
 /**
  *
@@ -93,7 +94,7 @@ import java.util.concurrent.atomic.AtomicLong;
 public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
   private static final Logger LOG = 
LoggerFactory.getLogger(TaskRunnerCallable.class);
   private final SubmitWorkRequestProto request;
-  private final Configuration conf;
+  private final Supplier<Configuration> conf;
   private final Map<String, String> envMap;
   private final String pid = null;
   private final ObjectRegistryImpl objectRegistry;
@@ -135,8 +136,9 @@ public class TaskRunnerCallable extends 
CallableWithNdc<TaskRunner2Result> {
 
   @VisibleForTesting
   public TaskRunnerCallable(SubmitWorkRequestProto request, QueryFragmentInfo 
fragmentInfo,
-                            Configuration conf, ExecutionContext 
executionContext, Map<String, String> envMap,
-                            Credentials credentials, long memoryAvailable, 
AMReporter amReporter, ConfParams confParams,
+                            Supplier<Configuration> conf, ExecutionContext 
executionContext,
+                            Map<String, String> envMap, Credentials 
credentials, long memoryAvailable,
+                            AMReporter amReporter, ConfParams confParams,
                             LlapDaemonExecutorMetrics metrics, 
KilledTaskHandler killedTaskHandler,
                             FragmentCompletionHandler fragmentCompleteHandler, 
HadoopShim tezHadoopShim,
                             TezTaskAttemptID attemptId, SignableVertexSpec 
vertex, TezEvent initialEvent,
@@ -192,6 +194,7 @@ public class TaskRunnerCallable extends 
CallableWithNdc<TaskRunner2Result> {
     setMDCFromNDC();
 
     try {
+      final Configuration config = conf.get();
       isStarted.set(true);
       this.startTime = System.currentTimeMillis();
       threadName = Thread.currentThread().getName();
@@ -254,7 +257,7 @@ public class TaskRunnerCallable extends 
CallableWithNdc<TaskRunner2Result> {
         @Override
         public LlapTaskUmbilicalProtocol run() throws Exception {
           return RPC.getProxy(LlapTaskUmbilicalProtocol.class,
-              LlapTaskUmbilicalProtocol.versionID, address, taskOwner, conf, 
socketFactory);
+              LlapTaskUmbilicalProtocol.versionID, address, taskOwner, config, 
socketFactory);
         }
       });
 
@@ -277,7 +280,7 @@ public class TaskRunnerCallable extends 
CallableWithNdc<TaskRunner2Result> {
       try {
         synchronized (this) {
           if (shouldRunTask) {
-            taskRunner = new TezTaskRunner2(conf, fsTaskUgi, 
fragmentInfo.getLocalDirs(),
+            taskRunner = new TezTaskRunner2(config, fsTaskUgi, 
fragmentInfo.getLocalDirs(),
                 taskSpec, vertex.getQueryIdentifier().getAppAttemptNumber(),
                 serviceConsumerMetadata, envMap, startedInputsMap, 
taskReporter, executor,
                 objectRegistry, pid, executionContext, memoryAvailable, false, 
tezHadoopShim);
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java
index aff2c2e..9294fb3 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java
@@ -431,6 +431,10 @@ public class ShuffleHandler implements 
AttemptRegistrationListener {
     return port;
   }
 
+  public boolean isDirWatcherEnabled() {
+    return dirWatcher != null;
+  }
+
   /**
    * Register an application and it's associated credentials and user 
information.
    *
diff --git 
a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
 
b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
index 50dec47..af3f292 100644
--- 
a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
+++ 
b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
@@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
+import com.google.common.base.Supplier;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.llap.LlapNodeId;
 import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler;
@@ -212,7 +213,7 @@ public class TaskExecutorTestHelpers {
     public MockRequest(SubmitWorkRequestProto requestProto, QueryFragmentInfo 
fragmentInfo,
                        boolean canFinish, boolean canFinishQueue, long 
workTime,
                        TezEvent initialEvent, boolean isGuaranteed) {
-      super(requestProto, fragmentInfo, new Configuration(), new 
ExecutionContextImpl("localhost"),
+      super(requestProto, fragmentInfo, Configuration::new, new 
ExecutionContextImpl("localhost"),
           null, new Credentials(), 0, mock(AMReporter.class), null, mock(
           LlapDaemonExecutorMetrics.class), mock(KilledTaskHandler.class), 
mock(
           FragmentCompletionHandler.class), new DefaultHadoopShim(), null,
diff --git 
a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestContainerRunnerImpl.java
 
b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestContainerRunnerImpl.java
index 8ae00b9..93ca9f2 100644
--- 
a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestContainerRunnerImpl.java
+++ 
b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestContainerRunnerImpl.java
@@ -174,7 +174,9 @@ public class TestContainerRunnerImpl {
     containerRunner.submitWork(sRequest);
     Assert.assertEquals(ShuffleHandler.get().getRegisteredApps().size(), 1);
     Assert.assertEquals(ShuffleHandler.get().getRegisteredApps().get(appId), 
dagId);
-    
Assert.assertEquals(ShuffleHandler.get().getRegisteredDirectories().size(), 1);
-    
Assert.assertEquals(ShuffleHandler.get().getRegisteredDirectories().get(appId), 
dagId);
+    if (ShuffleHandler.get().isDirWatcherEnabled()) {
+      
Assert.assertEquals(ShuffleHandler.get().getRegisteredDirectories().size(), 1);
+      
Assert.assertEquals(ShuffleHandler.get().getRegisteredDirectories().get(appId), 
dagId);
+    }
   }
 }

Reply via email to