Hi, I'm trying to run a job in a Flink cluster in Amazon EMR from java code but I'm having some problems
This is how I create the cluster: ------------------------------------------------------------------------------------------------------------ StepConfig copyJarStep = new StepConfig() .withName("copy-jar-step") .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER) .withHadoopJarStep(new HadoopJarStepConfig("command-runner.jar") .withArgs("bash", "-c", "aws s3 cp s3://" + bucketName + "/lib/trendit-flink-jobs.jar /home/hadoop/trendit-flink-jobs.jar")); List<StepConfig> stepConfigs = new ArrayList<>(); stepConfigs.add(copyJarStep); Application flink = new Application().withName("Flink"); Configuration flinkConfiguration = new Configuration() .withClassification("flink-conf") .addPropertiesEntry("jobmanager.heap.size", "6g") .addPropertiesEntry("taskmanager.heap.size", "6g") .addPropertiesEntry("taskmanager.numberOfTaskSlots", "2"); RunJobFlowRequest request = new RunJobFlowRequest() .withName("cluster-" + executionKey) .withReleaseLabel("emr-5.26.0") .withApplications(flink) .withConfigurations(flinkConfiguration) .withServiceRole("EMR_DefaultRole") .withJobFlowRole("EMR_EC2_DefaultRole") .withLogUri(getWorkPath() + "logs") .withInstances(new JobFlowInstancesConfig() .withEc2SubnetId("mysubnetid") .withInstanceCount(2) .withKeepJobFlowAliveWhenNoSteps(true) .withMasterInstanceType("m4.large") .withSlaveInstanceType("m4.large")) .withSteps(stepConfigs); RunJobFlowResult result = amazonClient.getEmrClient().runJobFlow(request); --------------------------------------------------------------------------------------------------------- And this is how I add the jobwhen the cluster is ready: ------------------------------------------------------------------------------------------ StepConfig runJobStep = new StepConfig() .withName("run-job-step") .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER) .withHadoopJarStep(new HadoopJarStepConfig("command-runner.jar") .withArgs("bash", "-c", "flink run -m yarn-cluster --parallelism 2 --class es.trendit.flink.job.centrality.CentralityJob /home/hadoop/trendit-flink-jobs.jar <args...>")); AddJobFlowStepsRequest request = new AddJobFlowStepsRequest() .withJobFlowId(clusterId) .withSteps(runJobStep); AddJobFlowStepsResult result = amazonClient.getEmrClient().addJobFlowSteps(request); ----------------------------------------------------------------------------------------------- As summary: - I'm using 2 instances of EMR m4.large machines (2 vCPU, 8GB each) - jobmanager.heap.size and taskmanager.heap.size: 6g - taskmanager.numberOfTaskSlots: 2 - run flink with --parallelism 2 - so 1 EMR instance should be running the jobmanager and the other the taskmanager with 2 slots available But it fails after some time and I see this warning in the step stdout file: ---------------------------------------------------------------------------------------------------------------------- 2020-03-31 14:37:47,288 WARN org.apache.flink.yarn.AbstractYarnClusterDescriptor - This YARN session requires 12288MB of memory in the cluster. There are currently only 6144MB available. The Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are connecting from the beginning because the resources are currently not available in the cluster. The allocation might take more time than usual because the Flink YARN client needs to wait until the resources become available. 2020-03-31 14:37:47,294 WARN org.apache.flink.yarn.AbstractYarnClusterDescriptor - There is not enough memory available in the YARN cluster. The TaskManager(s) require 6144MB each. NodeManagers available: [6144] After allocating the JobManager (6144MB) and (0/1) TaskManagers, the following NodeManagers are available: [0] The Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are connecting from the beginning because the resources are currently not available in the cluster. The allocation might take more time than usual because the Flink YARN client needs to wait until the resources become available. 2020-03-31 14:37:47,296 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster specification: ClusterSpecification{masterMemoryMB=6144, taskManagerMemoryMB=6144, numberTaskManagers=1, slotsPerTaskManager=2} ---------------------------------------------------------------------------------------------------------------------- And this error in the step stderr file: ---------------------------------------------------------------------------------------------------------------------- org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: 1f0a651302d5fd48d35ff5b5d0880f99) at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483) ... Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265) ... 23 more Caused by: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not allocate enough slots within timeout of 300000 ms to run the job. Please make sure that the cluster has enough resources. at org.apache.flink.runtime.executiongraph.Execution.lambda$scheduleForExecution$0(Execution.java:449) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) ... ---------------------------------------------------------------------------------------------------------------------- It looks to me like the TaskManager is not created at the beginning, any idea why is this happening and how to solve it? I could not find any relevant information in Flink docs Thanks