Re: S3 + Parquet credentials issue
Hello, Robert. I've been changing manually the name of the buckets in the logs and other potentially sensitive data. The name of the buckets are ok, since changing the format from 'parquet' to 'raw' allows to retrieve the data. Sorry for the confusion. Does your env allow access to all AWS resources? Yes, I have full access to the aws objects. Interesting fact: I have checked that putting the access/secret keys as OS environment variables and instructing Flink to use EnvironmentVariableCredentialsProvider in the flink-conf.yaml works OK for both Parquet and Raw. Problem is that I won't be allowed to use environment vars in production environment. Thank you very much. On Wed, Jun 16, 2021 at 1:37 PM Robert Metzger wrote: > Thanks for the logs. > > The OK job seems to read from "s3a://test-bucket/", while the KO job reads > from "s3a://bucket-test/". Could it be that you are just trying to access > the wrong bucket? > > What I also found interesting from the KO Job TaskManager is this log > message: > > Caused by: java.net.NoRouteToHostException: No route to host (Host > unreachable) > at java.net.PlainSocketImpl.socketConnect(Native Method) ~[?:1.8.0_171] > at > java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) > ~[?:1.8.0_171] > at > java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) > ~[?:1.8.0_171] > at > java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) > ~[?:1.8.0_171] > at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) > ~[?:1.8.0_171] > at java.net.Socket.connect(Socket.java:589) ~[?:1.8.0_171] > at sun.net.NetworkClient.doConnect(NetworkClient.java:175) ~[?:1.8.0_171] > at sun.net.www.http.HttpClient.openServer(HttpClient.java:463) > ~[?:1.8.0_171] > at sun.net.www.http.HttpClient.openServer(HttpClient.java:558) > ~[?:1.8.0_171] > at sun.net.www.http.HttpClient.(HttpClient.java:242) ~[?:1.8.0_171] > at sun.net.www.http.HttpClient.New(HttpClient.java:339) ~[?:1.8.0_171] > at sun.net.www.http.HttpClient.New(HttpClient.java:357) ~[?:1.8.0_171] > at > sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1220) > ~[?:1.8.0_171] > at > sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1199) > ~[?:1.8.0_171] > at > sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1050) > ~[?:1.8.0_171] > at > sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:984) > ~[?:1.8.0_171] > at > com.amazonaws.internal.ConnectionUtils.connectToEndpoint(ConnectionUtils.java:52) > ~[blob_p-575afa7acc2fe3049b65534303a189df3afe9895-6c71352c89388f6a3754b9b72482e6d2:?] > at > com.amazonaws.internal.EC2ResourceFetcher.doReadResource(EC2ResourceFetcher.java:80) > ~[blob_p-575afa7acc2fe3049b65534303a189df3afe9895-6c71352c89388f6a3754b9b72482e6d2:?] > > Does your env allow access to all AWS resources? > > On Tue, Jun 15, 2021 at 7:12 PM Angelo G. > wrote: > >> Thank you Svend and Till for your help. >> >> Sorry for the the late response. >> >> I'll try to give more information about the issue: >> >> I've not worked exactly in the situation you described, although I've had >>> to configure S3 access from a Flink application recently and here are a >>> couple of things I learnt along the way: >>> * You should normally not need to include flink-s3-fs-hadoop nor >>> hadoop-mapreduce-client-core in your classpath but should rather make >>> flink-s3-fs-hadoop available to Flink by putting it into the plugins >>> folder. The motivation for that is that this jar is a fat jar containing a >>> lot of hadoop and aws classes, s.t. including it in your classpath quickly >>> leads to conflicts. The plugins folder is associated with a separate >>> classpath, with helps avoiding those conflicts. >>> >> *Following your advice I've leave these dependencies out from the pom. >> Thank you for the explanation.* >> >>> * Under the hood, Fink is using the hadoop-aws library to connect to s3 >>> => the documentation regarding how to configure it, and especially security >>> accesses, is available in [1] >>> >> *In our case, connection to S3 should be made via access/secret key >> pair. * >> >>> * Ideally, when running on AWS, your code should not be using >>> BasicAWSCredentialsProvider, but instead the application should assume >>> a role, which you associate with some IAM permission. If that's your case, >>> the specific documentation for that situation is in [2]. If you're running >>> some test locally on
Re: S3 + Parquet credentials issue
your classpath quickly >> leads to conflicts. The plugins folder is associated with a separate >> classpath, with helps avoiding those conflicts. >> >> * Under the hood, Fink is using the hadoop-aws library to connect to s3 >> => the documentation regarding how to configure it, and especially security >> accesses, is available in [1] >> >> * Ideally, when running on AWS, your code should not be using >> BasicAWSCredentialsProvider, but instead the application should assume a >> role, which you associate with some IAM permission. If that's your case, >> the specific documentation for that situation is in [2]. If you're running >> some test locally on your laptop, BasicAWSCredentialsProvider with some >> key id and secret pointing to a dev account may be appropriate. >> >> * As I understand it, any configuration entry in flink.yaml that starts >> with "fs.s3a" is forwarded by Flink to hadoop-aws (I found that info in >> [3]) => by reading documentation in [1] and [2] you might be able to figure >> out which parameters are relevant to your case, which you can then set with >> the mechanism just mentioned. For example, in my case, I simply add this to >> flink.yaml: >> >> fs.s3a.aws.credentials.provider: >> "com.amazonaws.auth.WebIdentityTokenCredentialsProvider" >> >> * You can debug the various operations that are attempted on S3 by >> setting this logger to DEBUG level: org.apache.hadoop.fs.s3a >> >> >> Good luck :) >> >> Svend >> >> >> >> [1] >> https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html >> [2] >> https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/assumed_roles.html >> [3] >> https://ververica.zendesk.com/hc/en-us/articles/360002452379-How-to-set-arbitrary-S3-configuration-options-Hadoop-S3A-Presto-S3-in-Flink- >> >> >> On Mon, 31 May 2021, at 3:52 PM, Angelo G. wrote: >> >> Hello, >> >> Trying to read a parquet file located in S3 leads to a AWS credentials >> exception. Switching to other format (raw, for example) works ok regarding >> to file access. >> >> This is a snippet of code to reproduce the issue: >> >> static void parquetS3Error() { >> >> EnvironmentSettings settings = >> EnvironmentSettings.*newInstance*().inBatchMode().useBlinkPlanner().build(); >> >> TableEnvironment t_env = TableEnvironment.*create*(settings); >> >> // parquet format gives error: >> // Caused by: java.net.SocketTimeoutException: doesBucketExist on >> bucket-prueba-medusa: com.amazonaws.AmazonClientException: >> // No AWS Credentials provided by BasicAWSCredentialsProvider >> EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider : >> // com.amazonaws.SdkClientException: Failed to connect to service >> endpoint: >> t_env.executeSql("CREATE TABLE backup ( `date` STRING, `value` INT) >> WITH ( 'connector' = 'filesystem', 'path' = 's3a://.../', 'format' = >> 'parquet')"); >> >> // other formats (i.e. raw) work properly: >> // Job has been submitted with JobID 6ecd31d322aba759f9b8b591e9f4fed5 >> //++ >> //|url | >> //++ >> //| [80, 65, 82, 49, 21, 0, 21,... | >> //| [0, 0, 0, 50, 48, 50, 49, 4... | >> t_env.executeSql("CREATE TABLE backup ( `url` BINARY) WITH ( >> 'connector' = 'filesystem', 'path' = 's3a://.../', 'format' = 'raw')"); >> >> Table t1 = t_env.from("backup"); >> >> t1.execute().print(); >> >> } >> >> Flink version is 1.12.2. >> >> Please find attached the pom with dependencies and version numbers. >> >> What would be a suitable workaround for this? >> >> Thank you very much. >> >> Angelo. >> >> >> >> >> *Attachments:* >> >>- pom.xml >> >> >> <>
S3 + Parquet credentials issue
Hello, Trying to read a parquet file located in S3 leads to a AWS credentials exception. Switching to other format (raw, for example) works ok regarding to file access. This is a snippet of code to reproduce the issue: static void parquetS3Error() { EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().useBlinkPlanner().build(); TableEnvironment t_env = TableEnvironment.create(settings); // parquet format gives error: // Caused by: java.net.SocketTimeoutException: doesBucketExist on bucket-prueba-medusa: com.amazonaws.AmazonClientException: // No AWS Credentials provided by BasicAWSCredentialsProvider EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider : // com.amazonaws.SdkClientException: Failed to connect to service endpoint: t_env.executeSql("CREATE TABLE backup ( `date` STRING, `value` INT) WITH ( 'connector' = 'filesystem', 'path' = 's3a://.../', 'format' = 'parquet')"); // other formats (i.e. raw) work properly: // Job has been submitted with JobID 6ecd31d322aba759f9b8b591e9f4fed5 //++ //|url | //++ //| [80, 65, 82, 49, 21, 0, 21,... | //| [0, 0, 0, 50, 48, 50, 49, 4... | t_env.executeSql("CREATE TABLE backup ( `url` BINARY) WITH ( 'connector' = 'filesystem', 'path' = 's3a://.../', 'format' = 'raw')"); Table t1 = t_env.from("backup"); t1.execute().print(); } Flink version is 1.12.2. Please find attached the pom with dependencies and version numbers. What would be a suitable workaround for this? Thank you very much. Angelo. http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> 4.0.0 org.apache.flink flink-s3 1.0-SNAPSHOT jar Flink Quickstart Job UTF-8 1.12.2 1.8 2.11 ${target.java.version} ${target.java.version} 2.12.1 apache.snapshots Apache Development Snapshot Repository https://repository.apache.org/content/repositories/snapshots/ false true org.apache.flink flink-java ${flink.version} provided org.apache.flink flink-streaming-java_${scala.binary.version} ${flink.version} provided org.apache.flink flink-clients_${scala.binary.version} ${flink.version} provided org.apache.flink flink-table-api-java-bridge_2.11 ${flink.version} provided org.apache.flink flink-table-planner-blink_2.11 ${flink.version} provided org.apache.flink flink-streaming-scala_2.11 ${flink.version} provided org.apache.flink flink-table-common ${flink.version} provided org.apache.flink flink-s3-fs-hadoop ${flink.version} org.apache.hadoop hadoop-mapreduce-client-core 3.1.0 org.apache.flink flink-parquet_2.11 ${flink.version} org.apache.logging.log4j log4j-slf4j-impl ${log4j.version} runtime org.apache.logging.log4j log4j-api ${log4j.version} runtime org.apache.logging.log4j log4j-core ${log4j.version} runtime org.apache.maven.plugins maven-compiler-plugin 3.1 ${target.java.version} ${target.java.version} org.apache.maven.plugins maven-shade-plugin 3.1.1 package shade org.apache.flink:force-shading com.google.code.findbugs:jsr305 org.slf4j:* org.apache.logging.log4j:* *:* META-INF/*.SF META-INF/*.DSA META-INF/*.RSA org.apache.flink.StreamingJob org.eclipse.m2e lifecycle-mapping 1.0.0 org.apache.maven.plugins maven-shade-plugin [3.1.1,) shade org.apache.maven.plugins maven-compiler-plugin [3.1,) testCompile compile
Re: Issue reading from S3
[3.1.1,) shade org.apache.maven.plugins maven-compiler-plugin [3.1,) testCompile compile Another interesting fact is when exporting the access and secret key as env variables and adding: fs.s3a.aws.credentials.provider: com.amazonaws.auth.EnvironmentVariableCredentialsProvider to flink-conf.yaml I'm able to read and decode the parquet file properly: Job has been submitted with JobID 15453b8be2bddcb49c1141a01013bf81 ++-+ | date | value | ++-+ | 2021-05-16 | 7 | | 2021-05-16 | 8 | | 2021-05-16 | 8 | | 2021-05-16 | 8 | | 2021-05-16 | 1 | | 2021-05-16 | 3 | | 2021-05-16 | 9 | | 2021-05-16 | 9 | | 2021-05-16 | 8 | | 2021-05-16 | 7 | | 2021-05-16 | 9 We won't be able to set env variables in production, so this is not a valid workaround for us. Please have a look to the POM so you can tell a dependency miss or misuse of some sort. Thank you very much. Angelo. On Thu, May 20, 2021 at 11:54 AM Yun Gao wrote: > Hi Angelo, > > I tried the fail case provied with a similar one: > > > > > > > > > > > > > > > *StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);EnvironmentSettings > settings = > EnvironmentSettings.newInstance().inBatchMode().useBlinkPlanner().build();TableEnvironment > t_env = > TableEnvironment.create(settings);t_env.getConfig().getConfiguration().setString("parallelism.default", > "1");t_env.executeSql("CREATE TABLE example ( `url` STRING) WITH ( > 'connector' = 'filesystem', 'path' = 's3a://whatnamedoyouwant/links', > 'format' = 'raw')");Table t1 = t_env.from("example");t1.execute().print();* > > > However, it seems the job could be executed successfully. > > I further tried with the configuration, and found that the exception > is thrown if there is no s3a.access-key or s3a.secret-key > configured. Could you have a look at if the two configuration items > are effective ? > > Also I only configured the s3a.path-style: true, s3a.access-key and > s3a.secret-key, is it possible to remove the other configuration items > and have a try ? > > Best, > Yun > > > > --Original Mail -- > *Sender:*Angelo G. > *Send Date:*Wed May 19 00:24:42 2021 > *Recipients:*Flink User Mail List > *Subject:*Issue reading from S3 > >> Hi, >> >> I'm trying to read from and write to S3 with Flink 1.12.2. I'm submitting >> the job to local cluster (tar.gz distribution). I do not have a Hadoop >> installation running in the same machine. S3 (not Amazon) is running in a >> remote location and I have access to it via endpoint and access/secret keys. >> >> The issue is that I'm able to read and write from and to S3 when >> using StreamExecutionEnvironment.readTextFile and DataStrean.writeAsText >> methods but I can't read from S3 when using the table API. >> >> This is the application: >> >> package org.apache.flink;import org.apache.flink.core.fs.FileSystem;import >> org.apache.flink.streaming.api.datastream.DataStream;import >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import >> org.apache.flink.table.api.EnvironmentSettings;import >> org.apache.flink.table.api.Table;import >> org.apache.flink.table.api.TableEnvironment;public class ReadTables { >> public static void main(String[] args) throws Exception {// CLASSIC >> API (PLAIN TEXT)StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExec
Issue reading from S3
Hi, I'm trying to read from and write to S3 with Flink 1.12.2. I'm submitting the job to local cluster (tar.gz distribution). I do not have a Hadoop installation running in the same machine. S3 (not Amazon) is running in a remote location and I have access to it via endpoint and access/secret keys. The issue is that I'm able to read and write from and to S3 when using StreamExecutionEnvironment.readTextFile and DataStrean.writeAsText methods but I can't read from S3 when using the table API. This is the application: package org.apache.flink; import org.apache.flink.core.fs.FileSystem;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.TableEnvironment; public class ReadTables { public static void main(String[] args) throws Exception { // CLASSIC API (PLAIN TEXT) StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream ds = env.readTextFile("s3a://bucket/source.txt"); ds.writeAsText("s3a://bucket/dest.txt", FileSystem.WriteMode.OVERWRITE); env.execute(); // TABLE API EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().useBlinkPlanner().build(); TableEnvironment t_env = TableEnvironment.create(settings); t_env.getConfig().getConfiguration().setString("parallelism.default", "1"); t_env.executeSql("CREATE TABLE example ( `date` STRING, `value` INT) WITH ( 'connector' = 'filesystem', 'path' = 's3a://bucket/xxx/yyy/', 'format' = 'parquet')"); Table t1 = t_env.from("example"); t1.execute().print(); } } The first job works properly, reading the source.txt file and writing it to dest.txt. The second job does not work: $~/flink-1.12.2$ ./bin/flink run -Dexecution.runtime-mode=BATCH -c org.apache.flink.ReadTables flink-s3-1.0-SNAPSHOT.jar; Job has been submitted with JobID c690faed0051d1501d5b9747b56f Program execution finished Job with JobID c690faed0051d1501d5b9747b56f has finished. Job Runtime: 17358 ms Job has been submitted with JobID ebe54017faa83af33923d50892283e11 ++-+ | date | value | ++-+ The program finished with the following exception: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Failed to fetch next result Caused by: java.lang.RuntimeException: Failed to fetch next result Caused by: java.io.IOException: Failed to fetch job execution result Caused by: java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: ebe54017faa83af33923d50892283e11) Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: ebe54017faa83af33923d50892283e11) Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy Caused by: java.lang.RuntimeException: One or more fetchers have encountered exception Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records Caused by: java.net.SocketTimeoutException: doesBucketExist on scib-des-cm-fipoac-medusa: com.amazonaws.AmazonClientException: No AWS Credentials provided by BasicAWSCredentialsProvider EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider : com.amazonaws.SdkClientException: Failed to connect to service endpoint: Caused by: com.amazonaws.AmazonClientException: No AWS Credentials provided by BasicAWSCredentialsProvider EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider : com.amazonaws.SdkClientException: Failed to connect to service endpoint: Caused by: com.amazonaws.SdkClientException: Failed to connect to service endpoint: Caused by: java.net.SocketTimeoutException: connect timed out I have the access credentials configured in flink-conf.yaml file: s3a.endpoint: http://s3.xxx s3a.path-style: true s3a.access-key: x s3a.secret-key: x s3a.entropy.key: _entropy_ s3a.entropy.length: 4 s3a.region: s3 s3a.bucket: x I copied the flink-s3-fs-hadoop jar in the plugins folder but I had to add it as a dependency (not provided) to the pom, otherwise a S3AFileSystem 'class not found' exception arises. Thank you for your help, Angelo.