This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 94078e54224 Publish pipeline options to flink web ui (#26009)
94078e54224 is described below

commit 94078e542241eaed0824e0153aa4c2917945ea55
Author: Kanishk Karanawat <kkd...@gmail.com>
AuthorDate: Sat May 27 04:21:18 2023 -0400

    Publish pipeline options to flink web ui (#26009)
    
    * propogate pipeline options to flink web ui
    
    * use serializablePipelineOptions to parse pipeline options
    
    * update unit test to compare maps
    
    * add optionId param to make test deterministic
    
    * resolve static analysis errors
    
    * resolve comments
    
    * update unit test
    
    ---------
    
    Co-authored-by: Kanishk Karanawat <kkarana...@twitter.com>
---
 .../runners/flink/FlinkExecutionEnvironments.java  | 63 ++++++++++++++++++++++
 .../flink/FlinkExecutionEnvironmentsTest.java      | 58 ++++++++++++++++++++
 .../shortcodes/flink_java_pipeline_options.html    |  2 -
 .../shortcodes/flink_python_pipeline_options.html  |  2 -
 4 files changed, 121 insertions(+), 4 deletions(-)

diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
index d284cf2ca1f..fb4d90884d2 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
@@ -19,12 +19,21 @@ package org.apache.beam.runners.flink;
 
 import static 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getDefaultLocalParallelism;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.InstanceBuilder;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Streams;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.net.HostAndPort;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.ExecutionMode;
@@ -59,6 +68,8 @@ public class FlinkExecutionEnvironments {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkExecutionEnvironments.class);
 
+  private static final ObjectMapper mapper = new ObjectMapper();
+
   /**
    * If the submitted job is a batch processing job, this method creates the 
adequate Flink {@link
    * org.apache.flink.api.java.ExecutionEnvironment} depending on the 
user-specified options.
@@ -140,6 +151,8 @@ public class FlinkExecutionEnvironments {
 
     applyLatencyTrackingInterval(flinkBatchEnv.getConfig(), options);
 
+    configureWebUIOptions(flinkBatchEnv.getConfig(), 
options.as(PipelineOptions.class));
+
     return flinkBatchEnv;
   }
 
@@ -249,9 +262,59 @@ public class FlinkExecutionEnvironments {
 
     configureStateBackend(options, flinkStreamEnv);
 
+    configureWebUIOptions(flinkStreamEnv.getConfig(), 
options.as(PipelineOptions.class));
+
     return flinkStreamEnv;
   }
 
+  private static void configureWebUIOptions(
+      ExecutionConfig config, org.apache.beam.sdk.options.PipelineOptions 
options) {
+    SerializablePipelineOptions serializablePipelineOptions =
+        new SerializablePipelineOptions(options);
+    String optionsAsString = serializablePipelineOptions.toString();
+
+    try {
+      JsonNode node = mapper.readTree(optionsAsString);
+      JsonNode optionsNode = node.get("options");
+      Map<String, String> output =
+          Streams.stream(optionsNode.fields())
+              .filter(entry -> !entry.getValue().isNull())
+              .collect(Collectors.toMap(e -> e.getKey(), e -> 
e.getValue().asText()));
+
+      config.setGlobalJobParameters(new GlobalJobParametersImpl(output));
+    } catch (Exception e) {
+      LOG.warn("Unable to configure web ui options", e);
+    }
+  }
+
+  private static class GlobalJobParametersImpl extends 
ExecutionConfig.GlobalJobParameters {
+    private final Map<String, String> jobOptions;
+
+    private GlobalJobParametersImpl(Map<String, String> jobOptions) {
+      this.jobOptions = jobOptions;
+    }
+
+    @Override
+    public Map<String, String> toMap() {
+      return jobOptions;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj == null || this.getClass() != obj.getClass()) {
+        return false;
+      }
+
+      ExecutionConfig.GlobalJobParameters jobParams = 
(ExecutionConfig.GlobalJobParameters) obj;
+      return Maps.difference(jobParams.toMap(), this.jobOptions).areEqual();
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(jobOptions);
+    }
+  }
+
   private static void configureCheckpointing(
       FlinkPipelineOptions options, StreamExecutionEnvironment flinkStreamEnv) 
{
     // A value of -1 corresponds to disabled checkpointing (see 
CheckpointConfig in Flink).
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java
index 7d527716108..49d317d46ce 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java
@@ -22,6 +22,7 @@ import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
 
 import java.io.File;
 import java.io.IOException;
@@ -29,6 +30,11 @@ import java.io.InputStream;
 import java.net.InetSocketAddress;
 import java.nio.file.Files;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.LocalEnvironment;
 import org.apache.flink.api.java.RemoteEnvironment;
@@ -475,6 +481,58 @@ public class FlinkExecutionEnvironmentsTest {
     assertThat(sev.getStateBackend(), instanceOf(RocksDBStateBackend.class));
   }
 
+  /** Test interface. */
+  public interface TestOptions extends PipelineOptions {
+    String getKey1();
+
+    void setKey1(String value);
+
+    Boolean getKey2();
+
+    void setKey2(Boolean value);
+
+    String getKey3();
+
+    void setKey3(String value);
+  }
+
+  @Test
+  public void shouldSetWebUIOptions() {
+    PipelineOptionsFactory.register(TestOptions.class);
+    PipelineOptionsFactory.register(FlinkPipelineOptions.class);
+
+    FlinkPipelineOptions options =
+        PipelineOptionsFactory.fromArgs(
+                "--key1=value1",
+                "--key2",
+                "--key3=",
+                "--parallelism=10",
+                "--checkpointTimeoutMillis=500")
+            .as(FlinkPipelineOptions.class);
+
+    StreamExecutionEnvironment sev =
+        FlinkExecutionEnvironments.createStreamExecutionEnvironment(options);
+
+    Map<String, String> actualMap = 
sev.getConfig().getGlobalJobParameters().toMap();
+
+    Map<String, String> expectedMap = new HashMap<>();
+    expectedMap.put("key1", "value1");
+    expectedMap.put("key2", "true");
+    expectedMap.put("key3", "");
+    expectedMap.put("checkpointTimeoutMillis", "500");
+    expectedMap.put("parallelism", "10");
+
+    Map<String, String> filteredMap =
+        expectedMap.entrySet().stream()
+            .filter(
+                kv ->
+                    actualMap.containsKey(kv.getKey())
+                        && kv.getValue().equals(actualMap.get(kv.getKey())))
+            .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
+
+    assertTrue(expectedMap.size() == filteredMap.size());
+  }
+
   private void checkHostAndPort(Object env, String expectedHost, int 
expectedPort) {
     String host =
         ((Configuration) Whitebox.getInternalState(env, "configuration"))
diff --git 
a/website/www/site/layouts/shortcodes/flink_java_pipeline_options.html 
b/website/www/site/layouts/shortcodes/flink_java_pipeline_options.html
index c8e79da128e..8314d5b6879 100644
--- a/website/www/site/layouts/shortcodes/flink_java_pipeline_options.html
+++ b/website/www/site/layouts/shortcodes/flink_java_pipeline_options.html
@@ -16,7 +16,6 @@ This is an auto-generated file.
 Use generatePipelineOptionsTableJava and generatePipelineOptionsTablePython 
respectively.
 Should be called before running the tests.
 -->
-<div class="table-container-wrapper">
 <table class="table table-bordered">
 <tr>
   <td><code>allowNonRestoredState</code></td>
@@ -179,4 +178,3 @@ Should be called before running the tests.
   <td></td>
 </tr>
 </table>
-</div>
diff --git 
a/website/www/site/layouts/shortcodes/flink_python_pipeline_options.html 
b/website/www/site/layouts/shortcodes/flink_python_pipeline_options.html
index 127ca9ea005..010d3c94282 100644
--- a/website/www/site/layouts/shortcodes/flink_python_pipeline_options.html
+++ b/website/www/site/layouts/shortcodes/flink_python_pipeline_options.html
@@ -16,7 +16,6 @@ This is an auto-generated file.
 Use generatePipelineOptionsTableJava and generatePipelineOptionsTablePython 
respectively.
 Should be called before running the tests.
 -->
-<div class="table-container-wrapper">
 <table class="table table-bordered">
 <tr>
   <td><code>allow_non_restored_state</code></td>
@@ -179,4 +178,3 @@ Should be called before running the tests.
   <td></td>
 </tr>
 </table>
-</div>

Reply via email to