Hi,

Sorry I missed that. But yes, it looks like you are running two JobManagers :) 
You can always check the yarn logs for more information what is being executed.

Piotrek

> On 1 Apr 2020, at 16:44, Antonio Martínez Carratalá 
> <amarti...@alto-analytics.com> wrote:
> 
> Hi Piotr,
> 
> I don't have 2 task managers, just one with 2 slots. That would be ok 
> according to my calculations, but as Craig said I need one more instance for 
> the cluster master. I was guessing the job manager was running in the master 
> and the task manager in the slave, but both job manager and task manager run 
> on slaves so I need 3 instances instead of 2 as I guessed.
> 
> Regards
> 
> On Wed, Apr 1, 2020 at 1:31 PM Piotr Nowojski <pi...@ververica.com 
> <mailto:pi...@ververica.com>> wrote:
> Hey,
> 
> Isn’t explanation of the problem in the logs that you posted? Not enough 
> memory? You have 2 EMR nodes, 8GB memory each, while trying to allocate 2 
> TaskManagers AND 1 JobManager with 6GB heap size each?
> 
> Piotrek
> 
> > On 31 Mar 2020, at 17:01, Antonio Martínez Carratalá 
> > <amarti...@alto-analytics.com <mailto:amarti...@alto-analytics.com>> wrote:
> > 
> > Hi, I'm trying to run a job in a Flink cluster in Amazon EMR from java code 
> > but I'm having some problems
> > 
> > This is how I create the cluster:
> > ------------------------------------------------------------------------------------------------------------
> > StepConfig copyJarStep = new StepConfig()
> >     .withName("copy-jar-step")
> >     .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
> >     .withHadoopJarStep(new HadoopJarStepConfig("command-runner.jar")
> >         .withArgs("bash", "-c", "aws s3 cp s3://" + bucketName + 
> > "/lib/trendit-flink-jobs.jar /home/hadoop/trendit-flink-jobs.jar"));
> > 
> > List<StepConfig> stepConfigs = new ArrayList<>();
> > stepConfigs.add(copyJarStep);
> > 
> > Application flink = new Application().withName("Flink");
> > 
> > Configuration flinkConfiguration = new Configuration()
> >     .withClassification("flink-conf")
> >     .addPropertiesEntry("jobmanager.heap.size", "6g")
> >     .addPropertiesEntry("taskmanager.heap.size", "6g")
> >     .addPropertiesEntry("taskmanager.numberOfTaskSlots", "2");
> > 
> > RunJobFlowRequest request = new RunJobFlowRequest()
> >     .withName("cluster-" + executionKey)
> >     .withReleaseLabel("emr-5.26.0")
> >     .withApplications(flink)
> >     .withConfigurations(flinkConfiguration)
> >     .withServiceRole("EMR_DefaultRole")
> >     .withJobFlowRole("EMR_EC2_DefaultRole")
> >     .withLogUri(getWorkPath() + "logs")
> >     .withInstances(new JobFlowInstancesConfig()
> >         .withEc2SubnetId("mysubnetid")
> >         .withInstanceCount(2)
> >         .withKeepJobFlowAliveWhenNoSteps(true)
> >         .withMasterInstanceType("m4.large")
> >         .withSlaveInstanceType("m4.large"))
> >     .withSteps(stepConfigs);
> > 
> > RunJobFlowResult result = amazonClient.getEmrClient().runJobFlow(request);
> > ---------------------------------------------------------------------------------------------------------
> > 
> > And this is how I add the jobwhen the cluster is ready:
> > ------------------------------------------------------------------------------------------
> > StepConfig runJobStep = new StepConfig()
> >     .withName("run-job-step")
> >     .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
> >     .withHadoopJarStep(new HadoopJarStepConfig("command-runner.jar")
> >         .withArgs("bash", "-c", "flink run -m yarn-cluster --parallelism 2 
> > --class es.trendit.flink.job.centrality.CentralityJob 
> > /home/hadoop/trendit-flink-jobs.jar <args...>"));
> > 
> > AddJobFlowStepsRequest request = new AddJobFlowStepsRequest()
> > .withJobFlowId(clusterId)
> > .withSteps(runJobStep);
> > 
> > AddJobFlowStepsResult result = 
> > amazonClient.getEmrClient().addJobFlowSteps(request);
> > -----------------------------------------------------------------------------------------------
> > 
> > As summary:
> > - I'm using 2 instances of EMR m4.large machines (2 vCPU, 8GB each)
> > - jobmanager.heap.size and taskmanager.heap.size: 6g
> > - taskmanager.numberOfTaskSlots: 2
> > - run flink with --parallelism 2
> > - so 1 EMR instance should be running the jobmanager and the other the 
> > taskmanager with 2 slots available
> > 
> > But it fails after some time and I see this warning in the step stdout file:
> > ----------------------------------------------------------------------------------------------------------------------
> > 2020-03-31 14:37:47,288 WARN  
> > org.apache.flink.yarn.AbstractYarnClusterDescriptor           - This YARN 
> > session requires 12288MB of memory in the cluster. There are currently only 
> > 6144MB available.
> > The Flink YARN client will try to allocate the YARN session, but maybe not 
> > all TaskManagers are connecting from the beginning because the resources 
> > are currently not available in the cluster. The allocation might take more 
> > time than usual because the Flink YARN client needs to wait until the 
> > resources become available.
> > 2020-03-31 14:37:47,294 WARN  
> > org.apache.flink.yarn.AbstractYarnClusterDescriptor           - There is 
> > not enough memory available in the YARN cluster. The TaskManager(s) require 
> > 6144MB each. NodeManagers available: [6144]
> > After allocating the JobManager (6144MB) and (0/1) TaskManagers, the 
> > following NodeManagers are available: [0]
> > The Flink YARN client will try to allocate the YARN session, but maybe not 
> > all TaskManagers are connecting from the beginning because the resources 
> > are currently not available in the cluster. The allocation might take more 
> > time than usual because the Flink YARN client needs to wait until the 
> > resources become available.
> > 2020-03-31 14:37:47,296 INFO  
> > org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Cluster 
> > specification: ClusterSpecification{masterMemoryMB=6144, 
> > taskManagerMemoryMB=6144, numberTaskManagers=1, slotsPerTaskManager=2}
> > ----------------------------------------------------------------------------------------------------------------------
> >  
> > 
> > And this error in the step stderr file:
> > ----------------------------------------------------------------------------------------------------------------------
> >  
> > org.apache.flink.client.program.ProgramInvocationException: Job failed. 
> > (JobID: 1f0a651302d5fd48d35ff5b5d0880f99)
> > at 
> > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
> > at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483)
> > ...
> > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> > execution failed.
> > at 
> > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
> > at 
> > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
> > ... 23 more
> > Caused by: 
> > org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
> > Could not allocate enough slots within timeout of 300000 ms to run the job. 
> > Please make sure that the cluster has enough resources.
> > at 
> > org.apache.flink.runtime.executiongraph.Execution.lambda$scheduleForExecution$0(Execution.java:449)
> > at 
> > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
> > ...
> > ----------------------------------------------------------------------------------------------------------------------
> > 
> > It looks to me like the TaskManager is not created at the beginning, any 
> > idea why is this happening and how to solve it? I could not find any 
> > relevant information in Flink docs
> > 
> > Thanks
> > 
> > 
> 
> 
> 

Reply via email to