Hi, Thanks for the reply.
I don't think I can use IAM integration and avoid distributing keys to the application because my Flink application is running outside AWS EC2, in native K8s cluster nodes, from where I am distributing to S3 services hosted on AWS. If there is a procedure to still integrate with IAM, please point me to some documentation. Let me try if using environment variables passes our security checks. I am also trying to see if hadoop credential providers can work with Flink S3a file sink. https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#Storing_secrets_with_Hadoop_Credential_Providers Thanks!! Regards Anuj On Tue, May 9, 2023 at 6:23 PM Gabor Somogyi <gabor.g.somo...@gmail.com> wrote: > hi Anuj, > > As Martijn said IAM is the preferred option but if you've no other way > than access keys then environment variables is a better choice. > Such case conf doesn't contain plain text keys. > > Just a side note, putting `s3a.access.key` into Flink conf file is not > configuring Hadoop S3. The way how it goes is to set > `flink.hadoop.s3a.access.key`. > Practically all configs must be prefixed w/ `flink.hadoop.` to notify > Flink that these must be forwarded to Hadoop. > > G > > > On Tue, May 9, 2023 at 1:50 PM Martijn Visser <martijnvis...@apache.org> > wrote: > >> Hi Anuj, >> >> You can't provide the values for S3 in job code, since the S3 filesystems >> are loaded via plugins. Credentials must be stored in flink-conf.yaml. The >> recommended method for setting up credentials is by using IAM, not via >> Access Keys. See >> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/#configure-access-credentials >> for more details. >> >> Best regards, >> >> Martijn >> >> On Tue, May 9, 2023 at 1:35 PM Anuj Jain <anuj...@gmail.com> wrote: >> >>> Hi, >>> >>> Thanks for the reply. >>> >>> Yes my flink deployment is on K8s but I am not using Flink-k8s operator. >>> If i understood correctly, even with init-container the flink-conf.yaml >>> (inside the container) would finally contain unencrypted values for access >>> tokens. We don't want to persist such sensitive data unencrypted even >>> inside running containers in files or config maps, due to some security >>> constraints in my project. >>> Can you please let me know if I missed something with the suggested >>> solution. >>> >>> Problem with overriding configuration programmatically: >>> When I removed the S3 properties from flink-conf.yaml and tried to >>> provide it programmatically from the job code, the connection to S3 failed. >>> I tried it with Application mode also on a standalone cluster but the >>> behavior is the same. >>> >>> //My job main method (with default flink-conf.yaml): >>> Configuration conf = new Configuration(); >>> conf.setString("s3a.access.key", <access-key>); >>> conf.setString("s3a.secret.key", <secret-key>); >>> conf.setString("s3a.aws.credentials.provider", >>> "org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider"); >>> conf.setString("s3a.assumed.role.arn", <role arn>); >>> conf.setString("s3a.assumed.role.session.name", <session name>); >>> conf.setString("s3a.assumed.role.session.duration", <refresh time>); >>> conf.setString("s3a.assumed.role.sts.endpoint", <endpoint>); >>> conf.setString("s3a.assumed.role.sts.endpoint.region", <region>); >>> final StreamExecutionEnvironment env = >>> StreamExecutionEnvironment.getExecutionEnvironment(conf); >>> >>> // flink job program using DataStream >>> >>> env.execute("My job"); >>> >>> With this i got connection exception >>> Caused by: org.apache.flink.util.SerializedThrowable: >>> com.amazonaws.SdkClientException: Unable to load AWS credentials from >>> environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and >>> AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY)) >>> at >>> com.amazonaws.auth.EnvironmentVariableCredentialsProvider.getCredentials(EnvironmentVariableCredentialsProvider.java:50) >>> ~[?:?] >>> at >>> org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:177) >>> ~[?:?] >>> >>> When these values are given in flink-conf.yaml instead of job code, then >>> connection was successful. Please guide if i am doing something incorrect >>> w.r.t the job program. >>> >>> Regards >>> Anuj >>> >>> On Mon, May 8, 2023 at 12:36 PM Biao Geng <biaoge...@gmail.com> wrote: >>> >>>> Hi Anuj, >>>> >>>> To my best knowledge, flink does not provide the encryption strategy >>>> support for now. If you are using flink on k8s, it is possible to achieve >>>> the encryption of parameters using the init container. You can check this >>>> SO >>>> <https://stackoverflow.com/questions/73579176/flink-kubernetes-deployment-how-to-provide-s3-credentials-from-hashicorp-vault> >>>> for >>>> more detailed instructions. >>>> Besides, it should be possible to override Configuration object in your >>>> job code. Are you using Application mode to run the job? >>>> >>>> Best regards, >>>> Biao Geng >>>> >>>> Anuj Jain <anuj...@gmail.com> 于2023年5月8日周一 13:55写道: >>>> >>>>> Hi Community, >>>>> I am trying to create an amazon S3 filesystem distributor using flink >>>>> and for this I am using hadoop S3a connector with Flink filesystem sink. >>>>> My flink application would run in a non-AWS environment, on native >>>>> cluster; so I need to put my access keys in flink configuration. >>>>> >>>>> For connecting to S3 storage, i am configuring flink-conf.yaml >>>>> with the access credentials like >>>>> s3.access.key: <access key> >>>>> s3.secret.key: <secret key> >>>>> ... and some other parameters required for assuming AWS IAM role with >>>>> s3a AssumedRoleCredentialProvider >>>>> >>>>> Is there a way to encrypt these parameters rather than putting them >>>>> directly or is there any other way to supply them programmatically. >>>>> >>>>> I tried to set them programmatically using the Configuration object >>>>> and supplying them with >>>>> StreamExecutionEnvironment.getExecutionEnvironment(Configuration), in my >>>>> job (rather than from flink-conf.yaml) but then the S3 connection failed. >>>>> I >>>>> think flink creates the connection pool at startup even before the job is >>>>> started. >>>>> >>>>> Thanks and Regards >>>>> Anuj Jain >>>>> >>>>