JunRuiLee commented on code in PR #25798:
URL: https://github.com/apache/flink/pull/25798#discussion_r1887810799


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveExecutionPlanSchedulingContext.java:
##########
@@ -118,6 +119,14 @@ public int getPendingOperatorCount() {
         return adaptiveGraphManager.getPendingOperatorsCount();
     }
 
+    @Override
+    public String getJsonStreamGraph() {
+        return JsonPlanGenerator.generateJsonStreamGraph(
+                adaptiveGraphManager.getStreamGraphContext().getStreamGraph(),
+                adaptiveGraphManager.getStreamNodeIdsToJobVertexMap(),

Review Comment:
   How about adding `getJsonStreamGraph` and `generateJsonStreamGraph` methods 
to the `AdaptiveGraphManager`? 
   We can call `generateJsonStreamGraph` to trigger an update when the stream 
graph is updated or new job vertices are created. For example, in the 
`DefaultStreamGraphContext#modifyStreamEdge`, we could call 
`generateJsonStreamGraph` if the update result is true. Similarly, in the 
`AdaptiveGraphManager#onJobVertexFinished`, we also should call 
`generateJsonStreamGraph`.
   
   Therefore, the `AdaptiveExecutionPlanSchedulingContext#getJsonStreamGraph` 
only needs to call AdaptiveGraphManager#getJsonStreamGraph.
   
   With this update, we could benefit from the following aspects:
   
   1. We do not need to update the `immutable stream graph` because the 
`AdaptiveGraphManager` can handle the stream graph.
   2. We avoid frequently generating the JSON stream graph, as we only update 
it when the graph is updated.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonPlanGenerator.java:
##########
@@ -170,4 +174,58 @@ public static String generatePlan(
             throw new RuntimeException("Failed to generate plan", e);
         }
     }
+
+    public static String generateJsonStreamGraph(
+            ImmutableStreamGraph sg,

Review Comment:
   I think using the `StreamGraph` is sufficient after we update it and ensure 
that this method is only called by the `AdaptiveGraphManager`



##########
flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionState.java:
##########
@@ -70,6 +70,8 @@ public enum ExecutionState {
 
     RECONCILING,
 
+    PENDING,

Review Comment:
   I prefer to put the `pending-operators` field in the `JobDetailsInfo` 
because it seems more related to job execution concepts and cannot be fully 
represented by the stream graph plan.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to