This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 7e48549 [FLINK-17300] Log the lineage information between
ExecutionAttemptID and AllocationID
7e48549 is described below
commit 7e48549b822bc54f8dc0a5e1f9cbb5f3156fda06
Author: Yangze Guo <[email protected]>
AuthorDate: Wed Apr 22 14:14:29 2020 +0800
[FLINK-17300] Log the lineage information between ExecutionAttemptID and
AllocationID
This closes #11852.
---
.../java/org/apache/flink/runtime/executiongraph/Execution.java | 6 ++----
.../java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java | 3 ++-
.../org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java | 2 +-
3 files changed, 5 insertions(+), 6 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index f6702f8..56415e0 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -728,10 +728,8 @@ public class Execution implements AccessExecution,
Archiveable<ArchivedExecution
return;
}
- if (LOG.isInfoEnabled()) {
- LOG.info(String.format("Deploying %s (attempt
#%d) to %s", vertex.getTaskNameWithSubtaskIndex(),
- attemptNumber,
getAssignedResourceLocation()));
- }
+ LOG.info("Deploying {} (attempt #{}) with attempt id {}
to {} with allocation id {}", vertex.getTaskNameWithSubtaskIndex(),
+ attemptNumber,
vertex.getCurrentExecutionAttempt().getAttemptId(),
getAssignedResourceLocation(), slot.getAllocationId());
final TaskDeploymentDescriptor deployment =
TaskDeploymentDescriptorFactory
.fromExecutionVertex(vertex, attemptNumber)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index f8438f5..bd2834a 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -643,7 +643,8 @@ public class TaskExecutor extends RpcEndpoint implements
TaskExecutorGateway {
taskMetricGroup.gauge(MetricNames.IS_BACKPRESSURED,
task::isBackPressured);
- log.info("Received task {}.",
task.getTaskInfo().getTaskNameWithSubtasks());
+ log.info("Received task {} ({}), deploy into slot with
allocation id {}.",
+ task.getTaskInfo().getTaskNameWithSubtasks(),
tdd.getExecutionAttemptId(), tdd.getAllocationId());
boolean taskAdded;
diff --git
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
index af0f532..aca45b1 100644
---
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
+++
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
@@ -573,7 +573,7 @@ public class YARNSessionCapacitySchedulerITCase extends
YarnTestBase {
String expected = "Starting TaskManagers";
Assert.assertTrue("Expected string '" + expected + "'
not found in JobManager log: '" + jobmanagerLog + "'",
content.contains(expected));
- expected = " (2/2) (attempt #0) to ";
+ expected = " (2/2) (attempt #0) with attempt id ";
Assert.assertTrue("Expected string '" + expected + "'
not found in JobManager log." +
"This string checks that the job has
been started with a parallelism of 2. Log contents: '" + jobmanagerLog + "'",
content.contains(expected));