[jira] [Work logged] (BEAM-3089) Issue with setting the parallelism at client level using Flink runner
[ https://issues.apache.org/jira/browse/BEAM-3089?focusedWorklogId=145752=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-145752 ] ASF GitHub Bot logged work on BEAM-3089: Author: ASF GitHub Bot Created on: 19/Sep/18 18:16 Start Date: 19/Sep/18 18:16 Worklog Time Spent: 10m Work Description: mxm commented on issue #6426: [BEAM-3089] Fix default values in FlinkPipelineOptions / Add tests URL: https://github.com/apache/beam/pull/6426#issuecomment-422906216 @tweise As far as I see we have all the checkpointing related options already exposed. The problem is indeed that there might be more options that users want to configure. It could make sense to expose an interface to configure all Flink options. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 145752) Time Spent: 3h (was: 2h 50m) > Issue with setting the parallelism at client level using Flink runner > - > > Key: BEAM-3089 > URL: https://issues.apache.org/jira/browse/BEAM-3089 > Project: Beam > Issue Type: Bug > Components: runner-flink >Affects Versions: 2.0.0 > Environment: I am using Flink 1.2.1 running on Docker, with Task > Managers distributed across different VMs as part of a Docker Swarm. >Reporter: Thalita Vergilio >Assignee: Grzegorz Kołakowski >Priority: Major > Labels: docker, flink, parallel-deployment > Fix For: 2.8.0 > > Attachments: flink-ui-parallelism.png > > Time Spent: 3h > Remaining Estimate: 0h > > When uploading an Apache Beam application using the Flink Web UI, the > parallelism set at job submission doesn't get picked up. The same happens > when submitting a job using the Flink CLI. > In both cases, the parallelism ends up defaulting to 1. > When I set the parallelism programmatically within the Apache Beam code, it > works: {{flinkPipelineOptions.setParallelism(4);}} > I suspect the root of the problem may be in the > org.apache.beam.runners.flink.DefaultParallelismFactory class, as it checks > for Flink's GlobalConfiguration, which may not pick up runtime values passed > to Flink, then defaults to 1 if it doesn't find anything. > Any ideas on how this could be fixed or worked around? I need to be able to > change the parallelism dynamically, so the programmatic approach won't really > work for me, nor will setting the Flink configuration at system level. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3089) Issue with setting the parallelism at client level using Flink runner
[ https://issues.apache.org/jira/browse/BEAM-3089?focusedWorklogId=145731=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-145731 ] ASF GitHub Bot logged work on BEAM-3089: Author: ASF GitHub Bot Created on: 19/Sep/18 17:19 Start Date: 19/Sep/18 17:19 Worklog Time Spent: 10m Work Description: tweise closed pull request #6426: [BEAM-3089] Fix default values in FlinkPipelineOptions / Add tests URL: https://github.com/apache/beam/pull/6426 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/DefaultParallelismFactory.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/DefaultParallelismFactory.java deleted file mode 100644 index d448bed2333..000 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/DefaultParallelismFactory.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -import org.apache.beam.sdk.options.DefaultValueFactory; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.GlobalConfiguration; - -/** - * {@link DefaultValueFactory} for getting a default value for the parallelism option on {@link - * FlinkPipelineOptions}. - * - * This will return either the default value from {@link GlobalConfiguration} or {@code 1}. A - * valid {@link GlobalConfiguration} is only available if the program is executed by the Flink run - * scripts. - */ -public class DefaultParallelismFactory implements DefaultValueFactory { - @Override - public Integer create(PipelineOptions options) { -return GlobalConfiguration.loadConfiguration() -.getInteger(ConfigConstants.DEFAULT_PARALLELISM_KEY, 1); - } -} diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java index 4ace1eccc37..40a8d51ee40 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java @@ -17,12 +17,16 @@ */ package org.apache.beam.runners.flink; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Splitter; import java.util.List; +import javax.annotation.Nullable; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.java.CollectionEnvironment; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.RestOptions; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.streaming.api.TimeCharacteristic; @@ -42,6 +46,12 @@ */ public static ExecutionEnvironment createBatchExecutionEnvironment( FlinkPipelineOptions options, List filesToStage) { +return createBatchExecutionEnvironment(options, filesToStage, null); + } + + @VisibleForTesting + static ExecutionEnvironment createBatchExecutionEnvironment( + FlinkPipelineOptions options, List filesToStage, @Nullable String confDir) { LOG.info("Creating a Batch Execution Environment."); @@ -71,9 +81,18 @@ public static ExecutionEnvironment createBatchExecutionEnvironment( if (options.getParallelism() != -1 && !(flinkBatchEnv instanceof CollectionEnvironment)) { flinkBatchEnv.setParallelism(options.getParallelism()); } +// Set the correct parallelism, required by UnboundedSourceWrapper to generate consistent splits. +final int parallelism; +if (flinkBatchEnv instanceof CollectionEnvironment) { +
[jira] [Work logged] (BEAM-3089) Issue with setting the parallelism at client level using Flink runner
[ https://issues.apache.org/jira/browse/BEAM-3089?focusedWorklogId=145658=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-145658 ] ASF GitHub Bot logged work on BEAM-3089: Author: ASF GitHub Bot Created on: 19/Sep/18 13:41 Start Date: 19/Sep/18 13:41 Worklog Time Spent: 10m Work Description: mxm commented on issue #6426: [BEAM-3089] Fix default values in FlinkPipelineOptions / Add tests URL: https://github.com/apache/beam/pull/6426#issuecomment-422808148 Thanks for the review @angoenka. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 145658) Time Spent: 2h 40m (was: 2.5h) > Issue with setting the parallelism at client level using Flink runner > - > > Key: BEAM-3089 > URL: https://issues.apache.org/jira/browse/BEAM-3089 > Project: Beam > Issue Type: Bug > Components: runner-flink >Affects Versions: 2.0.0 > Environment: I am using Flink 1.2.1 running on Docker, with Task > Managers distributed across different VMs as part of a Docker Swarm. >Reporter: Thalita Vergilio >Assignee: Grzegorz Kołakowski >Priority: Major > Labels: docker, flink, parallel-deployment > Fix For: 2.8.0 > > Attachments: flink-ui-parallelism.png > > Time Spent: 2h 40m > Remaining Estimate: 0h > > When uploading an Apache Beam application using the Flink Web UI, the > parallelism set at job submission doesn't get picked up. The same happens > when submitting a job using the Flink CLI. > In both cases, the parallelism ends up defaulting to 1. > When I set the parallelism programmatically within the Apache Beam code, it > works: {{flinkPipelineOptions.setParallelism(4);}} > I suspect the root of the problem may be in the > org.apache.beam.runners.flink.DefaultParallelismFactory class, as it checks > for Flink's GlobalConfiguration, which may not pick up runtime values passed > to Flink, then defaults to 1 if it doesn't find anything. > Any ideas on how this could be fixed or worked around? I need to be able to > change the parallelism dynamically, so the programmatic approach won't really > work for me, nor will setting the Flink configuration at system level. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3089) Issue with setting the parallelism at client level using Flink runner
[ https://issues.apache.org/jira/browse/BEAM-3089?focusedWorklogId=145631=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-145631 ] ASF GitHub Bot logged work on BEAM-3089: Author: ASF GitHub Bot Created on: 19/Sep/18 11:47 Start Date: 19/Sep/18 11:47 Worklog Time Spent: 10m Work Description: mxm commented on a change in pull request #6426: [BEAM-3089] Fix default values in FlinkPipelineOptions / Add tests URL: https://github.com/apache/beam/pull/6426#discussion_r218771491 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java ## @@ -156,9 +181,11 @@ public static StreamExecutionEnvironment createStreamExecutionEnvironment( throw new IllegalArgumentException("The checkpoint interval must be positive"); } flinkStreamEnv.enableCheckpointing(checkpointInterval, options.getCheckpointingMode()); - flinkStreamEnv - .getCheckpointConfig() - .setCheckpointTimeout(options.getCheckpointTimeoutMillis()); + if (options.getCheckpointTimeoutMillis() != -1) { Review comment: -1 in Flink means disabled. Anything else should be applied, even other negative numbers which will raise an exception. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 145631) Time Spent: 2.5h (was: 2h 20m) > Issue with setting the parallelism at client level using Flink runner > - > > Key: BEAM-3089 > URL: https://issues.apache.org/jira/browse/BEAM-3089 > Project: Beam > Issue Type: Bug > Components: runner-flink >Affects Versions: 2.0.0 > Environment: I am using Flink 1.2.1 running on Docker, with Task > Managers distributed across different VMs as part of a Docker Swarm. >Reporter: Thalita Vergilio >Assignee: Grzegorz Kołakowski >Priority: Major > Labels: docker, flink, parallel-deployment > Fix For: 2.8.0 > > Attachments: flink-ui-parallelism.png > > Time Spent: 2.5h > Remaining Estimate: 0h > > When uploading an Apache Beam application using the Flink Web UI, the > parallelism set at job submission doesn't get picked up. The same happens > when submitting a job using the Flink CLI. > In both cases, the parallelism ends up defaulting to 1. > When I set the parallelism programmatically within the Apache Beam code, it > works: {{flinkPipelineOptions.setParallelism(4);}} > I suspect the root of the problem may be in the > org.apache.beam.runners.flink.DefaultParallelismFactory class, as it checks > for Flink's GlobalConfiguration, which may not pick up runtime values passed > to Flink, then defaults to 1 if it doesn't find anything. > Any ideas on how this could be fixed or worked around? I need to be able to > change the parallelism dynamically, so the programmatic approach won't really > work for me, nor will setting the Flink configuration at system level. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3089) Issue with setting the parallelism at client level using Flink runner
[ https://issues.apache.org/jira/browse/BEAM-3089?focusedWorklogId=145630=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-145630 ] ASF GitHub Bot logged work on BEAM-3089: Author: ASF GitHub Bot Created on: 19/Sep/18 11:47 Start Date: 19/Sep/18 11:47 Worklog Time Spent: 10m Work Description: mxm commented on a change in pull request #6426: [BEAM-3089] Fix default values in FlinkPipelineOptions / Add tests URL: https://github.com/apache/beam/pull/6426#discussion_r218771485 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java ## @@ -56,12 +56,13 @@ "Address of the Flink Master where the Pipeline should be executed. Can" + " either be of the form \"host:port\" or one of the special values [local], " + "[collection] or [auto].") + @Default.String("[auto]") String getFlinkMaster(); void setFlinkMaster(String value); @Description("The degree of parallelism to be used when distributing operations onto workers.") - @Default.InstanceFactory(DefaultParallelismFactory.class) + @Default.Integer(-1) Review comment: Updated. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 145630) Time Spent: 2h 20m (was: 2h 10m) > Issue with setting the parallelism at client level using Flink runner > - > > Key: BEAM-3089 > URL: https://issues.apache.org/jira/browse/BEAM-3089 > Project: Beam > Issue Type: Bug > Components: runner-flink >Affects Versions: 2.0.0 > Environment: I am using Flink 1.2.1 running on Docker, with Task > Managers distributed across different VMs as part of a Docker Swarm. >Reporter: Thalita Vergilio >Assignee: Grzegorz Kołakowski >Priority: Major > Labels: docker, flink, parallel-deployment > Fix For: 2.8.0 > > Attachments: flink-ui-parallelism.png > > Time Spent: 2h 20m > Remaining Estimate: 0h > > When uploading an Apache Beam application using the Flink Web UI, the > parallelism set at job submission doesn't get picked up. The same happens > when submitting a job using the Flink CLI. > In both cases, the parallelism ends up defaulting to 1. > When I set the parallelism programmatically within the Apache Beam code, it > works: {{flinkPipelineOptions.setParallelism(4);}} > I suspect the root of the problem may be in the > org.apache.beam.runners.flink.DefaultParallelismFactory class, as it checks > for Flink's GlobalConfiguration, which may not pick up runtime values passed > to Flink, then defaults to 1 if it doesn't find anything. > Any ideas on how this could be fixed or worked around? I need to be able to > change the parallelism dynamically, so the programmatic approach won't really > work for me, nor will setting the Flink configuration at system level. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3089) Issue with setting the parallelism at client level using Flink runner
[ https://issues.apache.org/jira/browse/BEAM-3089?focusedWorklogId=145589=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-145589 ] ASF GitHub Bot logged work on BEAM-3089: Author: ASF GitHub Bot Created on: 19/Sep/18 08:10 Start Date: 19/Sep/18 08:10 Worklog Time Spent: 10m Work Description: mxm commented on issue #6426: [BEAM-3089] Fix default values in FlinkPipelineOptions / Add tests URL: https://github.com/apache/beam/pull/6426#issuecomment-422701950 Retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 145589) Time Spent: 2h 10m (was: 2h) > Issue with setting the parallelism at client level using Flink runner > - > > Key: BEAM-3089 > URL: https://issues.apache.org/jira/browse/BEAM-3089 > Project: Beam > Issue Type: Bug > Components: runner-flink >Affects Versions: 2.0.0 > Environment: I am using Flink 1.2.1 running on Docker, with Task > Managers distributed across different VMs as part of a Docker Swarm. >Reporter: Thalita Vergilio >Assignee: Grzegorz Kołakowski >Priority: Major > Labels: docker, flink, parallel-deployment > Fix For: 2.8.0 > > Attachments: flink-ui-parallelism.png > > Time Spent: 2h 10m > Remaining Estimate: 0h > > When uploading an Apache Beam application using the Flink Web UI, the > parallelism set at job submission doesn't get picked up. The same happens > when submitting a job using the Flink CLI. > In both cases, the parallelism ends up defaulting to 1. > When I set the parallelism programmatically within the Apache Beam code, it > works: {{flinkPipelineOptions.setParallelism(4);}} > I suspect the root of the problem may be in the > org.apache.beam.runners.flink.DefaultParallelismFactory class, as it checks > for Flink's GlobalConfiguration, which may not pick up runtime values passed > to Flink, then defaults to 1 if it doesn't find anything. > Any ideas on how this could be fixed or worked around? I need to be able to > change the parallelism dynamically, so the programmatic approach won't really > work for me, nor will setting the Flink configuration at system level. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3089) Issue with setting the parallelism at client level using Flink runner
[ https://issues.apache.org/jira/browse/BEAM-3089?focusedWorklogId=145459=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-145459 ] ASF GitHub Bot logged work on BEAM-3089: Author: ASF GitHub Bot Created on: 18/Sep/18 20:03 Start Date: 18/Sep/18 20:03 Worklog Time Spent: 10m Work Description: angoenka commented on a change in pull request #6426: [BEAM-3089] Fix default values in FlinkPipelineOptions / Add tests URL: https://github.com/apache/beam/pull/6426#discussion_r218573541 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java ## @@ -56,12 +56,13 @@ "Address of the Flink Master where the Pipeline should be executed. Can" + " either be of the form \"host:port\" or one of the special values [local], " + "[collection] or [auto].") + @Default.String("[auto]") String getFlinkMaster(); void setFlinkMaster(String value); @Description("The degree of parallelism to be used when distributing operations onto workers.") - @Default.InstanceFactory(DefaultParallelismFactory.class) + @Default.Integer(-1) Review comment: Nit: update the description to signify <= 0 meaning This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 145459) Time Spent: 2h (was: 1h 50m) > Issue with setting the parallelism at client level using Flink runner > - > > Key: BEAM-3089 > URL: https://issues.apache.org/jira/browse/BEAM-3089 > Project: Beam > Issue Type: Bug > Components: runner-flink >Affects Versions: 2.0.0 > Environment: I am using Flink 1.2.1 running on Docker, with Task > Managers distributed across different VMs as part of a Docker Swarm. >Reporter: Thalita Vergilio >Assignee: Grzegorz Kołakowski >Priority: Major > Labels: docker, flink, parallel-deployment > Fix For: 2.8.0 > > Attachments: flink-ui-parallelism.png > > Time Spent: 2h > Remaining Estimate: 0h > > When uploading an Apache Beam application using the Flink Web UI, the > parallelism set at job submission doesn't get picked up. The same happens > when submitting a job using the Flink CLI. > In both cases, the parallelism ends up defaulting to 1. > When I set the parallelism programmatically within the Apache Beam code, it > works: {{flinkPipelineOptions.setParallelism(4);}} > I suspect the root of the problem may be in the > org.apache.beam.runners.flink.DefaultParallelismFactory class, as it checks > for Flink's GlobalConfiguration, which may not pick up runtime values passed > to Flink, then defaults to 1 if it doesn't find anything. > Any ideas on how this could be fixed or worked around? I need to be able to > change the parallelism dynamically, so the programmatic approach won't really > work for me, nor will setting the Flink configuration at system level. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3089) Issue with setting the parallelism at client level using Flink runner
[ https://issues.apache.org/jira/browse/BEAM-3089?focusedWorklogId=145457=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-145457 ] ASF GitHub Bot logged work on BEAM-3089: Author: ASF GitHub Bot Created on: 18/Sep/18 20:03 Start Date: 18/Sep/18 20:03 Worklog Time Spent: 10m Work Description: angoenka commented on a change in pull request #6426: [BEAM-3089] Fix default values in FlinkPipelineOptions / Add tests URL: https://github.com/apache/beam/pull/6426#discussion_r218574954 ## File path: runners/flink/src/test/resources/flink-conf.yaml ## @@ -0,0 +1 @@ +parallelism.default: 23 Review comment: nit: new line This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 145457) Time Spent: 1h 50m (was: 1h 40m) > Issue with setting the parallelism at client level using Flink runner > - > > Key: BEAM-3089 > URL: https://issues.apache.org/jira/browse/BEAM-3089 > Project: Beam > Issue Type: Bug > Components: runner-flink >Affects Versions: 2.0.0 > Environment: I am using Flink 1.2.1 running on Docker, with Task > Managers distributed across different VMs as part of a Docker Swarm. >Reporter: Thalita Vergilio >Assignee: Grzegorz Kołakowski >Priority: Major > Labels: docker, flink, parallel-deployment > Fix For: 2.8.0 > > Attachments: flink-ui-parallelism.png > > Time Spent: 1h 50m > Remaining Estimate: 0h > > When uploading an Apache Beam application using the Flink Web UI, the > parallelism set at job submission doesn't get picked up. The same happens > when submitting a job using the Flink CLI. > In both cases, the parallelism ends up defaulting to 1. > When I set the parallelism programmatically within the Apache Beam code, it > works: {{flinkPipelineOptions.setParallelism(4);}} > I suspect the root of the problem may be in the > org.apache.beam.runners.flink.DefaultParallelismFactory class, as it checks > for Flink's GlobalConfiguration, which may not pick up runtime values passed > to Flink, then defaults to 1 if it doesn't find anything. > Any ideas on how this could be fixed or worked around? I need to be able to > change the parallelism dynamically, so the programmatic approach won't really > work for me, nor will setting the Flink configuration at system level. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3089) Issue with setting the parallelism at client level using Flink runner
[ https://issues.apache.org/jira/browse/BEAM-3089?focusedWorklogId=145458=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-145458 ] ASF GitHub Bot logged work on BEAM-3089: Author: ASF GitHub Bot Created on: 18/Sep/18 20:03 Start Date: 18/Sep/18 20:03 Worklog Time Spent: 10m Work Description: angoenka commented on a change in pull request #6426: [BEAM-3089] Fix default values in FlinkPipelineOptions / Add tests URL: https://github.com/apache/beam/pull/6426#discussion_r218574267 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java ## @@ -156,9 +181,11 @@ public static StreamExecutionEnvironment createStreamExecutionEnvironment( throw new IllegalArgumentException("The checkpoint interval must be positive"); } flinkStreamEnv.enableCheckpointing(checkpointInterval, options.getCheckpointingMode()); - flinkStreamEnv - .getCheckpointConfig() - .setCheckpointTimeout(options.getCheckpointTimeoutMillis()); + if (options.getCheckpointTimeoutMillis() != -1) { Review comment: nit: < 0 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 145458) > Issue with setting the parallelism at client level using Flink runner > - > > Key: BEAM-3089 > URL: https://issues.apache.org/jira/browse/BEAM-3089 > Project: Beam > Issue Type: Bug > Components: runner-flink >Affects Versions: 2.0.0 > Environment: I am using Flink 1.2.1 running on Docker, with Task > Managers distributed across different VMs as part of a Docker Swarm. >Reporter: Thalita Vergilio >Assignee: Grzegorz Kołakowski >Priority: Major > Labels: docker, flink, parallel-deployment > Fix For: 2.8.0 > > Attachments: flink-ui-parallelism.png > > Time Spent: 1h 50m > Remaining Estimate: 0h > > When uploading an Apache Beam application using the Flink Web UI, the > parallelism set at job submission doesn't get picked up. The same happens > when submitting a job using the Flink CLI. > In both cases, the parallelism ends up defaulting to 1. > When I set the parallelism programmatically within the Apache Beam code, it > works: {{flinkPipelineOptions.setParallelism(4);}} > I suspect the root of the problem may be in the > org.apache.beam.runners.flink.DefaultParallelismFactory class, as it checks > for Flink's GlobalConfiguration, which may not pick up runtime values passed > to Flink, then defaults to 1 if it doesn't find anything. > Any ideas on how this could be fixed or worked around? I need to be able to > change the parallelism dynamically, so the programmatic approach won't really > work for me, nor will setting the Flink configuration at system level. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3089) Issue with setting the parallelism at client level using Flink runner
[ https://issues.apache.org/jira/browse/BEAM-3089?focusedWorklogId=145432=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-145432 ] ASF GitHub Bot logged work on BEAM-3089: Author: ASF GitHub Bot Created on: 18/Sep/18 18:46 Start Date: 18/Sep/18 18:46 Worklog Time Spent: 10m Work Description: mxm commented on issue #6426: [BEAM-3089] Fix default values in FlinkPipelineOptions / Add tests URL: https://github.com/apache/beam/pull/6426#issuecomment-422504818 CC @tweise @angoenka There are a couple of commits. The first commit fixes the parallelism issue that users reported with BEAM-3089. While fixing this I discovered that we don't have proper tests for the pipeline options and some of them are not set ideally. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 145432) Time Spent: 1h 40m (was: 1.5h) > Issue with setting the parallelism at client level using Flink runner > - > > Key: BEAM-3089 > URL: https://issues.apache.org/jira/browse/BEAM-3089 > Project: Beam > Issue Type: Bug > Components: runner-flink >Affects Versions: 2.0.0 > Environment: I am using Flink 1.2.1 running on Docker, with Task > Managers distributed across different VMs as part of a Docker Swarm. >Reporter: Thalita Vergilio >Assignee: Grzegorz Kołakowski >Priority: Major > Labels: docker, flink, parallel-deployment > Fix For: 2.8.0 > > Attachments: flink-ui-parallelism.png > > Time Spent: 1h 40m > Remaining Estimate: 0h > > When uploading an Apache Beam application using the Flink Web UI, the > parallelism set at job submission doesn't get picked up. The same happens > when submitting a job using the Flink CLI. > In both cases, the parallelism ends up defaulting to 1. > When I set the parallelism programmatically within the Apache Beam code, it > works: {{flinkPipelineOptions.setParallelism(4);}} > I suspect the root of the problem may be in the > org.apache.beam.runners.flink.DefaultParallelismFactory class, as it checks > for Flink's GlobalConfiguration, which may not pick up runtime values passed > to Flink, then defaults to 1 if it doesn't find anything. > Any ideas on how this could be fixed or worked around? I need to be able to > change the parallelism dynamically, so the programmatic approach won't really > work for me, nor will setting the Flink configuration at system level. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3089) Issue with setting the parallelism at client level using Flink runner
[ https://issues.apache.org/jira/browse/BEAM-3089?focusedWorklogId=145308=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-145308 ] ASF GitHub Bot logged work on BEAM-3089: Author: ASF GitHub Bot Created on: 18/Sep/18 14:42 Start Date: 18/Sep/18 14:42 Worklog Time Spent: 10m Work Description: mxm opened a new pull request #6426: [BEAM-3089] Fix default values in FlinkPipelineOptions / Add tests URL: https://github.com/apache/beam/pull/6426 - [BEAM-3089] Use Flink cluster parallelism if no parallelism provided The Runner always defaulted to 1, even if the Flink cluster had a default parallelism set. With this patch, when no parallelism has been provided, the cluster default will be used. This was working before, it is a regression of cdd2544. - [BEAM-3089] Add test for FlinkExecutionEnvironments - Revert default checkpointing mode to EXACTLY_ONCE - Use default for checkpoint timeout - [BEAM-3089] Test default values of FlinkPipelineOptions - Set default master url to [auto] Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/) | --- | --- | --- | --- | --- | --- Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/) [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | --- | --- | --- | --- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 145308) Time Spent: 1.5h (was: 1h 20m) > Issue with setting the parallelism at client level using Flink runner > - > > Key: BEAM-3089 > URL: https://issues.apache.org/jira/browse/BEAM-3089 > Project: Beam > Issue Type: Bug > Components: runner-flink >Affects Versions: 2.0.0 > Environment: I am using Flink 1.2.1 running on Docker, with Task > Managers distributed across different VMs as part of a Docker Swarm. >Reporter: Thalita Vergilio >Assignee: Grzegorz Kołakowski >Priority: Major >
[jira] [Work logged] (BEAM-3089) Issue with setting the parallelism at client level using Flink runner
[ https://issues.apache.org/jira/browse/BEAM-3089?focusedWorklogId=111771=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111771 ] ASF GitHub Bot logged work on BEAM-3089: Author: ASF GitHub Bot Created on: 14/Jun/18 04:55 Start Date: 14/Jun/18 04:55 Worklog Time Spent: 10m Work Description: stale[bot] closed pull request #4766: [BEAM-3089] Fix job parallelism resolution URL: https://github.com/apache/beam/pull/4766 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/runners/flink/build.gradle b/runners/flink/build.gradle index d6673ae246d..1ed3ac32ad8 100644 --- a/runners/flink/build.gradle +++ b/runners/flink/build.gradle @@ -93,7 +93,11 @@ def createValidatesRunnerTask(Map m) { group = "Verification" def runnerType = config.streaming ? "streaming" : "batch" description = "Validates the ${runnerType} runner" -def pipelineOptions = JsonOutput.toJson(["--runner=TestFlinkRunner", "--streaming=${config.streaming}"]) +def pipelineOptions = JsonOutput.toJson([ +"--runner=TestFlinkRunner", +"--streaming=${config.streaming}", +"--parallelism=1" +]) systemProperty "beamTestPipelineOptions", pipelineOptions classpath = configurations.validatesRunner testClassesDirs = files(project(":sdks:java:core").sourceSets.test.output.classesDirs) diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml index 610bc9d200d..dac092ee44c 100644 --- a/runners/flink/pom.xml +++ b/runners/flink/pom.xml @@ -72,7 +72,8 @@ [ "--runner=TestFlinkRunner", - "--streaming=false" + "--streaming=false", + "--parallelism=1" ] @@ -104,7 +105,8 @@ [ "--runner=TestFlinkRunner", - "--streaming=true" + "--streaming=true", + "--parallelism=1" ] diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/DefaultParallelismFactory.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/DefaultParallelismFactory.java deleted file mode 100644 index b745f0bd441..000 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/DefaultParallelismFactory.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -import org.apache.beam.sdk.options.DefaultValueFactory; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.GlobalConfiguration; - -/** - * {@link DefaultValueFactory} for getting a default value for the parallelism option - * on {@link FlinkPipelineOptions}. - * - * This will return either the default value from {@link GlobalConfiguration} or {@code 1}. - * A valid {@link GlobalConfiguration} is only available if the program is executed by the Flink - * run scripts. - */ -public class DefaultParallelismFactory implements DefaultValueFactory { - @Override - public Integer create(PipelineOptions options) { -return GlobalConfiguration.loadConfiguration() -.getInteger(ConfigConstants.DEFAULT_PARALLELISM_KEY, 1); - } -} diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java index b2cbefbc5b0..51c81650ba9 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java @@ -60,7 +60,7 @@ void setFlinkMaster(String
[jira] [Work logged] (BEAM-3089) Issue with setting the parallelism at client level using Flink runner
[ https://issues.apache.org/jira/browse/BEAM-3089?focusedWorklogId=109612=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-109612 ] ASF GitHub Bot logged work on BEAM-3089: Author: ASF GitHub Bot Created on: 07/Jun/18 04:01 Start Date: 07/Jun/18 04:01 Worklog Time Spent: 10m Work Description: stale[bot] commented on issue #4766: [BEAM-3089] Fix job parallelism resolution URL: https://github.com/apache/beam/pull/4766#issuecomment-395285650 This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the d...@beam.apache.org list. Thank you for your contributions. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 109612) Time Spent: 1h 10m (was: 1h) > Issue with setting the parallelism at client level using Flink runner > - > > Key: BEAM-3089 > URL: https://issues.apache.org/jira/browse/BEAM-3089 > Project: Beam > Issue Type: Bug > Components: runner-flink >Affects Versions: 2.0.0 > Environment: I am using Flink 1.2.1 running on Docker, with Task > Managers distributed across different VMs as part of a Docker Swarm. >Reporter: Thalita Vergilio >Assignee: Grzegorz Kołakowski >Priority: Major > Labels: docker, flink, parallel-deployment > Attachments: flink-ui-parallelism.png > > Time Spent: 1h 10m > Remaining Estimate: 0h > > When uploading an Apache Beam application using the Flink Web UI, the > parallelism set at job submission doesn't get picked up. The same happens > when submitting a job using the Flink CLI. > In both cases, the parallelism ends up defaulting to 1. > When I set the parallelism programmatically within the Apache Beam code, it > works: {{flinkPipelineOptions.setParallelism(4);}} > I suspect the root of the problem may be in the > org.apache.beam.runners.flink.DefaultParallelismFactory class, as it checks > for Flink's GlobalConfiguration, which may not pick up runtime values passed > to Flink, then defaults to 1 if it doesn't find anything. > Any ideas on how this could be fixed or worked around? I need to be able to > change the parallelism dynamically, so the programmatic approach won't really > work for me, nor will setting the Flink configuration at system level. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3089) Issue with setting the parallelism at client level using Flink runner
[ https://issues.apache.org/jira/browse/BEAM-3089?focusedWorklogId=83148=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83148 ] ASF GitHub Bot logged work on BEAM-3089: Author: ASF GitHub Bot Created on: 22/Mar/18 12:02 Start Date: 22/Mar/18 12:02 Worklog Time Spent: 10m Work Description: grzegorz8 commented on issue #4766: [BEAM-3089] Fix job parallelism resolution URL: https://github.com/apache/beam/pull/4766#issuecomment-375280490 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 83148) Time Spent: 1h (was: 50m) > Issue with setting the parallelism at client level using Flink runner > - > > Key: BEAM-3089 > URL: https://issues.apache.org/jira/browse/BEAM-3089 > Project: Beam > Issue Type: Bug > Components: runner-flink >Affects Versions: 2.0.0 > Environment: I am using Flink 1.2.1 running on Docker, with Task > Managers distributed across different VMs as part of a Docker Swarm. >Reporter: Thalita Vergilio >Assignee: Grzegorz Kołakowski >Priority: Major > Labels: docker, flink, parallel-deployment > Attachments: flink-ui-parallelism.png > > Time Spent: 1h > Remaining Estimate: 0h > > When uploading an Apache Beam application using the Flink Web UI, the > parallelism set at job submission doesn't get picked up. The same happens > when submitting a job using the Flink CLI. > In both cases, the parallelism ends up defaulting to 1. > When I set the parallelism programmatically within the Apache Beam code, it > works: {{flinkPipelineOptions.setParallelism(4);}} > I suspect the root of the problem may be in the > org.apache.beam.runners.flink.DefaultParallelismFactory class, as it checks > for Flink's GlobalConfiguration, which may not pick up runtime values passed > to Flink, then defaults to 1 if it doesn't find anything. > Any ideas on how this could be fixed or worked around? I need to be able to > change the parallelism dynamically, so the programmatic approach won't really > work for me, nor will setting the Flink configuration at system level. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3089) Issue with setting the parallelism at client level using Flink runner
[ https://issues.apache.org/jira/browse/BEAM-3089?focusedWorklogId=80847=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-80847 ] ASF GitHub Bot logged work on BEAM-3089: Author: ASF GitHub Bot Created on: 15/Mar/18 14:10 Start Date: 15/Mar/18 14:10 Worklog Time Spent: 10m Work Description: grzegorz8 commented on issue #4766: [BEAM-3089] Fix job parallelism resolution URL: https://github.com/apache/beam/pull/4766#issuecomment-373388497 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 80847) Time Spent: 50m (was: 40m) > Issue with setting the parallelism at client level using Flink runner > - > > Key: BEAM-3089 > URL: https://issues.apache.org/jira/browse/BEAM-3089 > Project: Beam > Issue Type: Bug > Components: runner-flink >Affects Versions: 2.0.0 > Environment: I am using Flink 1.2.1 running on Docker, with Task > Managers distributed across different VMs as part of a Docker Swarm. >Reporter: Thalita Vergilio >Assignee: Grzegorz Kołakowski >Priority: Major > Labels: docker, flink, parallel-deployment > Attachments: flink-ui-parallelism.png > > Time Spent: 50m > Remaining Estimate: 0h > > When uploading an Apache Beam application using the Flink Web UI, the > parallelism set at job submission doesn't get picked up. The same happens > when submitting a job using the Flink CLI. > In both cases, the parallelism ends up defaulting to 1. > When I set the parallelism programmatically within the Apache Beam code, it > works: {{flinkPipelineOptions.setParallelism(4);}} > I suspect the root of the problem may be in the > org.apache.beam.runners.flink.DefaultParallelismFactory class, as it checks > for Flink's GlobalConfiguration, which may not pick up runtime values passed > to Flink, then defaults to 1 if it doesn't find anything. > Any ideas on how this could be fixed or worked around? I need to be able to > change the parallelism dynamically, so the programmatic approach won't really > work for me, nor will setting the Flink configuration at system level. -- This message was sent by Atlassian JIRA (v7.6.3#76005)