hi Anuj,

As Martijn said IAM is the preferred option but if you've no other way than
access keys then environment variables is a better choice.
Such case conf doesn't contain plain text keys.

Just a side note, putting `s3a.access.key` into Flink conf file is not
configuring Hadoop S3. The way how it goes is to set
`flink.hadoop.s3a.access.key`.
Practically all configs must be prefixed w/ `flink.hadoop.` to notify Flink
that these must be forwarded to Hadoop.

G


On Tue, May 9, 2023 at 1:50 PM Martijn Visser <martijnvis...@apache.org>
wrote:

> Hi Anuj,
>
> You can't provide the values for S3 in job code, since the S3 filesystems
> are loaded via plugins. Credentials must be stored in flink-conf.yaml. The
> recommended method for setting up credentials is by using IAM, not via
> Access Keys. See
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/#configure-access-credentials
> for more details.
>
> Best regards,
>
> Martijn
>
> On Tue, May 9, 2023 at 1:35 PM Anuj Jain <anuj...@gmail.com> wrote:
>
>> Hi,
>>
>> Thanks for the reply.
>>
>> Yes my flink deployment is on K8s but I am not using Flink-k8s operator.
>> If i understood correctly, even with init-container the flink-conf.yaml
>> (inside the container) would finally contain unencrypted values for access
>> tokens. We don't want to persist such sensitive data unencrypted even
>> inside running containers in files or config maps, due to some security
>> constraints in my project.
>> Can you please let me know if I missed something with the suggested
>> solution.
>>
>> Problem with overriding configuration programmatically:
>> When I removed the S3 properties from flink-conf.yaml and tried to
>> provide it programmatically from the job code, the connection to S3 failed.
>> I tried it with Application mode also on a standalone cluster but the
>> behavior is the same.
>>
>> //My job main method (with default flink-conf.yaml):
>> Configuration conf = new Configuration();
>> conf.setString("s3a.access.key", <access-key>);
>> conf.setString("s3a.secret.key", <secret-key>);
>> conf.setString("s3a.aws.credentials.provider",
>> "org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider");
>> conf.setString("s3a.assumed.role.arn", <role arn>);
>> conf.setString("s3a.assumed.role.session.name", <session name>);
>> conf.setString("s3a.assumed.role.session.duration", <refresh time>);
>> conf.setString("s3a.assumed.role.sts.endpoint", <endpoint>);
>> conf.setString("s3a.assumed.role.sts.endpoint.region", <region>);
>> final StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment(conf);
>>
>> // flink job program using DataStream
>>
>> env.execute("My job");
>>
>> With this i got connection exception
>> Caused by: org.apache.flink.util.SerializedThrowable:
>> com.amazonaws.SdkClientException: Unable to load AWS credentials from
>> environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and
>> AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY))
>> at
>> com.amazonaws.auth.EnvironmentVariableCredentialsProvider.getCredentials(EnvironmentVariableCredentialsProvider.java:50)
>> ~[?:?]
>> at
>> org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:177)
>> ~[?:?]
>>
>> When these values are given in flink-conf.yaml instead of job code, then
>> connection was successful. Please guide if i am doing something incorrect
>> w.r.t the job program.
>>
>> Regards
>> Anuj
>>
>> On Mon, May 8, 2023 at 12:36 PM Biao Geng <biaoge...@gmail.com> wrote:
>>
>>> Hi Anuj,
>>>
>>> To my best knowledge, flink does not provide the encryption strategy
>>> support for now. If you are using flink on k8s, it is possible to achieve
>>> the encryption of parameters using the init container. You can check this
>>> SO
>>> <https://stackoverflow.com/questions/73579176/flink-kubernetes-deployment-how-to-provide-s3-credentials-from-hashicorp-vault>
>>>  for
>>> more detailed instructions.
>>> Besides, it should be possible to override Configuration object in your
>>> job code. Are you using Application mode to run the job?
>>>
>>> Best regards,
>>> Biao Geng
>>>
>>> Anuj Jain <anuj...@gmail.com> 于2023年5月8日周一 13:55写道:
>>>
>>>> Hi Community,
>>>> I am trying to create an amazon S3 filesystem distributor using flink
>>>> and for this I am using hadoop S3a connector with Flink filesystem sink.
>>>> My flink application would run in a non-AWS environment, on native
>>>> cluster; so I need to put my access keys in flink configuration.
>>>>
>>>> For connecting to S3 storage, i am configuring flink-conf.yaml with the
>>>> access credentials like
>>>> s3.access.key: <access key>
>>>> s3.secret.key: <secret key>
>>>> ... and some other parameters required for assuming AWS IAM role with
>>>> s3a AssumedRoleCredentialProvider
>>>>
>>>> Is there a way to encrypt these parameters rather than putting them
>>>> directly or is there any other way to supply them programmatically.
>>>>
>>>> I tried to set them programmatically using the Configuration object and
>>>> supplying them with
>>>> StreamExecutionEnvironment.getExecutionEnvironment(Configuration), in my
>>>> job (rather than from flink-conf.yaml) but then the S3 connection failed. I
>>>> think flink creates the connection pool at startup even before the job is
>>>> started.
>>>>
>>>> Thanks and Regards
>>>> Anuj Jain
>>>>
>>>

Reply via email to