[
https://issues.apache.org/jira/browse/BEAM-6058?focusedWorklogId=170070&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-170070
]
ASF GitHub Bot logged work on BEAM-6058:
----------------------------------------
Author: ASF GitHub Bot
Created on: 28/Nov/18 00:55
Start Date: 28/Nov/18 00:55
Worklog Time Spent: 10m
Work Description: tweise closed pull request #7031: [BEAM-6058] Adding
option for flink configuration directory and setting config in exectution
environment
URL: https://github.com/apache/beam/pull/7031
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/runners/flink/job-server/build.gradle
b/runners/flink/job-server/build.gradle
index 93ab802493f6..6a438ff8bb9f 100644
--- a/runners/flink/job-server/build.gradle
+++ b/runners/flink/job-server/build.gradle
@@ -65,6 +65,8 @@ runShadow {
args += ["--clean-artifacts-per-job"]
if (project.hasProperty('flinkMasterUrl'))
args += ["--flink-master-url=${project.property('flinkMasterUrl')}"]
+ if (project.hasProperty('flinkConfDir'))
+ args += ["--flink-conf-dir=${project.property('flinkConfDir')}"]
// Enable remote debugging.
jvmArgs = ["-Xdebug",
"-Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=5005"]
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
index 3f26d5e2d991..bc1ffd36b4c2 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
@@ -39,6 +39,7 @@
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;
+import javax.annotation.Nullable;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import
org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId;
import org.apache.beam.runners.core.construction.PTransformTranslation;
@@ -100,7 +101,7 @@
* FlinkBatchPortablePipelineTranslator translator =
* FlinkBatchPortablePipelineTranslator.createTranslator();
* BatchTranslationContext context =
- *
FlinkBatchPortablePipelineTranslator.createTranslationContext(jobInfo);
+ *
FlinkBatchPortablePipelineTranslator.createTranslationContext(jobInfo, confDir,
filesToStage);
* translator.translate(context, pipeline);
* ExecutionEnvironment executionEnvironment =
context.getExecutionEnvironment();
* // Do something with executionEnvironment...
@@ -118,9 +119,13 @@
* {@link ExecutionEnvironment}.
*/
public static BatchTranslationContext createTranslationContext(
- JobInfo jobInfo, FlinkPipelineOptions pipelineOptions, List<String>
filesToStage) {
+ JobInfo jobInfo,
+ FlinkPipelineOptions pipelineOptions,
+ @Nullable String confDir,
+ List<String> filesToStage) {
ExecutionEnvironment executionEnvironment =
-
FlinkExecutionEnvironments.createBatchExecutionEnvironment(pipelineOptions,
filesToStage);
+ FlinkExecutionEnvironments.createBatchExecutionEnvironment(
+ pipelineOptions, filesToStage, confDir);
return new BatchTranslationContext(jobInfo, pipelineOptions,
executionEnvironment);
}
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
index 8141e6c148bb..9802defeb1a4 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
@@ -49,18 +49,18 @@ public static ExecutionEnvironment
createBatchExecutionEnvironment(
return createBatchExecutionEnvironment(options, filesToStage, null);
}
- @VisibleForTesting
static ExecutionEnvironment createBatchExecutionEnvironment(
FlinkPipelineOptions options, List<String> filesToStage, @Nullable
String confDir) {
LOG.info("Creating a Batch Execution Environment.");
String masterUrl = options.getFlinkMaster();
+ Configuration flinkConfiguration = getFlinkConfiguration(confDir);
ExecutionEnvironment flinkBatchEnv;
// depending on the master, create the right environment.
if ("[local]".equals(masterUrl)) {
- flinkBatchEnv = ExecutionEnvironment.createLocalEnvironment();
+ flinkBatchEnv =
ExecutionEnvironment.createLocalEnvironment(flinkConfiguration);
} else if ("[collection]".equals(masterUrl)) {
flinkBatchEnv = new CollectionEnvironment();
} else if ("[auto]".equals(masterUrl)) {
@@ -71,6 +71,7 @@ static ExecutionEnvironment createBatchExecutionEnvironment(
ExecutionEnvironment.createRemoteEnvironment(
parts.get(0),
Integer.parseInt(parts.get(1)),
+ flinkConfiguration,
filesToStage.toArray(new String[filesToStage.size()]));
} else {
LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].",
masterUrl);
@@ -90,7 +91,8 @@ static ExecutionEnvironment createBatchExecutionEnvironment(
parallelism = 1;
} else {
parallelism =
- determineParallelism(options.getParallelism(),
flinkBatchEnv.getParallelism(), confDir);
+ determineParallelism(
+ options.getParallelism(), flinkBatchEnv.getParallelism(),
flinkConfiguration);
}
flinkBatchEnv.setParallelism(parallelism);
@@ -120,11 +122,12 @@ public static StreamExecutionEnvironment
createStreamExecutionEnvironment(
@VisibleForTesting
static StreamExecutionEnvironment createStreamExecutionEnvironment(
- FlinkPipelineOptions options, List<String> filesToStage, @Nullable
String flinkConfigDir) {
+ FlinkPipelineOptions options, List<String> filesToStage, @Nullable
String confDir) {
LOG.info("Creating a Streaming Environment.");
String masterUrl = options.getFlinkMaster();
+ Configuration flinkConfig = getFlinkConfiguration(confDir);
StreamExecutionEnvironment flinkStreamEnv = null;
// depending on the master, create the right environment.
@@ -134,13 +137,12 @@ static StreamExecutionEnvironment
createStreamExecutionEnvironment(
flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
} else if (masterUrl.matches(".*:\\d*")) {
List<String> parts = Splitter.on(':').splitToList(masterUrl);
- Configuration clientConfig = new Configuration();
- clientConfig.setInteger(RestOptions.PORT,
Integer.parseInt(parts.get(1)));
+ flinkConfig.setInteger(RestOptions.PORT, Integer.parseInt(parts.get(1)));
flinkStreamEnv =
StreamExecutionEnvironment.createRemoteEnvironment(
parts.get(0),
Integer.parseInt(parts.get(1)),
- clientConfig,
+ flinkConfig,
filesToStage.toArray(new String[filesToStage.size()]));
} else {
LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].",
masterUrl);
@@ -150,7 +152,7 @@ static StreamExecutionEnvironment
createStreamExecutionEnvironment(
// Set the parallelism, required by UnboundedSourceWrapper to generate
consistent splits.
final int parallelism =
determineParallelism(
- options.getParallelism(), flinkStreamEnv.getParallelism(),
flinkConfigDir);
+ options.getParallelism(), flinkStreamEnv.getParallelism(),
flinkConfig);
flinkStreamEnv.setParallelism(parallelism);
// set parallelism in the options (required by some execution code)
options.setParallelism(parallelism);
@@ -230,7 +232,7 @@ static StreamExecutionEnvironment
createStreamExecutionEnvironment(
private static int determineParallelism(
final int pipelineOptionsParallelism,
final int envParallelism,
- @Nullable String flinkConfDir) {
+ final Configuration configuration) {
if (pipelineOptionsParallelism > 0) {
return pipelineOptionsParallelism;
}
@@ -239,12 +241,6 @@ private static int determineParallelism(
return envParallelism;
}
- final Configuration configuration;
- if (flinkConfDir == null) {
- configuration = GlobalConfiguration.loadConfiguration();
- } else {
- configuration = GlobalConfiguration.loadConfiguration(flinkConfDir);
- }
final int flinkConfigParallelism =
configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM.key(), -1);
if (flinkConfigParallelism > 0) {
@@ -256,6 +252,12 @@ private static int determineParallelism(
return 1;
}
+ private static Configuration getFlinkConfiguration(@Nullable String
flinkConfDir) {
+ return flinkConfDir == null
+ ? GlobalConfiguration.loadConfiguration()
+ : GlobalConfiguration.loadConfiguration(flinkConfDir);
+ }
+
private static void applyLatencyTrackingInterval(
ExecutionConfig config, FlinkPipelineOptions options) {
long latencyTrackingInterval = options.getLatencyTrackingInterval();
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
index 4a7216be4630..b849770fa5ef 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
@@ -55,9 +55,10 @@ public static FlinkJobInvocation create(
ListeningExecutorService executorService,
Pipeline pipeline,
FlinkPipelineOptions pipelineOptions,
+ @Nullable String confDir,
List<String> filesToStage) {
return new FlinkJobInvocation(
- id, retrievalToken, executorService, pipeline, pipelineOptions,
filesToStage);
+ id, retrievalToken, executorService, pipeline, pipelineOptions,
confDir, filesToStage);
}
private final String id;
@@ -65,6 +66,7 @@ public static FlinkJobInvocation create(
private final ListeningExecutorService executorService;
private final RunnerApi.Pipeline pipeline;
private final FlinkPipelineOptions pipelineOptions;
+ private final String confDir;
private final List<String> filesToStage;
private JobState.Enum jobState;
private List<Consumer<JobState.Enum>> stateObservers;
@@ -78,12 +80,14 @@ private FlinkJobInvocation(
ListeningExecutorService executorService,
Pipeline pipeline,
FlinkPipelineOptions pipelineOptions,
+ @Nullable String confDir,
List<String> filesToStage) {
this.id = id;
this.retrievalToken = retrievalToken;
this.executorService = executorService;
this.pipeline = pipeline;
this.pipelineOptions = pipelineOptions;
+ this.confDir = confDir;
this.filesToStage = filesToStage;
this.invocationFuture = null;
this.jobState = JobState.Enum.STOPPED;
@@ -112,7 +116,7 @@ private PipelineResult runPipeline() throws Exception {
FlinkBatchPortablePipelineTranslator.createTranslator();
FlinkBatchPortablePipelineTranslator.BatchTranslationContext context =
FlinkBatchPortablePipelineTranslator.createTranslationContext(
- jobInfo, pipelineOptions, filesToStage);
+ jobInfo, pipelineOptions, confDir, filesToStage);
translator.translate(context, fusedPipeline);
result =
context.getExecutionEnvironment().execute(pipelineOptions.getJobName());
} else {
@@ -121,7 +125,7 @@ private PipelineResult runPipeline() throws Exception {
new FlinkStreamingPortablePipelineTranslator();
FlinkStreamingPortablePipelineTranslator.StreamingTranslationContext
context =
FlinkStreamingPortablePipelineTranslator.createTranslationContext(
- jobInfo, pipelineOptions, filesToStage);
+ jobInfo, pipelineOptions, confDir, filesToStage);
translator.translate(context, fusedPipeline);
result =
context.getExecutionEnvironment().execute(pipelineOptions.getJobName());
}
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java
index ecefed1dc25f..8c8b4e197926 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java
@@ -82,6 +82,7 @@ public JobInvocation invoke(
executorService,
pipeline,
flinkOptions,
+ serverConfig.flinkConfDir,
detectClassPathResourcesToStage(FlinkJobInvoker.class.getClassLoader()));
}
}
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
index 024eee269324..de30eda2a8bc 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
@@ -25,6 +25,7 @@
import java.nio.file.Paths;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
+import javax.annotation.Nullable;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.runners.fnexecution.GrpcFnServer;
import org.apache.beam.runners.fnexecution.ServerFactory;
@@ -94,6 +95,20 @@ String getFlinkMasterUrl() {
Long getSdkWorkerParallelism() {
return this.sdkWorkerParallelism;
}
+
+ @Option(
+ name = "--flink-conf-dir",
+ usage =
+ "Directory containing Flink YAML configuration files. "
+ + "These properties will be set to all jobs submitted to Flink
and take precedence "
+ + "over configurations in FLINK_CONF_DIR."
+ )
+ String flinkConfDir = null;
+
+ @Nullable
+ String getFlinkConfDir() {
+ return flinkConfDir;
+ }
}
public static void main(String[] args) throws Exception {
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
index a8adc97edcfd..83b8908a11f3 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
@@ -105,9 +105,13 @@
* {@link StreamExecutionEnvironment}.
*/
public static StreamingTranslationContext createTranslationContext(
- JobInfo jobInfo, FlinkPipelineOptions pipelineOptions, List<String>
filesToStage) {
+ JobInfo jobInfo,
+ FlinkPipelineOptions pipelineOptions,
+ String confDir,
+ List<String> filesToStage) {
StreamExecutionEnvironment executionEnvironment =
-
FlinkExecutionEnvironments.createStreamExecutionEnvironment(pipelineOptions,
filesToStage);
+ FlinkExecutionEnvironments.createStreamExecutionEnvironment(
+ pipelineOptions, filesToStage, confDir);
return new StreamingTranslationContext(jobInfo, pipelineOptions,
executionEnvironment);
}
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java
index 74c595a9465d..e3198dc1bfb6 100644
---
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java
@@ -84,7 +84,7 @@ public void shouldInferParallelismFromEnvironmentBatch()
throws IOException {
@Test
public void shouldInferParallelismFromEnvironmentStreaming() throws
IOException {
- String flinkConfDir = extractFlinkConfig();
+ String confDir = extractFlinkConfig();
FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
options.setRunner(TestFlinkRunner.class);
@@ -92,7 +92,7 @@ public void shouldInferParallelismFromEnvironmentStreaming()
throws IOException
StreamExecutionEnvironment sev =
FlinkExecutionEnvironments.createStreamExecutionEnvironment(
- options, Collections.emptyList(), flinkConfDir);
+ options, Collections.emptyList(), confDir);
assertThat(options.getParallelism(), is(23));
assertThat(sev.getParallelism(), is(23));
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableExecutionTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableExecutionTest.java
index 0cef18d636c3..34985d75e568 100644
---
a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableExecutionTest.java
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableExecutionTest.java
@@ -139,6 +139,7 @@ public void process(ProcessContext ctx) {
flinkJobExecutor,
pipelineProto,
options.as(FlinkPipelineOptions.class),
+ null,
Collections.EMPTY_LIST);
jobInvocation.start();
long timeout = System.currentTimeMillis() + 60 * 1000;
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableStateExecutionTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableStateExecutionTest.java
index e94b2678ff90..05194d1c979b 100644
---
a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableStateExecutionTest.java
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableStateExecutionTest.java
@@ -181,6 +181,7 @@ private void performStateUpdates(
flinkJobExecutor,
pipelineProto,
options.as(FlinkPipelineOptions.class),
+ null,
Collections.emptyList());
jobInvocation.start();
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java
index f3f04b0b3405..8fbb68652881 100644
---
a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java
@@ -181,6 +181,7 @@ public void processElement(ProcessContext context) {
flinkJobExecutor,
pipelineProto,
options.as(FlinkPipelineOptions.class),
+ null,
Collections.emptyList());
jobInvocation.start();
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 170070)
Time Spent: 8h 40m (was: 8.5h)
> Support flink config directory for flink runner.
> ------------------------------------------------
>
> Key: BEAM-6058
> URL: https://issues.apache.org/jira/browse/BEAM-6058
> Project: Beam
> Issue Type: Bug
> Components: runner-flink
> Reporter: Ankur Goenka
> Assignee: Ankur Goenka
> Priority: Major
> Time Spent: 8h 40m
> Remaining Estimate: 0h
>
> Give user option to use flink configuration from a config directory.
> Earlier we only used jars to submit flink jobs which by default picks the
> cluster flink properties.
> Portability uses remote environment to submit flink job. In Remote
> execution, flink uses the client config instead of the cluster config. This
> require us to submit the custom flink properties via flink client.
> Suggestion is to read the flink config from a flink configuration folder and
> set them to the environment.
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)