[GitHub] [flink] zentol commented on a change in pull request #14950: [FLINK-21347][coordination] Extract interface from ExecutionGraph

2021-03-02 Thread GitBox


zentol commented on a change in pull request #14950:
URL: https://github.com/apache/flink/pull/14950#discussion_r585421476



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTrackingMockExecutionGraph.java
##
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive;
+
+import org.apache.flink.api.common.ArchivedExecutionConfig;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.executiongraph.ErrorInfo;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.ResultPartitionAvailabilityChecker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.query.KvStateLocationRegistry;
+import org.apache.flink.runtime.scheduler.InternalFailuresListener;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
+import org.apache.flink.runtime.state.CheckpointStorage;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.util.OptionalFailure;
+import org.apache.flink.util.SerializedValue;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Mocked ExecutionGraph which (partially) tracks the job status, and provides 
some basic mocks to
+ * create an {@link 
org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph} from this
+ * ExecutionGraph.
+ */
+class StateTrackingMockExecutionGraph implements ExecutionGraph {
+private JobStatus state = JobStatus.INITIALIZING;
+private final CompletableFuture terminationFuture = new 
CompletableFuture<>();
+private final JobID jobId = new JobID();
+private final ArchivedExecutionConfig archivedExecutionConfig = new 
ExecutionConfig().archive();

Review comment:
   this one could be static

##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java
##
@@ -148,8 +134,8 @@ public Restarting createRestartingState(
 
 public Restarting createRestartingState(MockRestartingContext ctx)
 throws JobException, JobExecutionException {
-ExecutionGraph executionGraph = 
TestingExecutionGraphBuilder.newBuilder().build();
-return createRestartingState(ctx, executionGraph);
+  

[GitHub] [flink] zentol commented on a change in pull request #14950: [FLINK-21347][coordination] Extract interface from ExecutionGraph

2021-02-24 Thread GitBox


zentol commented on a change in pull request #14950:
URL: https://github.com/apache/flink/pull/14950#discussion_r581861367



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
##
@@ -1021,203 +220,20 @@ public ArchivedExecutionConfig 
getArchivedExecutionConfig() {
  *
  * @return Termination future of this {@link ExecutionGraph}.
  */
-public CompletableFuture getTerminationFuture() {
-return terminationFuture;
-}
+CompletableFuture getTerminationFuture();
 
 @VisibleForTesting
-public JobStatus waitUntilTerminal() throws InterruptedException {
-try {
-return terminationFuture.get();
-} catch (ExecutionException e) {
-// this should never happen
-// it would be a bug, so we  don't expect this to be handled and 
throw
-// an unchecked exception here
-throw new RuntimeException(e);
-}
-}
-
-// 
-//  State Transitions
-// 
-
-public boolean transitionState(JobStatus current, JobStatus newState) {
-return transitionState(current, newState, null);
-}
-
-private void transitionState(JobStatus newState, Throwable error) {
-transitionState(state, newState, error);
-}
+JobStatus waitUntilTerminal() throws InterruptedException;
 
-private boolean transitionState(JobStatus current, JobStatus newState, 
Throwable error) {
-assertRunningInJobMasterMainThread();
-// consistency check
-if (current.isTerminalState()) {
-String message = "Job is trying to leave terminal state " + 
current;
-LOG.error(message);
-throw new IllegalStateException(message);
-}
+boolean transitionState(JobStatus current, JobStatus newState);
 
-// now do the actual state transition
-if (state == current) {
-state = newState;
-LOG.info(
-"Job {} ({}) switched from state {} to {}.",
-getJobName(),
-getJobID(),
-current,
-newState,
-error);
+void incrementRestarts();
 
-stateTimestamps[newState.ordinal()] = System.currentTimeMillis();
-notifyJobStatusChange(newState, error);
-return true;
-} else {
-return false;
-}
-}
+void initFailureCause(Throwable t);
 
-public void incrementRestarts() {
-numberOfRestartsCounter.inc();
-}
+void vertexFinished();

Review comment:
   To add on to that, in the long-term interfaces for all Execution* 
classes would be pretty sick because we could re-implement the EG as we see 
fit, whereas an interface between the EG and vertices is probably not that 
beneficial.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zentol commented on a change in pull request #14950: [FLINK-21347][coordination] Extract interface from ExecutionGraph

2021-02-23 Thread GitBox


zentol commented on a change in pull request #14950:
URL: https://github.com/apache/flink/pull/14950#discussion_r580963986



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
##
@@ -1021,203 +220,20 @@ public ArchivedExecutionConfig 
getArchivedExecutionConfig() {
  *
  * @return Termination future of this {@link ExecutionGraph}.
  */
-public CompletableFuture getTerminationFuture() {
-return terminationFuture;
-}
+CompletableFuture getTerminationFuture();
 
 @VisibleForTesting
-public JobStatus waitUntilTerminal() throws InterruptedException {
-try {
-return terminationFuture.get();
-} catch (ExecutionException e) {
-// this should never happen
-// it would be a bug, so we  don't expect this to be handled and 
throw
-// an unchecked exception here
-throw new RuntimeException(e);
-}
-}
-
-// 
-//  State Transitions
-// 
-
-public boolean transitionState(JobStatus current, JobStatus newState) {
-return transitionState(current, newState, null);
-}
-
-private void transitionState(JobStatus newState, Throwable error) {
-transitionState(state, newState, error);
-}
+JobStatus waitUntilTerminal() throws InterruptedException;
 
-private boolean transitionState(JobStatus current, JobStatus newState, 
Throwable error) {
-assertRunningInJobMasterMainThread();
-// consistency check
-if (current.isTerminalState()) {
-String message = "Job is trying to leave terminal state " + 
current;
-LOG.error(message);
-throw new IllegalStateException(message);
-}
+boolean transitionState(JobStatus current, JobStatus newState);
 
-// now do the actual state transition
-if (state == current) {
-state = newState;
-LOG.info(
-"Job {} ({}) switched from state {} to {}.",
-getJobName(),
-getJobID(),
-current,
-newState,
-error);
+void incrementRestarts();
 
-stateTimestamps[newState.ordinal()] = System.currentTimeMillis();
-notifyJobStatusChange(newState, error);
-return true;
-} else {
-return false;
-}
-}
+void initFailureCause(Throwable t);
 
-public void incrementRestarts() {
-numberOfRestartsCounter.inc();
-}
+void vertexFinished();

Review comment:
   Yes I didn't explain it very well, and I don't think my suggestion 
(according to the phrasing) as is would work particularly well.
   
   The original issue we wanted to solve with this PR is that mocking an EG is 
annoying and error-prone, so we introduced an interface to remedy that.
   However, because all the Execution* classes are pretty much one intermingled 
mess, and the Executing state works against the Execution(Job)Vertex, you ended 
up still having a dependency on the package-private APIs, which you also 
exposed via the interface.
   
   You them introduced an interface for the EG containing the internal APIs; 
this allows you to mock the internal API of the EG that the vertex requires. 
While it works it doesn't really address the core issue of not having a clear 
API. Despite only wanting to assert that the state deploys/cancels some 
vertices we end up having to mock EG-internal APIs and all that jazz.
   
   Alternatively, and this is what I was more going for, we could go with a 
similar approach like what you did for the EG for the remaining Execution* 
classes. In that model you'd have to mock a lot less methods, and can entirely 
ignore how these classes internally communicate.
   I would think that this wouldn't be s much overhead, in particular 
because we need a lot less methods from these classes; so we can probably get 
by with a tiny extension of the Access* interfaces.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zentol commented on a change in pull request #14950: [FLINK-21347][coordination] Extract interface from ExecutionGraph

2021-02-19 Thread GitBox


zentol commented on a change in pull request #14950:
URL: https://github.com/apache/flink/pull/14950#discussion_r579000369



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
##
@@ -1021,203 +220,20 @@ public ArchivedExecutionConfig 
getArchivedExecutionConfig() {
  *
  * @return Termination future of this {@link ExecutionGraph}.
  */
-public CompletableFuture getTerminationFuture() {
-return terminationFuture;
-}
+CompletableFuture getTerminationFuture();
 
 @VisibleForTesting
-public JobStatus waitUntilTerminal() throws InterruptedException {
-try {
-return terminationFuture.get();
-} catch (ExecutionException e) {
-// this should never happen
-// it would be a bug, so we  don't expect this to be handled and 
throw
-// an unchecked exception here
-throw new RuntimeException(e);
-}
-}
-
-// 
-//  State Transitions
-// 
-
-public boolean transitionState(JobStatus current, JobStatus newState) {
-return transitionState(current, newState, null);
-}
-
-private void transitionState(JobStatus newState, Throwable error) {
-transitionState(state, newState, error);
-}
+JobStatus waitUntilTerminal() throws InterruptedException;
 
-private boolean transitionState(JobStatus current, JobStatus newState, 
Throwable error) {
-assertRunningInJobMasterMainThread();
-// consistency check
-if (current.isTerminalState()) {
-String message = "Job is trying to leave terminal state " + 
current;
-LOG.error(message);
-throw new IllegalStateException(message);
-}
+boolean transitionState(JobStatus current, JobStatus newState);
 
-// now do the actual state transition
-if (state == current) {
-state = newState;
-LOG.info(
-"Job {} ({}) switched from state {} to {}.",
-getJobName(),
-getJobID(),
-current,
-newState,
-error);
+void incrementRestarts();
 
-stateTimestamps[newState.ordinal()] = System.currentTimeMillis();
-notifyJobStatusChange(newState, error);
-return true;
-} else {
-return false;
-}
-}
+void initFailureCause(Throwable t);
 
-public void incrementRestarts() {
-numberOfRestartsCounter.inc();
-}
+void vertexFinished();

Review comment:
   That is only necessary because Executing works directly against 
ExecutionJobVertices/ExecutionVertices.
   If there would be 2 small interfaces exposing the vertices the the JobVertex 
and deploy/markFailed for the ExecutionVertex, then you wouldn't need to mock 
these vertices in the test and would no longer require package-private APIs.
   
   As it stands you're exposing a whole bunch of package-private methods to the 
outside which is just not acceptable.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zentol commented on a change in pull request #14950: [FLINK-21347][coordination] Extract interface from ExecutionGraph

2021-02-19 Thread GitBox


zentol commented on a change in pull request #14950:
URL: https://github.com/apache/flink/pull/14950#discussion_r578996744



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/StateTrackingMockExecutionGraph.java
##
@@ -0,0 +1,459 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.ArchivedExecutionConfig;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ErrorInfo;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionDeploymentListener;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.ResultPartitionAvailabilityChecker;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionReleaseStrategy;
+import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.query.KvStateLocationRegistry;
+import org.apache.flink.runtime.scheduler.InternalFailuresListener;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.state.CheckpointStorage;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.types.Either;
+import org.apache.flink.util.OptionalFailure;
+import org.apache.flink.util.SerializedValue;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ForkJoinPool;
+
+/**
+ * Mocked ExecutionGraph which (partially) tracks the job status, and provides 
some basic mocks to
+ * create an {@link 
org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph} from this
+ * ExecutionGraph.
+ */
+class StateTrackingMockExecutionGraph implements ExecutionGraph {
+private JobStatus state = JobStatus.INITIALIZING;
+private final 

[GitHub] [flink] zentol commented on a change in pull request #14950: [FLINK-21347][coordination] Extract interface from ExecutionGraph

2021-02-18 Thread GitBox


zentol commented on a change in pull request #14950:
URL: https://github.com/apache/flink/pull/14950#discussion_r578978899



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
##
@@ -1021,203 +220,20 @@ public ArchivedExecutionConfig 
getArchivedExecutionConfig() {
  *
  * @return Termination future of this {@link ExecutionGraph}.
  */
-public CompletableFuture getTerminationFuture() {
-return terminationFuture;
-}
+CompletableFuture getTerminationFuture();
 
 @VisibleForTesting
-public JobStatus waitUntilTerminal() throws InterruptedException {
-try {
-return terminationFuture.get();
-} catch (ExecutionException e) {
-// this should never happen
-// it would be a bug, so we  don't expect this to be handled and 
throw
-// an unchecked exception here
-throw new RuntimeException(e);
-}
-}
-
-// 
-//  State Transitions
-// 
-
-public boolean transitionState(JobStatus current, JobStatus newState) {
-return transitionState(current, newState, null);
-}
-
-private void transitionState(JobStatus newState, Throwable error) {
-transitionState(state, newState, error);
-}
+JobStatus waitUntilTerminal() throws InterruptedException;
 
-private boolean transitionState(JobStatus current, JobStatus newState, 
Throwable error) {
-assertRunningInJobMasterMainThread();
-// consistency check
-if (current.isTerminalState()) {
-String message = "Job is trying to leave terminal state " + 
current;
-LOG.error(message);
-throw new IllegalStateException(message);
-}
+boolean transitionState(JobStatus current, JobStatus newState);
 
-// now do the actual state transition
-if (state == current) {
-state = newState;
-LOG.info(
-"Job {} ({}) switched from state {} to {}.",
-getJobName(),
-getJobID(),
-current,
-newState,
-error);
+void incrementRestarts();
 
-stateTimestamps[newState.ordinal()] = System.currentTimeMillis();
-notifyJobStatusChange(newState, error);
-return true;
-} else {
-return false;
-}
-}
+void initFailureCause(Throwable t);
 
-public void incrementRestarts() {
-numberOfRestartsCounter.inc();
-}
+void vertexFinished();

Review comment:
   If we made sure that the Execution* classes all work against the 
DefaultExecutionGraph then we should be able to remove this and other 
package-private methods.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zentol commented on a change in pull request #14950: [FLINK-21347][coordination] Extract interface from ExecutionGraph

2021-02-18 Thread GitBox


zentol commented on a change in pull request #14950:
URL: https://github.com/apache/flink/pull/14950#discussion_r578977899



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/StateTrackingMockExecutionGraph.java
##
@@ -0,0 +1,459 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.ArchivedExecutionConfig;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ErrorInfo;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionDeploymentListener;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.ResultPartitionAvailabilityChecker;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionReleaseStrategy;
+import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.query.KvStateLocationRegistry;
+import org.apache.flink.runtime.scheduler.InternalFailuresListener;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.state.CheckpointStorage;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.types.Either;
+import org.apache.flink.util.OptionalFailure;
+import org.apache.flink.util.SerializedValue;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ForkJoinPool;
+
+/**
+ * Mocked ExecutionGraph which (partially) tracks the job status, and provides 
some basic mocks to
+ * create an {@link 
org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph} from this
+ * ExecutionGraph.
+ */
+class StateTrackingMockExecutionGraph implements ExecutionGraph {
+private JobStatus state = JobStatus.INITIALIZING;
+private final 

[GitHub] [flink] zentol commented on a change in pull request #14950: [FLINK-21347][coordination] Extract interface from ExecutionGraph

2021-02-18 Thread GitBox


zentol commented on a change in pull request #14950:
URL: https://github.com/apache/flink/pull/14950#discussion_r578976880



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/StateTrackingMockExecutionGraph.java
##
@@ -0,0 +1,459 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.ArchivedExecutionConfig;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ErrorInfo;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionDeploymentListener;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.ResultPartitionAvailabilityChecker;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionReleaseStrategy;
+import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.query.KvStateLocationRegistry;
+import org.apache.flink.runtime.scheduler.InternalFailuresListener;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.state.CheckpointStorage;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.types.Either;
+import org.apache.flink.util.OptionalFailure;
+import org.apache.flink.util.SerializedValue;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ForkJoinPool;
+
+/**
+ * Mocked ExecutionGraph which (partially) tracks the job status, and provides 
some basic mocks to
+ * create an {@link 
org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph} from this
+ * ExecutionGraph.
+ */
+class StateTrackingMockExecutionGraph implements ExecutionGraph {
+private JobStatus state = JobStatus.INITIALIZING;
+private final 

[GitHub] [flink] zentol commented on a change in pull request #14950: [FLINK-21347][coordination] Extract interface from ExecutionGraph

2021-02-18 Thread GitBox


zentol commented on a change in pull request #14950:
URL: https://github.com/apache/flink/pull/14950#discussion_r578976098



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/StateTrackingMockExecutionGraph.java
##
@@ -0,0 +1,459 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.ArchivedExecutionConfig;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ErrorInfo;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionDeploymentListener;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.ResultPartitionAvailabilityChecker;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionReleaseStrategy;
+import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.query.KvStateLocationRegistry;
+import org.apache.flink.runtime.scheduler.InternalFailuresListener;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.state.CheckpointStorage;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.types.Either;
+import org.apache.flink.util.OptionalFailure;
+import org.apache.flink.util.SerializedValue;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ForkJoinPool;
+
+/**
+ * Mocked ExecutionGraph which (partially) tracks the job status, and provides 
some basic mocks to
+ * create an {@link 
org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph} from this
+ * ExecutionGraph.
+ */
+class StateTrackingMockExecutionGraph implements ExecutionGraph {
+private JobStatus state = JobStatus.INITIALIZING;
+private final 

[GitHub] [flink] zentol commented on a change in pull request #14950: [FLINK-21347][coordination] Extract interface from ExecutionGraph

2021-02-18 Thread GitBox


zentol commented on a change in pull request #14950:
URL: https://github.com/apache/flink/pull/14950#discussion_r578975860



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/StateTrackingMockExecutionGraph.java
##
@@ -0,0 +1,459 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.ArchivedExecutionConfig;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ErrorInfo;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionDeploymentListener;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.ResultPartitionAvailabilityChecker;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionReleaseStrategy;
+import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.query.KvStateLocationRegistry;
+import org.apache.flink.runtime.scheduler.InternalFailuresListener;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.state.CheckpointStorage;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.types.Either;
+import org.apache.flink.util.OptionalFailure;
+import org.apache.flink.util.SerializedValue;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ForkJoinPool;
+
+/**
+ * Mocked ExecutionGraph which (partially) tracks the job status, and provides 
some basic mocks to
+ * create an {@link 
org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph} from this
+ * ExecutionGraph.
+ */
+class StateTrackingMockExecutionGraph implements ExecutionGraph {
+private JobStatus state = JobStatus.INITIALIZING;
+private final 

[GitHub] [flink] zentol commented on a change in pull request #14950: [FLINK-21347][coordination] Extract interface from ExecutionGraph

2021-02-18 Thread GitBox


zentol commented on a change in pull request #14950:
URL: https://github.com/apache/flink/pull/14950#discussion_r578794106



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/MockExecutionGraphBase.java
##
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.ArchivedExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.ResultPartitionAvailabilityChecker;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionReleaseStrategy;
+import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.query.KvStateLocationRegistry;
+import org.apache.flink.runtime.scheduler.InternalFailuresListener;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.state.CheckpointStorage;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.types.Either;
+import org.apache.flink.util.OptionalFailure;
+import org.apache.flink.util.SerializedValue;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/** Base class for mocking the ExecutionGraph. */
+public class MockExecutionGraphBase implements ExecutionGraph {
+
+@Override
+public String getJsonPlan() {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public JobID getJobID() {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public String getJobName() {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public JobStatus getState() {
+throw new UnsupportedOperationException();
+}
+
+@Nullable
+@Override
+public ErrorInfo getFailureInfo() {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public long getStatusTimestamp(JobStatus status) {
+throw new UnsupportedOperationException();
+}
+
+@Nullable
+@Override
+public CheckpointCoordinatorConfiguration 
getCheckpointCoordinatorConfiguration() {
+throw new UnsupportedOperationException();
+}
+
+@Nullable
+@Override
+public CheckpointStatsSnapshot getCheckpointStatsSnapshot() {
+throw new UnsupportedOperationException();
+}
+
+@Nullable
+@Override
+public ArchivedExecutionConfig getArchivedExecutionConfig() {
+   

[GitHub] [flink] zentol commented on a change in pull request #14950: [FLINK-21347][coordination] Extract interface from ExecutionGraph

2021-02-17 Thread GitBox


zentol commented on a change in pull request #14950:
URL: https://github.com/apache/flink/pull/14950#discussion_r577520012



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/ExecutingTest.java
##
@@ -466,49 +455,48 @@ public Throwable getFailureCause() {
 }
 }
 
-private static class MockExecutionGraph extends ExecutionGraph {
+private static class MockExecutionGraph extends MockExecutionGraphBase {
 private final boolean updateStateReturnValue;
 private final Supplier> 
getVerticesTopologicallySupplier;
+private JobStatus state = JobStatus.INITIALIZING;
+private CompletableFuture terminationFuture = new 
CompletableFuture<>();
 
-MockExecutionGraph(Supplier> 
getVerticesTopologicallySupplier)
-throws IOException {
+MockExecutionGraph(
+Supplier> 
getVerticesTopologicallySupplier) {
 this(false, getVerticesTopologicallySupplier);
 }
 
-MockExecutionGraph(boolean updateStateReturnValue) throws IOException {
+MockExecutionGraph(boolean updateStateReturnValue) {
 this(updateStateReturnValue, null);
 }
 
 private MockExecutionGraph(
 boolean updateStateReturnValue,
-Supplier> 
getVerticesTopologicallySupplier)
-throws IOException {
-super(
-new JobInformation(
-new JobID(),
-"Test Job",
-new SerializedValue<>(new ExecutionConfig()),
-new Configuration(),
-Collections.emptyList(),
-Collections.emptyList()),
-TestingUtils.defaultExecutor(),
-TestingUtils.defaultExecutor(),
-AkkaUtils.getDefaultTimeout(),
-1,
-ExecutionGraph.class.getClassLoader(),
-VoidBlobWriter.getInstance(),
-
PartitionReleaseStrategyFactoryLoader.loadPartitionReleaseStrategyFactory(
-new Configuration()),
-NettyShuffleMaster.INSTANCE,
-NoOpJobMasterPartitionTracker.INSTANCE,
-ScheduleMode.EAGER,
-NoOpExecutionDeploymentListener.get(),
-(execution, newState) -> {},
-0L);
+Supplier> 
getVerticesTopologicallySupplier) {
 this.updateStateReturnValue = updateStateReturnValue;
 this.getVerticesTopologicallySupplier = 
getVerticesTopologicallySupplier;
 }
 
+@Override
+public void transitionToRunning() {
+state = JobStatus.RUNNING;
+}
+
+@Override
+public JobStatus getState() {
+return state;
+}
+
+@Override
+public JobID getJobID() {
+return new JobID();

Review comment:
   this could lead to subtle issues if `getJobID` is called multiple times.

##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/MockExecutionGraphBase.java
##
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.ArchivedExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
+import