This is an automated email from the ASF dual-hosted git repository.

abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git


The following commit(s) were added to refs/heads/master by this push:
     new a9c5365a5 TEZ-4344: Collect jstack periodically from all containers. 
(#299) (Ayush Saxena reviewed by Laszlo Bodor)
a9c5365a5 is described below

commit a9c5365a5b03b3f343657d69eef8af915d6ea4f0
Author: Ayush Saxena <ayushsax...@apache.org>
AuthorDate: Tue Jul 18 11:39:29 2023 +0530

    TEZ-4344: Collect jstack periodically from all containers. (#299) (Ayush 
Saxena reviewed by Laszlo Bodor)
---
 .../org/apache/tez/dag/api/TezConfiguration.java   |   8 +
 .../java/org/apache/tez/dag/app/DAGAppMaster.java  |   8 +
 .../apache/tez/runtime/TezThreadDumpHelper.java    | 195 +++++++++++++++++++++
 .../java/org/apache/tez/runtime/task/TezChild.java |  11 +-
 .../test/java/org/apache/tez/test/TestTezJobs.java |  48 ++++-
 5 files changed, 267 insertions(+), 3 deletions(-)

diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java 
b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 5842067ff..9e2e2d89c 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -2296,4 +2296,12 @@ public class TezConfiguration extends Configuration {
   @ConfigurationProperty
   public static final String TEZ_MRREADER_CONFIG_UPDATE_PROPERTIES = 
"tez.mrreader.config.update.properties";
 
+  /**
+   *  Frequency at which thread dump should be captured. Supports TimeUnits.
+   */
+  @ConfigurationScope(Scope.DAG)
+  @ConfigurationProperty
+  public static final String TEZ_THREAD_DUMP_INTERVAL = 
"tez.thread.dump.interval";
+  public static final String TEZ_THREAD_DUMP_INTERVAL_DEFAULT = "0ms";
+
 }
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 3c99b1afd..16d0a4dbd 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -185,6 +185,7 @@ import org.apache.tez.dag.utils.RelocalizationUtils;
 import org.apache.tez.dag.utils.Simple2LevelVersionComparator;
 import org.apache.tez.hadoop.shim.HadoopShim;
 import org.apache.tez.hadoop.shim.HadoopShimsLoader;
+import org.apache.tez.runtime.TezThreadDumpHelper;
 import org.apache.tez.util.LoggingUtils;
 import org.apache.tez.util.TezMxBeanResourceCalculator;
 import org.codehaus.jettison.json.JSONException;
@@ -340,6 +341,7 @@ public class DAGAppMaster extends AbstractService {
   Map<Service, ServiceWithDependency> services =
       new LinkedHashMap<Service, ServiceWithDependency>();
   private ThreadLocalMap mdcContext;
+  private TezThreadDumpHelper tezThreadDumpHelper = 
TezThreadDumpHelper.NOOP_TEZ_THREAD_DUMP_HELPER;
 
   public DAGAppMaster(ApplicationAttemptId applicationAttemptId,
       ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
@@ -766,6 +768,7 @@ public class DAGAppMaster extends AbstractService {
           "DAGAppMaster Internal Error occurred");
       break;
     case DAG_FINISHED:
+      tezThreadDumpHelper.stop();
       DAGAppMasterEventDAGFinished finishEvt =
           (DAGAppMasterEventDAGFinished) event;
       String timeStamp = new SimpleDateFormat("yyyy-MM-dd 
HH:mm:ss").format(Calendar.getInstance().getTime());
@@ -2201,6 +2204,9 @@ public class DAGAppMaster extends AbstractService {
       execService.shutdownNow();
     }
 
+    // Check if the thread dump service is up in any case, if yes attempt a 
shutdown
+    tezThreadDumpHelper.stop();
+
     super.serviceStop();
   }
 
@@ -2577,6 +2583,8 @@ public class DAGAppMaster extends AbstractService {
   private void startDAGExecution(DAG dag, final Map<String, LocalResource> 
additionalAmResources)
       throws TezException {
     currentDAG = dag;
+    tezThreadDumpHelper = 
TezThreadDumpHelper.getInstance(dag.getConf()).start(dag.getID().toString());
+
     // Try localizing the actual resources.
     List<URL> additionalUrlsForClasspath;
     try {
diff --git 
a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java
 
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java
new file mode 100644
index 000000000..6f3e9fec1
--- /dev/null
+++ 
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Appender;
+import org.apache.tez.common.TezContainerLogAppender;
+import org.apache.tez.dag.api.TezConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR;
+import static 
org.apache.hadoop.yarn.conf.YarnConfiguration.NM_REMOTE_APP_LOG_DIR;
+import static org.apache.tez.dag.api.TezConfiguration.TEZ_THREAD_DUMP_INTERVAL;
+import static 
org.apache.tez.dag.api.TezConfiguration.TEZ_THREAD_DUMP_INTERVAL_DEFAULT;
+
+public class TezThreadDumpHelper {
+
+  public static final NoopTezThreadDumpHelper NOOP_TEZ_THREAD_DUMP_HELPER = 
new NoopTezThreadDumpHelper();
+  private long duration = 0L;
+  private Path basePath = null;
+  private FileSystem fs = null;
+
+  private static final ThreadMXBean THREAD_BEAN = 
ManagementFactory.getThreadMXBean();
+  private static final Logger LOG = 
LoggerFactory.getLogger(TezThreadDumpHelper.class);
+
+  private ScheduledExecutorService periodicThreadDumpServiceExecutor;
+
+  private TezThreadDumpHelper(long duration, Configuration conf) throws 
IOException {
+    this.duration = duration;
+    Appender appender = 
org.apache.log4j.Logger.getRootLogger().getAppender(TezConstants.TEZ_CONTAINER_LOGGER_NAME);
+    if (appender instanceof TezContainerLogAppender) {
+      this.basePath = new Path(((TezContainerLogAppender) 
appender).getContainerLogDir());
+      this.fs = FileSystem.getLocal(conf);
+    } else {
+      // Fallback, if it is any other appender or if none is configured.
+      this.basePath = new Path(conf.get(NM_REMOTE_APP_LOG_DIR, 
DEFAULT_NM_REMOTE_APP_LOG_DIR));
+      this.fs = this.basePath.getFileSystem(conf);
+    }
+    LOG.info("Periodic Thread Dump Capture Service Configured to capture 
Thread Dumps at {} ms frequency and at " +
+        "path: {}", duration, basePath);
+  }
+
+  public TezThreadDumpHelper() {
+  }
+
+  public static TezThreadDumpHelper getInstance(Configuration conf) {
+    long periodicThreadDumpFrequency =
+        conf.getTimeDuration(TEZ_THREAD_DUMP_INTERVAL, 
TEZ_THREAD_DUMP_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS);
+
+    if (periodicThreadDumpFrequency > 0) {
+      try {
+        return new TezThreadDumpHelper(periodicThreadDumpFrequency, conf);
+      } catch (IOException e) {
+        LOG.warn("Can not initialize periodic thread dump service", e);
+      }
+    }
+    return NOOP_TEZ_THREAD_DUMP_HELPER;
+  }
+
+  public TezThreadDumpHelper start(String name) {
+    periodicThreadDumpServiceExecutor = Executors.newScheduledThreadPool(1,
+        new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat("PeriodicThreadDumpService{"
 + name + "} #%d")
+            .build());
+    Runnable threadDumpCollector = new ThreadDumpCollector(basePath, name, fs);
+    periodicThreadDumpServiceExecutor.schedule(threadDumpCollector, duration, 
TimeUnit.MILLISECONDS);
+    return this;
+  }
+
+  public void stop() {
+    if (periodicThreadDumpServiceExecutor != null) {
+      periodicThreadDumpServiceExecutor.shutdown();
+
+      try {
+        if (!periodicThreadDumpServiceExecutor.awaitTermination(100, 
TimeUnit.MILLISECONDS)) {
+          periodicThreadDumpServiceExecutor.shutdownNow();
+        }
+      } catch (InterruptedException ignored) {
+        // Ignore interrupt, will attempt a final shutdown below.
+      }
+      periodicThreadDumpServiceExecutor.shutdownNow();
+      periodicThreadDumpServiceExecutor = null;
+    }
+  }
+
+  private static class ThreadDumpCollector implements Runnable {
+
+    private final Path path;
+    private final String name;
+    private final FileSystem fs;
+
+    ThreadDumpCollector(Path path, String name, FileSystem fs) {
+      this.path = path;
+      this.fs = fs;
+      this.name = name;
+    }
+
+    @Override
+    public void run() {
+      if (!Thread.interrupted()) {
+        try (FSDataOutputStream fsStream = fs.create(
+            new Path(path, name + "_" + System.currentTimeMillis() + 
".jstack"));
+            PrintStream printStream = new PrintStream(fsStream, false, 
"UTF8")) {
+          printThreadInfo(printStream, name);
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    }
+
+    public synchronized void printThreadInfo(PrintStream stream, String title) 
{
+      boolean contention = THREAD_BEAN.isThreadContentionMonitoringEnabled();
+      long[] threadIds = THREAD_BEAN.getAllThreadIds();
+      stream.println("Process Thread Dump: " + title);
+      stream.println(threadIds.length + " active threads");
+      for (long tid : threadIds) {
+        ThreadInfo info = THREAD_BEAN.getThreadInfo(tid, Integer.MAX_VALUE);
+        if (info == null) {
+          stream.println("  Inactive");
+          continue;
+        }
+        stream.println("Thread " + getTaskName(info.getThreadId(), 
info.getThreadName()) + ":");
+        Thread.State state = info.getThreadState();
+        stream.println("  State: " + state);
+        stream.println("  Blocked count: " + info.getBlockedCount());
+        stream.println("  Waited count: " + info.getWaitedCount());
+        if (contention) {
+          stream.println("  Blocked time: " + info.getBlockedTime());
+          stream.println("  Waited time: " + info.getWaitedTime());
+        }
+        if (state == Thread.State.WAITING) {
+          stream.println("  Waiting on " + info.getLockName());
+        } else if (state == Thread.State.BLOCKED) {
+          stream.println("  Blocked on " + info.getLockName());
+          stream.println("  Blocked by " + getTaskName(info.getLockOwnerId(), 
info.getLockOwnerName()));
+        }
+        stream.println("  Stack:");
+        for (StackTraceElement frame : info.getStackTrace()) {
+          stream.println("    " + frame.toString());
+        }
+      }
+      stream.flush();
+    }
+
+    private String getTaskName(long id, String taskName) {
+      if (taskName == null) {
+        return Long.toString(id);
+      }
+      return id + " (" + taskName + ")";
+    }
+  }
+
+  private static class NoopTezThreadDumpHelper extends TezThreadDumpHelper {
+
+    @Override
+    public TezThreadDumpHelper start(String name) {
+      // Do Nothing
+      return this;
+    }
+
+    @Override
+    public void stop() {
+      // Do Nothing
+    }
+  }
+}
diff --git 
a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java 
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
index 7ab532ad3..3145f21a5 100644
--- 
a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
+++ 
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
@@ -69,6 +69,7 @@ import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.dag.utils.RelocalizationUtils;
 import org.apache.tez.hadoop.shim.HadoopShim;
 import org.apache.tez.hadoop.shim.HadoopShimsLoader;
+import org.apache.tez.runtime.TezThreadDumpHelper;
 import org.apache.tez.runtime.api.ExecutionContext;
 import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
 import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl;
@@ -119,6 +120,7 @@ public class TezChild {
   private final AtomicBoolean isShutdown = new AtomicBoolean(false);
   private final String user;
   private final boolean updateSysCounters;
+  private TezThreadDumpHelper tezThreadDumpHelper = 
TezThreadDumpHelper.NOOP_TEZ_THREAD_DUMP_HELPER;
 
   private Multimap<String, String> startedInputsMap = HashMultimap.create();
   private final boolean ownUmbilical;
@@ -178,7 +180,7 @@ public class TezChild {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Executing with tokens:");
       for (Token<?> token : credentials.getAllTokens()) {
-        LOG.debug("",token);
+        LOG.debug("{}", token);
       }
     }
 
@@ -248,13 +250,15 @@ public class TezChild {
       }
 
       TezTaskAttemptID attemptId = 
containerTask.getTaskSpec().getTaskAttemptID();
+      Configuration taskConf;
       if (containerTask.getTaskSpec().getTaskConf() != null) {
         Configuration copy = new Configuration(defaultConf);
         TezTaskRunner2.mergeTaskSpecConfToConf(containerTask.getTaskSpec(), 
copy);
-
+        taskConf = copy;
         LoggingUtils.initLoggingContext(mdcContext, copy,
             attemptId.getTaskID().getVertexID().getDAGID().toString(), 
attemptId.toString());
       } else {
+        taskConf = defaultConf;
         LoggingUtils.initLoggingContext(mdcContext, defaultConf,
             attemptId.getTaskID().getVertexID().getDAGID().toString(), 
attemptId.toString());
       }
@@ -292,6 +296,7 @@ public class TezChild {
             hadoopShim, sharedExecutor);
 
         boolean shouldDie;
+        tezThreadDumpHelper = 
TezThreadDumpHelper.getInstance(taskConf).start(attemptId.toString());
         try {
           TaskRunner2Result result = taskRunner.run();
           LOG.info("TaskRunner2Result: {}", result);
@@ -310,6 +315,7 @@ public class TezChild {
                 e, "TaskExecutionFailure: " + e.getMessage());
           }
         } finally {
+          tezThreadDumpHelper.stop();
           FileSystem.closeAllForUGI(childUGI);
         }
       }
@@ -425,6 +431,7 @@ public class TezChild {
         RPC.stopProxy(umbilical);
       }
     }
+
     TezRuntimeShutdownHandler.shutdown();
     LOG.info("TezChild shutdown finished");
   }
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java 
b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
index fd8f08b42..892629f29 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
@@ -18,6 +18,8 @@
 
 package org.apache.tez.test;
 
+import static org.apache.tez.dag.api.TezConfiguration.TEZ_THREAD_DUMP_INTERVAL;
+import static org.apache.tez.dag.api.TezConstants.TEZ_CONTAINER_LOGGER_NAME;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -40,6 +42,8 @@ import java.util.Set;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.tez.common.Preconditions;
 import com.google.common.collect.Lists;
 
@@ -47,6 +51,7 @@ import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.tez.common.TezContainerLogAppender;
 import org.apache.tez.common.counters.CounterGroup;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
@@ -533,8 +538,26 @@ public class TestTezJobs {
 
   @Test(timeout = 60000)
   public void testSortMergeJoinExampleDisableSplitGrouping() throws Exception {
+    testSortMergeJoinExampleDisableSplitGrouping(false);
+  }
+
+  @Test
+  public void testSortMergeJoinExampleWithThreadDump() throws Exception {
+    testSortMergeJoinExampleDisableSplitGrouping(true);
+  }
+
+  public void testSortMergeJoinExampleDisableSplitGrouping(boolean 
withThreadDump) throws Exception {
     SortMergeJoinExample sortMergeJoinExample = new SortMergeJoinExample();
-    sortMergeJoinExample.setConf(new Configuration(mrrTezCluster.getConfig()));
+    Configuration newConf = new Configuration(mrrTezCluster.getConfig());
+    Path logPath = new Path(TEST_ROOT_DIR + "/tmp/sortMerge/logPath");
+    if (withThreadDump) {
+      TezContainerLogAppender appender = new TezContainerLogAppender();
+      org.apache.log4j.Logger.getRootLogger().addAppender(appender);
+      appender.setName(TEZ_CONTAINER_LOGGER_NAME);
+      appender.setContainerLogDir(logPath.toString());
+      newConf.set(TEZ_THREAD_DUMP_INTERVAL, "1ms");
+    }
+    sortMergeJoinExample.setConf(newConf);
     Path stagingDirPath = new Path(TEST_ROOT_DIR + "/tmp/tez-staging-dir");
     Path inPath1 = new Path(TEST_ROOT_DIR + "/tmp/sortMerge/inPath1");
     Path inPath2 = new Path(TEST_ROOT_DIR + "/tmp/sortMerge/inPath2");
@@ -587,6 +610,29 @@ public class TestTezJobs {
     reader.close();
     inStream.close();
     assertEquals(0, expectedResult.size());
+
+    if (withThreadDump) {
+      validateThreadDumpCaptured(logPath);
+      
org.apache.log4j.Logger.getRootLogger().removeAppender(TEZ_CONTAINER_LOGGER_NAME);
+    }
+  }
+
+  private static void validateThreadDumpCaptured(Path jstackPath) throws 
IOException {
+    RemoteIterator<LocatedFileStatus> files = localFs.listFiles(jstackPath, 
true);
+    boolean appMasterDumpFound = false;
+    boolean tezChildDumpFound = false;
+    while (files.hasNext()) {
+      LocatedFileStatus file = files.next();
+      if (file.getPath().getName().endsWith(".jstack")) {
+        if (file.getPath().getName().contains("attempt")) {
+          tezChildDumpFound = true;
+        } else {
+          appMasterDumpFound = true;
+        }
+      }
+    }
+    assertTrue(tezChildDumpFound);
+    assertTrue(appMasterDumpFound);
   }
 
   /**

Reply via email to