This is an automated email from the ASF dual-hosted git repository. abstractdog pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/tez.git
The following commit(s) were added to refs/heads/master by this push: new a9c5365a5 TEZ-4344: Collect jstack periodically from all containers. (#299) (Ayush Saxena reviewed by Laszlo Bodor) a9c5365a5 is described below commit a9c5365a5b03b3f343657d69eef8af915d6ea4f0 Author: Ayush Saxena <ayushsax...@apache.org> AuthorDate: Tue Jul 18 11:39:29 2023 +0530 TEZ-4344: Collect jstack periodically from all containers. (#299) (Ayush Saxena reviewed by Laszlo Bodor) --- .../org/apache/tez/dag/api/TezConfiguration.java | 8 + .../java/org/apache/tez/dag/app/DAGAppMaster.java | 8 + .../apache/tez/runtime/TezThreadDumpHelper.java | 195 +++++++++++++++++++++ .../java/org/apache/tez/runtime/task/TezChild.java | 11 +- .../test/java/org/apache/tez/test/TestTezJobs.java | 48 ++++- 5 files changed, 267 insertions(+), 3 deletions(-) diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index 5842067ff..9e2e2d89c 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -2296,4 +2296,12 @@ public class TezConfiguration extends Configuration { @ConfigurationProperty public static final String TEZ_MRREADER_CONFIG_UPDATE_PROPERTIES = "tez.mrreader.config.update.properties"; + /** + * Frequency at which thread dump should be captured. Supports TimeUnits. + */ + @ConfigurationScope(Scope.DAG) + @ConfigurationProperty + public static final String TEZ_THREAD_DUMP_INTERVAL = "tez.thread.dump.interval"; + public static final String TEZ_THREAD_DUMP_INTERVAL_DEFAULT = "0ms"; + } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index 3c99b1afd..16d0a4dbd 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -185,6 +185,7 @@ import org.apache.tez.dag.utils.RelocalizationUtils; import org.apache.tez.dag.utils.Simple2LevelVersionComparator; import org.apache.tez.hadoop.shim.HadoopShim; import org.apache.tez.hadoop.shim.HadoopShimsLoader; +import org.apache.tez.runtime.TezThreadDumpHelper; import org.apache.tez.util.LoggingUtils; import org.apache.tez.util.TezMxBeanResourceCalculator; import org.codehaus.jettison.json.JSONException; @@ -340,6 +341,7 @@ public class DAGAppMaster extends AbstractService { Map<Service, ServiceWithDependency> services = new LinkedHashMap<Service, ServiceWithDependency>(); private ThreadLocalMap mdcContext; + private TezThreadDumpHelper tezThreadDumpHelper = TezThreadDumpHelper.NOOP_TEZ_THREAD_DUMP_HELPER; public DAGAppMaster(ApplicationAttemptId applicationAttemptId, ContainerId containerId, String nmHost, int nmPort, int nmHttpPort, @@ -766,6 +768,7 @@ public class DAGAppMaster extends AbstractService { "DAGAppMaster Internal Error occurred"); break; case DAG_FINISHED: + tezThreadDumpHelper.stop(); DAGAppMasterEventDAGFinished finishEvt = (DAGAppMasterEventDAGFinished) event; String timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(Calendar.getInstance().getTime()); @@ -2201,6 +2204,9 @@ public class DAGAppMaster extends AbstractService { execService.shutdownNow(); } + // Check if the thread dump service is up in any case, if yes attempt a shutdown + tezThreadDumpHelper.stop(); + super.serviceStop(); } @@ -2577,6 +2583,8 @@ public class DAGAppMaster extends AbstractService { private void startDAGExecution(DAG dag, final Map<String, LocalResource> additionalAmResources) throws TezException { currentDAG = dag; + tezThreadDumpHelper = TezThreadDumpHelper.getInstance(dag.getConf()).start(dag.getID().toString()); + // Try localizing the actual resources. List<URL> additionalUrlsForClasspath; try { diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java new file mode 100644 index 000000000..6f3e9fec1 --- /dev/null +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java @@ -0,0 +1,195 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.runtime; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.Appender; +import org.apache.tez.common.TezContainerLogAppender; +import org.apache.tez.dag.api.TezConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.PrintStream; +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadInfo; +import java.lang.management.ThreadMXBean; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.NM_REMOTE_APP_LOG_DIR; +import static org.apache.tez.dag.api.TezConfiguration.TEZ_THREAD_DUMP_INTERVAL; +import static org.apache.tez.dag.api.TezConfiguration.TEZ_THREAD_DUMP_INTERVAL_DEFAULT; + +public class TezThreadDumpHelper { + + public static final NoopTezThreadDumpHelper NOOP_TEZ_THREAD_DUMP_HELPER = new NoopTezThreadDumpHelper(); + private long duration = 0L; + private Path basePath = null; + private FileSystem fs = null; + + private static final ThreadMXBean THREAD_BEAN = ManagementFactory.getThreadMXBean(); + private static final Logger LOG = LoggerFactory.getLogger(TezThreadDumpHelper.class); + + private ScheduledExecutorService periodicThreadDumpServiceExecutor; + + private TezThreadDumpHelper(long duration, Configuration conf) throws IOException { + this.duration = duration; + Appender appender = org.apache.log4j.Logger.getRootLogger().getAppender(TezConstants.TEZ_CONTAINER_LOGGER_NAME); + if (appender instanceof TezContainerLogAppender) { + this.basePath = new Path(((TezContainerLogAppender) appender).getContainerLogDir()); + this.fs = FileSystem.getLocal(conf); + } else { + // Fallback, if it is any other appender or if none is configured. + this.basePath = new Path(conf.get(NM_REMOTE_APP_LOG_DIR, DEFAULT_NM_REMOTE_APP_LOG_DIR)); + this.fs = this.basePath.getFileSystem(conf); + } + LOG.info("Periodic Thread Dump Capture Service Configured to capture Thread Dumps at {} ms frequency and at " + + "path: {}", duration, basePath); + } + + public TezThreadDumpHelper() { + } + + public static TezThreadDumpHelper getInstance(Configuration conf) { + long periodicThreadDumpFrequency = + conf.getTimeDuration(TEZ_THREAD_DUMP_INTERVAL, TEZ_THREAD_DUMP_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS); + + if (periodicThreadDumpFrequency > 0) { + try { + return new TezThreadDumpHelper(periodicThreadDumpFrequency, conf); + } catch (IOException e) { + LOG.warn("Can not initialize periodic thread dump service", e); + } + } + return NOOP_TEZ_THREAD_DUMP_HELPER; + } + + public TezThreadDumpHelper start(String name) { + periodicThreadDumpServiceExecutor = Executors.newScheduledThreadPool(1, + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("PeriodicThreadDumpService{" + name + "} #%d") + .build()); + Runnable threadDumpCollector = new ThreadDumpCollector(basePath, name, fs); + periodicThreadDumpServiceExecutor.schedule(threadDumpCollector, duration, TimeUnit.MILLISECONDS); + return this; + } + + public void stop() { + if (periodicThreadDumpServiceExecutor != null) { + periodicThreadDumpServiceExecutor.shutdown(); + + try { + if (!periodicThreadDumpServiceExecutor.awaitTermination(100, TimeUnit.MILLISECONDS)) { + periodicThreadDumpServiceExecutor.shutdownNow(); + } + } catch (InterruptedException ignored) { + // Ignore interrupt, will attempt a final shutdown below. + } + periodicThreadDumpServiceExecutor.shutdownNow(); + periodicThreadDumpServiceExecutor = null; + } + } + + private static class ThreadDumpCollector implements Runnable { + + private final Path path; + private final String name; + private final FileSystem fs; + + ThreadDumpCollector(Path path, String name, FileSystem fs) { + this.path = path; + this.fs = fs; + this.name = name; + } + + @Override + public void run() { + if (!Thread.interrupted()) { + try (FSDataOutputStream fsStream = fs.create( + new Path(path, name + "_" + System.currentTimeMillis() + ".jstack")); + PrintStream printStream = new PrintStream(fsStream, false, "UTF8")) { + printThreadInfo(printStream, name); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + public synchronized void printThreadInfo(PrintStream stream, String title) { + boolean contention = THREAD_BEAN.isThreadContentionMonitoringEnabled(); + long[] threadIds = THREAD_BEAN.getAllThreadIds(); + stream.println("Process Thread Dump: " + title); + stream.println(threadIds.length + " active threads"); + for (long tid : threadIds) { + ThreadInfo info = THREAD_BEAN.getThreadInfo(tid, Integer.MAX_VALUE); + if (info == null) { + stream.println(" Inactive"); + continue; + } + stream.println("Thread " + getTaskName(info.getThreadId(), info.getThreadName()) + ":"); + Thread.State state = info.getThreadState(); + stream.println(" State: " + state); + stream.println(" Blocked count: " + info.getBlockedCount()); + stream.println(" Waited count: " + info.getWaitedCount()); + if (contention) { + stream.println(" Blocked time: " + info.getBlockedTime()); + stream.println(" Waited time: " + info.getWaitedTime()); + } + if (state == Thread.State.WAITING) { + stream.println(" Waiting on " + info.getLockName()); + } else if (state == Thread.State.BLOCKED) { + stream.println(" Blocked on " + info.getLockName()); + stream.println(" Blocked by " + getTaskName(info.getLockOwnerId(), info.getLockOwnerName())); + } + stream.println(" Stack:"); + for (StackTraceElement frame : info.getStackTrace()) { + stream.println(" " + frame.toString()); + } + } + stream.flush(); + } + + private String getTaskName(long id, String taskName) { + if (taskName == null) { + return Long.toString(id); + } + return id + " (" + taskName + ")"; + } + } + + private static class NoopTezThreadDumpHelper extends TezThreadDumpHelper { + + @Override + public TezThreadDumpHelper start(String name) { + // Do Nothing + return this; + } + + @Override + public void stop() { + // Do Nothing + } + } +} diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java index 7ab532ad3..3145f21a5 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java @@ -69,6 +69,7 @@ import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.dag.utils.RelocalizationUtils; import org.apache.tez.hadoop.shim.HadoopShim; import org.apache.tez.hadoop.shim.HadoopShimsLoader; +import org.apache.tez.runtime.TezThreadDumpHelper; import org.apache.tez.runtime.api.ExecutionContext; import org.apache.tez.runtime.api.impl.ExecutionContextImpl; import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl; @@ -119,6 +120,7 @@ public class TezChild { private final AtomicBoolean isShutdown = new AtomicBoolean(false); private final String user; private final boolean updateSysCounters; + private TezThreadDumpHelper tezThreadDumpHelper = TezThreadDumpHelper.NOOP_TEZ_THREAD_DUMP_HELPER; private Multimap<String, String> startedInputsMap = HashMultimap.create(); private final boolean ownUmbilical; @@ -178,7 +180,7 @@ public class TezChild { if (LOG.isDebugEnabled()) { LOG.debug("Executing with tokens:"); for (Token<?> token : credentials.getAllTokens()) { - LOG.debug("",token); + LOG.debug("{}", token); } } @@ -248,13 +250,15 @@ public class TezChild { } TezTaskAttemptID attemptId = containerTask.getTaskSpec().getTaskAttemptID(); + Configuration taskConf; if (containerTask.getTaskSpec().getTaskConf() != null) { Configuration copy = new Configuration(defaultConf); TezTaskRunner2.mergeTaskSpecConfToConf(containerTask.getTaskSpec(), copy); - + taskConf = copy; LoggingUtils.initLoggingContext(mdcContext, copy, attemptId.getTaskID().getVertexID().getDAGID().toString(), attemptId.toString()); } else { + taskConf = defaultConf; LoggingUtils.initLoggingContext(mdcContext, defaultConf, attemptId.getTaskID().getVertexID().getDAGID().toString(), attemptId.toString()); } @@ -292,6 +296,7 @@ public class TezChild { hadoopShim, sharedExecutor); boolean shouldDie; + tezThreadDumpHelper = TezThreadDumpHelper.getInstance(taskConf).start(attemptId.toString()); try { TaskRunner2Result result = taskRunner.run(); LOG.info("TaskRunner2Result: {}", result); @@ -310,6 +315,7 @@ public class TezChild { e, "TaskExecutionFailure: " + e.getMessage()); } } finally { + tezThreadDumpHelper.stop(); FileSystem.closeAllForUGI(childUGI); } } @@ -425,6 +431,7 @@ public class TezChild { RPC.stopProxy(umbilical); } } + TezRuntimeShutdownHandler.shutdown(); LOG.info("TezChild shutdown finished"); } diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java index fd8f08b42..892629f29 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java @@ -18,6 +18,8 @@ package org.apache.tez.test; +import static org.apache.tez.dag.api.TezConfiguration.TEZ_THREAD_DUMP_INTERVAL; +import static org.apache.tez.dag.api.TezConstants.TEZ_CONTAINER_LOGGER_NAME; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -40,6 +42,8 @@ import java.util.Set; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.tez.common.Preconditions; import com.google.common.collect.Lists; @@ -47,6 +51,7 @@ import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.tez.common.TezContainerLogAppender; import org.apache.tez.common.counters.CounterGroup; import org.apache.tez.common.counters.TaskCounter; import org.apache.tez.common.counters.TezCounter; @@ -533,8 +538,26 @@ public class TestTezJobs { @Test(timeout = 60000) public void testSortMergeJoinExampleDisableSplitGrouping() throws Exception { + testSortMergeJoinExampleDisableSplitGrouping(false); + } + + @Test + public void testSortMergeJoinExampleWithThreadDump() throws Exception { + testSortMergeJoinExampleDisableSplitGrouping(true); + } + + public void testSortMergeJoinExampleDisableSplitGrouping(boolean withThreadDump) throws Exception { SortMergeJoinExample sortMergeJoinExample = new SortMergeJoinExample(); - sortMergeJoinExample.setConf(new Configuration(mrrTezCluster.getConfig())); + Configuration newConf = new Configuration(mrrTezCluster.getConfig()); + Path logPath = new Path(TEST_ROOT_DIR + "/tmp/sortMerge/logPath"); + if (withThreadDump) { + TezContainerLogAppender appender = new TezContainerLogAppender(); + org.apache.log4j.Logger.getRootLogger().addAppender(appender); + appender.setName(TEZ_CONTAINER_LOGGER_NAME); + appender.setContainerLogDir(logPath.toString()); + newConf.set(TEZ_THREAD_DUMP_INTERVAL, "1ms"); + } + sortMergeJoinExample.setConf(newConf); Path stagingDirPath = new Path(TEST_ROOT_DIR + "/tmp/tez-staging-dir"); Path inPath1 = new Path(TEST_ROOT_DIR + "/tmp/sortMerge/inPath1"); Path inPath2 = new Path(TEST_ROOT_DIR + "/tmp/sortMerge/inPath2"); @@ -587,6 +610,29 @@ public class TestTezJobs { reader.close(); inStream.close(); assertEquals(0, expectedResult.size()); + + if (withThreadDump) { + validateThreadDumpCaptured(logPath); + org.apache.log4j.Logger.getRootLogger().removeAppender(TEZ_CONTAINER_LOGGER_NAME); + } + } + + private static void validateThreadDumpCaptured(Path jstackPath) throws IOException { + RemoteIterator<LocatedFileStatus> files = localFs.listFiles(jstackPath, true); + boolean appMasterDumpFound = false; + boolean tezChildDumpFound = false; + while (files.hasNext()) { + LocatedFileStatus file = files.next(); + if (file.getPath().getName().endsWith(".jstack")) { + if (file.getPath().getName().contains("attempt")) { + tezChildDumpFound = true; + } else { + appMasterDumpFound = true; + } + } + } + assertTrue(tezChildDumpFound); + assertTrue(appMasterDumpFound); } /**