Hi Adam, 1) Correct. Current KafkaIO.Read implementation is based on Beam “UnboundedSource” which requires to have fixed number of splits at DAG construction time. 2) Correct.
Dynamic topics and partitions discovering is a long story in Beam. Since you are interested in this, it would be worth to take a look on these discussions [1][2]. One of the way to have it in Beam is to use SplittableDoFn [3] instead of UnboundedSource API. As I mentioned before, there is ongoing work on this to make KafkaIO read with SDF [4] and that should allow in the future to discover new partitions/topics in runtime. [1] https://issues.apache.org/jira/browse/BEAM-5786 [2] https://issues.apache.org/jira/browse/BEAM-727 [3] https://beam.apache.org/blog/splittable-do-fn/ [4] https://issues.apache.org/jira/browse/BEAM-9977 > On 11 Aug 2020, at 15:01, Adam Bellemare <adam.bellem...@gmail.com> wrote: > > Hello Alexey > > Thank you for replying to my questions. A number of my colleagues have been > musing about the idea of dynamically changing the partition count of Apache > Kafka's input topics for Beam jobs during runtime (We intend to use the > Google Dataflow runner for our jobs). I have been hesitant to endorse such an > operation because my understanding of Beam at this point in time is that > dynamically scaling the topic partition count up will not be automatically > detected by the Beam job, such that these partitions will go unassigned until > the job is restarted. > > This, of course, ignores the impact to the state stores, particularly > data-locality issues. My understanding here (again) is that Beam stores keyed > state in alignment with the kafka partitions, and so changing the partition > count would affect the distribution of state significantly (which is my > primary reason to oppose this operation). > > In sum, if you (or anyone else reading this email!) could refute or support > these statements I would be very grateful: > 1) Beam doesn't support dynamic upscaling of Kafka partition counts. The job > needs to be restarted to pick new partitions up (which is in line with many > other stream processors, and not something I would consider a defect) > 2) A job's state pertaining to a Kafka source (such as materializing a > stream) is divided along the Kafka partition boundaries. > > Thanks! > > On Mon, Aug 10, 2020 at 1:08 PM Alexey Romanenko <aromanenko....@gmail.com > <mailto:aromanenko....@gmail.com>> wrote: > Hi Adam, > > 1) Yes, correct. Though, there is ongoing work to do it in runtime and > support topics/partitions discovering. > > 2) Yes but in case of worker fails, its task (read from specific partition in > case of KafkaIO) will be assigned to different one. How? It depends on > underlying data processing engine. > > 3) In general - yes, but some specific things, like storing the checkpoints > for unbounded sources, could be different in terms of implementation. Though, > Beam model should be applied in the same way for different runners, however, > the implementation can vary. This is actually why Beam runners exist - they > apply Beam model on different data processing engine and make it unified for > Beam users. > > 4) Please, see 3) > > I hope it will shed some light =) Please, let us know if you have more > questions. > > Regards, > Alexey > >> On 6 Aug 2020, at 18:57, Adam Bellemare <adam.bellem...@gmail.com >> <mailto:adam.bellem...@gmail.com>> wrote: >> >> Hi Folks >> >> When processing events from Kafka, it seems that, from my reading, the >> distribution of partitions maps directly to the worker via the concept of >> 'splits' : >> >> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java#L54 >> >> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java#L54> >> >> From the code: >> >> > The partitions are evenly distributed among the splits. The number of >> > splits returned is {@code >> > min(desiredNumSplits, totalNumPartitions)}, though better not to depend on >> > the exact count. >> >> > <p>It is important to assign the partitions deterministically so that we >> > can support resuming a >> > split from last checkpoint. The Kafka partitions are sorted by {@code >> > <topic, partition>} and then >> > assigned to splits in round-robin order. >> >> I'm not intimately familiar with Beam's execution model, but my reading of >> this code suggests that: >> 1) Beam allocates partitions to workers once, at creation time >> 2) This implies that once started, the worker count cannot be changed as the >> partitions are not redistributed >> 3) Any state is tied to the split, which is in turn tied to the worker. This >> means outside of, say, a global window >> <https://beam.apache.org/releases/javadoc/2.5.0/org/apache/beam/sdk/transforms/windowing/GlobalWindow.html>, >> materialized kafka state is "localized" to a worker. >> >> Follow up Q: >> 4) Is this independent of the runner? I am much more familiar with Apache >> Spark as a runner than say, Dataflow. >> >> If any could confirm or refute my 3 statements and 1 question, it would go a >> long way towards validating my understanding of Beam's current relationship >> to scaling and partitioned data locality with Kafka. >> >> Thanks >> >