Hi Anuj, You can't provide the values for S3 in job code, since the S3 filesystems are loaded via plugins. Credentials must be stored in flink-conf.yaml. The recommended method for setting up credentials is by using IAM, not via Access Keys. See https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/#configure-access-credentials for more details.
Best regards, Martijn On Tue, May 9, 2023 at 1:35 PM Anuj Jain <anuj...@gmail.com> wrote: > Hi, > > Thanks for the reply. > > Yes my flink deployment is on K8s but I am not using Flink-k8s operator. > If i understood correctly, even with init-container the flink-conf.yaml > (inside the container) would finally contain unencrypted values for access > tokens. We don't want to persist such sensitive data unencrypted even > inside running containers in files or config maps, due to some security > constraints in my project. > Can you please let me know if I missed something with the suggested > solution. > > Problem with overriding configuration programmatically: > When I removed the S3 properties from flink-conf.yaml and tried to provide > it programmatically from the job code, the connection to S3 failed. > I tried it with Application mode also on a standalone cluster but the > behavior is the same. > > //My job main method (with default flink-conf.yaml): > Configuration conf = new Configuration(); > conf.setString("s3a.access.key", <access-key>); > conf.setString("s3a.secret.key", <secret-key>); > conf.setString("s3a.aws.credentials.provider", > "org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider"); > conf.setString("s3a.assumed.role.arn", <role arn>); > conf.setString("s3a.assumed.role.session.name", <session name>); > conf.setString("s3a.assumed.role.session.duration", <refresh time>); > conf.setString("s3a.assumed.role.sts.endpoint", <endpoint>); > conf.setString("s3a.assumed.role.sts.endpoint.region", <region>); > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(conf); > > // flink job program using DataStream > > env.execute("My job"); > > With this i got connection exception > Caused by: org.apache.flink.util.SerializedThrowable: > com.amazonaws.SdkClientException: Unable to load AWS credentials from > environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and > AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY)) > at > com.amazonaws.auth.EnvironmentVariableCredentialsProvider.getCredentials(EnvironmentVariableCredentialsProvider.java:50) > ~[?:?] > at > org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:177) > ~[?:?] > > When these values are given in flink-conf.yaml instead of job code, then > connection was successful. Please guide if i am doing something incorrect > w.r.t the job program. > > Regards > Anuj > > On Mon, May 8, 2023 at 12:36 PM Biao Geng <biaoge...@gmail.com> wrote: > >> Hi Anuj, >> >> To my best knowledge, flink does not provide the encryption strategy >> support for now. If you are using flink on k8s, it is possible to achieve >> the encryption of parameters using the init container. You can check this >> SO >> <https://stackoverflow.com/questions/73579176/flink-kubernetes-deployment-how-to-provide-s3-credentials-from-hashicorp-vault> >> for >> more detailed instructions. >> Besides, it should be possible to override Configuration object in your >> job code. Are you using Application mode to run the job? >> >> Best regards, >> Biao Geng >> >> Anuj Jain <anuj...@gmail.com> 于2023年5月8日周一 13:55写道: >> >>> Hi Community, >>> I am trying to create an amazon S3 filesystem distributor using flink >>> and for this I am using hadoop S3a connector with Flink filesystem sink. >>> My flink application would run in a non-AWS environment, on native >>> cluster; so I need to put my access keys in flink configuration. >>> >>> For connecting to S3 storage, i am configuring flink-conf.yaml with the >>> access credentials like >>> s3.access.key: <access key> >>> s3.secret.key: <secret key> >>> ... and some other parameters required for assuming AWS IAM role with >>> s3a AssumedRoleCredentialProvider >>> >>> Is there a way to encrypt these parameters rather than putting them >>> directly or is there any other way to supply them programmatically. >>> >>> I tried to set them programmatically using the Configuration object and >>> supplying them with >>> StreamExecutionEnvironment.getExecutionEnvironment(Configuration), in my >>> job (rather than from flink-conf.yaml) but then the S3 connection failed. I >>> think flink creates the connection pool at startup even before the job is >>> started. >>> >>> Thanks and Regards >>> Anuj Jain >>> >>