Thank you Alexey, I appreciate your responses.

On Tue, Aug 11, 2020 at 10:57 AM Alexey Romanenko <aromanenko....@gmail.com>
wrote:

> Hi Adam,
>
> 1) Correct. Current KafkaIO.Read implementation is based on Beam
> “UnboundedSource” which requires to have fixed number of splits at DAG
> construction time.
> 2) Correct.
>
> Dynamic topics and partitions discovering is a long story in Beam. Since
> you are interested in this, it would be worth to take a look on these
> discussions [1][2]. One of the way to have it in Beam is to use
> SplittableDoFn [3] instead of UnboundedSource API. As I mentioned before,
> there is ongoing work on this to make KafkaIO read with SDF [4] and that
> should allow in the future to discover new partitions/topics in runtime.
>
> [1] https://issues.apache.org/jira/browse/BEAM-5786
> [2] https://issues.apache.org/jira/browse/BEAM-727
> [3] https://beam.apache.org/blog/splittable-do-fn/
> [4] https://issues.apache.org/jira/browse/BEAM-9977
>
> On 11 Aug 2020, at 15:01, Adam Bellemare <adam.bellem...@gmail.com> wrote:
>
> Hello Alexey
>
> Thank you for replying to my questions. A number of my colleagues have
> been musing about the idea of dynamically changing the partition count of
> Apache Kafka's input topics for Beam jobs during runtime (We intend to use
> the Google Dataflow runner for our jobs). I have been hesitant to endorse
> such an operation because my understanding of Beam at this point in time is
> that dynamically scaling the topic partition count up will not be
> automatically detected by the Beam job, such that these partitions will go
> unassigned until the job is restarted.
>
> This, of course, ignores the impact to the state stores, particularly
> data-locality issues. My understanding here (again) is that Beam stores
> keyed state in alignment with the kafka partitions, and so changing the
> partition count would affect the distribution of state significantly (which
> is my primary reason to oppose this operation).
>
> In sum, if you (or anyone else reading this email!) could refute or
> support these statements I would be very grateful:
> 1) Beam doesn't support dynamic upscaling of Kafka partition counts. The
> job needs to be restarted to pick new partitions up (which is in line with
> many other stream processors, and not something I would consider a defect)
> 2) A job's state pertaining to a Kafka source (such as materializing a
> stream) is divided along the Kafka partition boundaries.
>
> Thanks!
>
> On Mon, Aug 10, 2020 at 1:08 PM Alexey Romanenko <aromanenko....@gmail.com>
> wrote:
>
>> Hi Adam,
>>
>> 1) Yes, correct. Though, there is ongoing work to do it in runtime and
>> support topics/partitions discovering.
>>
>> 2) Yes but in case of worker fails, its task (read from specific
>> partition in case of KafkaIO) will be assigned to different one. How? It
>> depends on underlying data processing engine.
>>
>> 3) In general - yes, but some specific things, like storing the
>> checkpoints for unbounded sources, could be different in terms of
>> implementation. Though, Beam model should be applied in the same way for
>> different runners, however, the implementation can vary. This is actually
>> why Beam runners exist - they apply Beam model on different data processing
>> engine and make it unified for Beam users.
>>
>> 4) Please, see 3)
>>
>> I hope it will shed some light =) Please, let us know if you have more
>> questions.
>>
>> Regards,
>> Alexey
>>
>> On 6 Aug 2020, at 18:57, Adam Bellemare <adam.bellem...@gmail.com> wrote:
>>
>> Hi Folks
>>
>> When processing events from Kafka, it seems that, from my reading, the
>> distribution of partitions maps directly to the worker via the concept of
>> 'splits' :
>>
>>
>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java#L54
>>
>> From the code:
>>
>> > The partitions are evenly distributed among the splits. The number of
>> splits returned is {@code
>> > min(desiredNumSplits, totalNumPartitions)}, though better not to depend
>> on the exact count.
>>
>> > <p>It is important to assign the partitions deterministically so that
>> we can support resuming a
>> > split from last checkpoint. The Kafka partitions are sorted by {@code
>> <topic, partition>} and then
>> > assigned to splits in round-robin order.
>>
>> I'm not intimately familiar with Beam's execution model, but my reading
>> of this code suggests that:
>> 1) Beam allocates partitions to workers once, at creation time
>> 2) This implies that once started, the worker count cannot be changed as
>> the partitions are not redistributed
>> 3) Any state is tied to the split, which is in turn tied to the worker.
>> This means outside of, say, a global window
>> <https://beam.apache.org/releases/javadoc/2.5.0/org/apache/beam/sdk/transforms/windowing/GlobalWindow.html>,
>> materialized kafka state is "localized" to a worker.
>>
>> Follow up Q:
>> 4) Is this independent of the runner? I am much more familiar with Apache
>> Spark as a runner than say, Dataflow.
>>
>> If any could confirm or refute my 3 statements and 1 question, it would
>> go a long way towards validating my understanding of Beam's current
>> relationship to scaling and partitioned data locality with Kafka.
>>
>> Thanks
>>
>>
>>
>

Reply via email to