This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d6a4eb966fb [FLINK-34417] Log Job ID via MDC
d6a4eb966fb is described below

commit d6a4eb966fbc47277e07b79e7c64939a62eb1d54
Author: Roman Khachatryan <khachatryan.ro...@gmail.com>
AuthorDate: Sat Feb 3 13:17:36 2024 +0100

    [FLINK-34417] Log Job ID via MDC
---
 .../content.zh/docs/deployment/advanced/logging.md |  15 ++
 docs/content/docs/deployment/advanced/logging.md   |  14 ++
 .../org/apache/flink/util/MdcAwareExecutor.java    |  39 ++++
 .../apache/flink/util/MdcAwareExecutorService.java | 114 +++++++++++
 .../util/MdcAwareScheduledExecutorService.java     |  61 ++++++
 .../main/java/org/apache/flink/util/MdcUtils.java  | 112 +++++++++++
 .../java/org/apache/flink/util/MdcUtilsTest.java   | 148 ++++++++++++++
 .../runtime/rpc/pekko/FencedPekkoRpcActor.java     |   7 +-
 .../flink/runtime/rpc/pekko/PekkoRpcActor.java     |  75 +++----
 .../flink/runtime/rpc/pekko/PekkoRpcService.java   |  11 +-
 .../flink/runtime/rpc/FencedRpcEndpoint.java       |  14 +-
 .../org/apache/flink/runtime/rpc/RpcEndpoint.java  |  33 +++-
 .../org/apache/flink/runtime/rpc/RpcService.java   |   7 +-
 .../runtime/checkpoint/CheckpointCoordinator.java  |  47 ++---
 .../ChannelStateWriteRequestExecutorFactory.java   |   3 +-
 .../ChannelStateWriteRequestExecutorImpl.java      |  60 +++---
 .../flink/runtime/dispatcher/Dispatcher.java       |  41 ++--
 .../JobMasterServiceLeadershipRunnerFactory.java   |   4 +-
 .../executiongraph/DefaultExecutionGraph.java      |  10 +-
 .../apache/flink/runtime/jobmaster/JobMaster.java  |  21 +-
 .../runtime/resourcemanager/ResourceManager.java   |  48 +++--
 .../flink/runtime/taskexecutor/TaskExecutor.java   | 183 +++++++++--------
 .../org/apache/flink/runtime/taskmanager/Task.java |  28 ++-
 .../ChannelStateWriteRequestExecutorImplTest.java  |  29 +--
 .../flink/runtime/rpc/TestingRpcService.java       |   7 +-
 .../flink/streaming/runtime/tasks/StreamTask.java  |  24 ++-
 .../testutils/logging/LoggerAuditingExtension.java |  21 +-
 flink-tests/pom.xml                                |   2 +-
 .../OperatorEventSendingCheckpointITCase.java      |   6 +-
 .../apache/flink/test/misc/JobIDLoggingITCase.java | 220 +++++++++++++++++++++
 .../src/test/resources/log4j2-test.properties      |   2 +-
 31 files changed, 1148 insertions(+), 258 deletions(-)

