This is an automated email from the ASF dual-hosted git repository. mxm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 94078e54224 Publish pipeline options to flink web ui (#26009) 94078e54224 is described below commit 94078e542241eaed0824e0153aa4c2917945ea55 Author: Kanishk Karanawat <kkd...@gmail.com> AuthorDate: Sat May 27 04:21:18 2023 -0400 Publish pipeline options to flink web ui (#26009) * propogate pipeline options to flink web ui * use serializablePipelineOptions to parse pipeline options * update unit test to compare maps * add optionId param to make test deterministic * resolve static analysis errors * resolve comments * update unit test --------- Co-authored-by: Kanishk Karanawat <kkarana...@twitter.com> --- .../runners/flink/FlinkExecutionEnvironments.java | 63 ++++++++++++++++++++++ .../flink/FlinkExecutionEnvironmentsTest.java | 58 ++++++++++++++++++++ .../shortcodes/flink_java_pipeline_options.html | 2 - .../shortcodes/flink_python_pipeline_options.html | 2 - 4 files changed, 121 insertions(+), 4 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java index d284cf2ca1f..fb4d90884d2 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java @@ -19,12 +19,21 @@ package org.apache.beam.runners.flink; import static org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getDefaultLocalParallelism; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; +import org.apache.beam.runners.core.construction.SerializablePipelineOptions; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.util.InstanceBuilder; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Streams; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.net.HostAndPort; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.ExecutionMode; @@ -59,6 +68,8 @@ public class FlinkExecutionEnvironments { private static final Logger LOG = LoggerFactory.getLogger(FlinkExecutionEnvironments.class); + private static final ObjectMapper mapper = new ObjectMapper(); + /** * If the submitted job is a batch processing job, this method creates the adequate Flink {@link * org.apache.flink.api.java.ExecutionEnvironment} depending on the user-specified options. @@ -140,6 +151,8 @@ public class FlinkExecutionEnvironments { applyLatencyTrackingInterval(flinkBatchEnv.getConfig(), options); + configureWebUIOptions(flinkBatchEnv.getConfig(), options.as(PipelineOptions.class)); + return flinkBatchEnv; } @@ -249,9 +262,59 @@ public class FlinkExecutionEnvironments { configureStateBackend(options, flinkStreamEnv); + configureWebUIOptions(flinkStreamEnv.getConfig(), options.as(PipelineOptions.class)); + return flinkStreamEnv; } + private static void configureWebUIOptions( + ExecutionConfig config, org.apache.beam.sdk.options.PipelineOptions options) { + SerializablePipelineOptions serializablePipelineOptions = + new SerializablePipelineOptions(options); + String optionsAsString = serializablePipelineOptions.toString(); + + try { + JsonNode node = mapper.readTree(optionsAsString); + JsonNode optionsNode = node.get("options"); + Map<String, String> output = + Streams.stream(optionsNode.fields()) + .filter(entry -> !entry.getValue().isNull()) + .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue().asText())); + + config.setGlobalJobParameters(new GlobalJobParametersImpl(output)); + } catch (Exception e) { + LOG.warn("Unable to configure web ui options", e); + } + } + + private static class GlobalJobParametersImpl extends ExecutionConfig.GlobalJobParameters { + private final Map<String, String> jobOptions; + + private GlobalJobParametersImpl(Map<String, String> jobOptions) { + this.jobOptions = jobOptions; + } + + @Override + public Map<String, String> toMap() { + return jobOptions; + } + + @Override + public boolean equals(Object obj) { + if (obj == null || this.getClass() != obj.getClass()) { + return false; + } + + ExecutionConfig.GlobalJobParameters jobParams = (ExecutionConfig.GlobalJobParameters) obj; + return Maps.difference(jobParams.toMap(), this.jobOptions).areEqual(); + } + + @Override + public int hashCode() { + return Objects.hashCode(jobOptions); + } + } + private static void configureCheckpointing( FlinkPipelineOptions options, StreamExecutionEnvironment flinkStreamEnv) { // A value of -1 corresponds to disabled checkpointing (see CheckpointConfig in Flink). diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java index 7d527716108..49d317d46ce 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java @@ -22,6 +22,7 @@ import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; import java.io.File; import java.io.IOException; @@ -29,6 +30,11 @@ import java.io.InputStream; import java.net.InetSocketAddress; import java.nio.file.Files; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.LocalEnvironment; import org.apache.flink.api.java.RemoteEnvironment; @@ -475,6 +481,58 @@ public class FlinkExecutionEnvironmentsTest { assertThat(sev.getStateBackend(), instanceOf(RocksDBStateBackend.class)); } + /** Test interface. */ + public interface TestOptions extends PipelineOptions { + String getKey1(); + + void setKey1(String value); + + Boolean getKey2(); + + void setKey2(Boolean value); + + String getKey3(); + + void setKey3(String value); + } + + @Test + public void shouldSetWebUIOptions() { + PipelineOptionsFactory.register(TestOptions.class); + PipelineOptionsFactory.register(FlinkPipelineOptions.class); + + FlinkPipelineOptions options = + PipelineOptionsFactory.fromArgs( + "--key1=value1", + "--key2", + "--key3=", + "--parallelism=10", + "--checkpointTimeoutMillis=500") + .as(FlinkPipelineOptions.class); + + StreamExecutionEnvironment sev = + FlinkExecutionEnvironments.createStreamExecutionEnvironment(options); + + Map<String, String> actualMap = sev.getConfig().getGlobalJobParameters().toMap(); + + Map<String, String> expectedMap = new HashMap<>(); + expectedMap.put("key1", "value1"); + expectedMap.put("key2", "true"); + expectedMap.put("key3", ""); + expectedMap.put("checkpointTimeoutMillis", "500"); + expectedMap.put("parallelism", "10"); + + Map<String, String> filteredMap = + expectedMap.entrySet().stream() + .filter( + kv -> + actualMap.containsKey(kv.getKey()) + && kv.getValue().equals(actualMap.get(kv.getKey()))) + .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue())); + + assertTrue(expectedMap.size() == filteredMap.size()); + } + private void checkHostAndPort(Object env, String expectedHost, int expectedPort) { String host = ((Configuration) Whitebox.getInternalState(env, "configuration")) diff --git a/website/www/site/layouts/shortcodes/flink_java_pipeline_options.html b/website/www/site/layouts/shortcodes/flink_java_pipeline_options.html index c8e79da128e..8314d5b6879 100644 --- a/website/www/site/layouts/shortcodes/flink_java_pipeline_options.html +++ b/website/www/site/layouts/shortcodes/flink_java_pipeline_options.html @@ -16,7 +16,6 @@ This is an auto-generated file. Use generatePipelineOptionsTableJava and generatePipelineOptionsTablePython respectively. Should be called before running the tests. --> -<div class="table-container-wrapper"> <table class="table table-bordered"> <tr> <td><code>allowNonRestoredState</code></td> @@ -179,4 +178,3 @@ Should be called before running the tests. <td></td> </tr> </table> -</div> diff --git a/website/www/site/layouts/shortcodes/flink_python_pipeline_options.html b/website/www/site/layouts/shortcodes/flink_python_pipeline_options.html index 127ca9ea005..010d3c94282 100644 --- a/website/www/site/layouts/shortcodes/flink_python_pipeline_options.html +++ b/website/www/site/layouts/shortcodes/flink_python_pipeline_options.html @@ -16,7 +16,6 @@ This is an auto-generated file. Use generatePipelineOptionsTableJava and generatePipelineOptionsTablePython respectively. Should be called before running the tests. --> -<div class="table-container-wrapper"> <table class="table table-bordered"> <tr> <td><code>allow_non_restored_state</code></td> @@ -179,4 +178,3 @@ Should be called before running the tests. <td></td> </tr> </table> -</div>