hi Anuj, As Martijn said IAM is the preferred option but if you've no other way than access keys then environment variables is a better choice. Such case conf doesn't contain plain text keys.
Just a side note, putting `s3a.access.key` into Flink conf file is not configuring Hadoop S3. The way how it goes is to set `flink.hadoop.s3a.access.key`. Practically all configs must be prefixed w/ `flink.hadoop.` to notify Flink that these must be forwarded to Hadoop. G On Tue, May 9, 2023 at 1:50 PM Martijn Visser <martijnvis...@apache.org> wrote: > Hi Anuj, > > You can't provide the values for S3 in job code, since the S3 filesystems > are loaded via plugins. Credentials must be stored in flink-conf.yaml. The > recommended method for setting up credentials is by using IAM, not via > Access Keys. See > https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/#configure-access-credentials > for more details. > > Best regards, > > Martijn > > On Tue, May 9, 2023 at 1:35 PM Anuj Jain <anuj...@gmail.com> wrote: > >> Hi, >> >> Thanks for the reply. >> >> Yes my flink deployment is on K8s but I am not using Flink-k8s operator. >> If i understood correctly, even with init-container the flink-conf.yaml >> (inside the container) would finally contain unencrypted values for access >> tokens. We don't want to persist such sensitive data unencrypted even >> inside running containers in files or config maps, due to some security >> constraints in my project. >> Can you please let me know if I missed something with the suggested >> solution. >> >> Problem with overriding configuration programmatically: >> When I removed the S3 properties from flink-conf.yaml and tried to >> provide it programmatically from the job code, the connection to S3 failed. >> I tried it with Application mode also on a standalone cluster but the >> behavior is the same. >> >> //My job main method (with default flink-conf.yaml): >> Configuration conf = new Configuration(); >> conf.setString("s3a.access.key", <access-key>); >> conf.setString("s3a.secret.key", <secret-key>); >> conf.setString("s3a.aws.credentials.provider", >> "org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider"); >> conf.setString("s3a.assumed.role.arn", <role arn>); >> conf.setString("s3a.assumed.role.session.name", <session name>); >> conf.setString("s3a.assumed.role.session.duration", <refresh time>); >> conf.setString("s3a.assumed.role.sts.endpoint", <endpoint>); >> conf.setString("s3a.assumed.role.sts.endpoint.region", <region>); >> final StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(conf); >> >> // flink job program using DataStream >> >> env.execute("My job"); >> >> With this i got connection exception >> Caused by: org.apache.flink.util.SerializedThrowable: >> com.amazonaws.SdkClientException: Unable to load AWS credentials from >> environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and >> AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY)) >> at >> com.amazonaws.auth.EnvironmentVariableCredentialsProvider.getCredentials(EnvironmentVariableCredentialsProvider.java:50) >> ~[?:?] >> at >> org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:177) >> ~[?:?] >> >> When these values are given in flink-conf.yaml instead of job code, then >> connection was successful. Please guide if i am doing something incorrect >> w.r.t the job program. >> >> Regards >> Anuj >> >> On Mon, May 8, 2023 at 12:36 PM Biao Geng <biaoge...@gmail.com> wrote: >> >>> Hi Anuj, >>> >>> To my best knowledge, flink does not provide the encryption strategy >>> support for now. If you are using flink on k8s, it is possible to achieve >>> the encryption of parameters using the init container. You can check this >>> SO >>> <https://stackoverflow.com/questions/73579176/flink-kubernetes-deployment-how-to-provide-s3-credentials-from-hashicorp-vault> >>> for >>> more detailed instructions. >>> Besides, it should be possible to override Configuration object in your >>> job code. Are you using Application mode to run the job? >>> >>> Best regards, >>> Biao Geng >>> >>> Anuj Jain <anuj...@gmail.com> 于2023年5月8日周一 13:55写道: >>> >>>> Hi Community, >>>> I am trying to create an amazon S3 filesystem distributor using flink >>>> and for this I am using hadoop S3a connector with Flink filesystem sink. >>>> My flink application would run in a non-AWS environment, on native >>>> cluster; so I need to put my access keys in flink configuration. >>>> >>>> For connecting to S3 storage, i am configuring flink-conf.yaml with the >>>> access credentials like >>>> s3.access.key: <access key> >>>> s3.secret.key: <secret key> >>>> ... and some other parameters required for assuming AWS IAM role with >>>> s3a AssumedRoleCredentialProvider >>>> >>>> Is there a way to encrypt these parameters rather than putting them >>>> directly or is there any other way to supply them programmatically. >>>> >>>> I tried to set them programmatically using the Configuration object and >>>> supplying them with >>>> StreamExecutionEnvironment.getExecutionEnvironment(Configuration), in my >>>> job (rather than from flink-conf.yaml) but then the S3 connection failed. I >>>> think flink creates the connection pool at startup even before the job is >>>> started. >>>> >>>> Thanks and Regards >>>> Anuj Jain >>>> >>>