Hi Will, I’m afraid that will simply run your program in Flink local mode, where the configuration settings are ignored because an in-process Flink Cluster is being started.
For running a Beam pipeline on YARN you have two options right now: 1. Start a Flink YARN session as described here https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/yarn_setup.html#flink-yarn-session <https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/yarn_setup.html#flink-yarn-session> and submit your Beam job as described here https://beam.apache.org/get-started/quickstart-java/#run-wordcount <https://beam.apache.org/get-started/quickstart-java/#run-wordcount> once the YARN session is up and has printed the JobManager details that you need to submit the Beam job. 2. Package your program as a fat jar. The quickstart from the Beam website should have the rights settings in the POM for doing that. Then use regular Flink submission to submit that jar to a YARN cluster as described here https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/yarn_setup.html#run-a-single-flink-job-on-yarn <https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/yarn_setup.html#run-a-single-flink-job-on-yarn> Best, Aljoscha > On 6. Jun 2017, at 19:05, Will Walters <wwalt...@yahoo-inc.com> wrote: > > Aljoscha, > > You're correct that I'm using Beam. Here's the shell script I'm using to > start the job: > > hadoop jar **path to jar file** org.apache.beam.examples.complete.TfIdf \ > --runner=FlinkRunner \ > --input= **path to directory** \ > --output=tfIdf > > There are two lines before that to set the Flink config path, as well as the > Hadoop classpath. > > Thank you, > Will. > > > On Tuesday, June 6, 2017 5:45 AM, Aljoscha Krettek <aljos...@apache.org> > wrote: > > > Hi Will, > > How are you starting your cluster/executing your program? From what I can > gather you are using Beam, is that right? The line about the FlinkMiniCluster > seems strange because this would hint at the fact that the Runner is trying > to execute in local mode. > > Best, > Aljoscha > >> On 6. Jun 2017, at 02:13, Will Walters <wwalt...@yahoo-inc.com >> <mailto:wwalt...@yahoo-inc.com>> wrote: >> >> Hello, >> >> I'm having issues editing the default Flink memory settings. I'm attempting >> to run a Flink task on a cluster at scale. The log shows my edited config >> settings having been read into the program, but they're having no effect. >> Here's the trace: >> >> 17/06/05 23:45:41 INFO flink.FlinkRunner: Starting execution of Flink >> program. >> 17/06/05 23:45:41 INFO java.ExecutionEnvironment: The job has 0 registered >> types and 0 default Kryo serializers >> 17/06/05 23:45:41 INFO configuration.GlobalConfiguration: Loading >> configuration property: taskmanager.memory.size, 100 >> 17/06/05 23:45:41 INFO configuration.GlobalConfiguration: Loading >> configuration property: taskmanager.heap.mb, 256 >> 17/06/05 23:45:41 INFO configuration.GlobalConfiguration: Loading >> configuration property: taskmanager.debug.memory.startLogThread, true >> 17/06/05 23:45:41 INFO minicluster.FlinkMiniCluster: Disabled queryable >> state server >> 17/06/05 23:45:41 INFO minicluster.FlinkMiniCluster: Starting >> FlinkMiniCluster. >> . >> . >> . >> 17/06/05 23:45:41 INFO network.NetworkEnvironment: Starting the network >> environment and its components. >> 17/06/05 23:45:41 INFO taskmanager.TaskManager: Limiting managed memory to >> 17592186044406 MB, memory will be allocated lazily. >> 17/06/05 23:45:41 ERROR flink.FlinkRunner: Pipeline execution failed >> java.lang.IllegalArgumentException: Size of total memory must be positive. >> >> And here's the config file I'm using: >> >> taskmanager.memory.size: 100 >> taskmanager.heap.mb: 256 >> taskmanager.debug.memory.startLogThread: true >> >> In the shell script I'm using to run the task, I've edited the >> FLINK_CONF_DIR to direct to the config file that I created. >> >> If anyone has any advice or inputs it would be much appreciated. >> >> Thanks! >> Will Walters. > > >