Hi Adam,

1) Yes, correct. Though, there is ongoing work to do it in runtime and support 
topics/partitions discovering. 

2) Yes but in case of worker fails, its task (read from specific partition in 
case of KafkaIO) will be assigned to different one. How? It depends on 
underlying data processing engine.

3) In general - yes, but some specific things, like storing the checkpoints for 
unbounded sources, could be different in terms of implementation. Though, Beam 
model should be applied in the same way for different runners, however, the 
implementation can vary. This is actually why Beam runners exist - they apply 
Beam model on different data processing engine and make it unified for Beam 
users.

4) Please, see 3)

I hope it will shed some light =) Please, let us know if you have more 
questions.

Regards,
Alexey

> On 6 Aug 2020, at 18:57, Adam Bellemare <adam.bellem...@gmail.com> wrote:
> 
> Hi Folks
> 
> When processing events from Kafka, it seems that, from my reading, the 
> distribution of partitions maps directly to the worker via the concept of 
> 'splits' :
> 
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java#L54
>  
> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java#L54>
> 
> From the code:
> 
> > The partitions are evenly distributed among the splits. The number of 
> > splits returned is {@code 
> > min(desiredNumSplits, totalNumPartitions)}, though better not to depend on 
> > the exact count.
> 
> > <p>It is important to assign the partitions deterministically so that we 
> > can support resuming a 
> > split from last checkpoint. The Kafka partitions are sorted by {@code 
> > <topic, partition>} and then
> > assigned to splits in round-robin order.
> 
> I'm not intimately familiar with Beam's execution model, but my reading of 
> this code suggests that:
> 1) Beam allocates partitions to workers once, at creation time
> 2) This implies that once started, the worker count cannot be changed as the 
> partitions are not redistributed
> 3) Any state is tied to the split, which is in turn tied to the worker. This 
> means outside of, say, a global window 
> <https://beam.apache.org/releases/javadoc/2.5.0/org/apache/beam/sdk/transforms/windowing/GlobalWindow.html>,
>  materialized kafka state is "localized" to a worker.
> 
> Follow up Q:
> 4) Is this independent of the runner? I am much more familiar with Apache 
> Spark as a runner than say, Dataflow. 
> 
> If any could confirm or refute my 3 statements and 1 question, it would go a 
> long way towards validating my understanding of Beam's current relationship 
> to scaling and partitioned data locality with Kafka. 
> 
> Thanks
> 

Reply via email to