Thank you Alexey, I appreciate your responses. On Tue, Aug 11, 2020 at 10:57 AM Alexey Romanenko <aromanenko....@gmail.com> wrote:
> Hi Adam, > > 1) Correct. Current KafkaIO.Read implementation is based on Beam > “UnboundedSource” which requires to have fixed number of splits at DAG > construction time. > 2) Correct. > > Dynamic topics and partitions discovering is a long story in Beam. Since > you are interested in this, it would be worth to take a look on these > discussions [1][2]. One of the way to have it in Beam is to use > SplittableDoFn [3] instead of UnboundedSource API. As I mentioned before, > there is ongoing work on this to make KafkaIO read with SDF [4] and that > should allow in the future to discover new partitions/topics in runtime. > > [1] https://issues.apache.org/jira/browse/BEAM-5786 > [2] https://issues.apache.org/jira/browse/BEAM-727 > [3] https://beam.apache.org/blog/splittable-do-fn/ > [4] https://issues.apache.org/jira/browse/BEAM-9977 > > On 11 Aug 2020, at 15:01, Adam Bellemare <adam.bellem...@gmail.com> wrote: > > Hello Alexey > > Thank you for replying to my questions. A number of my colleagues have > been musing about the idea of dynamically changing the partition count of > Apache Kafka's input topics for Beam jobs during runtime (We intend to use > the Google Dataflow runner for our jobs). I have been hesitant to endorse > such an operation because my understanding of Beam at this point in time is > that dynamically scaling the topic partition count up will not be > automatically detected by the Beam job, such that these partitions will go > unassigned until the job is restarted. > > This, of course, ignores the impact to the state stores, particularly > data-locality issues. My understanding here (again) is that Beam stores > keyed state in alignment with the kafka partitions, and so changing the > partition count would affect the distribution of state significantly (which > is my primary reason to oppose this operation). > > In sum, if you (or anyone else reading this email!) could refute or > support these statements I would be very grateful: > 1) Beam doesn't support dynamic upscaling of Kafka partition counts. The > job needs to be restarted to pick new partitions up (which is in line with > many other stream processors, and not something I would consider a defect) > 2) A job's state pertaining to a Kafka source (such as materializing a > stream) is divided along the Kafka partition boundaries. > > Thanks! > > On Mon, Aug 10, 2020 at 1:08 PM Alexey Romanenko <aromanenko....@gmail.com> > wrote: > >> Hi Adam, >> >> 1) Yes, correct. Though, there is ongoing work to do it in runtime and >> support topics/partitions discovering. >> >> 2) Yes but in case of worker fails, its task (read from specific >> partition in case of KafkaIO) will be assigned to different one. How? It >> depends on underlying data processing engine. >> >> 3) In general - yes, but some specific things, like storing the >> checkpoints for unbounded sources, could be different in terms of >> implementation. Though, Beam model should be applied in the same way for >> different runners, however, the implementation can vary. This is actually >> why Beam runners exist - they apply Beam model on different data processing >> engine and make it unified for Beam users. >> >> 4) Please, see 3) >> >> I hope it will shed some light =) Please, let us know if you have more >> questions. >> >> Regards, >> Alexey >> >> On 6 Aug 2020, at 18:57, Adam Bellemare <adam.bellem...@gmail.com> wrote: >> >> Hi Folks >> >> When processing events from Kafka, it seems that, from my reading, the >> distribution of partitions maps directly to the worker via the concept of >> 'splits' : >> >> >> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java#L54 >> >> From the code: >> >> > The partitions are evenly distributed among the splits. The number of >> splits returned is {@code >> > min(desiredNumSplits, totalNumPartitions)}, though better not to depend >> on the exact count. >> >> > <p>It is important to assign the partitions deterministically so that >> we can support resuming a >> > split from last checkpoint. The Kafka partitions are sorted by {@code >> <topic, partition>} and then >> > assigned to splits in round-robin order. >> >> I'm not intimately familiar with Beam's execution model, but my reading >> of this code suggests that: >> 1) Beam allocates partitions to workers once, at creation time >> 2) This implies that once started, the worker count cannot be changed as >> the partitions are not redistributed >> 3) Any state is tied to the split, which is in turn tied to the worker. >> This means outside of, say, a global window >> <https://beam.apache.org/releases/javadoc/2.5.0/org/apache/beam/sdk/transforms/windowing/GlobalWindow.html>, >> materialized kafka state is "localized" to a worker. >> >> Follow up Q: >> 4) Is this independent of the runner? I am much more familiar with Apache >> Spark as a runner than say, Dataflow. >> >> If any could confirm or refute my 3 statements and 1 question, it would >> go a long way towards validating my understanding of Beam's current >> relationship to scaling and partitioned data locality with Kafka. >> >> Thanks >> >> >> >