Re: S3 + Parquet credentials issue

2021-06-20 Thread Angelo G.
Hello, Robert.

I've been changing manually the name of the buckets in the logs and other
potentially sensitive data. The name of the buckets are ok, since changing
the format from 'parquet' to 'raw' allows to retrieve the data. Sorry for
the confusion.

Does your env allow access to all AWS resources?


Yes, I have full access to the aws objects.

Interesting fact: I have checked that putting the access/secret keys as OS
environment variables and instructing Flink to use
EnvironmentVariableCredentialsProvider in the flink-conf.yaml works OK for
both Parquet and Raw. Problem is that I won't be allowed to use environment
vars in production environment.

Thank you very much.




On Wed, Jun 16, 2021 at 1:37 PM Robert Metzger  wrote:

> Thanks for the logs.
>
> The OK job seems to read from "s3a://test-bucket/", while the KO job reads
> from "s3a://bucket-test/". Could it be that you are just trying to access
> the wrong bucket?
>
> What I also found interesting from the KO Job TaskManager is this log
> message:
>
> Caused by: java.net.NoRouteToHostException: No route to host (Host
> unreachable)
> at java.net.PlainSocketImpl.socketConnect(Native Method) ~[?:1.8.0_171]
> at
> java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
> ~[?:1.8.0_171]
> at
> java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
> ~[?:1.8.0_171]
> at
> java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
> ~[?:1.8.0_171]
> at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
> ~[?:1.8.0_171]
> at java.net.Socket.connect(Socket.java:589) ~[?:1.8.0_171]
> at sun.net.NetworkClient.doConnect(NetworkClient.java:175) ~[?:1.8.0_171]
> at sun.net.www.http.HttpClient.openServer(HttpClient.java:463)
> ~[?:1.8.0_171]
> at sun.net.www.http.HttpClient.openServer(HttpClient.java:558)
> ~[?:1.8.0_171]
> at sun.net.www.http.HttpClient.(HttpClient.java:242) ~[?:1.8.0_171]
> at sun.net.www.http.HttpClient.New(HttpClient.java:339) ~[?:1.8.0_171]
> at sun.net.www.http.HttpClient.New(HttpClient.java:357) ~[?:1.8.0_171]
> at
> sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1220)
> ~[?:1.8.0_171]
> at
> sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1199)
> ~[?:1.8.0_171]
> at
> sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1050)
> ~[?:1.8.0_171]
> at
> sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:984)
> ~[?:1.8.0_171]
> at
> com.amazonaws.internal.ConnectionUtils.connectToEndpoint(ConnectionUtils.java:52)
> ~[blob_p-575afa7acc2fe3049b65534303a189df3afe9895-6c71352c89388f6a3754b9b72482e6d2:?]
> at
> com.amazonaws.internal.EC2ResourceFetcher.doReadResource(EC2ResourceFetcher.java:80)
> ~[blob_p-575afa7acc2fe3049b65534303a189df3afe9895-6c71352c89388f6a3754b9b72482e6d2:?]
>
> Does your env allow access to all AWS resources?
>
> On Tue, Jun 15, 2021 at 7:12 PM Angelo G. 
> wrote:
>
>> Thank you Svend  and Till for your help.
>>
>> Sorry for the the late response.
>>
>> I'll try to give more information about the issue:
>>
>> I've not worked exactly in the situation you described, although I've had
>>> to configure S3 access from a Flink application recently and here are a
>>> couple of things I learnt along the way:
>>> * You should normally not need to include flink-s3-fs-hadoop nor
>>> hadoop-mapreduce-client-core in your classpath but should rather make
>>> flink-s3-fs-hadoop available to Flink by putting it into the plugins
>>> folder. The motivation for that is that this jar is a fat jar containing a
>>> lot of hadoop and aws classes, s.t. including it in your classpath quickly
>>> leads to conflicts. The plugins folder is associated with a separate
>>> classpath, with helps avoiding those conflicts.
>>>
>> *Following your advice I've leave these dependencies out from the pom.
>> Thank you for the explanation.*
>>
>>> * Under the hood, Fink is using the hadoop-aws library to connect to s3
>>> => the documentation regarding how to configure it, and especially security
>>> accesses, is available in [1]
>>>
>> *In our case, connection to S3 should be made via access/secret key
>> pair. *
>>
>>> * Ideally, when running on AWS, your code should not be using
>>> BasicAWSCredentialsProvider, but instead the application should assume
>>> a role, which you associate with some IAM permission.  If that's your case,
>>> the specific documentation for that situation is in [2]. If you're running
>>> some test locally on 

