bytesandwich commented on a change in pull request #15898:
URL: https://github.com/apache/flink/pull/15898#discussion_r657416133
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java
##########
@@ -178,9 +192,13 @@ private void completeOperationAndGoToFinished(String
savepoint) {
context.goToFinished(ArchivedExecutionGraph.createFrom(getExecutionGraph()));
}
- private void handleAnyFailure(Throwable cause) {
+ private void handleAnyFailure(
Review comment:
I agree that this copied code smells really bad. *I think for this
change we should limit the scope since it's already been more than a month and
it's a minor feature about exposing exceptions.*
For architecting failure handling, I think if the specifics start to be
challenging, then it's good to step back and take a long-term outlook of what
the architecture should evolve into. We have at least three examples to
consider. The [AdaptiveScheduler
FLIP](https://cwiki.apache.org/confluence/display/FLINK/FLIP-160%3A+Adaptive+Scheduler#FLIP160:AdaptiveScheduler-Stablesetofresources)
refers to [rebalancing in
kafka](https://www.confluent.io/blog/cooperative-rebalancing-in-kafka-streams-consumer-ksqldb/).
I believe beam (and Google Dataflow) use a
[splitting](https://beam.apache.org/blog/splitatfraction-method/) abstraction
to rebalance and tolerate live failures. And, the `DefaultScheduler` has
fine-grained recovery as you know. So, how could we design to "plug/unplug" all
or any of these into `AdaptiveScheduler` and keep the scheduler as simple/dumb
as possible? That's a pretty tough question but a simple answer to that
question would probably create a good architecture.
Maybe we could make all these things simpler by first extracting _all_ the
duplicated code that accesses and retrieves things from the execution graph
into some `GraphUtil` or something like that. E.g.
```
Set<ExecutionVertexID> concurrentVertexIds =
IterableUtils.toStream(getExecutionGraph().getSchedulingTopology().getVertices())
.map(SchedulingExecutionVertex::getId)
.filter(
v ->
failingExecutionVertexId != null
&&
!failingExecutionVertexId.equals(v))
.collect(Collectors.toSet());
```
in `StateWithExecutionGraph` -> `archiveExecutionFailure` because this is so
much deep coupling to the execution graph!
Maybe something like a declarative interface that sits on top of a visitor
layer that sits on top of the actual graph data?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]