AHeise commented on code in PR #19228:
URL: https://github.com/apache/flink/pull/19228#discussion_r845810559


##########
flink-runtime/src/main/java/org/apache/flink/runtime/messages/ThreadInfoSample.java:
##########
@@ -54,6 +61,27 @@ private ThreadInfoSample(Thread.State threadState, 
StackTraceElement[] stackTrac
         }
     }
 
+    /**
+     * Constructs a collection of {@link ThreadInfoSample}s from a {@link 
ThreadInfo} array.
+     *
+     * @param threadInfos {@link ThreadInfo} array where the data will be 
copied from.
+     * @return a Collection of the corresponding {@link ThreadInfoSample}s
+     */
+    public static Collection<ThreadInfoSample> from(ThreadInfo[] threadInfos) {
+        Collection<ThreadInfoSample> result = new ArrayList<>();
+        for (ThreadInfo threadInfo : threadInfos) {
+            if (threadInfo == null) {
+                LOG.warn("Missing thread info.");

Review Comment:
   I'd handle this case on call side when you can issue more information.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/messages/ThreadInfoSample.java:
##########
@@ -54,6 +61,27 @@ private ThreadInfoSample(Thread.State threadState, 
StackTraceElement[] stackTrac
         }
     }
 
+    /**
+     * Constructs a collection of {@link ThreadInfoSample}s from a {@link 
ThreadInfo} array.
+     *
+     * @param threadInfos {@link ThreadInfo} array where the data will be 
copied from.
+     * @return a Collection of the corresponding {@link ThreadInfoSample}s
+     */
+    public static Collection<ThreadInfoSample> from(ThreadInfo[] threadInfos) {

Review Comment:
   Question of taste: this could be changed to Java streams. (For simple 
transformations, I prefer it as it's better expressing the intent).



##########
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java:
##########
@@ -545,23 +545,29 @@ private void stopTaskExecutorServices() throws Exception {
 
     @Override
     public CompletableFuture<TaskThreadInfoResponse> requestThreadInfoSamples(
-            final ExecutionAttemptID taskExecutionAttemptId,
+            final Set<ExecutionAttemptID> taskExecutionAttemptIds,
             final ThreadInfoSamplesRequest requestParams,
             final Time timeout) {
 
-        final Task task = taskSlotTable.getTask(taskExecutionAttemptId);
-        if (task == null) {
-            return FutureUtils.completedExceptionally(
-                    new IllegalStateException(
-                            String.format(
-                                    "Cannot sample task %s. "
-                                            + "Task is not known to the task 
manager.",
-                                    taskExecutionAttemptId)));
+        final Collection<Task> tasks = new ArrayList<>();
+        for (ExecutionAttemptID executionAttemptId : taskExecutionAttemptIds) {
+            final Task task = taskSlotTable.getTask(executionAttemptId);
+            if (task == null) {
+                log.warn(
+                        String.format(
+                                "Cannot sample task %s. "
+                                        + "Task is not known to the task 
manager.",
+                                executionAttemptId));
+            } else {
+                tasks.add(task);
+            }
         }
 
+        Set<SampleableTask> sampleableTasks =

Review Comment:
   Same about `Set`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/ThreadInfoSampleService.java:
##########
@@ -71,34 +74,44 @@
     }
 
     private void requestThreadInfoSamples(
-            final SampleableTask task,
+            final Collection<SampleableTask> tasks,
             final int numSamples,
             final Duration delayBetweenSamples,
             final int maxStackTraceDepth,
             final List<ThreadInfoSample> currentTraces,
             final CompletableFuture<List<ThreadInfoSample>> resultFuture) {
 
-        final long threadId = task.getExecutingThread().getId();
-        final Optional<ThreadInfoSample> threadInfoSample =
-                JvmUtils.createThreadInfoSample(threadId, maxStackTraceDepth);
+        final Collection<Long> threadIds =
+                tasks.stream()
+                        .map(t -> t.getExecutingThread().getId())
+                        .collect(Collectors.toList());
 
-        if (threadInfoSample.isPresent()) {
-            currentTraces.add(threadInfoSample.get());
+        final Collection<ThreadInfoSample> threadInfoSample =
+                JvmUtils.createThreadInfoSample(threadIds, maxStackTraceDepth);
+
+        if (!threadInfoSample.isEmpty()) {
+            currentTraces.addAll(threadInfoSample);
         } else if (!currentTraces.isEmpty()) {
+            // Requested tasks are not running anymore, completing with 
whatever was collected by
+            // now.
             resultFuture.complete(currentTraces);
         } else {
+            final String ids =
+                    tasks.stream()
+                            .map(SampleableTask::getExecutionId)
+                            .map(e -> e == null ? "unknown" : e.toString())

Review Comment:
   when is `e == null`? Can we filter them out earlier?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/ThreadInfoSampleService.java:
##########
@@ -71,34 +74,44 @@
     }
 
     private void requestThreadInfoSamples(
-            final SampleableTask task,
+            final Collection<SampleableTask> tasks,
             final int numSamples,
             final Duration delayBetweenSamples,
             final int maxStackTraceDepth,
             final List<ThreadInfoSample> currentTraces,
             final CompletableFuture<List<ThreadInfoSample>> resultFuture) {
 
-        final long threadId = task.getExecutingThread().getId();
-        final Optional<ThreadInfoSample> threadInfoSample =
-                JvmUtils.createThreadInfoSample(threadId, maxStackTraceDepth);
+        final Collection<Long> threadIds =
+                tasks.stream()
+                        .map(t -> t.getExecutingThread().getId())
+                        .collect(Collectors.toList());
 
-        if (threadInfoSample.isPresent()) {
-            currentTraces.add(threadInfoSample.get());
+        final Collection<ThreadInfoSample> threadInfoSample =
+                JvmUtils.createThreadInfoSample(threadIds, maxStackTraceDepth);
+
+        if (!threadInfoSample.isEmpty()) {
+            currentTraces.addAll(threadInfoSample);
         } else if (!currentTraces.isEmpty()) {
+            // Requested tasks are not running anymore, completing with 
whatever was collected by
+            // now.
             resultFuture.complete(currentTraces);

Review Comment:
   Shouldn't this also shortcut with `return` so we don't reenqueue more 
samples?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java:
##########
@@ -545,23 +545,29 @@ private void stopTaskExecutorServices() throws Exception {
 
     @Override
     public CompletableFuture<TaskThreadInfoResponse> requestThreadInfoSamples(
-            final ExecutionAttemptID taskExecutionAttemptId,
+            final Set<ExecutionAttemptID> taskExecutionAttemptIds,

Review Comment:
   `Set` always implies some overhead to deduplicate. The overhead is always 
worth it for user-facing things, but we could go with a simple `Collection` 
here.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoStats.java:
##########
@@ -44,7 +45,7 @@
     private final long endTime;
 
     /** Map of thread info samples by execution ID. */
-    private final Map<ExecutionAttemptID, List<ThreadInfoSample>> 
samplesBySubtask;
+    private final Map<Set<ExecutionAttemptID>, List<ThreadInfoSample>> 
samplesBySubtask;

Review Comment:
   Does it make sense to retain the original key type? E.g. you'd flatten the 
set with `putAll`  into multiple entries. `Collection` as a key is prone to 
surprises.
   You can then probably retain the original `matchExecutionsWithGateways` 
implementation (not sure though).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to