zentol commented on a change in pull request #14950: URL: https://github.com/apache/flink/pull/14950#discussion_r580963986
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ########## @@ -1021,203 +220,20 @@ public ArchivedExecutionConfig getArchivedExecutionConfig() { * * @return Termination future of this {@link ExecutionGraph}. */ - public CompletableFuture<JobStatus> getTerminationFuture() { - return terminationFuture; - } + CompletableFuture<JobStatus> getTerminationFuture(); @VisibleForTesting - public JobStatus waitUntilTerminal() throws InterruptedException { - try { - return terminationFuture.get(); - } catch (ExecutionException e) { - // this should never happen - // it would be a bug, so we don't expect this to be handled and throw - // an unchecked exception here - throw new RuntimeException(e); - } - } - - // ------------------------------------------------------------------------ - // State Transitions - // ------------------------------------------------------------------------ - - public boolean transitionState(JobStatus current, JobStatus newState) { - return transitionState(current, newState, null); - } - - private void transitionState(JobStatus newState, Throwable error) { - transitionState(state, newState, error); - } + JobStatus waitUntilTerminal() throws InterruptedException; - private boolean transitionState(JobStatus current, JobStatus newState, Throwable error) { - assertRunningInJobMasterMainThread(); - // consistency check - if (current.isTerminalState()) { - String message = "Job is trying to leave terminal state " + current; - LOG.error(message); - throw new IllegalStateException(message); - } + boolean transitionState(JobStatus current, JobStatus newState); - // now do the actual state transition - if (state == current) { - state = newState; - LOG.info( - "Job {} ({}) switched from state {} to {}.", - getJobName(), - getJobID(), - current, - newState, - error); + void incrementRestarts(); - stateTimestamps[newState.ordinal()] = System.currentTimeMillis(); - notifyJobStatusChange(newState, error); - return true; - } else { - return false; - } - } + void initFailureCause(Throwable t); - public void incrementRestarts() { - numberOfRestartsCounter.inc(); - } + void vertexFinished(); Review comment: Yes I didn't explain it very well, and I don't think my suggestion (according to the phrasing) as is would work particularly well. The original issue we wanted to solve with this PR is that mocking an EG is annoying and error-prone, so we introduced an interface to remedy that. However, because all the Execution* classes are pretty much one intermingled mess, and the Executing state works against the Execution(Job)Vertex, you ended up still having a dependency on the package-private APIs, which you also exposed via the interface. You them introduced an interface for the EG containing the internal APIs; this allows you to mock the internal API of the EG that the vertex requires. While it works it doesn't really address the core issue of not having a clear API. Despite only wanting to assert that the state deploys/cancels some vertices we end up having to mock EG-internal APIs and all that jazz. Alternatively, and this is what I was more going for, we could go with a similar approach like what you did for the EG for the remaining Execution* classes. In that model you'd have to mock a lot less methods, and can entirely ignore how these classes internally communicate. I would think that this wouldn't be soooo much overhead, in particular because we need a lot less methods from these classes; so we can probably get by with a tiny extension of the Access* interfaces. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org