Hi Anuj,

You can't provide the values for S3 in job code, since the S3 filesystems
are loaded via plugins. Credentials must be stored in flink-conf.yaml. The
recommended method for setting up credentials is by using IAM, not via
Access Keys. See
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/#configure-access-credentials
for more details.

Best regards,

Martijn

On Tue, May 9, 2023 at 1:35 PM Anuj Jain <anuj...@gmail.com> wrote:

> Hi,
>
> Thanks for the reply.
>
> Yes my flink deployment is on K8s but I am not using Flink-k8s operator.
> If i understood correctly, even with init-container the flink-conf.yaml
> (inside the container) would finally contain unencrypted values for access
> tokens. We don't want to persist such sensitive data unencrypted even
> inside running containers in files or config maps, due to some security
> constraints in my project.
> Can you please let me know if I missed something with the suggested
> solution.
>
> Problem with overriding configuration programmatically:
> When I removed the S3 properties from flink-conf.yaml and tried to provide
> it programmatically from the job code, the connection to S3 failed.
> I tried it with Application mode also on a standalone cluster but the
> behavior is the same.
>
> //My job main method (with default flink-conf.yaml):
> Configuration conf = new Configuration();
> conf.setString("s3a.access.key", <access-key>);
> conf.setString("s3a.secret.key", <secret-key>);
> conf.setString("s3a.aws.credentials.provider",
> "org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider");
> conf.setString("s3a.assumed.role.arn", <role arn>);
> conf.setString("s3a.assumed.role.session.name", <session name>);
> conf.setString("s3a.assumed.role.session.duration", <refresh time>);
> conf.setString("s3a.assumed.role.sts.endpoint", <endpoint>);
> conf.setString("s3a.assumed.role.sts.endpoint.region", <region>);
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment(conf);
>
> // flink job program using DataStream
>
> env.execute("My job");
>
> With this i got connection exception
> Caused by: org.apache.flink.util.SerializedThrowable:
> com.amazonaws.SdkClientException: Unable to load AWS credentials from
> environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and
> AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY))
> at
> com.amazonaws.auth.EnvironmentVariableCredentialsProvider.getCredentials(EnvironmentVariableCredentialsProvider.java:50)
> ~[?:?]
> at
> org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:177)
> ~[?:?]
>
> When these values are given in flink-conf.yaml instead of job code, then
> connection was successful. Please guide if i am doing something incorrect
> w.r.t the job program.
>
> Regards
> Anuj
>
> On Mon, May 8, 2023 at 12:36 PM Biao Geng <biaoge...@gmail.com> wrote:
>
>> Hi Anuj,
>>
>> To my best knowledge, flink does not provide the encryption strategy
>> support for now. If you are using flink on k8s, it is possible to achieve
>> the encryption of parameters using the init container. You can check this
>> SO
>> <https://stackoverflow.com/questions/73579176/flink-kubernetes-deployment-how-to-provide-s3-credentials-from-hashicorp-vault>
>>  for
>> more detailed instructions.
>> Besides, it should be possible to override Configuration object in your
>> job code. Are you using Application mode to run the job?
>>
>> Best regards,
>> Biao Geng
>>
>> Anuj Jain <anuj...@gmail.com> 于2023年5月8日周一 13:55写道:
>>
>>> Hi Community,
>>> I am trying to create an amazon S3 filesystem distributor using flink
>>> and for this I am using hadoop S3a connector with Flink filesystem sink.
>>> My flink application would run in a non-AWS environment, on native
>>> cluster; so I need to put my access keys in flink configuration.
>>>
>>> For connecting to S3 storage, i am configuring flink-conf.yaml with the
>>> access credentials like
>>> s3.access.key: <access key>
>>> s3.secret.key: <secret key>
>>> ... and some other parameters required for assuming AWS IAM role with
>>> s3a AssumedRoleCredentialProvider
>>>
>>> Is there a way to encrypt these parameters rather than putting them
>>> directly or is there any other way to supply them programmatically.
>>>
>>> I tried to set them programmatically using the Configuration object and
>>> supplying them with
>>> StreamExecutionEnvironment.getExecutionEnvironment(Configuration), in my
>>> job (rather than from flink-conf.yaml) but then the S3 connection failed. I
>>> think flink creates the connection pool at startup even before the job is
>>> started.
>>>
>>> Thanks and Regards
>>> Anuj Jain
>>>
>>

Reply via email to