[ 
https://issues.apache.org/jira/browse/BEAM-6058?focusedWorklogId=170070&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-170070
 ]

ASF GitHub Bot logged work on BEAM-6058:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 28/Nov/18 00:55
            Start Date: 28/Nov/18 00:55
    Worklog Time Spent: 10m 
      Work Description: tweise closed pull request #7031: [BEAM-6058] Adding 
option for flink configuration directory and setting config in exectution 
environment
URL: https://github.com/apache/beam/pull/7031
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/runners/flink/job-server/build.gradle 
b/runners/flink/job-server/build.gradle
index 93ab802493f6..6a438ff8bb9f 100644
--- a/runners/flink/job-server/build.gradle
+++ b/runners/flink/job-server/build.gradle
@@ -65,6 +65,8 @@ runShadow {
     args += ["--clean-artifacts-per-job"]
   if (project.hasProperty('flinkMasterUrl'))
     args += ["--flink-master-url=${project.property('flinkMasterUrl')}"]
+  if (project.hasProperty('flinkConfDir'))
+    args += ["--flink-conf-dir=${project.property('flinkConfDir')}"]
 
   // Enable remote debugging.
   jvmArgs = ["-Xdebug", 
"-Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=5005"]
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
index 3f26d5e2d991..bc1ffd36b4c2 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
@@ -39,6 +39,7 @@
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.stream.Collectors;
+import javax.annotation.Nullable;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import 
org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
@@ -100,7 +101,7 @@
  *   FlinkBatchPortablePipelineTranslator translator =
  *       FlinkBatchPortablePipelineTranslator.createTranslator();
  *   BatchTranslationContext context =
- *       
FlinkBatchPortablePipelineTranslator.createTranslationContext(jobInfo);
+ *       
FlinkBatchPortablePipelineTranslator.createTranslationContext(jobInfo, confDir, 
filesToStage);
  *   translator.translate(context, pipeline);
  *   ExecutionEnvironment executionEnvironment = 
context.getExecutionEnvironment();
  *   // Do something with executionEnvironment...
@@ -118,9 +119,13 @@
    * {@link ExecutionEnvironment}.
    */
   public static BatchTranslationContext createTranslationContext(
-      JobInfo jobInfo, FlinkPipelineOptions pipelineOptions, List<String> 
filesToStage) {
+      JobInfo jobInfo,
+      FlinkPipelineOptions pipelineOptions,
+      @Nullable String confDir,
+      List<String> filesToStage) {
     ExecutionEnvironment executionEnvironment =
-        
FlinkExecutionEnvironments.createBatchExecutionEnvironment(pipelineOptions, 
filesToStage);
+        FlinkExecutionEnvironments.createBatchExecutionEnvironment(
+            pipelineOptions, filesToStage, confDir);
     return new BatchTranslationContext(jobInfo, pipelineOptions, 
executionEnvironment);
   }
 
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
index 8141e6c148bb..9802defeb1a4 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
@@ -49,18 +49,18 @@ public static ExecutionEnvironment 
createBatchExecutionEnvironment(
     return createBatchExecutionEnvironment(options, filesToStage, null);
   }
 
-  @VisibleForTesting
   static ExecutionEnvironment createBatchExecutionEnvironment(
       FlinkPipelineOptions options, List<String> filesToStage, @Nullable 
String confDir) {
 
     LOG.info("Creating a Batch Execution Environment.");
 
     String masterUrl = options.getFlinkMaster();
+    Configuration flinkConfiguration = getFlinkConfiguration(confDir);
     ExecutionEnvironment flinkBatchEnv;
 
     // depending on the master, create the right environment.
     if ("[local]".equals(masterUrl)) {
-      flinkBatchEnv = ExecutionEnvironment.createLocalEnvironment();
+      flinkBatchEnv = 
ExecutionEnvironment.createLocalEnvironment(flinkConfiguration);
     } else if ("[collection]".equals(masterUrl)) {
       flinkBatchEnv = new CollectionEnvironment();
     } else if ("[auto]".equals(masterUrl)) {
@@ -71,6 +71,7 @@ static ExecutionEnvironment createBatchExecutionEnvironment(
           ExecutionEnvironment.createRemoteEnvironment(
               parts.get(0),
               Integer.parseInt(parts.get(1)),
+              flinkConfiguration,
               filesToStage.toArray(new String[filesToStage.size()]));
     } else {
       LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", 
masterUrl);
@@ -90,7 +91,8 @@ static ExecutionEnvironment createBatchExecutionEnvironment(
       parallelism = 1;
     } else {
       parallelism =
-          determineParallelism(options.getParallelism(), 
flinkBatchEnv.getParallelism(), confDir);
+          determineParallelism(
+              options.getParallelism(), flinkBatchEnv.getParallelism(), 
flinkConfiguration);
     }
 
     flinkBatchEnv.setParallelism(parallelism);
@@ -120,11 +122,12 @@ public static StreamExecutionEnvironment 
createStreamExecutionEnvironment(
 
   @VisibleForTesting
   static StreamExecutionEnvironment createStreamExecutionEnvironment(
-      FlinkPipelineOptions options, List<String> filesToStage, @Nullable 
String flinkConfigDir) {
+      FlinkPipelineOptions options, List<String> filesToStage, @Nullable 
String confDir) {
 
     LOG.info("Creating a Streaming Environment.");
 
     String masterUrl = options.getFlinkMaster();
+    Configuration flinkConfig = getFlinkConfiguration(confDir);
     StreamExecutionEnvironment flinkStreamEnv = null;
 
     // depending on the master, create the right environment.
@@ -134,13 +137,12 @@ static StreamExecutionEnvironment 
createStreamExecutionEnvironment(
       flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
     } else if (masterUrl.matches(".*:\\d*")) {
       List<String> parts = Splitter.on(':').splitToList(masterUrl);
-      Configuration clientConfig = new Configuration();
-      clientConfig.setInteger(RestOptions.PORT, 
Integer.parseInt(parts.get(1)));
+      flinkConfig.setInteger(RestOptions.PORT, Integer.parseInt(parts.get(1)));
       flinkStreamEnv =
           StreamExecutionEnvironment.createRemoteEnvironment(
               parts.get(0),
               Integer.parseInt(parts.get(1)),
-              clientConfig,
+              flinkConfig,
               filesToStage.toArray(new String[filesToStage.size()]));
     } else {
       LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", 
masterUrl);
@@ -150,7 +152,7 @@ static StreamExecutionEnvironment 
createStreamExecutionEnvironment(
     // Set the parallelism, required by UnboundedSourceWrapper to generate 
consistent splits.
     final int parallelism =
         determineParallelism(
-            options.getParallelism(), flinkStreamEnv.getParallelism(), 
flinkConfigDir);
+            options.getParallelism(), flinkStreamEnv.getParallelism(), 
flinkConfig);
     flinkStreamEnv.setParallelism(parallelism);
     // set parallelism in the options (required by some execution code)
     options.setParallelism(parallelism);
@@ -230,7 +232,7 @@ static StreamExecutionEnvironment 
createStreamExecutionEnvironment(
   private static int determineParallelism(
       final int pipelineOptionsParallelism,
       final int envParallelism,
-      @Nullable String flinkConfDir) {
+      final Configuration configuration) {
     if (pipelineOptionsParallelism > 0) {
       return pipelineOptionsParallelism;
     }
@@ -239,12 +241,6 @@ private static int determineParallelism(
       return envParallelism;
     }
 
-    final Configuration configuration;
-    if (flinkConfDir == null) {
-      configuration = GlobalConfiguration.loadConfiguration();
-    } else {
-      configuration = GlobalConfiguration.loadConfiguration(flinkConfDir);
-    }
     final int flinkConfigParallelism =
         configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM.key(), -1);
     if (flinkConfigParallelism > 0) {
@@ -256,6 +252,12 @@ private static int determineParallelism(
     return 1;
   }
 
+  private static Configuration getFlinkConfiguration(@Nullable String 
flinkConfDir) {
+    return flinkConfDir == null
+        ? GlobalConfiguration.loadConfiguration()
+        : GlobalConfiguration.loadConfiguration(flinkConfDir);
+  }
+
   private static void applyLatencyTrackingInterval(
       ExecutionConfig config, FlinkPipelineOptions options) {
     long latencyTrackingInterval = options.getLatencyTrackingInterval();
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
index 4a7216be4630..b849770fa5ef 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
@@ -55,9 +55,10 @@ public static FlinkJobInvocation create(
       ListeningExecutorService executorService,
       Pipeline pipeline,
       FlinkPipelineOptions pipelineOptions,
+      @Nullable String confDir,
       List<String> filesToStage) {
     return new FlinkJobInvocation(
-        id, retrievalToken, executorService, pipeline, pipelineOptions, 
filesToStage);
+        id, retrievalToken, executorService, pipeline, pipelineOptions, 
confDir, filesToStage);
   }
 
   private final String id;
@@ -65,6 +66,7 @@ public static FlinkJobInvocation create(
   private final ListeningExecutorService executorService;
   private final RunnerApi.Pipeline pipeline;
   private final FlinkPipelineOptions pipelineOptions;
+  private final String confDir;
   private final List<String> filesToStage;
   private JobState.Enum jobState;
   private List<Consumer<JobState.Enum>> stateObservers;
@@ -78,12 +80,14 @@ private FlinkJobInvocation(
       ListeningExecutorService executorService,
       Pipeline pipeline,
       FlinkPipelineOptions pipelineOptions,
+      @Nullable String confDir,
       List<String> filesToStage) {
     this.id = id;
     this.retrievalToken = retrievalToken;
     this.executorService = executorService;
     this.pipeline = pipeline;
     this.pipelineOptions = pipelineOptions;
+    this.confDir = confDir;
     this.filesToStage = filesToStage;
     this.invocationFuture = null;
     this.jobState = JobState.Enum.STOPPED;
@@ -112,7 +116,7 @@ private PipelineResult runPipeline() throws Exception {
           FlinkBatchPortablePipelineTranslator.createTranslator();
       FlinkBatchPortablePipelineTranslator.BatchTranslationContext context =
           FlinkBatchPortablePipelineTranslator.createTranslationContext(
-              jobInfo, pipelineOptions, filesToStage);
+              jobInfo, pipelineOptions, confDir, filesToStage);
       translator.translate(context, fusedPipeline);
       result = 
context.getExecutionEnvironment().execute(pipelineOptions.getJobName());
     } else {
@@ -121,7 +125,7 @@ private PipelineResult runPipeline() throws Exception {
           new FlinkStreamingPortablePipelineTranslator();
       FlinkStreamingPortablePipelineTranslator.StreamingTranslationContext 
context =
           FlinkStreamingPortablePipelineTranslator.createTranslationContext(
-              jobInfo, pipelineOptions, filesToStage);
+              jobInfo, pipelineOptions, confDir, filesToStage);
       translator.translate(context, fusedPipeline);
       result = 
context.getExecutionEnvironment().execute(pipelineOptions.getJobName());
     }
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java
index ecefed1dc25f..8c8b4e197926 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java
@@ -82,6 +82,7 @@ public JobInvocation invoke(
         executorService,
         pipeline,
         flinkOptions,
+        serverConfig.flinkConfDir,
         
detectClassPathResourcesToStage(FlinkJobInvoker.class.getClassLoader()));
   }
 }
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
index 024eee269324..de30eda2a8bc 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
@@ -25,6 +25,7 @@
 import java.nio.file.Paths;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
+import javax.annotation.Nullable;
 import org.apache.beam.model.pipeline.v1.Endpoints;
 import org.apache.beam.runners.fnexecution.GrpcFnServer;
 import org.apache.beam.runners.fnexecution.ServerFactory;
@@ -94,6 +95,20 @@ String getFlinkMasterUrl() {
     Long getSdkWorkerParallelism() {
       return this.sdkWorkerParallelism;
     }
+
+    @Option(
+      name = "--flink-conf-dir",
+      usage =
+          "Directory containing Flink YAML configuration files. "
+              + "These properties will be set to all jobs submitted to Flink 
and take precedence "
+              + "over configurations in FLINK_CONF_DIR."
+    )
+    String flinkConfDir = null;
+
+    @Nullable
+    String getFlinkConfDir() {
+      return flinkConfDir;
+    }
   }
 
   public static void main(String[] args) throws Exception {
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
index a8adc97edcfd..83b8908a11f3 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
@@ -105,9 +105,13 @@
    * {@link StreamExecutionEnvironment}.
    */
   public static StreamingTranslationContext createTranslationContext(
-      JobInfo jobInfo, FlinkPipelineOptions pipelineOptions, List<String> 
filesToStage) {
+      JobInfo jobInfo,
+      FlinkPipelineOptions pipelineOptions,
+      String confDir,
+      List<String> filesToStage) {
     StreamExecutionEnvironment executionEnvironment =
-        
FlinkExecutionEnvironments.createStreamExecutionEnvironment(pipelineOptions, 
filesToStage);
+        FlinkExecutionEnvironments.createStreamExecutionEnvironment(
+            pipelineOptions, filesToStage, confDir);
     return new StreamingTranslationContext(jobInfo, pipelineOptions, 
executionEnvironment);
   }
 
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java
index 74c595a9465d..e3198dc1bfb6 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java
@@ -84,7 +84,7 @@ public void shouldInferParallelismFromEnvironmentBatch() 
throws IOException {
 
   @Test
   public void shouldInferParallelismFromEnvironmentStreaming() throws 
IOException {
-    String flinkConfDir = extractFlinkConfig();
+    String confDir = extractFlinkConfig();
 
     FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
     options.setRunner(TestFlinkRunner.class);
@@ -92,7 +92,7 @@ public void shouldInferParallelismFromEnvironmentStreaming() 
throws IOException
 
     StreamExecutionEnvironment sev =
         FlinkExecutionEnvironments.createStreamExecutionEnvironment(
-            options, Collections.emptyList(), flinkConfDir);
+            options, Collections.emptyList(), confDir);
 
     assertThat(options.getParallelism(), is(23));
     assertThat(sev.getParallelism(), is(23));
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableExecutionTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableExecutionTest.java
index 0cef18d636c3..34985d75e568 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableExecutionTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableExecutionTest.java
@@ -139,6 +139,7 @@ public void process(ProcessContext ctx) {
             flinkJobExecutor,
             pipelineProto,
             options.as(FlinkPipelineOptions.class),
+            null,
             Collections.EMPTY_LIST);
     jobInvocation.start();
     long timeout = System.currentTimeMillis() + 60 * 1000;
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableStateExecutionTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableStateExecutionTest.java
index e94b2678ff90..05194d1c979b 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableStateExecutionTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableStateExecutionTest.java
@@ -181,6 +181,7 @@ private void performStateUpdates(
             flinkJobExecutor,
             pipelineProto,
             options.as(FlinkPipelineOptions.class),
+            null,
             Collections.emptyList());
 
     jobInvocation.start();
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java
index f3f04b0b3405..8fbb68652881 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java
@@ -181,6 +181,7 @@ public void processElement(ProcessContext context) {
             flinkJobExecutor,
             pipelineProto,
             options.as(FlinkPipelineOptions.class),
+            null,
             Collections.emptyList());
 
     jobInvocation.start();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 170070)
    Time Spent: 8h 40m  (was: 8.5h)

> Support flink config directory for flink runner.
> ------------------------------------------------
>
>                 Key: BEAM-6058
>                 URL: https://issues.apache.org/jira/browse/BEAM-6058
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>            Reporter: Ankur Goenka
>            Assignee: Ankur Goenka
>            Priority: Major
>          Time Spent: 8h 40m
>  Remaining Estimate: 0h
>
> Give user option to use flink configuration from a config directory.
> Earlier we only used jars to submit flink jobs which by default picks the 
> cluster flink properties.
> Portability uses  remote environment to submit flink job. In Remote 
> execution, flink uses the client config instead of the cluster config. This 
> require us to submit the custom flink properties via flink client.
> Suggestion is to read the flink config from a flink configuration folder and 
> set them to the environment.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to