Thank you Svend  and Till for your help.

Sorry for the the late response.

I'll try to give more information about the issue:

I've not worked exactly in the situation you described, although I've had
> to configure S3 access from a Flink application recently and here are a
> couple of things I learnt along the way:
> * You should normally not need to include flink-s3-fs-hadoop nor
> hadoop-mapreduce-client-core in your classpath but should rather make
> flink-s3-fs-hadoop available to Flink by putting it into the plugins
> folder. The motivation for that is that this jar is a fat jar containing a
> lot of hadoop and aws classes, s.t. including it in your classpath quickly
> leads to conflicts. The plugins folder is associated with a separate
> classpath, with helps avoiding those conflicts.
>
*Following your advice I've leave these dependencies out from the pom.
Thank you for the explanation.*

> * Under the hood, Fink is using the hadoop-aws library to connect to s3 =>
> the documentation regarding how to configure it, and especially security
> accesses, is available in [1]
>
*In our case, connection to S3 should be made via access/secret key pair. *

> * Ideally, when running on AWS, your code should not be using
> BasicAWSCredentialsProvider, but instead the application should assume a
> role, which you associate with some IAM permission.  If that's your case,
> the specific documentation for that situation is in [2]. If you're running
> some test locally on your laptop, BasicAWSCredentialsProvider with some
> key id and secret pointing to a dev account may be appropriate.
>
*Yes, in the Flink documentation is noted that IAM is the recommended way
to access S3. But I am forced to use secret/access keys.  I'm not
indicating in the flink-conf.yaml what credentials provider to use, the
BasicAWSCredentialsProvider seems to be the default provider for Flink. But
as we will see, this message is shown only when trying to read Parquet
format. Other formats poses no problem.*

> * As I understand it, any configuration entry in flink.yaml that starts
> with "fs.s3a" is forwarded by Flink to hadoop-aws (I found that info in
> [3]) => by reading documentation in [1] and [2] you might be able to figure
> out which parameters are relevant to your case, which you can then set with
> the mechanism just mentioned. For example, in my case, I simply add this to
> flink.yaml:

*My flink-yaml.conf is as follows:*

taskmanager.memory.process.size: 1728m
taskmanager.numberOfTaskSlots: 1
parallelism.default: 1
jobmanager.execution.failover-strategy: region
fs.s3a.path-style: true
fs.s3a.region: eu-west-3
fs.s3a.bucket.testbucket.access.key: xxxx
fs.s3a.bucket.testbucket.secret.key: xxxx


> what Svend has written is very good advice. Additionally, you could give
> us a bit more context by posting the exact stack trace and the exact
> configuration you use to deploy the Flink cluster. To me this looks like a
> configuration/setup problem in combination with AWS.


The cluster setup for the tests is as follows:

flink-1.12.2-bin-scala_2.12.tgz
<https://archive.apache.org/dist/flink/flink-1.12.2/flink-1.12.2-bin-scala_2.12.tgz>
unizipped in home folder.

flink-1.12.2/opt/flink-s3-fs-hadoop-1.12.2.jar copied to
flink-1.12.2/plugins/flink-s3-fs-hadoop/

flink-yaml.conf with the above contents.


The job is being launched like this:

~/flink-1.12.2$ ./bin/flink run -Dexecution.runtime-mode=BATCH    -c
org.apache.flink.s3.CompactDemo
/home/xxx/git/recovery/flink-s3/target/flink-s3-1.0-SNAPSHOT.jar


Please find attached the two type of traces, one when using 'raw'
format for the table (which is working ok) and the other when
'parquet' format is used (which fails).



Again, sorry for the delay of my response and thank you very much for your help.




On Tue, Jun 1, 2021 at 5:30 PM Till Rohrmann <trohrm...@apache.org> wrote:

