Efrat19 commented on code in PR #26320:
URL: https://github.com/apache/flink/pull/26320#discussion_r2051997572
##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonPlanGenerator.java:
##########
@@ -102,74 +95,56 @@ public static String generatePlan(
operatorDescr = StringEscapeUtils.escapeHtml4(operatorDescr);
operatorDescr = operatorDescr.replace("\n", "<br/>");
- gen.writeStartObject();
-
- // write the core properties
JobVertexID vertexID = vertex.getID();
- gen.writeStringField("id", vertexID.toString());
- gen.writeNumberField(
- "parallelism",
+ long parallelism =
vertexParallelism
.getParallelismOptional(vertexID)
- .orElse(vertex.getParallelism()));
- gen.writeStringField("operator", operator);
- gen.writeStringField("operator_strategy", operatorDescr);
- gen.writeStringField("description", description);
+ .orElse(vertex.getParallelism());
+ Collection<JobPlanInfo.Plan.Node.Input> inputs = new
ArrayList<>();
if (!vertex.isInputVertex()) {
- // write the input edge properties
- gen.writeArrayFieldStart("inputs");
-
- List<JobEdge> inputs = vertex.getInputs();
- for (int inputNum = 0; inputNum < inputs.size();
inputNum++) {
- JobEdge edge = inputs.get(inputNum);
+ for (int inputNum = 0; inputNum <
vertex.getInputs().size(); inputNum++) {
+ JobEdge edge = vertex.getInputs().get(inputNum);
if (edge.getSource() == null) {
continue;
}
-
JobVertex predecessor = edge.getSource().getProducer();
+ if (predecessor == null || predecessor.getID() ==
null) {
+ continue;
+ }
+ String inputId = predecessor.getID().toString();
+ if (edge.getSource().getResultType() == null
+ || edge.getSource().getResultType().name() ==
null) {
+ continue;
+ }
+ String exchange =
edge.getSource().getResultType().name().toLowerCase();
String shipStrategy = edge.getShipStrategyName();
String preProcessingOperation =
edge.getPreProcessingOperationName();
String operatorLevelCaching =
edge.getOperatorLevelCachingDescription();
- gen.writeStartObject();
- gen.writeNumberField("num", inputNum);
- gen.writeStringField("id",
predecessor.getID().toString());
-
- if (shipStrategy != null) {
- gen.writeStringField("ship_strategy",
shipStrategy);
- }
- if (preProcessingOperation != null) {
- gen.writeStringField("local_strategy",
preProcessingOperation);
- }
- if (operatorLevelCaching != null) {
- gen.writeStringField("caching",
operatorLevelCaching);
- }
-
- gen.writeStringField(
- "exchange",
edge.getSource().getResultType().name().toLowerCase());
-
- gen.writeEndObject();
+ inputs.add(
+ new JobPlanInfo.Plan.Node.Input(
+ inputId,
+ inputNum,
+ exchange,
+ shipStrategy,
+ preProcessingOperation,
+ operatorLevelCaching));
}
-
- gen.writeEndArray();
}
-
- // write the optimizer properties
- gen.writeFieldName("optimizer_properties");
- gen.writeRawValue(optimizerProps);
Review Comment:
Noticed `optimizerProps` is a string but is written as raw json - I should
probably replicate that behavior
Will be addressed in a followup pr
##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonPlanGenerator.java:
##########
@@ -102,74 +95,56 @@ public static String generatePlan(
operatorDescr = StringEscapeUtils.escapeHtml4(operatorDescr);
operatorDescr = operatorDescr.replace("\n", "<br/>");
- gen.writeStartObject();
-
- // write the core properties
JobVertexID vertexID = vertex.getID();
- gen.writeStringField("id", vertexID.toString());
- gen.writeNumberField(
- "parallelism",
+ long parallelism =
vertexParallelism
.getParallelismOptional(vertexID)
- .orElse(vertex.getParallelism()));
- gen.writeStringField("operator", operator);
- gen.writeStringField("operator_strategy", operatorDescr);
- gen.writeStringField("description", description);
+ .orElse(vertex.getParallelism());
+ Collection<JobPlanInfo.Plan.Node.Input> inputs = new
ArrayList<>();
if (!vertex.isInputVertex()) {
- // write the input edge properties
- gen.writeArrayFieldStart("inputs");
-
- List<JobEdge> inputs = vertex.getInputs();
- for (int inputNum = 0; inputNum < inputs.size();
inputNum++) {
- JobEdge edge = inputs.get(inputNum);
+ for (int inputNum = 0; inputNum <
vertex.getInputs().size(); inputNum++) {
+ JobEdge edge = vertex.getInputs().get(inputNum);
if (edge.getSource() == null) {
continue;
}
-
JobVertex predecessor = edge.getSource().getProducer();
+ if (predecessor == null || predecessor.getID() ==
null) {
+ continue;
+ }
+ String inputId = predecessor.getID().toString();
+ if (edge.getSource().getResultType() == null
+ || edge.getSource().getResultType().name() ==
null) {
+ continue;
+ }
+ String exchange =
edge.getSource().getResultType().name().toLowerCase();
String shipStrategy = edge.getShipStrategyName();
String preProcessingOperation =
edge.getPreProcessingOperationName();
String operatorLevelCaching =
edge.getOperatorLevelCachingDescription();
- gen.writeStartObject();
- gen.writeNumberField("num", inputNum);
- gen.writeStringField("id",
predecessor.getID().toString());
-
- if (shipStrategy != null) {
- gen.writeStringField("ship_strategy",
shipStrategy);
- }
- if (preProcessingOperation != null) {
- gen.writeStringField("local_strategy",
preProcessingOperation);
- }
- if (operatorLevelCaching != null) {
- gen.writeStringField("caching",
operatorLevelCaching);
- }
-
- gen.writeStringField(
- "exchange",
edge.getSource().getResultType().name().toLowerCase());
-
- gen.writeEndObject();
+ inputs.add(
+ new JobPlanInfo.Plan.Node.Input(
+ inputId,
+ inputNum,
+ exchange,
+ shipStrategy,
+ preProcessingOperation,
+ operatorLevelCaching));
}
-
- gen.writeEndArray();
}
-
- // write the optimizer properties
- gen.writeFieldName("optimizer_properties");
- gen.writeRawValue(optimizerProps);
Review Comment:
`optimizerProps` is a string but is written as raw json - I should probably
replicate that behavior
Will be addressed in a followup pr
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]