Hi,

Thanks for the reply.

Yes my flink deployment is on K8s but I am not using Flink-k8s operator.
If i understood correctly, even with init-container the flink-conf.yaml
(inside the container) would finally contain unencrypted values for access
tokens. We don't want to persist such sensitive data unencrypted even
inside running containers in files or config maps, due to some security
constraints in my project.
Can you please let me know if I missed something with the suggested
solution.

Problem with overriding configuration programmatically:
When I removed the S3 properties from flink-conf.yaml and tried to provide
it programmatically from the job code, the connection to S3 failed.
I tried it with Application mode also on a standalone cluster but the
behavior is the same.

//My job main method (with default flink-conf.yaml):
Configuration conf = new Configuration();
conf.setString("s3a.access.key", <access-key>);
conf.setString("s3a.secret.key", <secret-key>);
conf.setString("s3a.aws.credentials.provider",
"org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider");
conf.setString("s3a.assumed.role.arn", <role arn>);
conf.setString("s3a.assumed.role.session.name", <session name>);
conf.setString("s3a.assumed.role.session.duration", <refresh time>);
conf.setString("s3a.assumed.role.sts.endpoint", <endpoint>);
conf.setString("s3a.assumed.role.sts.endpoint.region", <region>);
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(conf);

// flink job program using DataStream

env.execute("My job");

With this i got connection exception
Caused by: org.apache.flink.util.SerializedThrowable:
com.amazonaws.SdkClientException: Unable to load AWS credentials from
environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and
AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY))
at
com.amazonaws.auth.EnvironmentVariableCredentialsProvider.getCredentials(EnvironmentVariableCredentialsProvider.java:50)
~[?:?]
at
org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:177)
~[?:?]

When these values are given in flink-conf.yaml instead of job code, then
connection was successful. Please guide if i am doing something incorrect
w.r.t the job program.

Regards
Anuj

On Mon, May 8, 2023 at 12:36 PM Biao Geng <biaoge...@gmail.com> wrote:

> Hi Anuj,
>
> To my best knowledge, flink does not provide the encryption strategy
> support for now. If you are using flink on k8s, it is possible to achieve
> the encryption of parameters using the init container. You can check this
> SO
> <https://stackoverflow.com/questions/73579176/flink-kubernetes-deployment-how-to-provide-s3-credentials-from-hashicorp-vault>
>  for
> more detailed instructions.
> Besides, it should be possible to override Configuration object in your
> job code. Are you using Application mode to run the job?
>
> Best regards,
> Biao Geng
>
> Anuj Jain <anuj...@gmail.com> 于2023年5月8日周一 13:55写道:
>
>> Hi Community,
>> I am trying to create an amazon S3 filesystem distributor using flink and
>> for this I am using hadoop S3a connector with Flink filesystem sink.
>> My flink application would run in a non-AWS environment, on native
>> cluster; so I need to put my access keys in flink configuration.
>>
>> For connecting to S3 storage, i am configuring flink-conf.yaml with the
>> access credentials like
>> s3.access.key: <access key>
>> s3.secret.key: <secret key>
>> ... and some other parameters required for assuming AWS IAM role with s3a
>> AssumedRoleCredentialProvider
>>
>> Is there a way to encrypt these parameters rather than putting them
>> directly or is there any other way to supply them programmatically.
>>
>> I tried to set them programmatically using the Configuration object and
>> supplying them with
>> StreamExecutionEnvironment.getExecutionEnvironment(Configuration), in my
>> job (rather than from flink-conf.yaml) but then the S3 connection failed. I
>> think flink creates the connection pool at startup even before the job is
>> started.
>>
>> Thanks and Regards
>> Anuj Jain
>>
>

Reply via email to