Hi,

I have a problem with accessing my own S3 system from within Flink when
running on Kubernetes.

*TL;DR* I have my own S3 (Ceph), Locally my application works, when running
in K8s it fails with

Caused by: com.amazonaws.SdkClientException: Unable to load credentials
from service endpoint
Caused by: java.net.SocketException: Network is unreachable (connect failed)


I have my own Kubernetes cluster (1.17) on which I have install Ceph and
the S3 gateway that is included in there.
I have put a file on this 'S3' and in my Flink 1.10.0 application I do this:

StreamExecutionEnvironment senv =
StreamExecutionEnvironment.getExecutionEnvironment();

final Configuration conf = new Configuration();

conf.setString("presto.s3.endpoint",         "s3.example.nl");

conf.setString("presto.s3.access-key",       "myAccessKey");

conf.setString("presto.s3.secret-key",       "mySecretKey");

FileSystem.initialize(conf, null);

senv.setParallelism(2);

senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStream<String> rawInputStream = senv

    .readTextFile(path).name("Read input");

...


The s3.example.nl is the hostname of the ingress I have attached to the S3
endpoint. In my case it is accessible via both http and https (with a valid
LetsEncrypt certificate).

When I run this locally from within IntelliJ it works like a charm, reads
the data, does some stuff with it and then writes it to ElasticSearch.

I have created an additional layer to enable the fs-s3-presto plugin with
this Dockerfile.


FROM flink:1.10.0-scala_2.12
RUN mkdir /opt/flink/plugins/s3-fs-presto && cp
/opt/flink/opt/flink-s3-fs-presto* /opt/flink/plugins/s3-fs-presto


I run flink with this customized docker image like this


#!/bin/bash
./flink-1.10.0/bin/kubernetes-session.sh \
  -Dkubernetes.cluster-id=flink1100 \
  -Dtaskmanager.memory.process.size=8192m \
  -Dkubernetes.taskmanager.cpu=2 \
  -Dtaskmanager.numberOfTaskSlots=8 \
  -Dresourcemanager.taskmanager-timeout=3600000 \
  -Dkubernetes.container.image=docker.example.nl/flink:1.10.0-2.12-s3-presto


I then submit this into Kubernetes with this command

flink run -e kubernetes-session -Dkubernetes.cluster-id=flink1100
target/flink-table-esloader-0.1-SNAPSHOT.jar


The job starts and after about 40 seconds the job fails with this exception:

*Caused by: com.amazonaws.SdkClientException: Unable to load credentials
from service endpoint*
at
com.amazonaws.auth.EC2CredentialsFetcher.handleError(EC2CredentialsFetcher.java:183)
at
com.amazonaws.auth.EC2CredentialsFetcher.fetchCredentials(EC2CredentialsFetcher.java:162)
at
com.amazonaws.auth.EC2CredentialsFetcher.getCredentials(EC2CredentialsFetcher.java:82)
at
com.amazonaws.auth.InstanceProfileCredentialsProvider.getCredentials(InstanceProfileCredentialsProvider.java:151)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.getCredentialsFromContext(AmazonHttpClient.java:1164)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.runBeforeRequestHandlers(AmazonHttpClient.java:762)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:724)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
at
com.amazonaws.services.s3.AmazonS3Client.getBucketRegionViaHeadRequest(AmazonS3Client.java:5086)
at
com.amazonaws.services.s3.AmazonS3Client.fetchRegionFromCache(AmazonS3Client.java:5060)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4309)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
at
com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1264)
at
com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1239)
at
com.facebook.presto.hive.s3.PrestoS3FileSystem.lambda$getS3ObjectMetadata$2(PrestoS3FileSystem.java:563)
at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138)
at
com.facebook.presto.hive.s3.PrestoS3FileSystem.getS3ObjectMetadata(PrestoS3FileSystem.java:560)
at
com.facebook.presto.hive.s3.PrestoS3FileSystem.getFileStatus(PrestoS3FileSystem.java:311)
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1734)
at
org.apache.flink.fs.s3presto.common.HadoopFileSystem.exists(HadoopFileSystem.java:152)
at
org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.exists(PluginFileSystemFactory.java:143)
at
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.run(ContinuousFileMonitoringFunction.java:197)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
*Caused by: java.net.SocketException: Network is unreachable (connect
failed)*
at java.net.PlainSocketImpl.socketConnect(Native Method)
at
java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at
java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at
java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:607)
at sun.net.NetworkClient.doConnect(NetworkClient.java:175)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:463)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:558)
at sun.net.www.http.HttpClient.<init>(HttpClient.java:242)
at sun.net.www.http.HttpClient.New(HttpClient.java:339)
at sun.net.www.http.HttpClient.New(HttpClient.java:357)
at
sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1226)
at
sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1205)
at
sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1056)
at
sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:990)
at
com.amazonaws.internal.ConnectionUtils.connectToEndpoint(ConnectionUtils.java:54)
at
com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:108)
at
com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:79)
at
com.amazonaws.auth.InstanceProfileCredentialsProvider$InstanceMetadataCredentialsEndpointProvider.getCredentialsEndpoint(InstanceProfileCredentialsProvider.java:174)
at
com.amazonaws.auth.EC2CredentialsFetcher.fetchCredentials(EC2CredentialsFetcher.java:122)
... 28 more


I have tried this with

conf.setString("presto.s3.endpoint",         "s3.example.nl");

and also by using the ClusterIP and the LoadBalancer IP and I get the same
error in all cases.

I have verified by logging in into the task manager pod that all of these
endpoints show a sensible result when simply doing a curl from the
commandline.

I have the s3cmd installed locally on my laptop.
My ~/.s3cfg looks like this and I can fully access this S3 setup.


[default]
access_key = myAccessKey
secret_key = mySecretKey
host_base = s3.example.nl


*I'm stuck, please help:*

   - What is causing the differences in behaviour between local and in k8s?
   It works locally but not in the cluster.
   - How do I figure out what network it is trying to reach in k8s?


Thanks.
-- 
Best regards / Met vriendelijke groeten,

Niels Basjes

Reply via email to