Hi Angelo,

what Svend has written is very good advice. Additionally, you could give us
a bit more context by posting the exact stack trace and the exact
configuration you use to deploy the Flink cluster. To me this looks like a
configuration/setup problem in combination with AWS.

Cheers,
Till

On Mon, May 31, 2021 at 10:49 PM Svend <stream...@svend.xyz> wrote:

> Hi Angelo,
>
> I've not worked exactly in the situation you described, although I've had
> to configure S3 access from a Flink application recently and here are a
> couple of things I learnt along the way:
>
> * You should normally not need to include flink-s3-fs-hadoop nor
> hadoop-mapreduce-client-core in your classpath but should rather make
> flink-s3-fs-hadoop available to Flink by putting it into the plugins
> folder. The motivation for that is that this jar is a fat jar containing a
> lot of hadoop and aws classes, s.t. including it in your classpath quickly
> leads to conflicts. The plugins folder is associated with a separate
> classpath, with helps avoiding those conflicts.
>
> * Under the hood, Fink is using the hadoop-aws library to connect to s3 =>
> the documentation regarding how to configure it, and especially security
> accesses, is available in [1]
>
> * Ideally, when running on AWS, your code should not be using
> BasicAWSCredentialsProvider, but instead the application should assume a
> role, which you associate with some IAM permission.  If that's your case,
> the specific documentation for that situation is in [2]. If you're running
> some test locally on your laptop, BasicAWSCredentialsProvider with some
> key id and secret pointing to a dev account may be appropriate.
>
> * As I understand it, any configuration entry in flink.yaml that starts
> with "fs.s3a" is forwarded by Flink to hadoop-aws (I found that info in
> [3]) => by reading documentation in [1] and [2] you might be able to figure
> out which parameters are relevant to your case, which you can then set with
> the mechanism just mentioned. For example, in my case, I simply add this to
> flink.yaml:
>
> fs.s3a.aws.credentials.provider:
> "com.amazonaws.auth.WebIdentityTokenCredentialsProvider"
>
> * You can debug the various operations that are attempted on S3 by setting
> this logger to DEBUG level:  org.apache.hadoop.fs.s3a
>
>
> Good luck :)
>
> Svend
>
>
>
> [1]
> https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html
> [2]
> https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/assumed_roles.html
> [3]
> https://ververica.zendesk.com/hc/en-us/articles/360002452379-How-to-set-arbitrary-S3-configuration-options-Hadoop-S3A-Presto-S3-in-Flink-
>
>
> On Mon, 31 May 2021, at 3:52 PM, Angelo G. wrote:
>
> Hello,
>
> Trying to read a parquet file located in S3 leads to a AWS credentials
> exception. Switching to other format (raw, for example) works ok regarding
> to file access.
>
> This is a snippet of code to reproduce the issue:
>
> static void parquetS3Error() {
>
>     EnvironmentSettings settings = 
> EnvironmentSettings.*newInstance*().inBatchMode().useBlinkPlanner().build();
>
>     TableEnvironment t_env = TableEnvironment.*create*(settings);
>
>     // parquet format gives error:
>     // Caused by: java.net.SocketTimeoutException: doesBucketExist on 
> bucket-prueba-medusa: com.amazonaws.AmazonClientException:
>     // No AWS Credentials provided by BasicAWSCredentialsProvider 
> EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider :
>     // com.amazonaws.SdkClientException: Failed to connect to service 
> endpoint:
>     t_env.executeSql("CREATE TABLE backup (  `date` STRING,  `value` INT) 
> WITH ( 'connector' = 'filesystem', 'path' = 's3a://.../', 'format' = 
> 'parquet')");
>
>     // other formats (i.e. raw) work properly:
>     // Job has been submitted with JobID 6ecd31d322aba759f9b8b591e9f4fed5
>     //                +--------------------------------+
>     //                |                            url |
>     //                +--------------------------------+
>     //                | [80, 65, 82, 49, 21, 0, 21,... |
>     //                | [0, 0, 0, 50, 48, 50, 49, 4... |
>     t_env.executeSql("CREATE TABLE backup (  `url` BINARY) WITH ( 'connector' 
> = 'filesystem', 'path' = 's3a://.../', 'format' = 'raw')");
>
>     Table t1 = t_env.from("backup");
>
>     t1.execute().print();
>
> }
>
> Flink version is 1.12.2.
>
> Please find attached the pom with dependencies and version numbers.
>
> What would be a suitable workaround for this?
>
> Thank you very much.
>
> Angelo.
>
>
>
>
> *Attachments:*
>
>    - pom.xml
>
>
>

Reply via email to