> Hi Angelo,
>
> what Svend has written is very good advice. Additionally, you could give
> us a bit more context by posting the exact stack trace and the exact
> configuration you use to deploy the Flink cluster. To me this looks like a
> configuration/setup problem in combination with AWS.
>
> Cheers,
> Till
>
> On Mon, May 31, 2021 at 10:49 PM Svend <stream...@svend.xyz> wrote:
>
>> Hi Angelo,
>>
>> I've not worked exactly in the situation you described, although I've had
>> to configure S3 access from a Flink application recently and here are a
>> couple of things I learnt along the way:
>>
>> * You should normally not need to include flink-s3-fs-hadoop nor
>> hadoop-mapreduce-client-core in your classpath but should rather make
>> flink-s3-fs-hadoop available to Flink by putting it into the plugins
>> folder. The motivation for that is that this jar is a fat jar containing a
>> lot of hadoop and aws classes, s.t. including it in your classpath quickly
>> leads to conflicts. The plugins folder is associated with a separate
>> classpath, with helps avoiding those conflicts.
>>
>> * Under the hood, Fink is using the hadoop-aws library to connect to s3
>> => the documentation regarding how to configure it, and especially security
>> accesses, is available in [1]
>>
>> * Ideally, when running on AWS, your code should not be using
>> BasicAWSCredentialsProvider, but instead the application should assume a
>> role, which you associate with some IAM permission.  If that's your case,
>> the specific documentation for that situation is in [2]. If you're running
>> some test locally on your laptop, BasicAWSCredentialsProvider with some
>> key id and secret pointing to a dev account may be appropriate.
>>
>> * As I understand it, any configuration entry in flink.yaml that starts
>> with "fs.s3a" is forwarded by Flink to hadoop-aws (I found that info in
>> [3]) => by reading documentation in [1] and [2] you might be able to figure
>> out which parameters are relevant to your case, which you can then set with
>> the mechanism just mentioned. For example, in my case, I simply add this to
>> flink.yaml:
>>
>> fs.s3a.aws.credentials.provider:
>> "com.amazonaws.auth.WebIdentityTokenCredentialsProvider"
>>
>> * You can debug the various operations that are attempted on S3 by
>> setting this logger to DEBUG level:  org.apache.hadoop.fs.s3a
>>
>>
>> Good luck :)
>>
>> Svend
>>
>>
>>
>> [1]
>> https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html
>> [2]
>> https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/assumed_roles.html
>> [3]
>> https://ververica.zendesk.com/hc/en-us/articles/360002452379-How-to-set-arbitrary-S3-configuration-options-Hadoop-S3A-Presto-S3-in-Flink-
>>
>>
>> On Mon, 31 May 2021, at 3:52 PM, Angelo G. wrote:
>>
>> Hello,
>>
>> Trying to read a parquet file located in S3 leads to a AWS credentials
>> exception. Switching to other format (raw, for example) works ok regarding
>> to file access.
>>
>> This is a snippet of code to reproduce the issue:
>>
>> static void parquetS3Error() {
>>
>>     EnvironmentSettings settings = 
>> EnvironmentSettings.*newInstance*().inBatchMode().useBlinkPlanner().build();
>>
>>     TableEnvironment t_env = TableEnvironment.*create*(settings);
>>
>>     // parquet format gives error:
>>     // Caused by: java.net.SocketTimeoutException: doesBucketExist on 
>> bucket-prueba-medusa: com.amazonaws.AmazonClientException:
>>     // No AWS Credentials provided by BasicAWSCredentialsProvider 
>> EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider :
>>     // com.amazonaws.SdkClientException: Failed to connect to service 
>> endpoint:
>>     t_env.executeSql("CREATE TABLE backup (  `date` STRING,  `value` INT) 
>> WITH ( 'connector' = 'filesystem', 'path' = 's3a://.../', 'format' = 
>> 'parquet')");
>>
>>     // other formats (i.e. raw) work properly:
>>     // Job has been submitted with JobID 6ecd31d322aba759f9b8b591e9f4fed5
>>     //                +--------------------------------+
>>     //                |                            url |
>>     //                +--------------------------------+
>>     //                | [80, 65, 82, 49, 21, 0, 21,... |
>>     //                | [0, 0, 0, 50, 48, 50, 49, 4... |
>>     t_env.executeSql("CREATE TABLE backup (  `url` BINARY) WITH ( 
>> 'connector' = 'filesystem', 'path' = 's3a://.../', 'format' = 'raw')");
>>
>>     Table t1 = t_env.from("backup");
>>
>>     t1.execute().print();
>>
>> }
>>
>> Flink version is 1.12.2.
>>
>> Please find attached the pom with dependencies and version numbers.
>>
>> What would be a suitable workaround for this?
>>
>> Thank you very much.
>>
>> Angelo.
>>
>>
>>
>>
>> *Attachments:*
>>
>>    - pom.xml
>>
>>
>>

<<attachment: traces.zip>>

Reply via email to