[ 
https://issues.apache.org/jira/browse/BEAM-9295?focusedWorklogId=400179&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-400179
 ]

ASF GitHub Bot logged work on BEAM-9295:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 09/Mar/20 15:26
            Start Date: 09/Mar/20 15:26
    Worklog Time Spent: 10m 
      Work Description: mxm commented on pull request #10945: [BEAM-9295] Add 
Flink 1.10 build target and Make FlinkRunner compatible with Flink 1.10
URL: https://github.com/apache/beam/pull/10945#discussion_r389761717
 
 

 ##########
 File path: 
runners/flink/1.10/src/test/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironmentTest.java
 ##########
 @@ -0,0 +1,441 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static org.apache.beam.sdk.testing.RegexMatcher.matches;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.CoreMatchers.startsWith;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.core.Every.everyItem;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.fail;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.io.Serializable;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.beam.runners.core.construction.PTransformMatchers;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.resources.PipelineResources;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.runners.PTransformOverride;
+import org.apache.beam.sdk.runners.PTransformOverrideFactory;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.RemoteEnvironment;
+import org.apache.flink.client.cli.ExecutionConfigAccessor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.hamcrest.Matchers;
+import org.joda.time.Duration;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+import org.powermock.reflect.Whitebox;
+
+/**
+ * Tests for {@link FlinkPipelineExecutionEnvironment}.
+ *
+ * <p>This test is copied to 1.10 is becauses the field jarFiles has been 
removed from
+ * RemoteEnvironment in Flink 1.10, please refer to
+ * 
https://github.com/apache/flink/commit/057c036784242c674ea6091549cdbc98688827a6 
for more details.
+ */
+@RunWith(JUnit4.class)
+public class FlinkPipelineExecutionEnvironmentTest implements Serializable {
+
+  @Rule public transient TemporaryFolder tmpFolder = new TemporaryFolder();
+
+  @Test
+  public void shouldRecognizeAndTranslateStreamingPipeline() {
+    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    options.setRunner(TestFlinkRunner.class);
+    options.setFlinkMaster("[auto]");
+
+    FlinkPipelineExecutionEnvironment flinkEnv = new 
FlinkPipelineExecutionEnvironment(options);
+    Pipeline pipeline = Pipeline.create();
+
+    pipeline
+        .apply(GenerateSequence.from(0).withRate(1, 
Duration.standardSeconds(1)))
+        .apply(
+            ParDo.of(
+                new DoFn<Long, String>() {
+
+                  @ProcessElement
+                  public void processElement(ProcessContext c) throws 
Exception {
+                    c.output(Long.toString(c.element()));
+                  }
+                }))
+        .apply(Window.into(FixedWindows.of(Duration.standardHours(1))))
+        
.apply(TextIO.write().withNumShards(1).withWindowedWrites().to("/dummy/path"));
+
+    flinkEnv.translate(pipeline);
+
+    // no exception should be thrown
+  }
+
+  @Test
+  public void shouldPrepareFilesToStageWhenFlinkMasterIsSetExplicitly() throws 
IOException {
+    FlinkPipelineOptions options = 
testPreparingResourcesToStage("localhost:8081", true, false);
+
+    assertThat(options.getFilesToStage().size(), is(2));
+    assertThat(options.getFilesToStage().get(0), matches(".*\\.jar"));
+  }
+
+  @Test
+  public void shouldFailWhenFileDoesNotExistAndFlinkMasterIsSetExplicitly() {
+    assertThrows(
+        "To-be-staged file does not exist: ",
+        IllegalStateException.class,
+        () -> testPreparingResourcesToStage("localhost:8081", true, true));
+  }
+
+  @Test
+  public void shouldNotPrepareFilesToStageWhenFlinkMasterIsSetToAuto() throws 
IOException {
+    FlinkPipelineOptions options = testPreparingResourcesToStage("[auto]");
+
+    assertThat(options.getFilesToStage().size(), is(2));
+    assertThat(options.getFilesToStage(), everyItem(not(matches(".*\\.jar"))));
+  }
+
+  @Test
+  public void shouldNotPrepareFilesToStagewhenFlinkMasterIsSetToCollection() 
throws IOException {
+    FlinkPipelineOptions options = 
testPreparingResourcesToStage("[collection]");
+
+    assertThat(options.getFilesToStage().size(), is(2));
+    assertThat(options.getFilesToStage(), everyItem(not(matches(".*\\.jar"))));
+  }
+
+  @Test
+  public void shouldNotPrepareFilesToStageWhenFlinkMasterIsSetToLocal() throws 
IOException {
+    FlinkPipelineOptions options = testPreparingResourcesToStage("[local]");
+
+    assertThat(options.getFilesToStage().size(), is(2));
+    assertThat(options.getFilesToStage(), everyItem(not(matches(".*\\.jar"))));
+  }
+
+  @Test
+  public void shouldUseDefaultTempLocationIfNoneSet() {
+    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    options.setRunner(TestFlinkRunner.class);
+    options.setFlinkMaster("clusterAddress");
+
+    FlinkPipelineExecutionEnvironment flinkEnv = new 
FlinkPipelineExecutionEnvironment(options);
+
+    Pipeline pipeline = Pipeline.create(options);
+    flinkEnv.translate(pipeline);
+
+    String defaultTmpDir = System.getProperty("java.io.tmpdir");
+
+    assertThat(options.getFilesToStage(), hasItem(startsWith(defaultTmpDir)));
+  }
+
+  @Test
+  public void shouldUsePreparedFilesOnRemoteEnvironment() throws Exception {
+    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    options.setRunner(TestFlinkRunner.class);
+    options.setFlinkMaster("clusterAddress");
+
+    FlinkPipelineExecutionEnvironment flinkEnv = new 
FlinkPipelineExecutionEnvironment(options);
+
+    Pipeline pipeline = Pipeline.create(options);
+    flinkEnv.translate(pipeline);
+
+    ExecutionEnvironment executionEnvironment = 
flinkEnv.getBatchExecutionEnvironment();
+    assertThat(executionEnvironment, instanceOf(RemoteEnvironment.class));
+
+    ExecutionConfigAccessor accesor =
+        ExecutionConfigAccessor.fromConfiguration(
+            (Configuration) Whitebox.getInternalState(executionEnvironment, 
"configuration"));
+    List<URL> jarFiles = accesor.getJars();
 
 Review comment:
   We could avoid duplicating this file if we had a Flink 1.10 dependent 
branching here.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 400179)
    Time Spent: 3h 40m  (was: 3.5h)

> Add Flink 1.10 build target and Make FlinkRunner compatible with Flink 1.10
> ---------------------------------------------------------------------------
>
>                 Key: BEAM-9295
>                 URL: https://issues.apache.org/jira/browse/BEAM-9295
>             Project: Beam
>          Issue Type: New Feature
>          Components: runner-flink
>            Reporter: sunjincheng
>            Assignee: sunjincheng
>            Priority: Major
>          Time Spent: 3h 40m
>  Remaining Estimate: 0h
>
> Apache Flink 1.10 has completed the final release vote, see [1]. So, I would 
> like to add Flink 1.10 build target and make Flink Runner compatible with 
> Flink 1.10.
> And I appreciate it if you can leave your suggestions or comments!
> [1] 
> https://lists.apache.org/thread.html/r97672d4d1e47372cebf23e6643a6cc30a06bfbdf3f277b0be3695b15%40%3Cdev.flink.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to