Hi, I have a problem with accessing my own S3 system from within Flink when running on Kubernetes.
*TL;DR* I have my own S3 (Ceph), Locally my application works, when running in K8s it fails with Caused by: com.amazonaws.SdkClientException: Unable to load credentials from service endpoint Caused by: java.net.SocketException: Network is unreachable (connect failed) I have my own Kubernetes cluster (1.17) on which I have install Ceph and the S3 gateway that is included in there. I have put a file on this 'S3' and in my Flink 1.10.0 application I do this: StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment(); final Configuration conf = new Configuration(); conf.setString("presto.s3.endpoint", "s3.example.nl"); conf.setString("presto.s3.access-key", "myAccessKey"); conf.setString("presto.s3.secret-key", "mySecretKey"); FileSystem.initialize(conf, null); senv.setParallelism(2); senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStream<String> rawInputStream = senv .readTextFile(path).name("Read input"); ... The s3.example.nl is the hostname of the ingress I have attached to the S3 endpoint. In my case it is accessible via both http and https (with a valid LetsEncrypt certificate). When I run this locally from within IntelliJ it works like a charm, reads the data, does some stuff with it and then writes it to ElasticSearch. I have created an additional layer to enable the fs-s3-presto plugin with this Dockerfile. FROM flink:1.10.0-scala_2.12 RUN mkdir /opt/flink/plugins/s3-fs-presto && cp /opt/flink/opt/flink-s3-fs-presto* /opt/flink/plugins/s3-fs-presto I run flink with this customized docker image like this #!/bin/bash ./flink-1.10.0/bin/kubernetes-session.sh \ -Dkubernetes.cluster-id=flink1100 \ -Dtaskmanager.memory.process.size=8192m \ -Dkubernetes.taskmanager.cpu=2 \ -Dtaskmanager.numberOfTaskSlots=8 \ -Dresourcemanager.taskmanager-timeout=3600000 \ -Dkubernetes.container.image=docker.example.nl/flink:1.10.0-2.12-s3-presto I then submit this into Kubernetes with this command flink run -e kubernetes-session -Dkubernetes.cluster-id=flink1100 target/flink-table-esloader-0.1-SNAPSHOT.jar The job starts and after about 40 seconds the job fails with this exception: *Caused by: com.amazonaws.SdkClientException: Unable to load credentials from service endpoint* at com.amazonaws.auth.EC2CredentialsFetcher.handleError(EC2CredentialsFetcher.java:183) at com.amazonaws.auth.EC2CredentialsFetcher.fetchCredentials(EC2CredentialsFetcher.java:162) at com.amazonaws.auth.EC2CredentialsFetcher.getCredentials(EC2CredentialsFetcher.java:82) at com.amazonaws.auth.InstanceProfileCredentialsProvider.getCredentials(InstanceProfileCredentialsProvider.java:151) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.getCredentialsFromContext(AmazonHttpClient.java:1164) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.runBeforeRequestHandlers(AmazonHttpClient.java:762) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:724) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667) at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649) at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513) at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325) at com.amazonaws.services.s3.AmazonS3Client.getBucketRegionViaHeadRequest(AmazonS3Client.java:5086) at com.amazonaws.services.s3.AmazonS3Client.fetchRegionFromCache(AmazonS3Client.java:5060) at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4309) at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272) at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1264) at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1239) at com.facebook.presto.hive.s3.PrestoS3FileSystem.lambda$getS3ObjectMetadata$2(PrestoS3FileSystem.java:563) at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) at com.facebook.presto.hive.s3.PrestoS3FileSystem.getS3ObjectMetadata(PrestoS3FileSystem.java:560) at com.facebook.presto.hive.s3.PrestoS3FileSystem.getFileStatus(PrestoS3FileSystem.java:311) at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1734) at org.apache.flink.fs.s3presto.common.HadoopFileSystem.exists(HadoopFileSystem.java:152) at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.exists(PluginFileSystemFactory.java:143) at org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.run(ContinuousFileMonitoringFunction.java:197) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196) *Caused by: java.net.SocketException: Network is unreachable (connect failed)* at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:607) at sun.net.NetworkClient.doConnect(NetworkClient.java:175) at sun.net.www.http.HttpClient.openServer(HttpClient.java:463) at sun.net.www.http.HttpClient.openServer(HttpClient.java:558) at sun.net.www.http.HttpClient.<init>(HttpClient.java:242) at sun.net.www.http.HttpClient.New(HttpClient.java:339) at sun.net.www.http.HttpClient.New(HttpClient.java:357) at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1226) at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1205) at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1056) at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:990) at com.amazonaws.internal.ConnectionUtils.connectToEndpoint(ConnectionUtils.java:54) at com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:108) at com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:79) at com.amazonaws.auth.InstanceProfileCredentialsProvider$InstanceMetadataCredentialsEndpointProvider.getCredentialsEndpoint(InstanceProfileCredentialsProvider.java:174) at com.amazonaws.auth.EC2CredentialsFetcher.fetchCredentials(EC2CredentialsFetcher.java:122) ... 28 more I have tried this with conf.setString("presto.s3.endpoint", "s3.example.nl"); and also by using the ClusterIP and the LoadBalancer IP and I get the same error in all cases. I have verified by logging in into the task manager pod that all of these endpoints show a sensible result when simply doing a curl from the commandline. I have the s3cmd installed locally on my laptop. My ~/.s3cfg looks like this and I can fully access this S3 setup. [default] access_key = myAccessKey secret_key = mySecretKey host_base = s3.example.nl *I'm stuck, please help:* - What is causing the differences in behaviour between local and in k8s? It works locally but not in the cluster. - How do I figure out what network it is trying to reach in k8s? Thanks. -- Best regards / Met vriendelijke groeten, Niels Basjes