diff --git a/docs/content.zh/docs/deployment/advanced/logging.md 
b/docs/content.zh/docs/deployment/advanced/logging.md
index abb4b1025f0..432336946de 100644
--- a/docs/content.zh/docs/deployment/advanced/logging.md
+++ b/docs/content.zh/docs/deployment/advanced/logging.md
@@ -40,6 +40,21 @@ Flink 中的日志记录是使用 [SLF4J](http://www.slf4j.org/) 日志接口实
 
 <a name="configuring-log4j-2"></a>
 
+### Structured logging
+
+Flink adds the following fields to 
[MDC](https://www.slf4j.org/api/org/slf4j/MDC.html) of most of the relevant log 
messages (experimental feature):
+- Job ID
+    - key: `flink-job-id`
+    - format: string
+    - length 32
+
+This is most useful in environments with structured logging and allows you to 
quickly filter the relevant logs.
+
+The MDC is propagated by slf4j to the logging backend which usually adds it to 
the log records automatically (e.g. in [log4j json 
layout](https://logging.apache.org/log4j/2.x/manual/json-template-layout.html)).
+Alternatively, it can be configured explicitly - [log4j pattern 
layout](https://logging.apache.org/log4j/1.2/apidocs/org/apache/log4j/PatternLayout.html)
 might look like this:
+
+`[%-32X{flink-job-id}] %c{0} %m%n`.
+
 ## 配置 Log4j 2
 
 Log4j 2 是通过 property 配置文件进行配置的。
diff --git a/docs/content/docs/deployment/advanced/logging.md 
b/docs/content/docs/deployment/advanced/logging.md
index 6c01e1ddff1..cc2d0201e17 100644
--- a/docs/content/docs/deployment/advanced/logging.md
+++ b/docs/content/docs/deployment/advanced/logging.md
@@ -38,6 +38,20 @@ This allows you to use any logging framework that supports 
SLF4J, without having
 By default, [Log4j 2](https://logging.apache.org/log4j/2.x/index.html) is used 
as the underlying logging framework.
 
 
+### Structured logging
+
+Flink adds the following fields to 
[MDC](https://www.slf4j.org/api/org/slf4j/MDC.html) of most of the relevant log 
messages (experimental feature):
+- Job ID
+  - key: `flink-job-id`
+  - format: string
+  - length 32
+
+This is most useful in environments with structured logging and allows you to 
quickly filter the relevant logs.
+
+The MDC is propagated by slf4j to the logging backend which usually adds it to 
the log records automatically (e.g. in [log4j json 
layout](https://logging.apache.org/log4j/2.x/manual/json-template-layout.html)).
+Alternatively, it can be configured explicitly - [log4j pattern 
layout](https://logging.apache.org/log4j/1.2/apidocs/org/apache/log4j/PatternLayout.html)
 might look like this:
+
+`[%-32X{flink-job-id}] %c{0} %m%n`.
 
 ## Configuring Log4j 2
 
diff --git 
a/flink-core/src/main/java/org/apache/flink/util/MdcAwareExecutor.java 
b/flink-core/src/main/java/org/apache/flink/util/MdcAwareExecutor.java
new file mode 100644
index 00000000000..a6f9c662dab
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/MdcAwareExecutor.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+class MdcAwareExecutor<T extends Executor> implements Executor {
+    protected final Map<String, String> contextData;
+    protected final T delegate;
+
+    protected MdcAwareExecutor(T delegate, Map<String, String> contextData) {
+        this.delegate = checkNotNull(delegate);
+        this.contextData = 
Collections.unmodifiableMap(checkNotNull(contextData));
+    }
+
+    public void execute(Runnable command) {
+        delegate.execute(MdcUtils.wrapRunnable(contextData, command));
+    }
+}
diff --git 
a/flink-core/src/main/java/org/apache/flink/util/MdcAwareExecutorService.java 
b/flink-core/src/main/java/org/apache/flink/util/MdcAwareExecutorService.java
new file mode 100644
index 00000000000..693a247481b
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/util/MdcAwareExecutorService.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.MdcUtils.wrapCallable;
+import static org.apache.flink.util.MdcUtils.wrapRunnable;
+
+class MdcAwareExecutorService<S extends ExecutorService> extends 
MdcAwareExecutor<S>
+        implements ExecutorService {
+
+    public MdcAwareExecutorService(S delegate, Map<String, String> 
contextData) {
+        super(delegate, contextData);
+    }
+
+    @Override
+    public void shutdown() {
+        delegate.shutdown();
+    }
+
+    @Override
+    public List<Runnable> shutdownNow() {
+        return delegate.shutdownNow();
+    }
+
+    @Override
+    public boolean isShutdown() {
+        return delegate.isShutdown();
+    }
+
+    @Override
+    public boolean isTerminated() {
+        return delegate.isTerminated();
+    }
+
+    @Override
+    public boolean awaitTermination(long timeout, TimeUnit unit) throws 
InterruptedException {
+        return delegate.awaitTermination(timeout, unit);
+    }
+
+    @Override
+    public <T> Future<T> submit(Callable<T> task) {
+        return delegate.submit(wrapCallable(contextData, task));
+    }
+
+    @Override
+    public <T> Future<T> submit(Runnable task, T result) {
+        return delegate.submit(wrapRunnable(contextData, task), result);
+    }
+
+    @Override
+    public Future<?> submit(Runnable task) {
+        return delegate.submit(wrapRunnable(contextData, task));
+    }
+
+    @Override
+    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> 
tasks)
+            throws InterruptedException {
+        return delegate.invokeAll(wrapCallables(tasks));
+    }
+
+    @Override
+    public <T> List<Future<T>> invokeAll(
+            Collection<? extends Callable<T>> tasks, long timeout, TimeUnit 
unit)
+            throws InterruptedException {
+        return delegate.invokeAll(wrapCallables(tasks), timeout, unit);
+    }
+
+    @Override
+    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
+            throws InterruptedException, ExecutionException {
+        return delegate.invokeAny(wrapCallables(tasks));
+    }
+
+    @Override
+    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long 
timeout, TimeUnit unit)
+            throws InterruptedException, ExecutionException, TimeoutException {
+        return delegate.invokeAny(wrapCallables(tasks), timeout, unit);
+    }
+
+    private <T> List<Callable<T>> wrapCallables(Collection<? extends 
Callable<T>> tasks) {
+        List<Callable<T>> list = new ArrayList<>(tasks.size());
+        for (Callable<T> task : tasks) {
+            list.add(wrapCallable(contextData, task));
+        }
+        return list;
+    }
+}
diff --git 
a/flink-core/src/main/java/org/apache/flink/util/MdcAwareScheduledExecutorService.java
 
b/flink-core/src/main/java/org/apache/flink/util/MdcAwareScheduledExecutorService.java
new file mode 100644
index 00000000000..1fa71dd659f
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/util/MdcAwareScheduledExecutorService.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.MdcUtils.wrapCallable;
+import static org.apache.flink.util.MdcUtils.wrapRunnable;
+
+class MdcAwareScheduledExecutorService extends 
MdcAwareExecutorService<ScheduledExecutorService>
+        implements ScheduledExecutorService {
+
+    public MdcAwareScheduledExecutorService(
+            ScheduledExecutorService delegate, Map<String, String> 
contextData) {
+        super(delegate, contextData);
+    }
+
+    @Override
+    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit 
unit) {
+        return delegate.schedule(wrapRunnable(contextData, command), delay, 
unit);
+    }
+
+    @Override
+    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, 
TimeUnit unit) {
+        return delegate.schedule(wrapCallable(contextData, callable), delay, 
unit);
+    }
+
+    @Override
+    public ScheduledFuture<?> scheduleAtFixedRate(
+            Runnable command, long initialDelay, long period, TimeUnit unit) {
+        return delegate.scheduleAtFixedRate(
+                wrapRunnable(contextData, command), initialDelay, period, 
unit);
+    }
+
+    @Override
+    public ScheduledFuture<?> scheduleWithFixedDelay(
+            Runnable command, long initialDelay, long delay, TimeUnit unit) {
+        return delegate.scheduleWithFixedDelay(
+                wrapRunnable(contextData, command), initialDelay, delay, unit);
+    }
+}
diff --git a/flink-core/src/main/java/org/apache/flink/util/MdcUtils.java 
b/flink-core/src/main/java/org/apache/flink/util/MdcUtils.java
new file mode 100644
index 00000000000..c448c9837db
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/MdcUtils.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.apache.flink.api.common.JobID;
+
+import org.slf4j.MDC;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** Utility class to manage common Flink attributes in {@link MDC} (only 
{@link JobID} ATM). */
+public class MdcUtils {
+
+    public static final String JOB_ID = "flink-job-id";
+
+    /**
+     * Replace MDC contents with the provided one and return a closeable 
object that can be used to
+     * restore the original MDC.
+     *
+     * @param context to put into MDC
+     */
+    public static MdcCloseable withContext(Map<String, String> context) {
+        final Map<String, String> orig = MDC.getCopyOfContextMap();
+        MDC.setContextMap(context);
+        return () -> MDC.setContextMap(orig);
+    }
+
+    /** {@link AutoCloseable } that restores the {@link MDC} contents on 
close. */
+    public interface MdcCloseable extends AutoCloseable {
+        @Override
+        void close();
+    }
+
+    /**
+     * Wrap the given {@link Runnable} so that the given data is added to 
{@link MDC} before its
+     * execution and removed afterward.
+     */
+    public static Runnable wrapRunnable(Map<String, String> contextData, 
Runnable command) {
+        return () -> {
+            try (MdcCloseable ctx = withContext(contextData)) {
+                command.run();
+            }
+        };
+    }
+
+    /**
+     * Wrap the given {@link Callable} so that the given data is added to 
{@link MDC} before its
+     * execution and removed afterward.
+     */
+    public static <T> Callable<T> wrapCallable(
+            Map<String, String> contextData, Callable<T> command) {
+        return () -> {
+            try (MdcCloseable ctx = withContext(contextData)) {
+                return command.call();
+            }
+        };
+    }
+
+    /**
+     * Wrap the given {@link Executor} so that the given {@link JobID} is 
added before it executes
+     * any submitted commands and removed afterward.
+     */
+    public static Executor scopeToJob(JobID jobID, Executor executor) {
+        checkArgument(!(executor instanceof MdcAwareExecutor));
+        return new MdcAwareExecutor<>(executor, asContextData(jobID));
+    }
+
+    /**
+     * Wrap the given {@link ExecutorService} so that the given {@link JobID} 
is added before it
+     * executes any submitted commands and removed afterward.
+     */
+    public static ExecutorService scopeToJob(JobID jobID, ExecutorService 
delegate) {
+        checkArgument(!(delegate instanceof MdcAwareExecutorService));
+        return new MdcAwareExecutorService<>(delegate, asContextData(jobID));
+    }
+
+    /**
+     * Wrap the given {@link ScheduledExecutorService} so that the given 
{@link JobID} is added
+     * before it executes any submitted commands and removed afterward.
+     */
+    public static ScheduledExecutorService scopeToJob(JobID jobID, 
ScheduledExecutorService ses) {
+        checkArgument(!(ses instanceof MdcAwareScheduledExecutorService));
+        return new MdcAwareScheduledExecutorService(ses, asContextData(jobID));
+    }
+
+    public static Map<String, String> asContextData(JobID jobID) {
+        return Collections.singletonMap(JOB_ID, jobID.toHexString());
+    }
+}
diff --git a/flink-core/src/test/java/org/apache/flink/util/MdcUtilsTest.java 
b/flink-core/src/test/java/org/apache/flink/util/MdcUtilsTest.java
new file mode 100644
index 00000000000..35917d28c72
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/util/MdcUtilsTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.testutils.logging.LoggerAuditingExtension;
+import org.apache.flink.util.MdcUtils.MdcCloseable;
+import org.apache.flink.util.concurrent.Executors;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import org.apache.logging.log4j.core.LogEvent;
+import org.assertj.core.api.AbstractObjectAssert;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.MdcUtils.asContextData;
+import static org.apache.flink.util.MdcUtils.wrapCallable;
+import static org.apache.flink.util.MdcUtils.wrapRunnable;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.slf4j.event.Level.DEBUG;
+
+/** Tests for the {@link MdcUtils}. */
+class MdcUtilsTest {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(MdcUtilsTest.class);
+    private static final Runnable LOGGING_RUNNABLE = () -> 
LOGGER.info("ignore");
+
+    @RegisterExtension
+    public final LoggerAuditingExtension loggerExtension =
+            new LoggerAuditingExtension(MdcUtilsTest.class, DEBUG);
+
+    @Test
+    public void testJobIDAsContext() {
+        JobID jobID = new JobID();
+        assertThat(MdcUtils.asContextData(jobID))
+                .isEqualTo(Collections.singletonMap("flink-job-id", 
jobID.toHexString()));
+    }
+
+    @Test
+    public void testMdcCloseableAddsJobId() throws Exception {
+        assertJobIDLogged(
+                jobID -> {
+                    try (MdcCloseable ignored = 
MdcUtils.withContext(asContextData(jobID))) {
+                        LOGGER.warn("ignore");
+                    }
+                });
+    }
+
+    @Test
+    public void testMdcCloseableRemovesJobId() {
+        JobID jobID = new JobID();
+        try (MdcCloseable ignored = 
MdcUtils.withContext(asContextData(jobID))) {
+            // ...
+        }
+        LOGGER.warn("with-job");
+        assertJobIdLogged(null);
+    }
+
+    @Test
+    public void testWrapRunnable() throws Exception {
+        assertJobIDLogged(jobID -> wrapRunnable(asContextData(jobID), 
LOGGING_RUNNABLE).run());
+    }
+
+    @Test
+    public void testWrapCallable() throws Exception {
+        assertJobIDLogged(
+                jobID ->
+                        wrapCallable(
+                                        asContextData(jobID),
+                                        () -> {
+                                            LOGGER.info("ignore");
+                                            return null;
+                                        })
+                                .call());
+    }
+
+    @Test
+    public void testScopeExecutor() throws Exception {
+        assertJobIDLogged(
+                jobID ->
+                        MdcUtils.scopeToJob(jobID, Executors.directExecutor())
+                                .execute(LOGGING_RUNNABLE));
+    }
+
+    @Test
+    public void testScopeExecutorService() throws Exception {
+        assertJobIDLogged(
+                jobID ->
+                        MdcUtils.scopeToJob(jobID, 
Executors.newDirectExecutorService())
+                                .submit(LOGGING_RUNNABLE)
+                                .get());
+    }
+
+    @Test
+    public void testScopeScheduledExecutorService() throws Exception {
+        ScheduledExecutorService ses =
+                
java.util.concurrent.Executors.newSingleThreadScheduledExecutor();
+        try {
+            assertJobIDLogged(
+                    jobID ->
+                            MdcUtils.scopeToJob(jobID, ses)
+                                    .schedule(LOGGING_RUNNABLE, 1L, 
TimeUnit.MILLISECONDS)
+                                    .get());
+        } finally {
+            ses.shutdownNow();
+        }
+    }
+
+    private void assertJobIDLogged(ThrowingConsumer<JobID, Exception> action) 
throws Exception {
+        JobID jobID = new JobID();
+        action.accept(jobID);
+        assertJobIdLogged(jobID);
+    }
+
+    private void assertJobIdLogged(JobID jobId) {
+        AbstractObjectAssert<?, Object> extracting =
+                assertThat(loggerExtension.getEvents())
+                        .singleElement()
+                        .extracting(LogEvent::getContextData)
+                        .extracting(m -> m.getValue("flink-job-id"));
+        if (jobId == null) {
+            extracting.isNull();
+        } else {
+            extracting.isEqualTo(jobId.toHexString());
+        }
+    }
+}
diff --git 
a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/FencedPekkoRpcActor.java
 
b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/FencedPekkoRpcActor.java
index 860a263f539..b2ebf0a7320 100644
--- 
a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/FencedPekkoRpcActor.java
+++ 
b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/FencedPekkoRpcActor.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.runtime.rpc.messages.LocalFencedMessage;
 import org.apache.flink.runtime.rpc.pekko.exceptions.UnknownMessageException;
 
 import java.io.Serializable;
+import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 
@@ -47,14 +48,16 @@ public class FencedPekkoRpcActor<
             int version,
             final long maximumFramesize,
             final boolean forceSerialization,
-            ClassLoader flinkClassLoader) {
+            ClassLoader flinkClassLoader,
+            final Map<String, String> loggingContext) {
         super(
                 rpcEndpoint,
                 terminationFuture,
                 version,
                 maximumFramesize,
                 forceSerialization,
-                flinkClassLoader);
+                flinkClassLoader,
+                loggingContext);
     }
 
     @Override
diff --git 
a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcActor.java
 
b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcActor.java
index dc4e342f35a..a9877796867 100644
--- 
a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcActor.java
+++ 
b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcActor.java
@@ -35,6 +35,7 @@ import 
org.apache.flink.runtime.rpc.pekko.exceptions.RpcInvalidStateException;
 import org.apache.flink.runtime.rpc.pekko.exceptions.UnknownMessageException;
 import org.apache.flink.types.Either;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.MdcUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.concurrent.FutureUtils;
 
@@ -52,6 +53,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
@@ -102,6 +104,7 @@ class PekkoRpcActor<T extends RpcEndpoint & RpcGateway> 
extends AbstractActor {
     private final AtomicBoolean rpcEndpointStopped;
 
     private final boolean forceSerialization;
+    private final Map<String, String> loggingContext;
 
     private volatile RpcEndpointTerminationResult rpcEndpointTerminationResult;
 
@@ -113,7 +116,9 @@ class PekkoRpcActor<T extends RpcEndpoint & RpcGateway> 
extends AbstractActor {
             final int version,
             final long maximumFramesize,
             final boolean forceSerialization,
-            final ClassLoader flinkClassLoader) {
+            final ClassLoader flinkClassLoader,
+            final Map<String, String> loggingContext) {
+        this.loggingContext = loggingContext;
 
         checkArgument(maximumFramesize > 0, "Maximum framesize must be 
positive.");
         this.rpcEndpoint = checkNotNull(rpcEndpoint, "rpc endpoint");
@@ -161,30 +166,32 @@ class PekkoRpcActor<T extends RpcEndpoint & RpcGateway> 
extends AbstractActor {
     }
 
     private void handleMessage(final Object message) {
-        if (state.isRunning()) {
-            mainThreadValidator.enterMainThread();
+        try (MdcUtils.MdcCloseable ctx = MdcUtils.withContext(loggingContext)) 
{
+            if (state.isRunning()) {
+                mainThreadValidator.enterMainThread();
+
+                try {
+                    handleRpcMessage(message);
+                } finally {
+                    mainThreadValidator.exitMainThread();
+                }
+            } else {
+                log.info(
+                        "The rpc endpoint {} has not been started yet. 
Discarding message {} until processing is started.",
+                        rpcEndpoint.getClass().getName(),
+                        message);
 
-            try {
-                handleRpcMessage(message);
-            } finally {
-                mainThreadValidator.exitMainThread();
+                sendErrorIfSender(
+                        new EndpointNotStartedException(
+                                String.format(
+                                        "Discard message %s, because the rpc 
endpoint %s has not been started yet.",
+                                        message, getSelf().path())));
             }
-        } else {
-            log.info(
-                    "The rpc endpoint {} has not been started yet. Discarding 
message {} until processing is started.",
-                    rpcEndpoint.getClass().getName(),
-                    message);
-
-            sendErrorIfSender(
-                    new EndpointNotStartedException(
-                            String.format(
-                                    "Discard message %s, because the rpc 
endpoint %s has not been started yet.",
-                                    message, getSelf().path())));
         }
     }
 
     private void handleControlMessage(ControlMessages controlMessage) {
-        try {
+        try (MdcUtils.MdcCloseable ctx = MdcUtils.withContext(loggingContext)) 
{
             switch (controlMessage) {
                 case START:
                     state = state.start(this, flinkClassLoader);
@@ -237,20 +244,22 @@ class PekkoRpcActor<T extends RpcEndpoint & RpcGateway> 
extends AbstractActor {
     }
 
     private void handleHandshakeMessage(RemoteHandshakeMessage 
handshakeMessage) {
-        if (!isCompatibleVersion(handshakeMessage.getVersion())) {
-            sendErrorIfSender(
-                    new HandshakeException(
-                            String.format(
-                                    "Version mismatch between source (%s) and 
target (%s) rpc component. Please verify that all components have the same 
version.",
-                                    handshakeMessage.getVersion(), 
getVersion())));
-        } else if (!isGatewaySupported(handshakeMessage.getRpcGateway())) {
-            sendErrorIfSender(
-                    new HandshakeException(
-                            String.format(
-                                    "The rpc endpoint does not support the 
gateway %s.",
-                                    
handshakeMessage.getRpcGateway().getSimpleName())));
-        } else {
-            getSender().tell(new 
Status.Success(HandshakeSuccessMessage.INSTANCE), getSelf());
+        try (MdcUtils.MdcCloseable ctx = MdcUtils.withContext(loggingContext)) 
{
+            if (!isCompatibleVersion(handshakeMessage.getVersion())) {
+                sendErrorIfSender(
+                        new HandshakeException(
+                                String.format(
+                                        "Version mismatch between source (%s) 
and target (%s) rpc component. Please verify that all components have the same 
version.",
+                                        handshakeMessage.getVersion(), 
getVersion())));
+            } else if (!isGatewaySupported(handshakeMessage.getRpcGateway())) {
+                sendErrorIfSender(
+                        new HandshakeException(
+                                String.format(
+                                        "The rpc endpoint does not support the 
gateway %s.",
+                                        
handshakeMessage.getRpcGateway().getSimpleName())));
+            } else {
+                getSender().tell(new 
Status.Success(HandshakeSuccessMessage.INSTANCE), getSelf());
+            }
         }
     }
 
diff --git 
a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcService.java
 
b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcService.java
index c9d276685ed..ab8bee41822 100644
--- 
a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcService.java
+++ 
b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcService.java
@@ -260,10 +260,12 @@ public class PekkoRpcService implements RpcService {
     }
 
     @Override
-    public <C extends RpcEndpoint & RpcGateway> RpcServer startServer(C 
rpcEndpoint) {
+    public <C extends RpcEndpoint & RpcGateway> RpcServer startServer(
+            C rpcEndpoint, Map<String, String> loggingContext) {
         checkNotNull(rpcEndpoint, "rpc endpoint");
 
-        final SupervisorActor.ActorRegistration actorRegistration = 
registerRpcActor(rpcEndpoint);
+        final SupervisorActor.ActorRegistration actorRegistration =
+                registerRpcActor(rpcEndpoint, loggingContext);
         final ActorRef actorRef = actorRegistration.getActorRef();
         final CompletableFuture<Void> actorTerminationFuture =
                 actorRegistration.getTerminationFuture();
@@ -336,7 +338,7 @@ public class PekkoRpcService implements RpcService {
     }
 
     private <C extends RpcEndpoint & RpcGateway> 
SupervisorActor.ActorRegistration registerRpcActor(
-            C rpcEndpoint) {
+            C rpcEndpoint, Map<String, String> loggingContext) {
         final Class<? extends AbstractActor> rpcActorType;
 
         if (rpcEndpoint instanceof FencedRpcEndpoint) {
@@ -359,7 +361,8 @@ public class PekkoRpcService implements RpcService {
                                             getVersion(),
                                             
configuration.getMaximumFramesize(),
                                             
configuration.isForceRpcInvocationSerialization(),
-                                            flinkClassLoader),
+                                            flinkClassLoader,
+                                            loggingContext),
                             rpcEndpoint.getEndpointId());
 
             final SupervisorActor.ActorRegistration actorRegistration =
diff --git 
a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/FencedRpcEndpoint.java
 
b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/FencedRpcEndpoint.java
index e6ed59f737d..a6fbd2792a4 100644
--- 
a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/FencedRpcEndpoint.java
+++ 
b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/FencedRpcEndpoint.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.rpc;
 import org.apache.flink.util.Preconditions;
 
 import java.io.Serializable;
+import java.util.Collections;
+import java.util.Map;
 import java.util.UUID;
 
 /**
@@ -34,8 +36,12 @@ public abstract class FencedRpcEndpoint<F extends 
Serializable> extends RpcEndpo
 
     private final F fencingToken;
 
-    protected FencedRpcEndpoint(RpcService rpcService, String endpointId, F 
fencingToken) {
-        super(rpcService, endpointId);
+    protected FencedRpcEndpoint(
+            RpcService rpcService,
+            String endpointId,
+            F fencingToken,
+            Map<String, String> loggingContext) {
+        super(rpcService, endpointId, loggingContext);
 
         Preconditions.checkNotNull(fencingToken, "The fence token should be 
null");
         Preconditions.checkNotNull(rpcServer, "The rpc server should be null");
@@ -43,6 +49,10 @@ public abstract class FencedRpcEndpoint<F extends 
Serializable> extends RpcEndpo
         this.fencingToken = fencingToken;
     }
 
+    protected FencedRpcEndpoint(RpcService rpcService, String endpointId, F 
fencingToken) {
+        this(rpcService, endpointId, fencingToken, Collections.emptyMap());
+    }
+
     protected FencedRpcEndpoint(RpcService rpcService, F fencingToken) {
         this(rpcService, UUID.randomUUID().toString(), fencingToken);
     }
diff --git 
a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
 
b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
index 47b64d1200e..b1fda4a0443 100644
--- 
a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
+++ 
b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -19,10 +19,12 @@
 package org.apache.flink.runtime.rpc;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.concurrent.ScheduledFutureAdapter;
 import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.MdcUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 
@@ -34,9 +36,12 @@ import javax.annotation.Nonnull;
 import java.io.Closeable;
 import java.io.IOException;
 import java.time.Duration;
+import java.util.Collections;
+import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.concurrent.FutureTask;
 import java.util.concurrent.ScheduledExecutorService;
@@ -134,11 +139,12 @@ public abstract class RpcEndpoint implements RpcGateway, 
AutoCloseableAsync {
      * @param rpcService The RPC server that dispatches calls to this RPC 
endpoint.
      * @param endpointId Unique identifier for this endpoint
      */
-    protected RpcEndpoint(final RpcService rpcService, final String 
endpointId) {
+    protected RpcEndpoint(
+            RpcService rpcService, String endpointId, Map<String, String> 
loggingContext) {
         this.rpcService = checkNotNull(rpcService, "rpcService");
         this.endpointId = checkNotNull(endpointId, "endpointId");
 
-        this.rpcServer = rpcService.startServer(this);
+        this.rpcServer = rpcService.startServer(this, loggingContext);
         this.resourceRegistry = new CloseableRegistry();
 
         this.mainThreadExecutor =
@@ -146,6 +152,16 @@ public abstract class RpcEndpoint implements RpcGateway, 
AutoCloseableAsync {
         registerResource(this.mainThreadExecutor);
     }
 
+    /**
+     * Initializes the RPC endpoint.
+     *
+     * @param rpcService The RPC server that dispatches calls to this RPC 
endpoint.
+     * @param endpointId Unique identifier for this endpoint
+     */
+    protected RpcEndpoint(final RpcService rpcService, final String 
endpointId) {
+        this(rpcService, endpointId, Collections.emptyMap());
+    }
+
     /**
      * Initializes the RPC endpoint with a random endpoint id.
      *
@@ -342,6 +358,19 @@ public abstract class RpcEndpoint implements RpcGateway, 
AutoCloseableAsync {
         return mainThreadExecutor;
     }
 
+    /**
+     * Gets the main thread execution context. The main thread execution 
context can be used to
+     * execute tasks in the main thread of the underlying RPC endpoint.
+     *
+     * @param jobID the {@link JobID} to scope the returned {@link 
ComponentMainThreadExecutor} to,
+     *     i.e. add/remove before/after the invocations using the returned 
executor
+     * @return Main thread execution context
+     */
+    protected Executor getMainThreadExecutor(JobID jobID) {
+        // todo: consider caching
+        return MdcUtils.scopeToJob(jobID, getMainThreadExecutor());
+    }
+
     /**
      * Gets the endpoint's RPC service.
      *
diff --git 
a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
 
b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
index 9d60fd00534..788f2cb93e5 100644
--- 
a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
+++ 
b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
@@ -23,6 +23,7 @@ import org.apache.flink.util.AutoCloseableAsync;
 import org.apache.flink.util.concurrent.ScheduledExecutor;
 
 import java.io.Serializable;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -93,11 +94,13 @@ public interface RpcService extends AutoCloseableAsync {
     /**
      * Start a rpc server which forwards the remote procedure calls to the 
provided rpc endpoint.
      *
-     * @param rpcEndpoint Rpc protocol to dispatch the rpcs to
      * @param <C> Type of the rpc endpoint
+     * @param rpcEndpoint Rpc protocol to dispatch the rpcs to
+     * @param loggingContext
      * @return Self gateway to dispatch remote procedure calls to oneself
      */
-    <C extends RpcEndpoint & RpcGateway> RpcServer startServer(C rpcEndpoint);
+    <C extends RpcEndpoint & RpcGateway> RpcServer startServer(
+            C rpcEndpoint, Map<String, String> loggingContext);
 
     /**
      * Stop the underlying rpc server of the provided self gateway.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index d74f1062767..25afade0239 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -47,6 +47,7 @@ import 
org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.MdcUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
 import org.apache.flink.util.clock.Clock;
@@ -795,29 +796,31 @@ public class CheckpointCoordinator {
             triggerTasks(request, timestamp, checkpoint)
                     .exceptionally(
                             failure -> {
-                                LOG.info(
-                                        "Triggering Checkpoint {} for job {} 
failed due to {}",
-                                        checkpoint.getCheckpointID(),
-                                        job,
-                                        failure);
-
-                                final CheckpointException cause;
-                                if (failure instanceof CheckpointException) {
-                                    cause = (CheckpointException) failure;
-                                } else {
-                                    cause =
-                                            new CheckpointException(
-                                                    CheckpointFailureReason
-                                                            
.TRIGGER_CHECKPOINT_FAILURE,
-                                                    failure);
+                                try (MdcUtils.MdcCloseable ignored =
+                                        
MdcUtils.withContext(MdcUtils.asContextData(job))) {
+                                    LOG.info(
+                                            "Triggering Checkpoint {} for job 
{} failed due to {}",
+                                            checkpoint.getCheckpointID(),
+                                            job,
+                                            failure);
+                                    final CheckpointException cause;
+                                    if (failure instanceof 
CheckpointException) {
+                                        cause = (CheckpointException) failure;
+                                    } else {
+                                        cause =
+                                                new CheckpointException(
+                                                        CheckpointFailureReason
+                                                                
.TRIGGER_CHECKPOINT_FAILURE,
+                                                        failure);
+                                    }
+                                    timer.execute(
+                                            () -> {
+                                                synchronized (lock) {
+                                                    
abortPendingCheckpoint(checkpoint, cause);
+                                                }
+                                            });
+                                    return null;
                                 }
-                                timer.execute(
-                                        () -> {
-                                            synchronized (lock) {
-                                                
abortPendingCheckpoint(checkpoint, cause);
-                                            }
-                                        });
-                                return null;
                             });
 
             // It is possible that the tasks has finished
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorFactory.java
index 8854c0976d1..a05e47a1916 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorFactory.java
@@ -70,7 +70,8 @@ public class ChannelStateWriteRequestExecutorFactory {
                                     checkState(this.executor == executor);
                                     this.executor = null;
                                 },
-                                lock);
+                                lock,
+                                jobID);
                 if (startExecutor) {
                     executor.start();
                 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java
index b20388e1be0..25fa06c9f2d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java
@@ -18,9 +18,11 @@
 package org.apache.flink.runtime.checkpoint.channel;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.fs.FileSystemSafetyNet;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.MdcUtils;
 import org.apache.flink.util.function.RunnableWithException;
 
 import org.slf4j.Logger;
@@ -91,17 +93,21 @@ class ChannelStateWriteRequestExecutorImpl implements 
ChannelStateWriteRequestEx
     @GuardedBy("registerLock")
     private final Consumer<ChannelStateWriteRequestExecutor> onRegistered;
 
+    private final JobID jobID;
+
     ChannelStateWriteRequestExecutorImpl(
             ChannelStateWriteRequestDispatcher dispatcher,
             int maxSubtasksPerChannelStateFile,
             Consumer<ChannelStateWriteRequestExecutor> onRegistered,
-            Object registerLock) {
+            Object registerLock,
+            JobID jobID) {
         this(
                 dispatcher,
                 new ArrayDeque<>(),
                 maxSubtasksPerChannelStateFile,
                 registerLock,
-                onRegistered);
+                onRegistered,
+                jobID);
     }
 
     ChannelStateWriteRequestExecutorImpl(
@@ -109,44 +115,48 @@ class ChannelStateWriteRequestExecutorImpl implements 
ChannelStateWriteRequestEx
             Deque<ChannelStateWriteRequest> deque,
             int maxSubtasksPerChannelStateFile,
             Object registerLock,
-            Consumer<ChannelStateWriteRequestExecutor> onRegistered) {
+            Consumer<ChannelStateWriteRequestExecutor> onRegistered,
+            JobID jobID) {
         this.dispatcher = dispatcher;
         this.deque = deque;
         this.maxSubtasksPerChannelStateFile = maxSubtasksPerChannelStateFile;
         this.registerLock = registerLock;
         this.onRegistered = onRegistered;
-        this.thread = new Thread(this::run, "Channel state writer ");
+        this.thread = new Thread(this::run, "Channel state writer");
         this.subtasks = new HashSet<>();
         this.thread.setDaemon(true);
+        this.jobID = jobID;
     }
 
     @VisibleForTesting
     void run() {
-        try {
-            FileSystemSafetyNet.initializeSafetyNetForThread();
-            loop();
-        } catch (Exception ex) {
-            thrown = ex;
-        } finally {
+        try (MdcUtils.MdcCloseable ignored = 
MdcUtils.withContext(MdcUtils.asContextData(jobID))) {
             try {
-                closeAll(
-                        this::cleanupRequests,
-                        () -> {
-                            Throwable cause;
-                            synchronized (lock) {
-                                cause = thrown == null ? new 
CancellationException() : thrown;
-                            }
-                            dispatcher.fail(cause);
-                        });
-            } catch (Exception e) {
-                synchronized (lock) {
-                    //noinspection NonAtomicOperationOnVolatileField
-                    thrown = ExceptionUtils.firstOrSuppressed(e, thrown);
+                FileSystemSafetyNet.initializeSafetyNetForThread();
+                loop();
+            } catch (Exception ex) {
+                thrown = ex;
+            } finally {
+                try {
+                    closeAll(
+                            this::cleanupRequests,
+                            () -> {
+                                Throwable cause;
+                                synchronized (lock) {
+                                    cause = thrown == null ? new 
CancellationException() : thrown;
+                                }
+                                dispatcher.fail(cause);
+                            });
+                } catch (Exception e) {
+                    synchronized (lock) {
+                        //noinspection NonAtomicOperationOnVolatileField
+                        thrown = ExceptionUtils.firstOrSuppressed(e, thrown);
+                    }
                 }
+                
FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
             }
-            FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
+            LOG.debug("loop terminated");
         }
-        LOG.debug("loop terminated");
     }
 
     private void loop() throws Exception {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index fed987e9819..5b89203095e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -96,6 +96,8 @@ import 
org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.util.CollectionUtil;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.MdcUtils;
+import org.apache.flink.util.MdcUtils.MdcCloseable;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.concurrent.FutureUtils;
@@ -403,7 +405,8 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId>
 
         initJobClientExpiredTime(recoveredJob);
 
-        try {
+        try (MdcCloseable ignored =
+                
MdcUtils.withContext(MdcUtils.asContextData(recoveredJob.getJobID()))) {
             runJob(createJobMasterRunner(recoveredJob), 
ExecutionType.RECOVERY);
         } catch (Throwable throwable) {
             onFatalError(
@@ -431,7 +434,7 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId>
                                 .getScheduledExecutor()
                                 .scheduleWithFixedDelay(
                                         () ->
-                                                getMainThreadExecutor()
+                                                getMainThreadExecutor(jobID)
                                                         
.execute(this::checkJobClientAliveness),
                                         0L,
                                         jobClientAlivenessCheckInterval,
@@ -513,7 +516,9 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId>
     @Override
     public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time 
timeout) {
         final JobID jobID = jobGraph.getJobID();
-        log.info("Received JobGraph submission '{}' ({}).", 
jobGraph.getName(), jobID);
+        try (MdcCloseable ignored = 
MdcUtils.withContext(MdcUtils.asContextData(jobID))) {
+            log.info("Received JobGraph submission '{}' ({}).", 
jobGraph.getName(), jobID);
+        }
         return isInGloballyTerminalState(jobID)
                 .thenComposeAsync(
                         isTerminated -> {
@@ -547,7 +552,7 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId>
                                 return internalSubmitJob(jobGraph);
                             }
                         },
-                        getMainThreadExecutor());
+                        getMainThreadExecutor(jobID));
     }
 
     @Override
@@ -636,7 +641,7 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId>
                                         new JobSubmissionException(
                                                 jobId, "Failed to submit 
job.", strippedThrowable));
                             },
-                            getMainThreadExecutor());
+                            getMainThreadExecutor(jobId));
         }
         return CompletableFuture.completedFuture(Acknowledge.get());
     }
@@ -668,7 +673,7 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId>
                 dirtyJobResult,
                 highAvailabilityServices.getCheckpointRecoveryFactory(),
                 configuration,
-                ioExecutor);
+                getIoExecutor(dirtyJobResult.getJobId()));
     }
 
     private void runJob(JobManagerRunner jobManagerRunner, ExecutionType 
executionType)
@@ -698,7 +703,7 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId>
                                                         jobId, 
JobStatus.FAILED, throwable));
                                     }
                                 },
-                                getMainThreadExecutor())
+                                getMainThreadExecutor(jobId))
                         .thenCompose(Function.identity());
 
         final CompletableFuture<Void> jobTerminationFuture =
@@ -1185,7 +1190,7 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId>
                                                 e));
                             }
                         },
-                        ioExecutor)
+                        getIoExecutor(jobId))
                 .thenComposeAsync(
                         ignored ->
                                 performOperationOnJobMasterGateway(
@@ -1193,7 +1198,7 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId>
                                         jobMasterGateway ->
                                                 
jobMasterGateway.updateJobResourceRequirements(
                                                         
jobResourceRequirements)),
-                        getMainThreadExecutor())
+                        getMainThreadExecutor(jobId))
                 .whenComplete(
                         (ack, error) -> {
                             if (error != null) {
@@ -1254,7 +1259,7 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId>
                         jobManagerRunnerTerminationFutures.put(jobId, 
terminationFuture);
                     }
                 },
-                getMainThreadExecutor());
+                getMainThreadExecutor(jobId));
     }
 
     private CompletableFuture<Void> removeJob(JobID jobId, CleanupJobState 
cleanupJobState) {
@@ -1280,7 +1285,7 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId>
                             () ->
                                     runPostJobGloballyTerminated(
                                             jobId, 
cleanupJobState.getJobStatus()),
-                            getMainThreadExecutor());
+                            getMainThreadExecutor(jobId));
         } else {
             return localResourceCleaner.cleanupAsync(jobId);
         }
@@ -1400,7 +1405,7 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId>
                             }
                             return 
CleanupJobState.globalCleanup(terminalJobStatus);
                         },
-                        getMainThreadExecutor());
+                        getMainThreadExecutor(jobId));
     }
 
     /**
@@ -1474,7 +1479,8 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId>
                             }
                             return Acknowledge.get();
                         },
-                        getMainThreadExecutor());
+                        getMainThreadExecutor(
+                                
executionGraphInfo.getArchivedExecutionGraph().getJobID()));
     }
 
     private void jobMasterFailed(JobID jobId, Throwable cause) {
@@ -1562,7 +1568,7 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId>
 
         return FutureUtils.thenAcceptAsyncIfNotDone(
                 jobManagerTerminationFuture,
-                getMainThreadExecutor(),
+                getMainThreadExecutor(jobId),
                 FunctionUtils.uncheckedConsumer(
                         (ignored) -> {
                             jobManagerRunnerTerminationFutures.remove(jobId);
@@ -1586,7 +1592,7 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId>
     }
 
     public CompletableFuture<Void> onRemovedJobGraph(JobID jobId) {
-        return CompletableFuture.runAsync(() -> terminateJob(jobId), 
getMainThreadExecutor());
+        return CompletableFuture.runAsync(() -> terminateJob(jobId), 
getMainThreadExecutor(jobId));
     }
 
     private void applyParallelismOverrides(JobGraph jobGraph) {
@@ -1607,4 +1613,9 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId>
             }
         }
     }
+
+    private Executor getIoExecutor(JobID jobID) {
+        // todo: consider caching
+        return MdcUtils.scopeToJob(jobID, ioExecutor);
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobMasterServiceLeadershipRunnerFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobMasterServiceLeadershipRunnerFactory.java
index ecd67e1f3d4..bf7c69fb077 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobMasterServiceLeadershipRunnerFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobMasterServiceLeadershipRunnerFactory.java
@@ -39,6 +39,7 @@ import 
org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFact
 import org.apache.flink.runtime.leaderelection.LeaderElection;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.MdcUtils;
 import org.apache.flink.util.Preconditions;
 
 import java.util.Collection;
@@ -98,7 +99,8 @@ public enum JobMasterServiceLeadershipRunnerFactory 
implements JobManagerRunnerF
 
         final DefaultJobMasterServiceFactory jobMasterServiceFactory =
                 new DefaultJobMasterServiceFactory(
-                        jobManagerServices.getIoExecutor(),
+                        MdcUtils.scopeToJob(
+                                jobGraph.getJobID(), 
jobManagerServices.getIoExecutor()),
                         rpcService,
                         jobMasterConfiguration,
                         jobGraph,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
index 168a7436b28..1a853d851b6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
@@ -85,6 +85,7 @@ import org.apache.flink.util.CollectionUtil;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.IterableUtils;
+import org.apache.flink.util.MdcUtils;
 import org.apache.flink.util.OptionalFailure;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TernaryBoolean;
@@ -484,9 +485,12 @@ public class DefaultExecutionGraph implements 
ExecutionGraph, InternalExecutionG
         checkState(checkpointCoordinatorTimer == null);
 
         checkpointCoordinatorTimer =
-                Executors.newSingleThreadScheduledExecutor(
-                        new DispatcherThreadFactory(
-                                Thread.currentThread().getThreadGroup(), 
"Checkpoint Timer"));
+                MdcUtils.scopeToJob(
+                        getJobID(),
+                        Executors.newSingleThreadScheduledExecutor(
+                                new DispatcherThreadFactory(
+                                        
Thread.currentThread().getThreadGroup(),
+                                        "Checkpoint Timer")));
 
         // create the coordinator that triggers and commits checkpoints and 
holds the state
         checkpointCoordinator =
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 6edaa00ef07..3b288089a46 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -104,6 +104,7 @@ import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.MdcUtils;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.concurrent.FutureUtils;
 
@@ -250,7 +251,11 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId>
             long initializationTimestamp)
             throws Exception {
 
-        super(rpcService, RpcServiceUtils.createRandomName(JOB_MANAGER_NAME), 
jobMasterId);
+        super(
+                rpcService,
+                RpcServiceUtils.createRandomName(JOB_MANAGER_NAME),
+                jobMasterId,
+                MdcUtils.asContextData(jobGraph.getJobID()));
 
         final ExecutionDeploymentReconciliationHandler 
executionStateReconciliationHandler =
                 new ExecutionDeploymentReconciliationHandler() {
@@ -291,6 +296,10 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId>
                         }
                     }
                 };
+        final String jobName = jobGraph.getName();
+        final JobID jid = jobGraph.getJobID();
+
+        log.info("Initializing job '{}' ({}).", jobName, jid);
 
         this.executionDeploymentTracker = executionDeploymentTracker;
         this.executionDeploymentReconciler =
@@ -302,8 +311,9 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId>
         this.rpcTimeout = jobMasterConfiguration.getRpcTimeout();
         this.highAvailabilityServices = checkNotNull(highAvailabilityService);
         this.blobWriter = jobManagerSharedServices.getBlobWriter();
-        this.futureExecutor = jobManagerSharedServices.getFutureExecutor();
-        this.ioExecutor = jobManagerSharedServices.getIoExecutor();
+        this.futureExecutor =
+                MdcUtils.scopeToJob(jid, 
jobManagerSharedServices.getFutureExecutor());
+        this.ioExecutor = MdcUtils.scopeToJob(jid, 
jobManagerSharedServices.getIoExecutor());
         this.jobCompletionActions = checkNotNull(jobCompletionActions);
         this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
         this.userCodeLoader = checkNotNull(userCodeLoader);
@@ -313,11 +323,6 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId>
                         .getConfiguration()
                         .get(JobManagerOptions.RETRIEVE_TASK_MANAGER_HOSTNAME);
 
-        final String jobName = jobGraph.getName();
-        final JobID jid = jobGraph.getJobID();
-
-        log.info("Initializing job '{}' ({}).", jobName, jid);
-
         resourceManagerLeaderRetriever =
                 highAvailabilityServices.getResourceManagerLeaderRetriever();
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 45aa27eca69..422bbecb30e 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -85,6 +85,8 @@ import org.apache.flink.util.CollectionUtil;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.FlinkExpectedException;
+import org.apache.flink.util.MdcUtils;
+import org.apache.flink.util.MdcUtils.MdcCloseable;
 import org.apache.flink.util.concurrent.FutureUtils;
 
 import javax.annotation.Nullable;
@@ -435,7 +437,7 @@ public abstract class ResourceManager<WorkerType extends 
ResourceIDRetrievable>
                                         new FlinkException(declineMessage));
                             }
                         },
-                        getMainThreadExecutor());
+                        getMainThreadExecutor(jobId));
 
         // handle exceptions which might have occurred in one of the futures 
inputs of combine
         return registrationResponseFuture.handleAsync(
@@ -572,30 +574,34 @@ public abstract class ResourceManager<WorkerType extends 
ResourceIDRetrievable>
     public CompletableFuture<Acknowledge> declareRequiredResources(
             JobMasterId jobMasterId, ResourceRequirements 
resourceRequirements, Time timeout) {
         final JobID jobId = resourceRequirements.getJobId();
-        final JobManagerRegistration jobManagerRegistration = 
jobManagerRegistrations.get(jobId);
-
-        if (null != jobManagerRegistration) {
-            if (Objects.equals(jobMasterId, 
jobManagerRegistration.getJobMasterId())) {
-                return getReadyToServeFuture()
-                        .thenApply(
-                                acknowledge -> {
-                                    validateRunsInMainThread();
-                                    
slotManager.processResourceRequirements(resourceRequirements);
-                                    return null;
-                                });
+        try (MdcCloseable ignored = 
MdcUtils.withContext(MdcUtils.asContextData(jobId))) {
+            final JobManagerRegistration jobManagerRegistration =
+                    jobManagerRegistrations.get(jobId);
+
+            if (null != jobManagerRegistration) {
+                if (Objects.equals(jobMasterId, 
jobManagerRegistration.getJobMasterId())) {
+                    return getReadyToServeFuture()
+                            .thenApply(
+                                    acknowledge -> {
+                                        validateRunsInMainThread();
+                                        
slotManager.processResourceRequirements(
+                                                resourceRequirements);
+                                        return null;
+                                    });
+                } else {
+                    return FutureUtils.completedExceptionally(
+                            new ResourceManagerException(
+                                    "The job leader's id "
+                                            + 
jobManagerRegistration.getJobMasterId()
+                                            + " does not match the received id 
"
+                                            + jobMasterId
+                                            + '.'));
+                }
             } else {
                 return FutureUtils.completedExceptionally(
                         new ResourceManagerException(
-                                "The job leader's id "
-                                        + 
jobManagerRegistration.getJobMasterId()
-                                        + " does not match the received id "
-                                        + jobMasterId
-                                        + '.'));
+                                "Could not find registered job manager for job 
" + jobId + '.'));
             }
-        } else {
-            return FutureUtils.completedExceptionally(
-                    new ResourceManagerException(
-                            "Could not find registered job manager for job " + 
jobId + '.'));
         }
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index e75714425a2..8be2e04d24a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -142,6 +142,8 @@ import org.apache.flink.util.CollectionUtil;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.FlinkExpectedException;
+import org.apache.flink.util.MdcUtils;
+import org.apache.flink.util.MdcUtils.MdcCloseable;
 import org.apache.flink.util.OptionalConsumer;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
@@ -640,8 +642,10 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
     public CompletableFuture<Acknowledge> submitTask(
             TaskDeploymentDescriptor tdd, JobMasterId jobMasterId, Time 
timeout) {
 
-        try {
-            final JobID jobId = tdd.getJobId();
+        final JobID jobId = tdd.getJobId();
+        // todo: consider adding task info
+        try (MdcCloseable ignored = 
MdcUtils.withContext(MdcUtils.asContextData(jobId))) {
+
             final ExecutionAttemptID executionAttemptID = 
tdd.getExecutionAttemptId();
 
             final JobTable.Connection jobManagerConnection =
@@ -817,7 +821,7 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                             taskManagerConfiguration,
                             taskMetricGroup,
                             partitionStateChecker,
-                            getRpcService().getScheduledExecutor(),
+                            MdcUtils.scopeToJob(jobId, 
getRpcService().getScheduledExecutor()),
                             
channelStateExecutorFactoryManager.getOrCreateExecutorFactory(jobId));
 
             taskMetricGroup.gauge(MetricNames.IS_BACK_PRESSURED, 
task::isBackPressured);
@@ -905,13 +909,17 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
         final Task task = taskSlotTable.getTask(executionAttemptID);
 
         if (task != null) {
-            try {
-                task.cancelExecution();
-                return CompletableFuture.completedFuture(Acknowledge.get());
-            } catch (Throwable t) {
-                return FutureUtils.completedExceptionally(
-                        new TaskException(
-                                "Cannot cancel task for execution " + 
executionAttemptID + '.', t));
+            try (MdcCloseable ignored =
+                    
MdcUtils.withContext(MdcUtils.asContextData(task.getJobID()))) {
+                try {
+                    task.cancelExecution();
+                    return 
CompletableFuture.completedFuture(Acknowledge.get());
+                } catch (Throwable t) {
+                    return FutureUtils.completedExceptionally(
+                            new TaskException(
+                                    "Cannot cancel task for execution " + 
executionAttemptID + '.',
+                                    t));
+                }
             }
         } else {
             final String message =
@@ -1039,18 +1047,19 @@ public class TaskExecutor extends RpcEndpoint 
implements TaskExecutorGateway {
             long checkpointId,
             long checkpointTimestamp,
             CheckpointOptions checkpointOptions) {
-        log.debug(
-                "Trigger checkpoint {}@{} for {}.",
-                checkpointId,
-                checkpointTimestamp,
-                executionAttemptID);
-
         final Task task = taskSlotTable.getTask(executionAttemptID);
-
         if (task != null) {
-            task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, 
checkpointOptions);
+            try (MdcCloseable ignored =
+                    
MdcUtils.withContext(MdcUtils.asContextData(task.getJobID()))) {
+                log.debug(
+                        "Trigger checkpoint {}@{} for {}.",
+                        checkpointId,
+                        checkpointTimestamp,
+                        executionAttemptID);
+                task.triggerCheckpointBarrier(checkpointId, 
checkpointTimestamp, checkpointOptions);
 
-            return CompletableFuture.completedFuture(Acknowledge.get());
+                return CompletableFuture.completedFuture(Acknowledge.get());
+            }
         } else {
             final String message =
                     "TaskManager received a checkpoint request for unknown 
task "
@@ -1070,20 +1079,21 @@ public class TaskExecutor extends RpcEndpoint 
implements TaskExecutorGateway {
             long completedCheckpointId,
             long completedCheckpointTimestamp,
             long lastSubsumedCheckpointId) {
-        log.debug(
-                "Confirm completed checkpoint {}@{} and last subsumed 
checkpoint {} for {}.",
-                completedCheckpointId,
-                completedCheckpointTimestamp,
-                lastSubsumedCheckpointId,
-                executionAttemptID);
-
         final Task task = taskSlotTable.getTask(executionAttemptID);
-
         if (task != null) {
-            task.notifyCheckpointComplete(completedCheckpointId);
-
-            task.notifyCheckpointSubsumed(lastSubsumedCheckpointId);
-            return CompletableFuture.completedFuture(Acknowledge.get());
+            try (MdcCloseable ignored =
+                    
MdcUtils.withContext(MdcUtils.asContextData(task.getJobID()))) {
+                log.debug(
+                        "Confirm completed checkpoint {}@{} and last subsumed 
checkpoint {} for {}.",
+                        completedCheckpointId,
+                        completedCheckpointTimestamp,
+                        lastSubsumedCheckpointId,
+                        executionAttemptID);
+                task.notifyCheckpointComplete(completedCheckpointId);
+
+                task.notifyCheckpointSubsumed(lastSubsumedCheckpointId);
+                return CompletableFuture.completedFuture(Acknowledge.get());
+            }
         } else {
             final String message =
                     "TaskManager received a checkpoint confirmation for 
unknown task "
@@ -1146,37 +1156,40 @@ public class TaskExecutor extends RpcEndpoint 
implements TaskExecutorGateway {
         // TODO: Filter invalid requests from the resource manager by using the
         // instance/registration Id
 
-        log.info(
-                "Receive slot request {} for job {} from resource manager with 
leader id {}.",
-                allocationId,
-                jobId,
-                resourceManagerId);
+        try (MdcCloseable ignored = 
MdcUtils.withContext(MdcUtils.asContextData(jobId))) {
+            log.info(
+                    "Receive slot request {} for job {} from resource manager 
with leader id {}.",
+                    allocationId,
+                    jobId,
+                    resourceManagerId);
 
-        if (!isConnectedToResourceManager(resourceManagerId)) {
-            final String message =
-                    String.format(
-                            "TaskManager is not connected to the resource 
manager %s.",
-                            resourceManagerId);
-            log.debug(message);
-            return FutureUtils.completedExceptionally(new 
TaskManagerException(message));
-        }
+            if (!isConnectedToResourceManager(resourceManagerId)) {
+                final String message =
+                        String.format(
+                                "TaskManager is not connected to the resource 
manager %s.",
+                                resourceManagerId);
+                log.debug(message);
+                return FutureUtils.completedExceptionally(new 
TaskManagerException(message));
+            }
 
-        tryPersistAllocationSnapshot(
-                new SlotAllocationSnapshot(
-                        slotId, jobId, targetAddress, allocationId, 
resourceProfile));
+            tryPersistAllocationSnapshot(
+                    new SlotAllocationSnapshot(
+                            slotId, jobId, targetAddress, allocationId, 
resourceProfile));
 
-        try {
-            final boolean isConnected =
-                    allocateSlotForJob(jobId, slotId, allocationId, 
resourceProfile, targetAddress);
+            try {
+                final boolean isConnected =
+                        allocateSlotForJob(
+                                jobId, slotId, allocationId, resourceProfile, 
targetAddress);
 
-            if (isConnected) {
-                offerSlotsToJobManager(jobId);
-            }
+                if (isConnected) {
+                    offerSlotsToJobManager(jobId);
+                }
 
-            return CompletableFuture.completedFuture(Acknowledge.get());
-        } catch (SlotAllocationException e) {
-            log.debug("Could not allocate slot for allocation id {}.", 
allocationId, e);
-            return FutureUtils.completedExceptionally(e);
+                return CompletableFuture.completedFuture(Acknowledge.get());
+            } catch (SlotAllocationException e) {
+                log.debug("Could not allocate slot for allocation id {}.", 
allocationId, e);
+                return FutureUtils.completedExceptionally(e);
+            }
         }
     }
 
@@ -1266,15 +1279,17 @@ public class TaskExecutor extends RpcEndpoint 
implements TaskExecutorGateway {
 
     @Override
     public void freeInactiveSlots(JobID jobId, Time timeout) {
-        log.debug("Freeing inactive slots for job {}.", jobId);
-
-        // need a copy to prevent ConcurrentModificationExceptions
-        final ImmutableList<TaskSlot<Task>> inactiveSlots =
-                ImmutableList.copyOf(taskSlotTable.getAllocatedSlots(jobId));
-        for (TaskSlot<Task> slot : inactiveSlots) {
-            freeSlotInternal(
-                    slot.getAllocationId(),
-                    new FlinkException("Slot was re-claimed by resource 
manager."));
+        try (MdcCloseable ignored = 
MdcUtils.withContext(MdcUtils.asContextData(jobId))) {
+            log.debug("Freeing inactive slots for job {}.", jobId);
+
+            // need a copy to prevent ConcurrentModificationExceptions
+            final ImmutableList<TaskSlot<Task>> inactiveSlots =
+                    
ImmutableList.copyOf(taskSlotTable.getAllocatedSlots(jobId));
+            for (TaskSlot<Task> slot : inactiveSlots) {
+                freeSlotInternal(
+                        slot.getAllocationId(),
+                        new FlinkException("Slot was re-claimed by resource 
manager."));
+            }
         }
     }
 
@@ -1338,16 +1353,22 @@ public class TaskExecutor extends RpcEndpoint 
implements TaskExecutorGateway {
 
     @Override
     public void disconnectJobManager(JobID jobId, Exception cause) {
-        jobTable.getConnection(jobId)
-                .ifPresent(
-                        jobManagerConnection ->
-                                
disconnectAndTryReconnectToJobManager(jobManagerConnection, cause));
+        try (MdcCloseable ignored = 
MdcUtils.withContext(MdcUtils.asContextData(jobId))) {
+            jobTable.getConnection(jobId)
+                    .ifPresent(
+                            jobManagerConnection ->
+                                    disconnectAndTryReconnectToJobManager(
+                                            jobManagerConnection, cause));
+        }
     }
 
     private void disconnectAndTryReconnectToJobManager(
             JobTable.Connection jobManagerConnection, Exception cause) {
-        disconnectJobManagerConnection(jobManagerConnection, cause);
-        jobLeaderService.reconnect(jobManagerConnection.getJobId());
+        try (MdcCloseable ignored =
+                
MdcUtils.withContext(MdcUtils.asContextData(jobManagerConnection.getJobId()))) {
+            disconnectJobManagerConnection(jobManagerConnection, cause);
+            jobLeaderService.reconnect(jobManagerConnection.getJobId());
+        }
     }
 
     @Override
@@ -1658,7 +1679,7 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
             acceptedSlotsFuture.whenCompleteAsync(
                     handleAcceptedSlotOffers(
                             jobId, jobMasterGateway, jobMasterId, 
reservedSlots, slotOfferId),
-                    getMainThreadExecutor());
+                    getMainThreadExecutor(jobId));
         } else {
             log.debug("There are no unassigned slots for the job {}.", jobId);
         }
@@ -2061,13 +2082,17 @@ public class TaskExecutor extends RpcEndpoint 
implements TaskExecutorGateway {
         // only respond to freeing slots when not shutting down to avoid 
freeing slot allocation
         // information
         if (isRunning()) {
-            log.debug(
-                    "Free slot with allocation id {} because: {}",
-                    allocationId,
-                    cause.getMessage());
+            final JobID jobId = taskSlotTable.getOwningJob(allocationId);
+            try (MdcCloseable ignored =
+                    MdcUtils.withContext(
+                            jobId == null
+                                    ? Collections.emptyMap()
+                                    : MdcUtils.asContextData(jobId))) {
 
-            try {
-                final JobID jobId = taskSlotTable.getOwningJob(allocationId);
+                log.debug(
+                        "Free slot with allocation id {} because: {}",
+                        allocationId,
+                        cause.getMessage());
 
                 final int slotIndex = taskSlotTable.freeSlot(allocationId, 
cause);
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 877279326e9..f23ec1f3a3b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -87,6 +87,8 @@ import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FatalExitExceptionHandler;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.MdcUtils;
+import org.apache.flink.util.MdcUtils.MdcCloseable;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TaskManagerExceptionUtils;
@@ -562,7 +564,7 @@ public class Task
     /** The core work method that bootstraps the task and executes its code. */
     @Override
     public void run() {
-        try {
+        try (MdcCloseable ignored = 
MdcUtils.withContext(MdcUtils.asContextData(jobId))) {
             doRun();
         } finally {
             terminationFuture.complete(executionState);
@@ -1244,7 +1246,8 @@ public class Task
                                         invokable,
                                         executingThread,
                                         taskNameWithSubtask,
-                                        taskCancellationInterval);
+                                        taskCancellationInterval,
+                                        jobId);
 
                         Thread interruptingThread =
                                 new Thread(
@@ -1266,7 +1269,8 @@ public class Task
                                             taskInfo,
                                             executingThread,
                                             taskManagerActions,
-                                            taskCancellationTimeout);
+                                            taskCancellationTimeout,
+                                            jobId);
 
                             Thread watchDogThread =
                                     new Thread(
@@ -1661,7 +1665,7 @@ public class Task
 
         @Override
         public void run() {
-            try {
+            try (MdcCloseable ignored = 
MdcUtils.withContext(MdcUtils.asContextData(jobId))) {
                 // the user-defined cancel method may throw errors.
                 // we need do continue despite that
                 try {
@@ -1708,23 +1712,27 @@ public class Task
         /** The interval in which we interrupt. */
         private final long interruptIntervalMillis;
 
+        private final JobID jobID;
+
         TaskInterrupter(
                 Logger log,
                 TaskInvokable task,
                 Thread executorThread,
                 String taskName,
-                long interruptIntervalMillis) {
+                long interruptIntervalMillis,
+                JobID jobID) {
 
             this.log = log;
             this.task = task;
             this.executorThread = executorThread;
             this.taskName = taskName;
             this.interruptIntervalMillis = interruptIntervalMillis;
+            this.jobID = jobID;
         }
 
         @Override
         public void run() {
-            try {
+            try (MdcCloseable ignored = 
MdcUtils.withContext(MdcUtils.asContextData(jobID))) {
                 // we initially wait for one interval
                 // in most cases, the threads go away immediately (by the 
cancellation thread)
                 // and we need not actually do anything
@@ -1765,11 +1773,14 @@ public class Task
 
         private final TaskInfo taskInfo;
 
+        private final JobID jobID;
+
         TaskCancelerWatchDog(
                 TaskInfo taskInfo,
                 Thread executorThread,
                 TaskManagerActions taskManager,
-                long timeoutMillis) {
+                long timeoutMillis,
+                JobID jobID) {
 
             checkArgument(timeoutMillis > 0);
 
@@ -1777,11 +1788,12 @@ public class Task
             this.executorThread = executorThread;
             this.taskManager = taskManager;
             this.timeoutMillis = timeoutMillis;
+            this.jobID = jobID;
         }
 
         @Override
         public void run() {
-            try {
+            try (MdcCloseable ign = 
MdcUtils.withContext(MdcUtils.asContextData(jobID))) {
                 Deadline timeout = 
Deadline.fromNow(Duration.ofMillis(timeoutMillis));
                 while (executorThread.isAlive() && timeout.hasTimeLeft()) {
                     try {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImplTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImplTest.java
index ea31130b3fc..8f9f073154c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImplTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImplTest.java
@@ -87,7 +87,7 @@ class ChannelStateWriteRequestExecutorImplTest {
         Object registerLock = new Object();
         ChannelStateWriteRequestExecutorImpl worker =
                 new ChannelStateWriteRequestExecutorImpl(
-                        NO_OP, closingDeque, 5, registerLock, e -> {});
+                        NO_OP, closingDeque, 5, registerLock, e -> {}, new 
JobID());
         closingDeque.setWorker(worker);
         synchronized (registerLock) {
             worker.registerSubtask(JOB_VERTEX_ID, SUBTASK_INDEX);
@@ -109,7 +109,7 @@ class ChannelStateWriteRequestExecutorImplTest {
             Object registerLock = new Object();
             ChannelStateWriteRequestExecutorImpl executor =
                     new ChannelStateWriteRequestExecutorImpl(
-                            NO_OP, deque, 5, registerLock, e -> {});
+                            NO_OP, deque, 5, registerLock, e -> {}, new 
JobID());
             synchronized (registerLock) {
                 executor.registerSubtask(JOB_VERTEX_ID, SUBTASK_INDEX);
             }
@@ -134,7 +134,7 @@ class ChannelStateWriteRequestExecutorImplTest {
         Object registerLock = new Object();
         ChannelStateWriteRequestExecutorImpl worker =
                 new ChannelStateWriteRequestExecutorImpl(
-                        requestProcessor, deque, 5, registerLock, e -> {});
+                        requestProcessor, deque, 5, registerLock, e -> {}, new 
JobID());
         synchronized (registerLock) {
             worker.registerSubtask(JOB_VERTEX_ID, SUBTASK_INDEX);
         }
@@ -153,7 +153,7 @@ class ChannelStateWriteRequestExecutorImplTest {
         Object registerLock = new Object();
         ChannelStateWriteRequestExecutorImpl worker =
                 new ChannelStateWriteRequestExecutorImpl(
-                        requestProcessor, deque, 5, registerLock, e -> {});
+                        requestProcessor, deque, 5, registerLock, e -> {}, new 
JobID());
         synchronized (registerLock) {
             worker.registerSubtask(JOB_VERTEX_ID, SUBTASK_INDEX);
         }
@@ -180,7 +180,8 @@ class ChannelStateWriteRequestExecutorImplTest {
                         new ChannelStateSerializerImpl());
         Object registerLock = new Object();
         ChannelStateWriteRequestExecutorImpl worker =
-                new ChannelStateWriteRequestExecutorImpl(processor, 5, e -> 
{}, registerLock);
+                new ChannelStateWriteRequestExecutorImpl(
+                        processor, 5, e -> {}, registerLock, new JobID());
         synchronized (registerLock) {
             worker.registerSubtask(JOB_VERTEX_ID, SUBTASK_INDEX);
         }
@@ -273,7 +274,7 @@ class ChannelStateWriteRequestExecutorImplTest {
         Object registerLock = new Object();
         ChannelStateWriteRequestExecutorImpl worker =
                 new ChannelStateWriteRequestExecutorImpl(
-                        throwingRequestProcessor, 5, e -> {}, registerLock);
+                        throwingRequestProcessor, 5, e -> {}, registerLock, 
new JobID());
         synchronized (registerLock) {
             worker.registerSubtask(JOB_VERTEX_ID, subtaskIndex0);
             worker.registerSubtask(JOB_VERTEX_ID, subtaskIndex1);
@@ -319,7 +320,7 @@ class ChannelStateWriteRequestExecutorImplTest {
         Object registerLock = new Object();
         ChannelStateWriteRequestExecutorImpl worker =
                 new ChannelStateWriteRequestExecutorImpl(
-                        throwingRequestProcessor, deque, 5, registerLock, e -> 
{});
+                        throwingRequestProcessor, deque, 5, registerLock, e -> 
{}, new JobID());
         synchronized (registerLock) {
             worker.registerSubtask(JOB_VERTEX_ID, SUBTASK_INDEX);
         }
@@ -342,7 +343,8 @@ class ChannelStateWriteRequestExecutorImplTest {
     void testSubmitRequestOfUnregisteredSubtask() throws Exception {
         Object registerLock = new Object();
         ChannelStateWriteRequestExecutorImpl worker =
-                new ChannelStateWriteRequestExecutorImpl(NO_OP, 5, e -> {}, 
registerLock);
+                new ChannelStateWriteRequestExecutorImpl(
+                        NO_OP, 5, e -> {}, registerLock, new JobID());
         synchronized (registerLock) {
             worker.registerSubtask(JOB_VERTEX_ID, SUBTASK_INDEX);
         }
@@ -366,7 +368,8 @@ class ChannelStateWriteRequestExecutorImplTest {
     void testSubmitPriorityUnreadyRequest() throws Exception {
         Object registerLock = new Object();
         ChannelStateWriteRequestExecutorImpl worker =
-                new ChannelStateWriteRequestExecutorImpl(NO_OP, 5, e -> {}, 
registerLock);
+                new ChannelStateWriteRequestExecutorImpl(
+                        NO_OP, 5, e -> {}, registerLock, new JobID());
         synchronized (registerLock) {
             worker.registerSubtask(JOB_VERTEX_ID, SUBTASK_INDEX);
         }
@@ -390,7 +393,7 @@ class ChannelStateWriteRequestExecutorImplTest {
         Object registerLock = new Object();
         ChannelStateWriteRequestExecutorImpl worker =
                 new ChannelStateWriteRequestExecutorImpl(
-                        NO_OP, maxSubtasksPerChannelStateFile, e -> {}, 
registerLock);
+                        NO_OP, maxSubtasksPerChannelStateFile, e -> {}, 
registerLock, new JobID());
         synchronized (registerLock) {
             for (int i = 0; i < maxSubtasksPerChannelStateFile; i++) {
                 assertThat(worker.isRegistering()).isTrue();
@@ -429,7 +432,8 @@ class ChannelStateWriteRequestExecutorImplTest {
                         dispatcher,
                         maxSubtasksPerChannelStateFile,
                         workerFuture::complete,
-                        registerLock);
+                        registerLock,
+                        new JobID());
         worker.start();
         synchronized (registerLock) {
             worker.registerSubtask(JOB_VERTEX_ID, SUBTASK_INDEX);
@@ -467,7 +471,8 @@ class ChannelStateWriteRequestExecutorImplTest {
                         new TestRequestDispatcher(),
                         maxSubtasksPerChannelStateFile,
                         workerFuture::complete,
-                        registerLock);
+                        registerLock,
+                        new JobID());
         worker.start();
         synchronized (registerLock) {
             worker.registerSubtask(JOB_VERTEX_ID, SUBTASK_INDEX);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
index d61ba418d0d..afabbeff592 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
@@ -23,6 +23,8 @@ import org.apache.flink.util.concurrent.FutureUtils;
 import org.apache.flink.util.concurrent.ScheduledExecutor;
 
 import java.io.Serializable;
+import java.util.Collections;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Function;
@@ -195,8 +197,9 @@ public class TestingRpcService implements RpcService {
     }
 
     @Override
-    public <C extends RpcEndpoint & RpcGateway> RpcServer startServer(C 
rpcEndpoint) {
-        return backingRpcService.startServer(rpcEndpoint);
+    public <C extends RpcEndpoint & RpcGateway> RpcServer startServer(
+            C rpcEndpoint, Map<String, String> loggingContext) {
+        return backingRpcService.startServer(rpcEndpoint, 
Collections.emptyMap());
     }
 
     @Override
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 23912711562..d16bc40a8be 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -110,6 +110,7 @@ import org.apache.flink.util.FatalExitExceptionHandler;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.MdcUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.clock.SystemClock;
@@ -416,8 +417,10 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
             resourceCloser.registerCloseable(mailboxProcessor);
 
             this.channelIOExecutor =
-                    Executors.newSingleThreadExecutor(
-                            new 
ExecutorThreadFactory("channel-state-unspilling"));
+                    MdcUtils.scopeToJob(
+                            environment.getJobID(),
+                            Executors.newSingleThreadExecutor(
+                                    new 
ExecutorThreadFactory("channel-state-unspilling")));
             resourceCloser.registerCloseable(channelIOExecutor::shutdown);
 
             this.recordWriter = createRecordWriterDelegate(configuration, 
environment);
@@ -436,13 +439,16 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
             // for simultaneous N ongoing concurrent checkpoints and for 
example clean up of one
             // aborted one.
             this.asyncOperationsThreadPool =
-                    new ThreadPoolExecutor(
-                            0,
-                            configuration.getMaxConcurrentCheckpoints() + 1,
-                            60L,
-                            TimeUnit.SECONDS,
-                            new LinkedBlockingQueue<>(),
-                            new ExecutorThreadFactory("AsyncOperations", 
uncaughtExceptionHandler));
+                    MdcUtils.scopeToJob(
+                            getEnvironment().getJobID(),
+                            new ThreadPoolExecutor(
+                                    0,
+                                    
configuration.getMaxConcurrentCheckpoints() + 1,
+                                    60L,
+                                    TimeUnit.SECONDS,
+                                    new LinkedBlockingQueue<>(),
+                                    new ExecutorThreadFactory(
+                                            "AsyncOperations", 
uncaughtExceptionHandler)));
 
             // Register all asynchronous checkpoint threads.
             resourceCloser.registerCloseable(this::shutdownAsyncThreads);
diff --git 
a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/logging/LoggerAuditingExtension.java
 
b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/logging/LoggerAuditingExtension.java
index 8ca1b5cc1da..a1e7367713d 100644
--- 
a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/logging/LoggerAuditingExtension.java
+++ 
b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/logging/LoggerAuditingExtension.java
@@ -33,6 +33,7 @@ import org.junit.jupiter.api.extension.ExtensionContext;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.stream.Collectors;
 
 /**
  * Utility for auditing logged messages.(Junit5 extension)
@@ -47,17 +48,31 @@ public class LoggerAuditingExtension implements 
BeforeEachCallback, AfterEachCal
     private final String loggerName;
     private final org.slf4j.event.Level level;
 
-    private ConcurrentLinkedQueue<String> loggingEvents;
+    private ConcurrentLinkedQueue<LogEvent> loggingEvents;
 
     public LoggerAuditingExtension(Class<?> clazz, org.slf4j.event.Level 
level) {
-        this.loggerName = clazz.getCanonicalName();
+        this(clazz.getCanonicalName(), level);
+    }
+
+    public LoggerAuditingExtension(String loggerName, org.slf4j.event.Level 
level) {
+        this.loggerName = loggerName;
         this.level = level;
     }
 
     public List<String> getMessages() {
+        return loggingEvents.stream()
+                .map(e -> e.getMessage().getFormattedMessage())
+                .collect(Collectors.toList());
+    }
+
+    public List<LogEvent> getEvents() {
         return new ArrayList<>(loggingEvents);
     }
 
+    public String getLoggerName() {
+        return loggerName;
+    }
+
     @Override
     public void beforeEach(ExtensionContext context) throws Exception {
         loggingEvents = new ConcurrentLinkedQueue<>();
@@ -66,7 +81,7 @@ public class LoggerAuditingExtension implements 
BeforeEachCallback, AfterEachCal
                 new AbstractAppender("test-appender", null, null, false, 
Property.EMPTY_ARRAY) {
                     @Override
                     public void append(LogEvent event) {
-                        
loggingEvents.add(event.getMessage().getFormattedMessage());
+                        loggingEvents.add(event.toImmutable());
                     }
                 };
         testAppender.start();
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index 6f10bdfd43d..8c46a28d0a7 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -44,7 +44,7 @@ under the License.
        </properties>
 
        <dependencies>
-       
+
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-core</artifactId>
diff --git 
a/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java
index aceabcea7c4..2ef37eddd13 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java
@@ -56,6 +56,7 @@ import java.io.Serializable;
 import java.time.Duration;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
@@ -382,8 +383,9 @@ public class OperatorEventSendingCheckpointITCase extends 
TestLogger {
         }
 
         @Override
-        public <C extends RpcEndpoint & RpcGateway> RpcServer startServer(C 
rpcEndpoint) {
-            return rpcService.startServer(rpcEndpoint);
+        public <C extends RpcEndpoint & RpcGateway> RpcServer startServer(
+                C rpcEndpoint, Map<String, String> loggingContext) {
+            return rpcService.startServer(rpcEndpoint, Collections.emptyMap());
         }
 
         @Override
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/misc/JobIDLoggingITCase.java 
b/flink-tests/src/test/java/org/apache/flink/test/misc/JobIDLoggingITCase.java
new file mode 100644
index 00000000000..3380698feb7
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/misc/JobIDLoggingITCase.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.misc;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.core.execution.CheckpointType;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+import org.apache.flink.runtime.taskmanager.Task;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.test.junit5.InjectClusterClient;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.testutils.logging.LoggerAuditingExtension;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.MdcUtils;
+
+import org.apache.logging.log4j.core.LogEvent;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkState;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.slf4j.event.Level.DEBUG;
+
+/**
+ * Tests adding of {@link JobID} to logs (via {@link org.slf4j.MDC}) in the 
most important cases.
+ */
+public class JobIDLoggingITCase {
+    private static final Logger logger = 
LoggerFactory.getLogger(JobIDLoggingITCase.class);
+
+    @RegisterExtension
+    public final LoggerAuditingExtension checkpointCoordinatorLogging =
+            new LoggerAuditingExtension(CheckpointCoordinator.class, DEBUG);
+
+    @RegisterExtension
+    public final LoggerAuditingExtension streamTaskLogging =
+            new LoggerAuditingExtension(StreamTask.class, DEBUG);
+
+    @RegisterExtension
+    public final LoggerAuditingExtension taskExecutorLogging =
+            new LoggerAuditingExtension(TaskExecutor.class, DEBUG);
+
+    @RegisterExtension
+    public final LoggerAuditingExtension taskLogging =
+            new LoggerAuditingExtension(Task.class, DEBUG);
+
+    @RegisterExtension
+    public final LoggerAuditingExtension executionGraphLogging =
+            new LoggerAuditingExtension(ExecutionGraph.class, DEBUG);
+
+    @RegisterExtension
+    public final LoggerAuditingExtension jobMasterLogging =
+            new LoggerAuditingExtension(JobMaster.class, DEBUG);
+
+    @RegisterExtension
+    public final LoggerAuditingExtension asyncCheckpointRunnableLogging =
+            // this class is private
+            new LoggerAuditingExtension(
+                    
"org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable", DEBUG);
+
+    @RegisterExtension
+    public static MiniClusterExtension miniClusterResource =
+            new MiniClusterExtension(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(1)
+                            .setNumberSlotsPerTaskManager(1)
+                            .build());
+
+    @Test
+    public void testJobIDLogging(@InjectClusterClient ClusterClient<?> 
clusterClient)
+            throws Exception {
+        JobID jobID = runJob(clusterClient);
+        clusterClient.cancel(jobID).get();
+
+        // NOTE: most of the assertions are empirical, such as
+        // - which classes are important
+        // - how many messages to expect
+        // - which log patterns to ignore
+
+        assertJobIDPresent(jobID, 3, checkpointCoordinatorLogging);
+        assertJobIDPresent(jobID, 6, streamTaskLogging);
+        assertJobIDPresent(
+                jobID,
+                9,
+                taskExecutorLogging,
+                "Un-registering task.*",
+                "Successful registration.*",
+                "Establish JobManager connection.*",
+                "Offer reserved slots.*",
+                ".*ResourceManager.*",
+                "Operator event.*");
+
+        assertJobIDPresent(jobID, 10, taskLogging);
+        assertJobIDPresent(jobID, 10, executionGraphLogging);
+        assertJobIDPresent(
+                jobID,
+                15,
+                jobMasterLogging,
+                "Registration at ResourceManager.*",
+                "Registration with ResourceManager.*",
+                "Resolved ResourceManager address.*");
+        assertJobIDPresent(jobID, 1, asyncCheckpointRunnableLogging);
+    }
+
+    private static void assertJobIDPresent(
+            JobID jobID,
+            int expectedLogMessages,
+            LoggerAuditingExtension ext,
+            String... ignPatterns) {
+        String loggerName = ext.getLoggerName();
+        checkState(
+                ext.getEvents().size() >= expectedLogMessages,
+                "Too few log events recorded for %s (%s) - this must be a bug 
in the test code",
+                loggerName,
+                ext.getEvents().size());
+
+        final List<LogEvent> eventsWithMissingJobId = new ArrayList<>();
+        final List<LogEvent> eventsWithWrongJobId = new ArrayList<>();
+        final List<LogEvent> ignoredEvents = new ArrayList<>();
+        final List<Pattern> ignorePatterns =
+                
Arrays.stream(ignPatterns).map(Pattern::compile).collect(Collectors.toList());
+
+        for (LogEvent e : ext.getEvents()) {
+            if (e.getContextData().containsKey(MdcUtils.JOB_ID)) {
+                if (!Objects.equals(
+                        e.getContextData().getValue(MdcUtils.JOB_ID), 
jobID.toHexString())) {
+                    eventsWithWrongJobId.add(e);
+                }
+            } else if (matchesAny(ignorePatterns, 
e.getMessage().getFormattedMessage())) {
+                ignoredEvents.add(e);
+            } else {
+                eventsWithMissingJobId.add(e);
+            }
+        }
+        logger.debug(
+                "checked events for {}:\n  {};\n  ignored: {},\n  wrong job 
id: {},\n  missing job id: {}",
+                loggerName,
+                ext.getEvents(),
+                ignoredEvents,
+                eventsWithWrongJobId,
+                eventsWithMissingJobId);
+        assertThat(eventsWithWrongJobId).as("events with a wrong Job 
ID").isEmpty();
+        assertTrue(
+                eventsWithMissingJobId.isEmpty(),
+                "too many events without Job ID recorded for "
+                        + loggerName
+                        + ": "
+                        + eventsWithMissingJobId);
+    }
+
+    private static boolean matchesAny(List<Pattern> patternStream, String 
message) {
+        return patternStream.stream().anyMatch(p -> 
p.matcher(message).matches());
+    }
+
+    private static JobID runJob(ClusterClient<?> clusterClient) throws 
Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.fromSequence(Long.MIN_VALUE, Long.MAX_VALUE).addSink(new 
DiscardingSink<>());
+        JobID jobId = 
clusterClient.submitJob(env.getStreamGraph().getJobGraph()).get();
+        Deadline deadline = Deadline.fromNow(Duration.ofMinutes(5));
+        while (deadline.hasTimeLeft()
+                && clusterClient.listJobs().get().stream()
+                        .noneMatch(
+                                m ->
+                                        m.getJobId().equals(jobId)
+                                                && 
m.getJobState().equals(JobStatus.RUNNING))) {
+            Thread.sleep(10);
+        }
+        // wait for all tasks ready and then checkpoint
+        while (true) {
+            try {
+                clusterClient.triggerCheckpoint(jobId, 
CheckpointType.DEFAULT).get();
+                return jobId;
+            } catch (ExecutionException e) {
+                if (ExceptionUtils.findThrowable(e, 
CheckpointException.class).isPresent()
+                        && !deadline.isOverdue()) {
+                    Thread.sleep(10);
+                } else {
+                    throw e;
+                }
+            }
+        }
+    }
+}
diff --git a/flink-tests/src/test/resources/log4j2-test.properties 
b/flink-tests/src/test/resources/log4j2-test.properties
index c5d9b0f65be..843e105b0ea 100644
--- a/flink-tests/src/test/resources/log4j2-test.properties
+++ b/flink-tests/src/test/resources/log4j2-test.properties
@@ -28,7 +28,7 @@ appender.testlogger.name = TestLogger
 appender.testlogger.type = CONSOLE
 appender.testlogger.target = SYSTEM_ERR
 appender.testlogger.layout.type = PatternLayout
-appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
+appender.testlogger.layout.pattern = [%-32X{flink-job-id}] %c{0} [%t] %-5p %m%n
 
 logger.migration.name = org.apache.flink.test.migration
 logger.migration.level = INFO

Reply via email to