Hi Angelo, what Svend has written is very good advice. Additionally, you could give us a bit more context by posting the exact stack trace and the exact configuration you use to deploy the Flink cluster. To me this looks like a configuration/setup problem in combination with AWS.
Cheers, Till On Mon, May 31, 2021 at 10:49 PM Svend <stream...@svend.xyz> wrote: > Hi Angelo, > > I've not worked exactly in the situation you described, although I've had > to configure S3 access from a Flink application recently and here are a > couple of things I learnt along the way: > > * You should normally not need to include flink-s3-fs-hadoop nor > hadoop-mapreduce-client-core in your classpath but should rather make > flink-s3-fs-hadoop available to Flink by putting it into the plugins > folder. The motivation for that is that this jar is a fat jar containing a > lot of hadoop and aws classes, s.t. including it in your classpath quickly > leads to conflicts. The plugins folder is associated with a separate > classpath, with helps avoiding those conflicts. > > * Under the hood, Fink is using the hadoop-aws library to connect to s3 => > the documentation regarding how to configure it, and especially security > accesses, is available in [1] > > * Ideally, when running on AWS, your code should not be using > BasicAWSCredentialsProvider, but instead the application should assume a > role, which you associate with some IAM permission. If that's your case, > the specific documentation for that situation is in [2]. If you're running > some test locally on your laptop, BasicAWSCredentialsProvider with some > key id and secret pointing to a dev account may be appropriate. > > * As I understand it, any configuration entry in flink.yaml that starts > with "fs.s3a" is forwarded by Flink to hadoop-aws (I found that info in > [3]) => by reading documentation in [1] and [2] you might be able to figure > out which parameters are relevant to your case, which you can then set with > the mechanism just mentioned. For example, in my case, I simply add this to > flink.yaml: > > fs.s3a.aws.credentials.provider: > "com.amazonaws.auth.WebIdentityTokenCredentialsProvider" > > * You can debug the various operations that are attempted on S3 by setting > this logger to DEBUG level: org.apache.hadoop.fs.s3a > > > Good luck :) > > Svend > > > > [1] > https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html > [2] > https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/assumed_roles.html > [3] > https://ververica.zendesk.com/hc/en-us/articles/360002452379-How-to-set-arbitrary-S3-configuration-options-Hadoop-S3A-Presto-S3-in-Flink- > > > On Mon, 31 May 2021, at 3:52 PM, Angelo G. wrote: > > Hello, > > Trying to read a parquet file located in S3 leads to a AWS credentials > exception. Switching to other format (raw, for example) works ok regarding > to file access. > > This is a snippet of code to reproduce the issue: > > static void parquetS3Error() { > > EnvironmentSettings settings = > EnvironmentSettings.*newInstance*().inBatchMode().useBlinkPlanner().build(); > > TableEnvironment t_env = TableEnvironment.*create*(settings); > > // parquet format gives error: > // Caused by: java.net.SocketTimeoutException: doesBucketExist on > bucket-prueba-medusa: com.amazonaws.AmazonClientException: > // No AWS Credentials provided by BasicAWSCredentialsProvider > EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider : > // com.amazonaws.SdkClientException: Failed to connect to service > endpoint: > t_env.executeSql("CREATE TABLE backup ( `date` STRING, `value` INT) > WITH ( 'connector' = 'filesystem', 'path' = 's3a://.../', 'format' = > 'parquet')"); > > // other formats (i.e. raw) work properly: > // Job has been submitted with JobID 6ecd31d322aba759f9b8b591e9f4fed5 > // +--------------------------------+ > // | url | > // +--------------------------------+ > // | [80, 65, 82, 49, 21, 0, 21,... | > // | [0, 0, 0, 50, 48, 50, 49, 4... | > t_env.executeSql("CREATE TABLE backup ( `url` BINARY) WITH ( 'connector' > = 'filesystem', 'path' = 's3a://.../', 'format' = 'raw')"); > > Table t1 = t_env.from("backup"); > > t1.execute().print(); > > } > > Flink version is 1.12.2. > > Please find attached the pom with dependencies and version numbers. > > What would be a suitable workaround for this? > > Thank you very much. > > Angelo. > > > > > *Attachments:* > > - pom.xml > > >