bytesandwich commented on a change in pull request #15898: URL: https://github.com/apache/flink/pull/15898#discussion_r657416133
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java ########## @@ -178,9 +192,13 @@ private void completeOperationAndGoToFinished(String savepoint) { context.goToFinished(ArchivedExecutionGraph.createFrom(getExecutionGraph())); } - private void handleAnyFailure(Throwable cause) { + private void handleAnyFailure( Review comment: I agree that this copied code smells really bad. *I think for this change we should limit the scope since it's already been more than a month and it's a minor feature about exposing exceptions.* For architecting failure handling, I think if the specifics start to be challenging, then it's good to step back and take a long-term outlook of what the architecture should evolve into. We have at least three examples to consider. The [AdaptiveScheduler FLIP](https://cwiki.apache.org/confluence/display/FLINK/FLIP-160%3A+Adaptive+Scheduler#FLIP160:AdaptiveScheduler-Stablesetofresources) refers to [rebalancing in kafka](https://www.confluent.io/blog/cooperative-rebalancing-in-kafka-streams-consumer-ksqldb/). I believe beam (and Google Dataflow) use a [splitting](https://beam.apache.org/blog/splitatfraction-method/) abstraction to rebalance and tolerate live failures. And, the `DefaultScheduler` has fine-grained recovery as you know. So, how could we design to "plug/unplug" all or any of these into `AdaptiveScheduler` and keep the scheduler as simple/dumb as possible? That's a pretty tough question but a simple answer to that question would probably create a good architecture. Maybe we could make all these things simpler by first extracting _all_ the duplicated code that accesses and retrieves things from the execution graph into some `GraphUtil` or something like that. E.g. ``` Set<ExecutionVertexID> concurrentVertexIds = IterableUtils.toStream(getExecutionGraph().getSchedulingTopology().getVertices()) .map(SchedulingExecutionVertex::getId) .filter( v -> failingExecutionVertexId != null && !failingExecutionVertexId.equals(v)) .collect(Collectors.toSet()); ``` in `StateWithExecutionGraph` -> `archiveExecutionFailure` because this is so much deep coupling to the execution graph! Maybe something like a declarative interface that sits on top of a visitor layer that sits on top of the actual graph data? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org