bytesandwich commented on a change in pull request #15898:
URL: https://github.com/apache/flink/pull/15898#discussion_r657416133



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java
##########
@@ -178,9 +192,13 @@ private void completeOperationAndGoToFinished(String 
savepoint) {
         
context.goToFinished(ArchivedExecutionGraph.createFrom(getExecutionGraph()));
     }
 
-    private void handleAnyFailure(Throwable cause) {
+    private void handleAnyFailure(

Review comment:
       I agree that this copied code smells really bad. *I think for this 
change we should limit the scope since it's already been more than a month and 
it's a minor feature about exposing exceptions.*
   
   For architecting failure handling, I think if the specifics start to be 
challenging, then it's good to step back and take a long-term outlook of what 
the architecture should evolve into. We have at least three examples to 
consider. The [AdaptiveScheduler 
FLIP](https://cwiki.apache.org/confluence/display/FLINK/FLIP-160%3A+Adaptive+Scheduler#FLIP160:AdaptiveScheduler-Stablesetofresources)
 refers to [rebalancing in 
kafka](https://www.confluent.io/blog/cooperative-rebalancing-in-kafka-streams-consumer-ksqldb/).
 I believe beam (and Google Dataflow) use a 
[splitting](https://beam.apache.org/blog/splitatfraction-method/) abstraction 
to rebalance and tolerate live failures. And, the `DefaultScheduler` has 
fine-grained recovery as you know. So, how could we design to "plug/unplug" all 
or any of these into `AdaptiveScheduler` and keep the scheduler as simple/dumb 
as possible? That's a pretty tough question but a simple answer to that 
question would probably create a good architecture.
   
   Maybe we could make all these things simpler by first extracting _all_ the 
duplicated code that accesses and retrieves things from the execution graph 
into some `GraphUtil` or something like that. E.g. 
   ```
           Set<ExecutionVertexID> concurrentVertexIds =
                   
IterableUtils.toStream(getExecutionGraph().getSchedulingTopology().getVertices())
                           .map(SchedulingExecutionVertex::getId)
                           .filter(
                                   v ->
                                           failingExecutionVertexId != null
                                                   && 
!failingExecutionVertexId.equals(v))
                           .collect(Collectors.toSet());
   ```
   in `StateWithExecutionGraph` -> `archiveExecutionFailure` because this is so 
much deep coupling to the execution graph! 
   
   Maybe something like a declarative interface that sits on top of a visitor 
layer that sits on top of the actual graph data?
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to