yuchen-ecnu commented on code in PR #25798:
URL: https://github.com/apache/flink/pull/25798#discussion_r1893709324


##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java:
##########
@@ -79,6 +79,10 @@ public class JobDetailsInfo implements ResponseBody {
 
     public static final String FIELD_NAME_JSON_PLAN = "plan";
 
+    public static final String FIELD_NAME_STREAM_GRAPH_JSON_PLAN = 
"stream-graph-plan";
+
+    public static final String FIELD_NAME_PENDING_OPERATOR_COUNT = 
"pending-operator-count";

Review Comment:
   Updated.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java:
##########
@@ -47,6 +47,13 @@ public interface AccessExecutionGraph extends 
JobStatusProvider {
      */
     String getJsonPlan();
 
+    /**
+     * Returns the stream graph as a JSON string.
+     *
+     * @return stream graph as a JSON string, or empty string if the job 
submitted with JobGraph.

Review Comment:
   Updated.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java:
##########
@@ -141,6 +142,11 @@ private static JobDetailsInfo createJobDetailsInfo(
                     executionState, 
jobVerticesPerState[executionState.ordinal()]);
         }
 
+        JobPlanInfo.RawJson streamGraphJson = null;
+        if 
(!StringUtils.isNullOrWhitespaceOnly(executionGraph.getStreamGraphJson())) {

Review Comment:
   That's true, but it's safe to check non-null at the same time by 
`StringUtils.isNullOrWhitespaceOnly`? WDYT?



##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphContext.java:
##########
@@ -62,4 +62,13 @@ public interface StreamGraphContext {
      * @return true if all modifications were successful and applied 
atomically, false otherwise.
      */
     boolean modifyStreamEdge(List<StreamEdgeUpdateRequestInfo> requestInfos);
+
+    interface StreamGraphUpdateListener {

Review Comment:
   Updated.



##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphContext.java:
##########
@@ -62,4 +62,13 @@ public interface StreamGraphContext {
      * @return true if all modifications were successful and applied 
atomically, false otherwise.
      */
     boolean modifyStreamEdge(List<StreamEdgeUpdateRequestInfo> requestInfos);
+
+    interface StreamGraphUpdateListener {
+        /**
+         * This method is called whenever the StreamGraph is updated.
+         *
+         * @param streamGraph the updated StreamGraph
+         */
+        void onStreamGraphUpdated(StreamGraph streamGraph);

Review Comment:
   It's true that `AdaptiveGraphManager` hold a stream graph, but the 
`StreamGraphContext` should notify it's listener with the updated stream graph 
(since the stream graph in the `AdaptiveGraphManager` maybe outdated?).
   Just like `JobStatusListener` should notify listener with `newJobStatus`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to