[ 
https://issues.apache.org/jira/browse/BEAM-9295?focusedWorklogId=400180&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-400180
 ]

ASF GitHub Bot logged work on BEAM-9295:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 09/Mar/20 15:26
            Start Date: 09/Mar/20 15:26
    Worklog Time Spent: 10m 
      Work Description: mxm commented on pull request #10945: [BEAM-9295] Add 
Flink 1.10 build target and Make FlinkRunner compatible with Flink 1.10
URL: https://github.com/apache/beam/pull/10945#discussion_r389761038
 
 

 ##########
 File path: 
runners/flink/1.10/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java
 ##########
 @@ -0,0 +1,533 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.util.Collections;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.LocalEnvironment;
+import org.apache.flink.api.java.RemoteEnvironment;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+import org.powermock.reflect.Whitebox;
+
+/**
+ * Tests for {@link FlinkExecutionEnvironments}.
+ *
+ * <p>This test is copied to 1.10 is becauses the field host, port, etc have 
been removed from
+ * RemoteEnvironment in Flink 1.10, please refer to
+ * 
https://github.com/apache/flink/commit/057c036784242c674ea6091549cdbc98688827a6 
for more details.
+ */
+public class FlinkExecutionEnvironmentsTest {
+
+  @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
+  @Rule public ExpectedException expectedException = ExpectedException.none();
+
+  @Test
+  public void shouldSetParallelismBatch() {
+    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    options.setRunner(TestFlinkRunner.class);
+    options.setParallelism(42);
+
+    ExecutionEnvironment bev =
+        FlinkExecutionEnvironments.createBatchExecutionEnvironment(
+            options, Collections.emptyList());
+
+    assertThat(options.getParallelism(), is(42));
+    assertThat(bev.getParallelism(), is(42));
+  }
+
+  @Test
+  public void shouldSetParallelismStreaming() {
+    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    options.setRunner(TestFlinkRunner.class);
+    options.setParallelism(42);
+
+    StreamExecutionEnvironment sev =
+        FlinkExecutionEnvironments.createStreamExecutionEnvironment(
+            options, Collections.emptyList());
+
+    assertThat(options.getParallelism(), is(42));
+    assertThat(sev.getParallelism(), is(42));
+  }
+
+  @Test
+  public void shouldSetMaxParallelismStreaming() {
+    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    options.setRunner(TestFlinkRunner.class);
+    options.setMaxParallelism(42);
+
+    StreamExecutionEnvironment sev =
+        FlinkExecutionEnvironments.createStreamExecutionEnvironment(
+            options, Collections.emptyList());
+
+    assertThat(options.getMaxParallelism(), is(42));
+    assertThat(sev.getMaxParallelism(), is(42));
+  }
+
+  @Test
+  public void shouldInferParallelismFromEnvironmentBatch() throws IOException {
+    String flinkConfDir = extractFlinkConfig();
+
+    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    options.setRunner(TestFlinkRunner.class);
+    options.setFlinkMaster("host:80");
+
+    ExecutionEnvironment bev =
+        FlinkExecutionEnvironments.createBatchExecutionEnvironment(
+            options, Collections.emptyList(), flinkConfDir);
+
+    assertThat(options.getParallelism(), is(23));
+    assertThat(bev.getParallelism(), is(23));
+  }
+
+  @Test
+  public void shouldInferParallelismFromEnvironmentStreaming() throws 
IOException {
+    String confDir = extractFlinkConfig();
+
+    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    options.setRunner(TestFlinkRunner.class);
+    options.setFlinkMaster("host:80");
+
+    StreamExecutionEnvironment sev =
+        FlinkExecutionEnvironments.createStreamExecutionEnvironment(
+            options, Collections.emptyList(), confDir);
+
+    assertThat(options.getParallelism(), is(23));
+    assertThat(sev.getParallelism(), is(23));
+  }
+
+  @Test
+  public void shouldFallbackToDefaultParallelismBatch() {
+    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    options.setRunner(TestFlinkRunner.class);
+    options.setFlinkMaster("host:80");
+
+    ExecutionEnvironment bev =
+        FlinkExecutionEnvironments.createBatchExecutionEnvironment(
+            options, Collections.emptyList());
+
+    assertThat(options.getParallelism(), is(1));
+    assertThat(bev.getParallelism(), is(1));
+  }
+
+  @Test
+  public void shouldFallbackToDefaultParallelismStreaming() {
+    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    options.setRunner(TestFlinkRunner.class);
+    options.setFlinkMaster("host:80");
+
+    StreamExecutionEnvironment sev =
+        FlinkExecutionEnvironments.createStreamExecutionEnvironment(
+            options, Collections.emptyList());
+
+    assertThat(options.getParallelism(), is(1));
+    assertThat(sev.getParallelism(), is(1));
+  }
+
+  @Test
+  public void useDefaultParallelismFromContextBatch() {
+    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    options.setRunner(TestFlinkRunner.class);
+
+    ExecutionEnvironment bev =
+        FlinkExecutionEnvironments.createBatchExecutionEnvironment(
+            options, Collections.emptyList());
+
+    assertThat(bev, instanceOf(LocalEnvironment.class));
+    assertThat(options.getParallelism(), 
is(LocalStreamEnvironment.getDefaultLocalParallelism()));
+    assertThat(bev.getParallelism(), 
is(LocalStreamEnvironment.getDefaultLocalParallelism()));
+  }
+
+  @Test
+  public void useDefaultParallelismFromContextStreaming() {
+    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    options.setRunner(TestFlinkRunner.class);
+
+    StreamExecutionEnvironment sev =
+        FlinkExecutionEnvironments.createStreamExecutionEnvironment(
+            options, Collections.emptyList());
+
+    assertThat(sev, instanceOf(LocalStreamEnvironment.class));
+    assertThat(options.getParallelism(), 
is(LocalStreamEnvironment.getDefaultLocalParallelism()));
+    assertThat(sev.getParallelism(), 
is(LocalStreamEnvironment.getDefaultLocalParallelism()));
+  }
+
+  @Test
+  public void shouldParsePortForRemoteEnvironmentBatch() {
+    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    options.setRunner(FlinkRunner.class);
+    options.setFlinkMaster("host:1234");
+
+    ExecutionEnvironment bev =
+        FlinkExecutionEnvironments.createBatchExecutionEnvironment(
+            options, Collections.emptyList());
+
+    assertThat(bev, instanceOf(RemoteEnvironment.class));
+    assertThat(
+        ((Configuration) Whitebox.getInternalState(bev, "configuration"))
+            .getString(RestOptions.ADDRESS),
+        is("host"));
+    assertThat(
+        ((Configuration) Whitebox.getInternalState(bev, "configuration"))
+            .getInteger(RestOptions.PORT),
+        is(1234));
+  }
+
+  @Test
+  public void shouldParsePortForRemoteEnvironmentStreaming() {
+    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    options.setRunner(FlinkRunner.class);
+    options.setFlinkMaster("host:1234");
+
+    StreamExecutionEnvironment sev =
+        FlinkExecutionEnvironments.createStreamExecutionEnvironment(
+            options, Collections.emptyList());
+
+    assertThat(sev, instanceOf(RemoteStreamEnvironment.class));
+    assertThat(
+        ((Configuration) Whitebox.getInternalState(sev, "configuration"))
+            .getString(RestOptions.ADDRESS),
+        is("host"));
+    assertThat(
+        ((Configuration) Whitebox.getInternalState(sev, "configuration"))
+            .getInteger(RestOptions.PORT),
+        is(1234));
+  }
+
+  @Test
+  public void shouldAllowPortOmissionForRemoteEnvironmentBatch() {
+    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    options.setRunner(FlinkRunner.class);
+    options.setFlinkMaster("host");
+
+    ExecutionEnvironment bev =
+        FlinkExecutionEnvironments.createBatchExecutionEnvironment(
+            options, Collections.emptyList());
+
+    assertThat(bev, instanceOf(RemoteEnvironment.class));
+    assertThat(
+        ((Configuration) Whitebox.getInternalState(bev, "configuration"))
+            .getString(RestOptions.ADDRESS),
+        is("host"));
+    assertThat(
+        ((Configuration) Whitebox.getInternalState(bev, "configuration"))
+            .getInteger(RestOptions.PORT),
+        is(RestOptions.PORT.defaultValue()));
+  }
+
+  @Test
+  public void shouldAllowPortOmissionForRemoteEnvironmentStreaming() {
+    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    options.setRunner(FlinkRunner.class);
+    options.setFlinkMaster("host");
+
+    StreamExecutionEnvironment sev =
+        FlinkExecutionEnvironments.createStreamExecutionEnvironment(
+            options, Collections.emptyList());
+
+    assertThat(sev, instanceOf(RemoteStreamEnvironment.class));
+    assertThat(
+        ((Configuration) Whitebox.getInternalState(sev, "configuration"))
+            .getString(RestOptions.ADDRESS),
+        is("host"));
+    assertThat(
+        ((Configuration) Whitebox.getInternalState(sev, "configuration"))
+            .getInteger(RestOptions.PORT),
+        is(RestOptions.PORT.defaultValue()));
+  }
+
+  @Test
+  public void shouldTreatAutoAndEmptyHostTheSameBatch() {
+    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    options.setRunner(FlinkRunner.class);
+
+    ExecutionEnvironment sev =
+        FlinkExecutionEnvironments.createBatchExecutionEnvironment(
+            options, Collections.emptyList());
+
+    options.setFlinkMaster("[auto]");
+
+    ExecutionEnvironment sev2 =
+        FlinkExecutionEnvironments.createBatchExecutionEnvironment(
+            options, Collections.emptyList());
+
+    assertEquals(sev.getClass(), sev2.getClass());
+  }
+
+  @Test
+  public void shouldTreatAutoAndEmptyHostTheSameStreaming() {
+    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    options.setRunner(FlinkRunner.class);
+
+    StreamExecutionEnvironment sev =
+        FlinkExecutionEnvironments.createStreamExecutionEnvironment(
+            options, Collections.emptyList());
+
+    options.setFlinkMaster("[auto]");
+
+    StreamExecutionEnvironment sev2 =
+        FlinkExecutionEnvironments.createStreamExecutionEnvironment(
+            options, Collections.emptyList());
+
+    assertEquals(sev.getClass(), sev2.getClass());
+  }
+
+  @Test
+  public void shouldDetectMalformedPortBatch() {
+    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    options.setRunner(FlinkRunner.class);
+    options.setFlinkMaster("host:p0rt");
+
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage("Unparseable port number");
+
+    FlinkExecutionEnvironments.createBatchExecutionEnvironment(options, 
Collections.emptyList());
+  }
+
+  @Test
+  public void shouldDetectMalformedPortStreaming() {
+    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    options.setRunner(FlinkRunner.class);
+    options.setFlinkMaster("host:p0rt");
+
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage("Unparseable port number");
+
+    FlinkExecutionEnvironments.createStreamExecutionEnvironment(options, 
Collections.emptyList());
+  }
+
+  @Test
+  public void shouldSupportIPv4Batch() {
+    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    options.setRunner(FlinkRunner.class);
+
+    options.setFlinkMaster("192.168.1.1:1234");
+    ExecutionEnvironment bev =
+        FlinkExecutionEnvironments.createBatchExecutionEnvironment(
+            options, Collections.emptyList());
+    assertThat(
+        ((Configuration) Whitebox.getInternalState(bev, "configuration"))
+            .getString(RestOptions.ADDRESS),
+        is("192.168.1.1"));
+    assertThat(
+        ((Configuration) Whitebox.getInternalState(bev, "configuration"))
+            .getInteger(RestOptions.PORT),
+        is(1234));
+
+    options.setFlinkMaster("192.168.1.1");
+    bev =
+        FlinkExecutionEnvironments.createBatchExecutionEnvironment(
+            options, Collections.emptyList());
+    assertThat(
+        ((Configuration) Whitebox.getInternalState(bev, "configuration"))
+            .getString(RestOptions.ADDRESS),
+        is("192.168.1.1"));
+    assertThat(
+        ((Configuration) Whitebox.getInternalState(bev, "configuration"))
+            .getInteger(RestOptions.PORT),
+        is(RestOptions.PORT.defaultValue()));
+  }
+
+  @Test
+  public void shouldSupportIPv4Streaming() {
+    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    options.setRunner(FlinkRunner.class);
+
+    options.setFlinkMaster("192.168.1.1:1234");
+    ExecutionEnvironment bev =
+        FlinkExecutionEnvironments.createBatchExecutionEnvironment(
+            options, Collections.emptyList());
+    assertThat(
+        ((Configuration) Whitebox.getInternalState(bev, "configuration"))
+            .getString(RestOptions.ADDRESS),
+        is("192.168.1.1"));
+    assertThat(
+        ((Configuration) Whitebox.getInternalState(bev, "configuration"))
+            .getInteger(RestOptions.PORT),
+        is(1234));
+
+    options.setFlinkMaster("192.168.1.1");
+    bev =
+        FlinkExecutionEnvironments.createBatchExecutionEnvironment(
+            options, Collections.emptyList());
+    assertThat(
+        ((Configuration) Whitebox.getInternalState(bev, "configuration"))
+            .getString(RestOptions.ADDRESS),
+        is("192.168.1.1"));
+    assertThat(
+        ((Configuration) Whitebox.getInternalState(bev, "configuration"))
+            .getInteger(RestOptions.PORT),
+        is(RestOptions.PORT.defaultValue()));
+  }
+
+  @Test
+  public void shouldSupportIPv6Batch() {
+    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    options.setRunner(FlinkRunner.class);
+
+    options.setFlinkMaster("[FE80:CD00:0000:0CDE:1257:0000:211E:729C]:1234");
+    ExecutionEnvironment bev =
+        FlinkExecutionEnvironments.createBatchExecutionEnvironment(
+            options, Collections.emptyList());
+    assertThat(
+        ((Configuration) Whitebox.getInternalState(bev, "configuration"))
+            .getString(RestOptions.ADDRESS),
+        is("fe80:cd00:0:cde:1257:0:211e:729c"));
+    assertThat(
+        ((Configuration) Whitebox.getInternalState(bev, "configuration"))
+            .getInteger(RestOptions.PORT),
+        is(1234));
+
+    options.setFlinkMaster("FE80:CD00:0000:0CDE:1257:0000:211E:729C");
+    bev =
+        FlinkExecutionEnvironments.createBatchExecutionEnvironment(
+            options, Collections.emptyList());
+    assertThat(
+        ((Configuration) Whitebox.getInternalState(bev, "configuration"))
+            .getString(RestOptions.ADDRESS),
+        is("fe80:cd00:0:cde:1257:0:211e:729c"));
+    assertThat(
+        ((Configuration) Whitebox.getInternalState(bev, "configuration"))
+            .getInteger(RestOptions.PORT),
+        is(RestOptions.PORT.defaultValue()));
+  }
+
+  @Test
+  public void shouldSupportIPv6Streaming() {
+    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    options.setRunner(FlinkRunner.class);
+
+    options.setFlinkMaster("[FE80:CD00:0000:0CDE:1257:0000:211E:729C]:1234");
+    StreamExecutionEnvironment sev =
+        FlinkExecutionEnvironments.createStreamExecutionEnvironment(
+            options, Collections.emptyList());
+    assertThat(
+        ((Configuration) Whitebox.getInternalState(sev, "configuration"))
+            .getString(RestOptions.ADDRESS),
+        is("fe80:cd00:0:cde:1257:0:211e:729c"));
+    assertThat(
+        ((Configuration) Whitebox.getInternalState(sev, "configuration"))
+            .getInteger(RestOptions.PORT),
+        is(1234));
+
+    options.setFlinkMaster("FE80:CD00:0000:0CDE:1257:0000:211E:729C");
+    sev =
+        FlinkExecutionEnvironments.createStreamExecutionEnvironment(
+            options, Collections.emptyList());
+    assertThat(
+        ((Configuration) Whitebox.getInternalState(sev, "configuration"))
+            .getString(RestOptions.ADDRESS),
+        is("fe80:cd00:0:cde:1257:0:211e:729c"));
+    assertThat(
+        ((Configuration) Whitebox.getInternalState(sev, "configuration"))
+            .getInteger(RestOptions.PORT),
+        is(RestOptions.PORT.defaultValue()));
+  }
+
+  @Test
+  public void shouldRemoveHttpProtocolFromHostBatch() {
+    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    options.setRunner(FlinkRunner.class);
+
+    for (String flinkMaster :
+        new String[] {
+          "http://host:1234";, " http://host:1234";, "https://host:1234";, " 
https://host:1234";
+        }) {
+      options.setFlinkMaster(flinkMaster);
+      ExecutionEnvironment sev =
+          FlinkExecutionEnvironments.createBatchExecutionEnvironment(
+              options, Collections.emptyList());
+      assertThat(
+          ((Configuration) Whitebox.getInternalState(sev, "configuration"))
+              .getString(RestOptions.ADDRESS),
+          is("host"));
+      assertThat(
+          ((Configuration) Whitebox.getInternalState(sev, "configuration"))
+              .getInteger(RestOptions.PORT),
+          is(1234));
 
 Review comment:
   We could avoid duplication of this file and just have a check method which 
branches depending on whether we have 1.10 or a version below.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 400180)
    Time Spent: 3h 50m  (was: 3h 40m)

> Add Flink 1.10 build target and Make FlinkRunner compatible with Flink 1.10
> ---------------------------------------------------------------------------
>
>                 Key: BEAM-9295
>                 URL: https://issues.apache.org/jira/browse/BEAM-9295
>             Project: Beam
>          Issue Type: New Feature
>          Components: runner-flink
>            Reporter: sunjincheng
>            Assignee: sunjincheng
>            Priority: Major
>          Time Spent: 3h 50m
>  Remaining Estimate: 0h
>
> Apache Flink 1.10 has completed the final release vote, see [1]. So, I would 
> like to add Flink 1.10 build target and make Flink Runner compatible with 
> Flink 1.10.
> And I appreciate it if you can leave your suggestions or comments!
> [1] 
> https://lists.apache.org/thread.html/r97672d4d1e47372cebf23e6643a6cc30a06bfbdf3f277b0be3695b15%40%3Cdev.flink.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to