This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 27569da4d36 [FLINK-36710] Log job ID in
RecreateOnResetOperatorCoordinator
27569da4d36 is described below
commit 27569da4d36d9b0fab8de99f0121ada2780d4b13
Author: Roman Khachatryan <[email protected]>
AuthorDate: Fri Nov 8 17:39:57 2024 +0100
[FLINK-36710] Log job ID in RecreateOnResetOperatorCoordinator
---
.../coordination/RecreateOnResetOperatorCoordinator.java | 14 ++++++++++----
.../org/apache/flink/test/misc/JobIDLoggingITCase.java | 4 +---
2 files changed, 11 insertions(+), 7 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
index 8e3a921e2c6..98718fe3ba8 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.metrics.groups.OperatorCoordinatorMetricGroup;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.util.MdcUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingConsumer;
import org.apache.flink.util.function.ThrowingRunnable;
@@ -33,6 +34,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.time.Duration;
+import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
@@ -58,7 +60,7 @@ public class RecreateOnResetOperatorCoordinator implements
OperatorCoordinator {
throws Exception {
this.context = context;
this.provider = provider;
- this.coordinator = new DeferrableCoordinator(context.getOperatorId());
+ this.coordinator = new DeferrableCoordinator(context.getOperatorId(),
context.getJobID());
this.coordinator.createNewInternalCoordinator(context, provider);
this.coordinator.processPendingCalls();
this.closingTimeoutMs = closingTimeoutMs;
@@ -132,7 +134,7 @@ public class RecreateOnResetOperatorCoordinator implements
OperatorCoordinator {
// After this point all the subsequent calls will be made to the new
coordinator.
final DeferrableCoordinator oldCoordinator = coordinator;
final DeferrableCoordinator newCoordinator =
- new DeferrableCoordinator(context.getOperatorId());
+ new DeferrableCoordinator(context.getOperatorId(),
context.getJobID());
coordinator = newCoordinator;
// Close the old coordinator asynchronously in a separate closing
thread.
// The future will be completed when the old coordinator closes.
@@ -327,18 +329,20 @@ public class RecreateOnResetOperatorCoordinator
implements OperatorCoordinator {
private static class DeferrableCoordinator {
private final OperatorID operatorId;
private final BlockingQueue<NamedCall> pendingCalls;
+ private final Map<String, String> mdc;
private QuiesceableContext internalQuiesceableContext;
private OperatorCoordinator internalCoordinator;
private boolean hasCaughtUp;
private boolean closed;
private volatile boolean failed;
- private DeferrableCoordinator(OperatorID operatorId) {
+ private DeferrableCoordinator(OperatorID operatorId, JobID jobID) {
this.operatorId = operatorId;
this.pendingCalls = new LinkedBlockingQueue<>();
this.hasCaughtUp = false;
this.closed = false;
this.failed = false;
+ this.mdc = MdcUtils.asContextData(jobID);
}
synchronized <T extends Exception> void applyCall(
@@ -346,7 +350,9 @@ public class RecreateOnResetOperatorCoordinator implements
OperatorCoordinator {
synchronized (this) {
if (hasCaughtUp) {
// The new coordinator has caught up.
- call.accept(internalCoordinator);
+ try (MdcUtils.MdcCloseable ignored =
MdcUtils.withContext(mdc)) {
+ call.accept(internalCoordinator);
+ }
} else {
pendingCalls.add(new NamedCall(name, call));
}
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/misc/JobIDLoggingITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/misc/JobIDLoggingITCase.java
index 2e8ac6a3ff0..206191195dd 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/misc/JobIDLoggingITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/misc/JobIDLoggingITCase.java
@@ -149,9 +149,7 @@ class JobIDLoggingITCase {
sourceCoordinatorLogging,
asList(
"Starting split enumerator.*",
- "Distributing maxAllowedWatermark.*",
- "Source .* registering reader for parallel task.*",
- "Closing SourceCoordinator for source .*"));
+ "Source .* registering reader for parallel task.*"));
assertJobIDPresent(
jobID,