Streaming pipelines on Dataflow have a limitation where all initial splitting[1] of the UnboundedSource is done upfront at pipeline creation time. This has been a 5+ year issue.
The recent development in this space has been to use Splittable DoFns that wrap UnboundedSources[2] and this would move the initial splitting to happen at pipeline execution time. You could try it out but be warned that this was checked in a week ago and there are no major users of it to my knowledge. The progress in this space has been steady for the past 9 months and might complete in 3-4 months for Dataflow. If you have the time to help out, that estimate could be brought in. 1: https://github.com/apache/beam/blob/873f689f107ee2a5d10edb7e8cb87d7996d40eea/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java#L62 2: https://github.com/apache/beam/blob/eb59dde36f74dba58d7fbd992cb741a811ba9a58/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L436 On Fri, Mar 13, 2020 at 1:30 PM Julien Phalip <[email protected]> wrote: > Hi, > > We're running a Beam job in Dataflow that reads inputs from KinesisIO. > Right now it works if we pass the AWS keys to the job like so: > > p.apply(KinesisIO.read() > // ... > .withAWSClientsProvider("AWS_KEY", "AWS_SECRET", STREAM_REGION) > .apply( ... )); > > Instead of passing the AWS keys upfront, we would like to let the workers > directly fetch the keys at run time from an external secret management > service (e.g. Vault). That way, we could give Vault access permissions to > the Dataflow cluster's service account, and avoid having to pass the keys > upfront in clear. > > We've tried implementing a custom AWSClientsProvider > <https://github.com/apache/beam/blob/master/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/AWSClientsProvider.java> > and > then passing it like so: > > p.apply(KinesisIO.read() > //... > .withAWSClientsProvider(new CustomAWSClientsProvider())); > > However, from our testing it looks like the > CustomAWSClientsProvider.getKinesisClient() method stills gets called at > job submission time, before the job actually starts running. > > Is there a way to let worker nodes retrieve those keys themselves at run > time to configure Kinesis access? > > Thanks, > > Julien >
