This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 1ffb481111f [FLINK-34097] Remove JobMasterGateway#requestJobDetails 1ffb481111f is described below commit 1ffb481111f658b699702357921a48e914d13caf Author: Chesnay Schepler <ches...@apache.org> AuthorDate: Mon Jan 15 16:20:55 2024 +0100 [FLINK-34097] Remove JobMasterGateway#requestJobDetails --- .../java/org/apache/flink/runtime/jobmaster/JobMaster.java | 6 ------ .../apache/flink/runtime/jobmaster/JobMasterGateway.java | 9 --------- .../org/apache/flink/runtime/scheduler/SchedulerBase.java | 7 ------- .../org/apache/flink/runtime/scheduler/SchedulerNG.java | 3 --- .../runtime/scheduler/adaptive/AdaptiveScheduler.java | 6 ------ .../runtime/jobmaster/utils/TestingJobMasterGateway.java | 14 ++++---------- .../jobmaster/utils/TestingJobMasterGatewayBuilder.java | 12 ++++++------ .../apache/flink/runtime/scheduler/TestingSchedulerNG.java | 7 ------- 8 files changed, 10 insertions(+), 54 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index bfea710db66..59455b787a6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -71,7 +71,6 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.FlinkJobNotFoundException; import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; -import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; import org.apache.flink.runtime.operators.coordination.CoordinationRequest; import org.apache.flink.runtime.operators.coordination.CoordinationResponse; @@ -861,11 +860,6 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> return resourceManagerHeartbeatManager.requestHeartbeat(resourceID, null); } - @Override - public CompletableFuture<JobDetails> requestJobDetails(Time timeout) { - return CompletableFuture.completedFuture(schedulerNG.requestJobDetails()); - } - @Override public CompletableFuture<JobStatus> requestJobStatus(Time timeout) { return CompletableFuture.completedFuture(schedulerNG.requestJobStatus()); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java index 6c1b79568a8..02c3c7d501a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java @@ -39,7 +39,6 @@ import org.apache.flink.runtime.jobgraph.JobResourceRequirements; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.messages.Acknowledge; -import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.operators.coordination.CoordinationRequest; import org.apache.flink.runtime.operators.coordination.CoordinationResponse; import org.apache.flink.runtime.registration.RegistrationResponse; @@ -183,14 +182,6 @@ public interface JobMasterGateway */ CompletableFuture<Void> heartbeatFromResourceManager(final ResourceID resourceID); - /** - * Request the details of the executed job. - * - * @param timeout for the rpc call - * @return Future details of the executed job - */ - CompletableFuture<JobDetails> requestJobDetails(@RpcTimeout Time timeout); - /** * Requests the current job status. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java index c11d7b2ca86..7f4ba383e43 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java @@ -78,7 +78,6 @@ import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException; import org.apache.flink.runtime.jobmaster.SerializedInputSplit; import org.apache.flink.runtime.messages.FlinkJobNotFoundException; import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; -import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.metrics.MetricNames; import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; import org.apache.flink.runtime.operators.coordination.CoordinationRequest; @@ -819,12 +818,6 @@ public abstract class SchedulerBase implements SchedulerNG, CheckpointScheduling return executionGraph.getState(); } - @Override - public JobDetails requestJobDetails() { - mainThreadExecutor.assertRunningInMainThread(); - return JobDetails.createDetailsForJob(executionGraph); - } - @Override public KvStateLocation requestKvStateLocation(final JobID jobId, final String registrationName) throws UnknownKvStateLocation, FlinkJobNotFoundException { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java index 1643dbee282..b22e6204789 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java @@ -44,7 +44,6 @@ import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException; import org.apache.flink.runtime.jobmaster.SerializedInputSplit; import org.apache.flink.runtime.messages.FlinkJobNotFoundException; import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; -import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.operators.coordination.CoordinationRequest; import org.apache.flink.runtime.operators.coordination.CoordinationResponse; import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; @@ -106,8 +105,6 @@ public interface SchedulerNG extends GlobalFailureHandler, AutoCloseableAsync { JobStatus requestJobStatus(); - JobDetails requestJobDetails(); - // ------------------------------------------------------------------------------------ // Methods below do not belong to Scheduler but are included due to historical reasons // ------------------------------------------------------------------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java index 760f9e969b1..101766a9780 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java @@ -83,7 +83,6 @@ import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPool; import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot; import org.apache.flink.runtime.messages.FlinkJobNotFoundException; import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; -import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; import org.apache.flink.runtime.operators.coordination.CoordinationRequest; import org.apache.flink.runtime.operators.coordination.CoordinationResponse; @@ -613,11 +612,6 @@ public class AdaptiveScheduler return state.getJobStatus(); } - @Override - public JobDetails requestJobDetails() { - return JobDetails.createDetailsForJob(state.getJob()); - } - @Override public KvStateLocation requestKvStateLocation(JobID jobId, String registrationName) throws UnknownKvStateLocation, FlinkJobNotFoundException { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java index c519c21f1c7..ecdc78f45a9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java @@ -48,7 +48,6 @@ import org.apache.flink.runtime.jobmaster.SerializedInputSplit; import org.apache.flink.runtime.jobmaster.TaskManagerRegistrationInformation; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; -import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.operators.coordination.CoordinationRequest; import org.apache.flink.runtime.operators.coordination.CoordinationResponse; import org.apache.flink.runtime.operators.coordination.OperatorEvent; @@ -129,7 +128,7 @@ public class TestingJobMasterGateway implements JobMasterGateway { @Nonnull private final Function<ResourceID, CompletableFuture<Void>> resourceManagerHeartbeatFunction; - @Nonnull private final Supplier<CompletableFuture<JobDetails>> requestJobDetailsSupplier; + @Nonnull private final Supplier<CompletableFuture<JobStatus>> requestJobStatusSupplier; @Nonnull private final Supplier<CompletableFuture<ExecutionGraphInfo>> requestJobSupplier; @@ -244,7 +243,7 @@ public class TestingJobMasterGateway implements JobMasterGateway { CompletableFuture<Void>> taskManagerHeartbeatFunction, @Nonnull Function<ResourceID, CompletableFuture<Void>> resourceManagerHeartbeatFunction, - @Nonnull Supplier<CompletableFuture<JobDetails>> requestJobDetailsSupplier, + @Nonnull Supplier<CompletableFuture<JobStatus>> requestJobStatusSupplier, @Nonnull Supplier<CompletableFuture<ExecutionGraphInfo>> requestJobSupplier, @Nonnull Supplier<CompletableFuture<CheckpointStatsSnapshot>> @@ -327,7 +326,7 @@ public class TestingJobMasterGateway implements JobMasterGateway { this.registerTaskManagerFunction = registerTaskManagerFunction; this.taskManagerHeartbeatFunction = taskManagerHeartbeatFunction; this.resourceManagerHeartbeatFunction = resourceManagerHeartbeatFunction; - this.requestJobDetailsSupplier = requestJobDetailsSupplier; + this.requestJobStatusSupplier = requestJobStatusSupplier; this.requestJobSupplier = requestJobSupplier; this.checkpointStatsSnapshotSupplier = checkpointStatsSnapshotSupplier; this.triggerSavepointFunction = triggerSavepointFunction; @@ -412,14 +411,9 @@ public class TestingJobMasterGateway implements JobMasterGateway { return resourceManagerHeartbeatFunction.apply(resourceID); } - @Override - public CompletableFuture<JobDetails> requestJobDetails(Time timeout) { - return requestJobDetailsSupplier.get(); - } - @Override public CompletableFuture<JobStatus> requestJobStatus(Time timeout) { - return requestJobDetailsSupplier.get().thenApply(JobDetails::getStatus); + return requestJobStatusSupplier.get(); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java index e462a0d23c5..6fbc367f800 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.jobmaster.utils; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.api.java.tuple.Tuple6; @@ -45,7 +46,6 @@ import org.apache.flink.runtime.jobmaster.SerializedInputSplit; import org.apache.flink.runtime.jobmaster.TaskManagerRegistrationInformation; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; -import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.operators.coordination.CoordinationRequest; import org.apache.flink.runtime.operators.coordination.CoordinationResponse; import org.apache.flink.runtime.operators.coordination.OperatorEvent; @@ -118,7 +118,7 @@ public class TestingJobMasterGatewayBuilder { (ignoredA, ignoredB) -> FutureUtils.completedVoidFuture(); private Function<ResourceID, CompletableFuture<Void>> resourceManagerHeartbeatFunction = ignored -> FutureUtils.completedVoidFuture(); - private Supplier<CompletableFuture<JobDetails>> requestJobDetailsSupplier = + private Supplier<CompletableFuture<JobStatus>> requestJobStatusSupplier = () -> FutureUtils.completedExceptionally(new UnsupportedOperationException()); private Supplier<CompletableFuture<ExecutionGraphInfo>> requestJobSupplier = () -> FutureUtils.completedExceptionally(new UnsupportedOperationException()); @@ -280,9 +280,9 @@ public class TestingJobMasterGatewayBuilder { return this; } - public TestingJobMasterGatewayBuilder setRequestJobDetailsSupplier( - Supplier<CompletableFuture<JobDetails>> requestJobDetailsSupplier) { - this.requestJobDetailsSupplier = requestJobDetailsSupplier; + public TestingJobMasterGatewayBuilder setRequestJobStatusSupplier( + Supplier<CompletableFuture<JobStatus>> requestJobStatusSupplier) { + this.requestJobStatusSupplier = requestJobStatusSupplier; return this; } @@ -446,7 +446,7 @@ public class TestingJobMasterGatewayBuilder { registerTaskManagerFunction, taskManagerHeartbeatFunction, resourceManagerHeartbeatFunction, - requestJobDetailsSupplier, + requestJobStatusSupplier, requestJobSupplier, checkpointStatsSnapshotSupplier, triggerSavepointFunction, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java index 9820f94db81..ad4d33edf96 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java @@ -40,7 +40,6 @@ import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException; import org.apache.flink.runtime.jobmaster.SerializedInputSplit; import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; -import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.operators.coordination.CoordinationRequest; import org.apache.flink.runtime.operators.coordination.CoordinationResponse; import org.apache.flink.runtime.operators.coordination.OperatorEvent; @@ -157,12 +156,6 @@ public class TestingSchedulerNG implements SchedulerNG { return JobStatus.CREATED; } - @Override - public JobDetails requestJobDetails() { - failOperation(); - return null; - } - @Override public KvStateLocation requestKvStateLocation(JobID jobId, String registrationName) { failOperation();