[ https://issues.apache.org/jira/browse/BEAM-9759?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17095092#comment-17095092 ]
Reuven Lax commented on BEAM-9759: ---------------------------------- The best solution would be to create a new SplittableDoFn version of the Kinesis runner. This would have several advantages: # It could support dynamic changes (at run time) of the list of Kinesis topics. I believe this is a major reason that you currently need to update the pipeline so often, and this would remove that need. 2. The splitting could then happen at run time instead of graph-construction time, and could also be parallelized. > Pipeline creation with large number of shards/streams takes long time > --------------------------------------------------------------------- > > Key: BEAM-9759 > URL: https://issues.apache.org/jira/browse/BEAM-9759 > Project: Beam > Issue Type: Improvement > Components: io-java-kinesis, runner-dataflow > Affects Versions: 2.19.0 > Reporter: Sebastian Graca > Priority: Major > > We are processing multiple Kinesis streams using pipelines running on > {{DataflowRunner}}. The time needed to start such pipeline from a pipeline > definition (execution of {{org.apache.beam.sdk.Pipeline.run()}} method) takes > considerable amount of time. In our case: > * a pipeline that consumes data from 196 streams (237 shards in total) > starts in 7 minutes > * a pipeline that consumes data from 111 streams (261 shards in total) > starts in 4 minutes > I've been investigating this and found out that when {{Pipeline.run}} is > invoked, the whole pipeline graph is traversed and serialized so it can be > passed to the Dataflow backend. Here's part of the stacktrace that shows this > traversal: > {code:java} > at > com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:1252) > at > org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.lambda$getRecords$2(SimplifiedKinesisClient.java:137) > at > org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.wrapExceptions(SimplifiedKinesisClient.java:210) > at > org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.getRecords(SimplifiedKinesisClient.java:134) > at > org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.getRecords(SimplifiedKinesisClient.java:119) > at > org.apache.beam.sdk.io.kinesis.StartingPointShardsFinder.validateShards(StartingPointShardsFinder.java:195) > at > org.apache.beam.sdk.io.kinesis.StartingPointShardsFinder.findShardsAtStartingPoint(StartingPointShardsFinder.java:115) > at > org.apache.beam.sdk.io.kinesis.DynamicCheckpointGenerator.generate(DynamicCheckpointGenerator.java:59) > at org.apache.beam.sdk.io.kinesis.KinesisSource.split(KinesisSource.java:88) > at > org.apache.beam.runners.dataflow.internal.CustomSources.serializeToCloudSource(CustomSources.java:87) > at > org.apache.beam.runners.dataflow.ReadTranslator.translateReadHelper(ReadTranslator.java:51) > at > org.apache.beam.runners.dataflow.DataflowRunner$StreamingUnboundedRead$ReadWithIdsTranslator.translate(DataflowRunner.java:1630) > at > org.apache.beam.runners.dataflow.DataflowRunner$StreamingUnboundedRead$ReadWithIdsTranslator.translate(DataflowRunner.java:1627) > at > org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.visitPrimitiveTransform(DataflowPipelineTranslator.java:494) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:665) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317) > at > org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251) > at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:460) > at > org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.translate(DataflowPipelineTranslator.java:433) > at > org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translate(DataflowPipelineTranslator.java:192) > at > org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:795) > at > org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:186) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:315) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:301) > {code} > As you can see, during serialization, > {{org.apache.beam.sdk.io.kinesis.KinesisSource.split}} method is called. This > method finds all shards for the stream and also validates each shard by > reading from it. As this process is sequential it takes considerable time > that is dependent both on the number of streams (which has the greatest > impact) and also the number of shards. Even with a single stream that has > large number of shards, the pipeline startup time will be noticeable. > I wonder if it's possible to optimise this somehow? > One way could be to parallelise the whole process, both on the stream and > shard level. As this is split between Beam core and KinesisIO this can be > complex. > Another solution, that I could think of, is having the information about > valid stream shards ready before calling {{Pipeline.run}}. It there were a > way to create a {{KinesisIO.Read}} operation in such a way that it cached > shard information and enabled a client code to control the parallelisation of > this operation this would allow for a great reduction of the startup time. > I was able to make a PoC to verify how much parallelisation of this process > can improve startup time and just by implementing this on the stream level I > was able to reduce the startup time from 7 minutes to 2.5 minutes. > Unfortunately this was a really hacky solution and I don't consider it to be > a one that should be implemented - I hacked the AWS client used by KinesisIO > to cache all responses from server and called {{split}} method in parallel on > all sources before executing {{Pipeline.run}}. However this proves that > there's a huge room for improvement for pipelines that deal with multiple > streams and/or shards. -- This message was sent by Atlassian Jira (v8.3.4#803005)