mxm commented on code in PR #26009:
URL: https://github.com/apache/beam/pull/26009#discussion_r1168622299
##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java:
##########
@@ -245,9 +258,63 @@ public static StreamExecutionEnvironment
createStreamExecutionEnvironment(
configureStateBackend(options, flinkStreamEnv);
+ configureWebUIOptions(
+ flinkStreamEnv.getConfig(),
options.as(org.apache.beam.sdk.options.PipelineOptions.class));
+
return flinkStreamEnv;
}
+ private static void configureWebUIOptions(
+ ExecutionConfig config, org.apache.beam.sdk.options.PipelineOptions
options) {
+ SerializablePipelineOptions serializablePipelineOptions =
+ new SerializablePipelineOptions(options);
+ String optionsAsString = serializablePipelineOptions.toString();
+
+ try {
+ JsonNode node = mapper.readTree(optionsAsString);
+ JsonNode optionsNode = node.get("options");
+ Map<String, String> output =
+ Streams.stream(optionsNode.fields())
+ .filter(
+ entry ->
+ !entry.getValue().asText().isEmpty()
Review Comment:
Empty could be a valid configuration which we might want to display to the
user. Do we not have a direct way to check if the field exists in the options?
##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java:
##########
@@ -140,6 +150,9 @@ static ExecutionEnvironment createBatchExecutionEnvironment(
applyLatencyTrackingInterval(flinkBatchEnv.getConfig(), options);
+ configureWebUIOptions(
+ flinkBatchEnv.getConfig(),
options.as(org.apache.beam.sdk.options.PipelineOptions.class));
Review Comment:
Any reason for not importing PipelineOptions?
##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java:
##########
@@ -245,9 +258,63 @@ public static StreamExecutionEnvironment
createStreamExecutionEnvironment(
configureStateBackend(options, flinkStreamEnv);
+ configureWebUIOptions(
+ flinkStreamEnv.getConfig(),
options.as(org.apache.beam.sdk.options.PipelineOptions.class));
Review Comment:
Same here.
##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java:
##########
@@ -245,9 +258,63 @@ public static StreamExecutionEnvironment
createStreamExecutionEnvironment(
configureStateBackend(options, flinkStreamEnv);
+ configureWebUIOptions(
+ flinkStreamEnv.getConfig(),
options.as(org.apache.beam.sdk.options.PipelineOptions.class));
+
return flinkStreamEnv;
}
+ private static void configureWebUIOptions(
+ ExecutionConfig config, org.apache.beam.sdk.options.PipelineOptions
options) {
+ SerializablePipelineOptions serializablePipelineOptions =
+ new SerializablePipelineOptions(options);
+ String optionsAsString = serializablePipelineOptions.toString();
+
+ try {
+ JsonNode node = mapper.readTree(optionsAsString);
+ JsonNode optionsNode = node.get("options");
+ Map<String, String> output =
+ Streams.stream(optionsNode.fields())
+ .filter(
+ entry ->
+ !entry.getValue().asText().isEmpty()
+ && !"null".equals(entry.getValue().asText()))
Review Comment:
Can we do a direct null check here instead of converting to a string first?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]