Hi Angelo,

I tried the fail case provied with a similar one:

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inBatchMode().useBlinkPlanner().build();

TableEnvironment t_env = TableEnvironment.create(settings);

t_env.getConfig().getConfiguration().setString("parallelism.default", "1");

t_env.executeSql("CREATE TABLE example (  `url` STRING) WITH ( 'connector' = 
'filesystem', 'path' = 's3a://whatnamedoyouwant/links', 'format' = 'raw')");

Table t1 = t_env.from("example");

t1.execute().print();

However, it seems the job could be executed successfully. 

I further tried with the configuration, and found that the exception
is thrown if there is no s3a.access-key or s3a.secret-key
configured. Could you have a look at if the two configuration items
are effective ?

Also I only configured the s3a.path-style: true, s3a.access-key and
s3a.secret-key, is it possible to remove the other configuration items
and have a try ? 

Best,
Yun




 ------------------Original Mail ------------------
Sender:Angelo G. <angelo.guas...@gmail.com>
Send Date:Wed May 19 00:24:42 2021
Recipients:Flink User Mail List <user@flink.apache.org>
Subject:Issue reading from S3

Hi,

I'm trying to read from and write to S3 with Flink 1.12.2. I'm submitting the 
job to local cluster (tar.gz distribution). I do not have a Hadoop installation 
running in the same machine. S3 (not Amazon) is running in a remote location 
and I have access to it via endpoint and access/secret keys.

The issue is that I'm able to read and write from and to S3 when using 
StreamExecutionEnvironment.readTextFile and DataStrean.writeAsText methods but 
I can't read from S3 when using the table API.

This is the application:

package org.apache.flink;import org.apache.flink.core.fs.FileSystem;import 
org.apache.flink.streaming.api.datastream.DataStream;import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import 
org.apache.flink.table.api.EnvironmentSettings;import 
org.apache.flink.table.api.Table;import 
org.apache.flink.table.api.TableEnvironment;public class ReadTables {    public 
static void main(String[] args) throws Exception {        // CLASSIC API (PLAIN 
TEXT)        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();        DataStream<String> 
ds = env.readTextFile("s3a://bucket/source.txt");        
ds.writeAsText("s3a://bucket/dest.txt", FileSystem.WriteMode.OVERWRITE);        
env.execute();        // TABLE API        EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inBatchMode().useBlinkPlanner().build();      
  TableEnvironment t_env = TableEnvironment.create(settings);        
t_env.getConfig().getConfiguration().setString("parallelism.default", "1");     
         t_env.executeSql("CREATE TABLE example (  `date` STRING,  `value` INT) 
WITH ( 'connector' = 'filesystem', 'path' = 's3a://bucket/xxx/yyy/', 'format' = 
'parquet')");        Table t1 = t_env.from("example");        
t1.execute().print();    }}

The first job works properly, reading the source.txt file and writing it to 
dest.txt.

The second job does not work:

$~/flink-1.12.2$ ./bin/flink run -Dexecution.runtime-mode=BATCH -c 
org.apache.flink.ReadTables flink-s3-1.0-SNAPSHOT.jar;

Job has been submitted with JobID c690f2222aed0051d1501d5b9747b56f
Program execution finished
Job with JobID c690f2222aed0051d1501d5b9747b56f has finished.
Job Runtime: 17358 ms

Job has been submitted with JobID ebe54017faa83af33923d50892283e11
+--------------------------------+-------------+
|                           date |       value |
+--------------------------------+-------------+

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Failed to fetch next result
Caused by: java.lang.RuntimeException: Failed to fetch next result
Caused by: java.io.IOException: Failed to fetch job execution result
Caused by: java.util.concurrent.ExecutionException: 
org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 
ebe54017faa83af33923d50892283e11)
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job 
failed (JobID: ebe54017faa83af33923d50892283e11)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
Caused by: java.lang.RuntimeException: One or more fetchers have encountered 
exception
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received 
unexpected exception while polling the records
Caused by: java.net.SocketTimeoutException: doesBucketExist on 
scib-des-cm-fipoac-medusa: com.amazonaws.AmazonClientException: No AWS 
Credentials provided by BasicAWSCredentialsProvider 
EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider : 
com.amazonaws.SdkClientException: Failed to connect to service endpoint: 
Caused by: com.amazonaws.AmazonClientException: No AWS Credentials provided by 
BasicAWSCredentialsProvider EnvironmentVariableCredentialsProvider 
InstanceProfileCredentialsProvider : com.amazonaws.SdkClientException: Failed 
to connect to service endpoint: 
Caused by: com.amazonaws.SdkClientException: Failed to connect to service 
endpoint: 
Caused by: java.net.SocketTimeoutException: connect timed out

I have the access credentials configured in flink-conf.yaml file:

s3a.endpoint: http://s3.xxxxxxx
s3a.path-style: true
s3a.access-key: xxxxxxxxx
s3a.secret-key: xxxxxxxxx
s3a.entropy.key: _entropy_
s3a.entropy.length: 4
s3a.region: s3
s3a.bucket: xxxxxxxxx

I copied the flink-s3-fs-hadoop jar in the plugins folder but I had to add it 
as a dependency (not provided) to the pom, otherwise a S3AFileSystem 'class not 
found' exception arises.

Thank you for your help,

Angelo.


Reply via email to