Re: S3 + Parquet credentials issue

2021-06-15 Thread Angelo G.
your classpath quickly
>> leads to conflicts. The plugins folder is associated with a separate
>> classpath, with helps avoiding those conflicts.
>>
>> * Under the hood, Fink is using the hadoop-aws library to connect to s3
>> => the documentation regarding how to configure it, and especially security
>> accesses, is available in [1]
>>
>> * Ideally, when running on AWS, your code should not be using
>> BasicAWSCredentialsProvider, but instead the application should assume a
>> role, which you associate with some IAM permission.  If that's your case,
>> the specific documentation for that situation is in [2]. If you're running
>> some test locally on your laptop, BasicAWSCredentialsProvider with some
>> key id and secret pointing to a dev account may be appropriate.
>>
>> * As I understand it, any configuration entry in flink.yaml that starts
>> with "fs.s3a" is forwarded by Flink to hadoop-aws (I found that info in
>> [3]) => by reading documentation in [1] and [2] you might be able to figure
>> out which parameters are relevant to your case, which you can then set with
>> the mechanism just mentioned. For example, in my case, I simply add this to
>> flink.yaml:
>>
>> fs.s3a.aws.credentials.provider:
>> "com.amazonaws.auth.WebIdentityTokenCredentialsProvider"
>>
>> * You can debug the various operations that are attempted on S3 by
>> setting this logger to DEBUG level:  org.apache.hadoop.fs.s3a
>>
>>
>> Good luck :)
>>
>> Svend
>>
>>
>>
>> [1]
>> https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html
>> [2]
>> https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/assumed_roles.html
>> [3]
>> https://ververica.zendesk.com/hc/en-us/articles/360002452379-How-to-set-arbitrary-S3-configuration-options-Hadoop-S3A-Presto-S3-in-Flink-
>>
>>
>> On Mon, 31 May 2021, at 3:52 PM, Angelo G. wrote:
>>
>> Hello,
>>
>> Trying to read a parquet file located in S3 leads to a AWS credentials
>> exception. Switching to other format (raw, for example) works ok regarding
>> to file access.
>>
>> This is a snippet of code to reproduce the issue:
>>
>> static void parquetS3Error() {
>>
>> EnvironmentSettings settings = 
>> EnvironmentSettings.*newInstance*().inBatchMode().useBlinkPlanner().build();
>>
>> TableEnvironment t_env = TableEnvironment.*create*(settings);
>>
>> // parquet format gives error:
>> // Caused by: java.net.SocketTimeoutException: doesBucketExist on 
>> bucket-prueba-medusa: com.amazonaws.AmazonClientException:
>> // No AWS Credentials provided by BasicAWSCredentialsProvider 
>> EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider :
>> // com.amazonaws.SdkClientException: Failed to connect to service 
>> endpoint:
>> t_env.executeSql("CREATE TABLE backup (  `date` STRING,  `value` INT) 
>> WITH ( 'connector' = 'filesystem', 'path' = 's3a://.../', 'format' = 
>> 'parquet')");
>>
>> // other formats (i.e. raw) work properly:
>> // Job has been submitted with JobID 6ecd31d322aba759f9b8b591e9f4fed5
>> //++
>> //|url |
>> //++
>> //| [80, 65, 82, 49, 21, 0, 21,... |
>> //| [0, 0, 0, 50, 48, 50, 49, 4... |
>> t_env.executeSql("CREATE TABLE backup (  `url` BINARY) WITH ( 
>> 'connector' = 'filesystem', 'path' = 's3a://.../', 'format' = 'raw')");
>>
>> Table t1 = t_env.from("backup");
>>
>> t1.execute().print();
>>
>> }
>>
>> Flink version is 1.12.2.
>>
>> Please find attached the pom with dependencies and version numbers.
>>
>> What would be a suitable workaround for this?
>>
>> Thank you very much.
>>
>> Angelo.
>>
>>
>>
>>
>> *Attachments:*
>>
>>- pom.xml
>>
>>
>>
<>


S3 + Parquet credentials issue

2021-05-31 Thread Angelo G.
Hello,

Trying to read a parquet file located in S3 leads to a AWS credentials
exception. Switching to other format (raw, for example) works ok regarding
to file access.

This is a snippet of code to reproduce the issue:

static void parquetS3Error() {

EnvironmentSettings settings =
EnvironmentSettings.newInstance().inBatchMode().useBlinkPlanner().build();

TableEnvironment t_env = TableEnvironment.create(settings);

// parquet format gives error:
// Caused by: java.net.SocketTimeoutException: doesBucketExist on
bucket-prueba-medusa: com.amazonaws.AmazonClientException:
// No AWS Credentials provided by BasicAWSCredentialsProvider
EnvironmentVariableCredentialsProvider
InstanceProfileCredentialsProvider :
// com.amazonaws.SdkClientException: Failed to connect to service endpoint:
t_env.executeSql("CREATE TABLE backup (  `date` STRING,  `value`
INT) WITH ( 'connector' = 'filesystem', 'path' = 's3a://.../',
'format' = 'parquet')");

// other formats (i.e. raw) work properly:
// Job has been submitted with JobID 6ecd31d322aba759f9b8b591e9f4fed5
//++
//|url |
//++
//| [80, 65, 82, 49, 21, 0, 21,... |
//| [0, 0, 0, 50, 48, 50, 49, 4... |
t_env.executeSql("CREATE TABLE backup (  `url` BINARY) WITH (
'connector' = 'filesystem', 'path' = 's3a://.../', 'format' =
'raw')");

Table t1 = t_env.from("backup");

t1.execute().print();

}

Flink version is 1.12.2.

Please find attached the pom with dependencies and version numbers.

What would be a suitable workaround for this?

Thank you very much.

Angelo.

http://maven.apache.org/POM/4.0.0;
		xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
		xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;>
	4.0.0
	org.apache.flink
	flink-s3
	1.0-SNAPSHOT
	jar
	Flink Quickstart Job
	
		UTF-8
		1.12.2
		1.8
		2.11
		${target.java.version}
		${target.java.version}
		2.12.1
	
	
		
			apache.snapshots
			Apache Development Snapshot Repository
			https://repository.apache.org/content/repositories/snapshots/
			
false
			
			
true
			
		
	
	
		
		
		
			org.apache.flink
			flink-java
			${flink.version}
			provided
		
		
			org.apache.flink
			flink-streaming-java_${scala.binary.version}
			${flink.version}
			provided
		
		
			org.apache.flink
			flink-clients_${scala.binary.version}
			${flink.version}
			provided
		
		
			org.apache.flink
			flink-table-api-java-bridge_2.11
			${flink.version}
			provided
		
		
			org.apache.flink
			flink-table-planner-blink_2.11
			${flink.version}
			provided
		
		
			org.apache.flink
			flink-streaming-scala_2.11
			${flink.version}
			provided
		
		
			org.apache.flink
			flink-table-common
			${flink.version}
			provided
		
		
		
			org.apache.flink
			flink-s3-fs-hadoop
			${flink.version}
		
		
		
			org.apache.hadoop
			hadoop-mapreduce-client-core
			3.1.0
		
		
			org.apache.flink
			flink-parquet_2.11
			${flink.version}
		
		
		
		
			org.apache.logging.log4j
			log4j-slf4j-impl
			${log4j.version}
			runtime
		
		
			org.apache.logging.log4j
			log4j-api
			${log4j.version}
			runtime
		
		
			org.apache.logging.log4j
			log4j-core
			${log4j.version}
			runtime
		
	
	
		
			
			
org.apache.maven.plugins
maven-compiler-plugin
3.1

	${target.java.version}
	${target.java.version}

			
			
			
			
org.apache.maven.plugins
maven-shade-plugin
3.1.1

	
	
		package
		
			shade
		
		
			

	org.apache.flink:force-shading
	com.google.code.findbugs:jsr305
	org.slf4j:*
	org.apache.logging.log4j:*

			
			

	
	*:*
	
		META-INF/*.SF
		META-INF/*.DSA
		META-INF/*.RSA
	

			
			

	org.apache.flink.StreamingJob

			
		
	

			
		
		
			


	org.eclipse.m2e
	lifecycle-mapping
	1.0.0
	
		
			

	
		org.apache.maven.plugins
		maven-shade-plugin
		[3.1.1,)
		
			shade
		
	
	
		
	


	
		org.apache.maven.plugins
		maven-compiler-plugin
		[3.1,)
		
			testCompile
			compile
		
	
	
		
	

			
		
	

			
		
	


Re: Issue reading from S3

2021-05-22 Thread Angelo G.
   [3.1.1,)

shade









org.apache.maven.plugins

maven-compiler-plugin
[3.1,)

testCompile
compile
















Another interesting fact is when exporting the access and secret key as env
variables and adding:

fs.s3a.aws.credentials.provider:
com.amazonaws.auth.EnvironmentVariableCredentialsProvider

to flink-conf.yaml I'm able to read and decode the parquet file properly:

Job has been submitted with JobID 15453b8be2bddcb49c1141a01013bf81
++-+
|   date |   value |
++-+
| 2021-05-16 |   7 |
| 2021-05-16 |   8 |
| 2021-05-16 |   8 |
| 2021-05-16 |   8 |
| 2021-05-16 |   1 |
| 2021-05-16 |   3 |
| 2021-05-16 |   9 |
| 2021-05-16 |   9 |
| 2021-05-16 |   8 |
| 2021-05-16 |   7 |
| 2021-05-16 |   9


We won't be able to set env variables in production, so this is not a valid
workaround for us.

Please have a look to the POM so you can tell a dependency miss or misuse
of some sort.

Thank you very much.

Angelo.


On Thu, May 20, 2021 at 11:54 AM Yun Gao  wrote:

> Hi Angelo,
>
> I tried the fail case provied with a similar one:
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);EnvironmentSettings
>  settings = 
> EnvironmentSettings.newInstance().inBatchMode().useBlinkPlanner().build();TableEnvironment
>  t_env = 
> TableEnvironment.create(settings);t_env.getConfig().getConfiguration().setString("parallelism.default",
>  "1");t_env.executeSql("CREATE TABLE example (  `url` STRING) WITH ( 
> 'connector' = 'filesystem', 'path' = 's3a://whatnamedoyouwant/links', 
> 'format' = 'raw')");Table t1 = t_env.from("example");t1.execute().print();*
>
>
> However, it seems the job could be executed successfully.
>
> I further tried with the configuration, and found that the exception
> is thrown if there is no s3a.access-key or s3a.secret-key
> configured. Could you have a look at if the two configuration items
> are effective ?
>
> Also I only configured the s3a.path-style: true, s3a.access-key and
> s3a.secret-key, is it possible to remove the other configuration items
> and have a try ?
>
> Best,
> Yun
>
>
>
> --Original Mail --
> *Sender:*Angelo G. 
> *Send Date:*Wed May 19 00:24:42 2021
> *Recipients:*Flink User Mail List 
> *Subject:*Issue reading from S3
>
>> Hi,
>>
>> I'm trying to read from and write to S3 with Flink 1.12.2. I'm submitting
>> the job to local cluster (tar.gz distribution). I do not have a Hadoop
>> installation running in the same machine. S3 (not Amazon) is running in a
>> remote location and I have access to it via endpoint and access/secret keys.
>>
>> The issue is that I'm able to read and write from and to S3 when
>> using StreamExecutionEnvironment.readTextFile and DataStrean.writeAsText
>> methods but I can't read from S3 when using the table API.
>>
>> This is the application:
>>
>> package org.apache.flink;import org.apache.flink.core.fs.FileSystem;import 
>> org.apache.flink.streaming.api.datastream.DataStream;import 
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import 
>> org.apache.flink.table.api.EnvironmentSettings;import 
>> org.apache.flink.table.api.Table;import 
>> org.apache.flink.table.api.TableEnvironment;public class ReadTables {
>> public static void main(String[] args) throws Exception {// CLASSIC 
>> API (PLAIN TEXT)StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExec

Issue reading from S3

2021-05-18 Thread Angelo G.
Hi,

I'm trying to read from and write to S3 with Flink 1.12.2. I'm submitting
the job to local cluster (tar.gz distribution). I do not have a Hadoop
installation running in the same machine. S3 (not Amazon) is running in a
remote location and I have access to it via endpoint and access/secret keys.

The issue is that I'm able to read and write from and to S3 when
using StreamExecutionEnvironment.readTextFile and DataStrean.writeAsText
methods but I can't read from S3 when using the table API.

This is the application:

package org.apache.flink;
import org.apache.flink.core.fs.FileSystem;import
org.apache.flink.streaming.api.datastream.DataStream;import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import
org.apache.flink.table.api.EnvironmentSettings;import
org.apache.flink.table.api.Table;import
org.apache.flink.table.api.TableEnvironment;
public class ReadTables {

public static void main(String[] args) throws Exception {

// CLASSIC API (PLAIN TEXT)
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

DataStream ds = env.readTextFile("s3a://bucket/source.txt");

ds.writeAsText("s3a://bucket/dest.txt", FileSystem.WriteMode.OVERWRITE);

env.execute();


// TABLE API
EnvironmentSettings settings =
EnvironmentSettings.newInstance().inBatchMode().useBlinkPlanner().build();

TableEnvironment t_env = TableEnvironment.create(settings);

t_env.getConfig().getConfiguration().setString("parallelism.default",
"1");

t_env.executeSql("CREATE TABLE example (  `date` STRING,
`value` INT) WITH ( 'connector' = 'filesystem', 'path' =
's3a://bucket/xxx/yyy/', 'format' = 'parquet')");

Table t1 = t_env.from("example");

t1.execute().print();

}
}


The first job works properly, reading the source.txt file and writing it to
dest.txt.

The second job does not work:

$~/flink-1.12.2$ ./bin/flink run -Dexecution.runtime-mode=BATCH -c
org.apache.flink.ReadTables flink-s3-1.0-SNAPSHOT.jar;

Job has been submitted with JobID c690faed0051d1501d5b9747b56f
Program execution finished
Job with JobID c690faed0051d1501d5b9747b56f has finished.
Job Runtime: 17358 ms

Job has been submitted with JobID ebe54017faa83af33923d50892283e11
++-+
|   date |   value |
++-+


 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Failed to fetch next result
Caused by: java.lang.RuntimeException: Failed to fetch next result
Caused by: java.io.IOException: Failed to fetch job execution result
Caused by: java.util.concurrent.ExecutionException:
org.apache.flink.client.program.ProgramInvocationException: Job failed
(JobID: ebe54017faa83af33923d50892283e11)
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job
failed (JobID: ebe54017faa83af33923d50892283e11)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
execution failed.
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by
NoRestartBackoffTimeStrategy
Caused by: java.lang.RuntimeException: One or more fetchers have
encountered exception
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received
unexpected exception while polling the records
Caused by: java.net.SocketTimeoutException: doesBucketExist on
scib-des-cm-fipoac-medusa: com.amazonaws.AmazonClientException: No AWS
Credentials provided by BasicAWSCredentialsProvider
EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider :
com.amazonaws.SdkClientException: Failed to connect to service endpoint:
Caused by: com.amazonaws.AmazonClientException: No AWS Credentials provided
by BasicAWSCredentialsProvider EnvironmentVariableCredentialsProvider
InstanceProfileCredentialsProvider : com.amazonaws.SdkClientException:
Failed to connect to service endpoint:
Caused by: com.amazonaws.SdkClientException: Failed to connect to service
endpoint:
Caused by: java.net.SocketTimeoutException: connect timed out

I have the access credentials configured in flink-conf.yaml file:

s3a.endpoint: http://s3.xxx
s3a.path-style: true
s3a.access-key: x
s3a.secret-key: x
s3a.entropy.key: _entropy_
s3a.entropy.length: 4
s3a.region: s3
s3a.bucket: x

I copied the flink-s3-fs-hadoop jar in the plugins folder but I had to add
it as a dependency (not provided) to the pom, otherwise a S3AFileSystem
'class not found' exception arises.

Thank you for your help,

Angelo.