Hi,

Thanks for the reply.

I don't think I can use IAM integration and avoid distributing keys to the
application because my Flink application is running outside AWS EC2, in
native K8s cluster nodes, from where I am distributing to S3 services
hosted on AWS.
If there is a procedure to still integrate with IAM, please point me to
some documentation.

Let me try if using environment variables passes our security checks.
I am also trying to see if hadoop credential providers can work with Flink
S3a file sink.
https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#Storing_secrets_with_Hadoop_Credential_Providers

Thanks!!

Regards
Anuj

On Tue, May 9, 2023 at 6:23 PM Gabor Somogyi <gabor.g.somo...@gmail.com>
wrote:

> hi Anuj,
>
> As Martijn said IAM is the preferred option but if you've no other way
> than access keys then environment variables is a better choice.
> Such case conf doesn't contain plain text keys.
>
> Just a side note, putting `s3a.access.key` into Flink conf file is not
> configuring Hadoop S3. The way how it goes is to set
> `flink.hadoop.s3a.access.key`.
> Practically all configs must be prefixed w/ `flink.hadoop.` to notify
> Flink that these must be forwarded to Hadoop.
>
> G
>
>
> On Tue, May 9, 2023 at 1:50 PM Martijn Visser <martijnvis...@apache.org>
> wrote:
>
>> Hi Anuj,
>>
>> You can't provide the values for S3 in job code, since the S3 filesystems
>> are loaded via plugins. Credentials must be stored in flink-conf.yaml. The
>> recommended method for setting up credentials is by using IAM, not via
>> Access Keys. See
>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/#configure-access-credentials
>> for more details.
>>
>> Best regards,
>>
>> Martijn
>>
>> On Tue, May 9, 2023 at 1:35 PM Anuj Jain <anuj...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Thanks for the reply.
>>>
>>> Yes my flink deployment is on K8s but I am not using Flink-k8s operator.
>>> If i understood correctly, even with init-container the flink-conf.yaml
>>> (inside the container) would finally contain unencrypted values for access
>>> tokens. We don't want to persist such sensitive data unencrypted even
>>> inside running containers in files or config maps, due to some security
>>> constraints in my project.
>>> Can you please let me know if I missed something with the suggested
>>> solution.
>>>
>>> Problem with overriding configuration programmatically:
>>> When I removed the S3 properties from flink-conf.yaml and tried to
>>> provide it programmatically from the job code, the connection to S3 failed.
>>> I tried it with Application mode also on a standalone cluster but the
>>> behavior is the same.
>>>
>>> //My job main method (with default flink-conf.yaml):
>>> Configuration conf = new Configuration();
>>> conf.setString("s3a.access.key", <access-key>);
>>> conf.setString("s3a.secret.key", <secret-key>);
>>> conf.setString("s3a.aws.credentials.provider",
>>> "org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider");
>>> conf.setString("s3a.assumed.role.arn", <role arn>);
>>> conf.setString("s3a.assumed.role.session.name", <session name>);
>>> conf.setString("s3a.assumed.role.session.duration", <refresh time>);
>>> conf.setString("s3a.assumed.role.sts.endpoint", <endpoint>);
>>> conf.setString("s3a.assumed.role.sts.endpoint.region", <region>);
>>> final StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment(conf);
>>>
>>> // flink job program using DataStream
>>>
>>> env.execute("My job");
>>>
>>> With this i got connection exception
>>> Caused by: org.apache.flink.util.SerializedThrowable:
>>> com.amazonaws.SdkClientException: Unable to load AWS credentials from
>>> environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and
>>> AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY))
>>> at
>>> com.amazonaws.auth.EnvironmentVariableCredentialsProvider.getCredentials(EnvironmentVariableCredentialsProvider.java:50)
>>> ~[?:?]
>>> at
>>> org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:177)
>>> ~[?:?]
>>>
>>> When these values are given in flink-conf.yaml instead of job code, then
>>> connection was successful. Please guide if i am doing something incorrect
>>> w.r.t the job program.
>>>
>>> Regards
>>> Anuj
>>>
>>> On Mon, May 8, 2023 at 12:36 PM Biao Geng <biaoge...@gmail.com> wrote:
>>>
>>>> Hi Anuj,
>>>>
>>>> To my best knowledge, flink does not provide the encryption strategy
>>>> support for now. If you are using flink on k8s, it is possible to achieve
>>>> the encryption of parameters using the init container. You can check this
>>>> SO
>>>> <https://stackoverflow.com/questions/73579176/flink-kubernetes-deployment-how-to-provide-s3-credentials-from-hashicorp-vault>
>>>>  for
>>>> more detailed instructions.
>>>> Besides, it should be possible to override Configuration object in your
>>>> job code. Are you using Application mode to run the job?
>>>>
>>>> Best regards,
>>>> Biao Geng
>>>>
>>>> Anuj Jain <anuj...@gmail.com> 于2023年5月8日周一 13:55写道:
>>>>
>>>>> Hi Community,
>>>>> I am trying to create an amazon S3 filesystem distributor using flink
>>>>> and for this I am using hadoop S3a connector with Flink filesystem sink.
>>>>> My flink application would run in a non-AWS environment, on native
>>>>> cluster; so I need to put my access keys in flink configuration.
>>>>>
>>>>> For connecting to S3 storage, i am configuring flink-conf.yaml
>>>>> with the access credentials like
>>>>> s3.access.key: <access key>
>>>>> s3.secret.key: <secret key>
>>>>> ... and some other parameters required for assuming AWS IAM role with
>>>>> s3a AssumedRoleCredentialProvider
>>>>>
>>>>> Is there a way to encrypt these parameters rather than putting them
>>>>> directly or is there any other way to supply them programmatically.
>>>>>
>>>>> I tried to set them programmatically using the Configuration object
>>>>> and supplying them with
>>>>> StreamExecutionEnvironment.getExecutionEnvironment(Configuration), in my
>>>>> job (rather than from flink-conf.yaml) but then the S3 connection failed. 
>>>>> I
>>>>> think flink creates the connection pool at startup even before the job is
>>>>> started.
>>>>>
>>>>> Thanks and Regards
>>>>> Anuj Jain
>>>>>
>>>>

Reply via email to