Streaming pipelines on Dataflow have a limitation where all initial
splitting[1] of the UnboundedSource is done upfront at pipeline creation
time. This has been a 5+ year issue.

The recent development in this space has been to use Splittable DoFns that
wrap UnboundedSources[2] and this would move the initial splitting to
happen at pipeline execution time.

You could try it out but be warned that this was checked in a week ago and
there are no major users of it to my knowledge. The progress in this space
has been steady for the past 9 months and might complete in 3-4 months for
Dataflow. If you have the time to help out, that estimate could be brought
in.

1:
https://github.com/apache/beam/blob/873f689f107ee2a5d10edb7e8cb87d7996d40eea/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java#L62
2:
https://github.com/apache/beam/blob/eb59dde36f74dba58d7fbd992cb741a811ba9a58/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L436

On Fri, Mar 13, 2020 at 1:30 PM Julien Phalip <[email protected]> wrote:

> Hi,
>
> We're running a Beam job in Dataflow that reads inputs from KinesisIO.
> Right now it works if we pass the AWS keys to the job like so:
>
> p.apply(KinesisIO.read()
>     // ...
>    .withAWSClientsProvider("AWS_KEY",  "AWS_SECRET", STREAM_REGION)
> .apply( ... ));
>
> Instead of passing the AWS keys upfront, we would like to let the workers
> directly fetch the keys at run time from an external secret management
> service (e.g. Vault). That way, we could give Vault access permissions to
> the Dataflow cluster's service account, and avoid having to pass the keys
> upfront in clear.
>
> We've tried implementing a custom AWSClientsProvider
> <https://github.com/apache/beam/blob/master/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/AWSClientsProvider.java>
>  and
> then passing it like so:
>
> p.apply(KinesisIO.read()
>    //...
>    .withAWSClientsProvider(new CustomAWSClientsProvider()));
>
> However, from our testing it looks like the
> CustomAWSClientsProvider.getKinesisClient() method stills gets called at
> job submission time, before the job actually starts running.
>
> Is there a way to let worker nodes retrieve those keys themselves at run
> time to configure Kinesis access?
>
> Thanks,
>
> Julien
>

Reply via email to