Hi Adam,

1) Correct. Current KafkaIO.Read implementation is based on Beam 
“UnboundedSource” which requires to have fixed number of splits at DAG 
construction time.
2) Correct.

Dynamic topics and partitions discovering is a long story in Beam. Since you 
are interested in this, it would be worth to take a look on these discussions 
[1][2]. One of the way to have it in Beam is to use SplittableDoFn [3] instead 
of UnboundedSource API. As I mentioned before, there is ongoing work on this to 
make KafkaIO read with SDF [4] and that should allow in the future to discover 
new partitions/topics in runtime.

[1] https://issues.apache.org/jira/browse/BEAM-5786
[2] https://issues.apache.org/jira/browse/BEAM-727 
[3] https://beam.apache.org/blog/splittable-do-fn/
[4] https://issues.apache.org/jira/browse/BEAM-9977

> On 11 Aug 2020, at 15:01, Adam Bellemare <adam.bellem...@gmail.com> wrote:
> 
> Hello Alexey
> 
> Thank you for replying to my questions. A number of my colleagues have been 
> musing about the idea of dynamically changing the partition count of Apache 
> Kafka's input topics for Beam jobs during runtime (We intend to use the 
> Google Dataflow runner for our jobs). I have been hesitant to endorse such an 
> operation because my understanding of Beam at this point in time is that 
> dynamically scaling the topic partition count up will not be automatically 
> detected by the Beam job, such that these partitions will go unassigned until 
> the job is restarted. 
> 
> This, of course, ignores the impact to the state stores, particularly 
> data-locality issues. My understanding here (again) is that Beam stores keyed 
> state in alignment with the kafka partitions, and so changing the partition 
> count would affect the distribution of state significantly (which is my 
> primary reason to oppose this operation).
> 
> In sum, if you (or anyone else reading this email!) could refute or support 
> these statements I would be very grateful:
> 1) Beam doesn't support dynamic upscaling of Kafka partition counts. The job 
> needs to be restarted to pick new partitions up (which is in line with many 
> other stream processors, and not something I would consider a defect)
> 2) A job's state pertaining to a Kafka source (such as materializing a 
> stream) is divided along the Kafka partition boundaries.
> 
> Thanks!
> 
> On Mon, Aug 10, 2020 at 1:08 PM Alexey Romanenko <aromanenko....@gmail.com 
> <mailto:aromanenko....@gmail.com>> wrote:
> Hi Adam,
> 
> 1) Yes, correct. Though, there is ongoing work to do it in runtime and 
> support topics/partitions discovering. 
> 
> 2) Yes but in case of worker fails, its task (read from specific partition in 
> case of KafkaIO) will be assigned to different one. How? It depends on 
> underlying data processing engine.
> 
> 3) In general - yes, but some specific things, like storing the checkpoints 
> for unbounded sources, could be different in terms of implementation. Though, 
> Beam model should be applied in the same way for different runners, however, 
> the implementation can vary. This is actually why Beam runners exist - they 
> apply Beam model on different data processing engine and make it unified for 
> Beam users.
> 
> 4) Please, see 3)
> 
> I hope it will shed some light =) Please, let us know if you have more 
> questions.
> 
> Regards,
> Alexey
> 
>> On 6 Aug 2020, at 18:57, Adam Bellemare <adam.bellem...@gmail.com 
>> <mailto:adam.bellem...@gmail.com>> wrote:
>> 
>> Hi Folks
>> 
>> When processing events from Kafka, it seems that, from my reading, the 
>> distribution of partitions maps directly to the worker via the concept of 
>> 'splits' :
>> 
>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java#L54
>>  
>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java#L54>
>> 
>> From the code:
>> 
>> > The partitions are evenly distributed among the splits. The number of 
>> > splits returned is {@code 
>> > min(desiredNumSplits, totalNumPartitions)}, though better not to depend on 
>> > the exact count.
>> 
>> > <p>It is important to assign the partitions deterministically so that we 
>> > can support resuming a 
>> > split from last checkpoint. The Kafka partitions are sorted by {@code 
>> > <topic, partition>} and then
>> > assigned to splits in round-robin order.
>> 
>> I'm not intimately familiar with Beam's execution model, but my reading of 
>> this code suggests that:
>> 1) Beam allocates partitions to workers once, at creation time
>> 2) This implies that once started, the worker count cannot be changed as the 
>> partitions are not redistributed
>> 3) Any state is tied to the split, which is in turn tied to the worker. This 
>> means outside of, say, a global window 
>> <https://beam.apache.org/releases/javadoc/2.5.0/org/apache/beam/sdk/transforms/windowing/GlobalWindow.html>,
>>  materialized kafka state is "localized" to a worker.
>> 
>> Follow up Q:
>> 4) Is this independent of the runner? I am much more familiar with Apache 
>> Spark as a runner than say, Dataflow. 
>> 
>> If any could confirm or refute my 3 statements and 1 question, it would go a 
>> long way towards validating my understanding of Beam's current relationship 
>> to scaling and partitioned data locality with Kafka. 
>> 
>> Thanks
>> 
> 

Reply via email to