This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 27569da4d36 [FLINK-36710] Log job ID in 
RecreateOnResetOperatorCoordinator
27569da4d36 is described below

commit 27569da4d36d9b0fab8de99f0121ada2780d4b13
Author: Roman Khachatryan <[email protected]>
AuthorDate: Fri Nov 8 17:39:57 2024 +0100

    [FLINK-36710] Log job ID in RecreateOnResetOperatorCoordinator
---
 .../coordination/RecreateOnResetOperatorCoordinator.java   | 14 ++++++++++----
 .../org/apache/flink/test/misc/JobIDLoggingITCase.java     |  4 +---
 2 files changed, 11 insertions(+), 7 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
index 8e3a921e2c6..98718fe3ba8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.metrics.groups.OperatorCoordinatorMetricGroup;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.util.MdcUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.ThrowingConsumer;
 import org.apache.flink.util.function.ThrowingRunnable;
@@ -33,6 +34,7 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 
 import java.time.Duration;
+import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -58,7 +60,7 @@ public class RecreateOnResetOperatorCoordinator implements 
OperatorCoordinator {
             throws Exception {
         this.context = context;
         this.provider = provider;
-        this.coordinator = new DeferrableCoordinator(context.getOperatorId());
+        this.coordinator = new DeferrableCoordinator(context.getOperatorId(), 
context.getJobID());
         this.coordinator.createNewInternalCoordinator(context, provider);
         this.coordinator.processPendingCalls();
         this.closingTimeoutMs = closingTimeoutMs;
@@ -132,7 +134,7 @@ public class RecreateOnResetOperatorCoordinator implements 
OperatorCoordinator {
         // After this point all the subsequent calls will be made to the new 
coordinator.
         final DeferrableCoordinator oldCoordinator = coordinator;
         final DeferrableCoordinator newCoordinator =
-                new DeferrableCoordinator(context.getOperatorId());
+                new DeferrableCoordinator(context.getOperatorId(), 
context.getJobID());
         coordinator = newCoordinator;
         // Close the old coordinator asynchronously in a separate closing 
thread.
         // The future will be completed when the old coordinator closes.
@@ -327,18 +329,20 @@ public class RecreateOnResetOperatorCoordinator 
implements OperatorCoordinator {
     private static class DeferrableCoordinator {
         private final OperatorID operatorId;
         private final BlockingQueue<NamedCall> pendingCalls;
+        private final Map<String, String> mdc;
         private QuiesceableContext internalQuiesceableContext;
         private OperatorCoordinator internalCoordinator;
         private boolean hasCaughtUp;
         private boolean closed;
         private volatile boolean failed;
 
-        private DeferrableCoordinator(OperatorID operatorId) {
+        private DeferrableCoordinator(OperatorID operatorId, JobID jobID) {
             this.operatorId = operatorId;
             this.pendingCalls = new LinkedBlockingQueue<>();
             this.hasCaughtUp = false;
             this.closed = false;
             this.failed = false;
+            this.mdc = MdcUtils.asContextData(jobID);
         }
 
         synchronized <T extends Exception> void applyCall(
@@ -346,7 +350,9 @@ public class RecreateOnResetOperatorCoordinator implements 
OperatorCoordinator {
             synchronized (this) {
                 if (hasCaughtUp) {
                     // The new coordinator has caught up.
-                    call.accept(internalCoordinator);
+                    try (MdcUtils.MdcCloseable ignored = 
MdcUtils.withContext(mdc)) {
+                        call.accept(internalCoordinator);
+                    }
                 } else {
                     pendingCalls.add(new NamedCall(name, call));
                 }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/misc/JobIDLoggingITCase.java 
b/flink-tests/src/test/java/org/apache/flink/test/misc/JobIDLoggingITCase.java
index 2e8ac6a3ff0..206191195dd 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/misc/JobIDLoggingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/misc/JobIDLoggingITCase.java
@@ -149,9 +149,7 @@ class JobIDLoggingITCase {
                 sourceCoordinatorLogging,
                 asList(
                         "Starting split enumerator.*",
-                        "Distributing maxAllowedWatermark.*",
-                        "Source .* registering reader for parallel task.*",
-                        "Closing SourceCoordinator for source .*"));
+                        "Source .* registering reader for parallel task.*"));
 
         assertJobIDPresent(
                 jobID,

Reply via email to