Hi, I'm trying to run a job in a Flink cluster in Amazon EMR from java code
but I'm having some problems

This is how I create the cluster:
------------------------------------------------------------------------------------------------------------
StepConfig copyJarStep = new StepConfig()
    .withName("copy-jar-step")
    .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
    .withHadoopJarStep(new HadoopJarStepConfig("command-runner.jar")
        .withArgs("bash", "-c", "aws s3 cp s3://" + bucketName +
"/lib/trendit-flink-jobs.jar /home/hadoop/trendit-flink-jobs.jar"));

List<StepConfig> stepConfigs = new ArrayList<>();
stepConfigs.add(copyJarStep);

Application flink = new Application().withName("Flink");

Configuration flinkConfiguration = new Configuration()
    .withClassification("flink-conf")
    .addPropertiesEntry("jobmanager.heap.size", "6g")
    .addPropertiesEntry("taskmanager.heap.size", "6g")
    .addPropertiesEntry("taskmanager.numberOfTaskSlots", "2");

RunJobFlowRequest request = new RunJobFlowRequest()
    .withName("cluster-" + executionKey)
    .withReleaseLabel("emr-5.26.0")
    .withApplications(flink)
    .withConfigurations(flinkConfiguration)
    .withServiceRole("EMR_DefaultRole")
    .withJobFlowRole("EMR_EC2_DefaultRole")
    .withLogUri(getWorkPath() + "logs")
    .withInstances(new JobFlowInstancesConfig()
        .withEc2SubnetId("mysubnetid")
        .withInstanceCount(2)
        .withKeepJobFlowAliveWhenNoSteps(true)
        .withMasterInstanceType("m4.large")
        .withSlaveInstanceType("m4.large"))
    .withSteps(stepConfigs);

RunJobFlowResult result = amazonClient.getEmrClient().runJobFlow(request);
---------------------------------------------------------------------------------------------------------

And this is how I add the jobwhen the cluster is ready:
------------------------------------------------------------------------------------------
StepConfig runJobStep = new StepConfig()
    .withName("run-job-step")
    .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
    .withHadoopJarStep(new HadoopJarStepConfig("command-runner.jar")
        .withArgs("bash", "-c", "flink run -m yarn-cluster --parallelism 2
--class es.trendit.flink.job.centrality.CentralityJob
/home/hadoop/trendit-flink-jobs.jar <args...>"));

AddJobFlowStepsRequest request = new AddJobFlowStepsRequest()
.withJobFlowId(clusterId)
.withSteps(runJobStep);

AddJobFlowStepsResult result =
amazonClient.getEmrClient().addJobFlowSteps(request);
-----------------------------------------------------------------------------------------------

As summary:
- I'm using 2 instances of EMR m4.large machines (2 vCPU, 8GB each)
- jobmanager.heap.size and taskmanager.heap.size: 6g
- taskmanager.numberOfTaskSlots: 2
- run flink with --parallelism 2
- so 1 EMR instance should be running the jobmanager and the other the
taskmanager with 2 slots available

But it fails after some time and I see this warning in the step stdout file:
----------------------------------------------------------------------------------------------------------------------
2020-03-31 14:37:47,288 WARN
 org.apache.flink.yarn.AbstractYarnClusterDescriptor           - This YARN
session requires 12288MB of memory in the cluster. There are currently only
6144MB available.
The Flink YARN client will try to allocate the YARN session, but maybe not
all TaskManagers are connecting from the beginning because the resources
are currently not available in the cluster. The allocation might take more
time than usual because the Flink YARN client needs to wait until the
resources become available.
2020-03-31 14:37:47,294 WARN
 org.apache.flink.yarn.AbstractYarnClusterDescriptor           - There is
not enough memory available in the YARN cluster. The TaskManager(s) require
6144MB each. NodeManagers available: [6144]
After allocating the JobManager (6144MB) and (0/1) TaskManagers, the
following NodeManagers are available: [0]
The Flink YARN client will try to allocate the YARN session, but maybe not
all TaskManagers are connecting from the beginning because the resources
are currently not available in the cluster. The allocation might take more
time than usual because the Flink YARN client needs to wait until the
resources become available.
2020-03-31 14:37:47,296 INFO
 org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Cluster
specification: ClusterSpecification{masterMemoryMB=6144,
taskManagerMemoryMB=6144, numberTaskManagers=1, slotsPerTaskManager=2}
----------------------------------------------------------------------------------------------------------------------


And this error in the step stderr file:
----------------------------------------------------------------------------------------------------------------------

org.apache.flink.client.program.ProgramInvocationException: Job failed.
(JobID: 1f0a651302d5fd48d35ff5b5d0880f99)
at
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483)
...
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
execution failed.
at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
... 23 more
Caused by:
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
Could not allocate enough slots within timeout of 300000 ms to run the job.
Please make sure that the cluster has enough resources.
at
org.apache.flink.runtime.executiongraph.Execution.lambda$scheduleForExecution$0(Execution.java:449)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
...
----------------------------------------------------------------------------------------------------------------------


It looks to me like the TaskManager is not created at the beginning, any
idea why is this happening and how to solve it? I could not find any
relevant information in Flink docs

Thanks

Reply via email to