Hi Flink Devs, I am facing a wierd issue with my Flink Job that uses DataSet APIs to read parquet files from s3. I have Flink Cluster deployed on AWS EMR and my jar works with charm when gets submitted through JobManager UI.
However I need a programatic way to submit the jars, hence I use flink-client apis to create a PackagedProgram with the right jar, entryClassName and arguments. My client code is to submit the jars to the flink cluster but flink job execution fails with following exception. Interestingly, the programatic submission also doesnt respect IAM role that I have setup on EMR. It forces me to either provide the access-secret key in hadoop configs or in S3 url itself. I followed the latter one since I dont want to change hadoop config on AWS EMR. But please note that job submission via UI doesn't require me to give access keys in S3 url, it respects IAM role. Any help will be greatly appreciated. Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: /Affine/Favorite_destinations/Book_Search/2016/01/01/part-r-00000-28c0d7fe-f8c8-42c0-957b-d2239540c30f.gz.parquet doesn't exist at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:172) at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:679) at org.apache.flink.runtime.jobmanager.JobManager.org $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1026) ... 21 more Caused by: java.io.IOException: /XXX/YYYY/2016/01/01/test.gz.parquet doesn't exist at org.apache.hadoop.fs.s3.Jets3tFileSystemStore.get(Jets3tFileSystemStore.java:170) at org.apache.hadoop.fs.s3.Jets3tFileSystemStore.retrieveINode(Jets3tFileSystemStore.java:221) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) ~Ashish