Hi Marco,

I tried to answer your questions and I also CC’ed Boyuan Zhang as initial 
author of SDF-based Read implementation for KafkaIO.

Also, I’d recommend to take a look on related PR’s discussion [1] which perhaps 
can give more details of some internal decisions.

Please, see my answers inline.

On 1 Sep 2021, at 18:13, Marco Robles <[email protected]> wrote:
> 
> I am taking KafkaIO as an example for the PulsarIO connector, during the 
> development of the new IO, I got some questions on KafkaIO implementation. I 
> was wondering if anyone has some experience with KafkaIO SDF implementation 
> that might help me.
> 
> - What was taken into consideration to implement the KafkaSourceDescriptor 
> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescriptor.java>
>  which is used as input for the SDF in Kafka?

IIRC, this class represents a Kafka topic partition that is used after in 
ReadFromKafkaDoFn to actually read data. So, we can have a 
PCollection<KafkaSourceDescriptor> to read them in parallel.

> - In the ReadFromKafkaDoFn 
> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java>
>  class, you have to implement a getSize in order to estimate how much work it 
> will take. What approach do you take in order to get an estimate with an 
> unbounded approach like kafka?

It should be quite tricky to do with unbounded sources, so we try to estimate 
the size by the number of records for current offset in topic partition and 
average record size, based on collected statistics (if any). 

> - For the SDF implementation, I suppose it will need a Source Interface 
> implementation 
> <https://beam.apache.org/documentation/io/developing-io-java/#implementing-the-source-interface>
>  and a Reader subclass 
> <https://beam.apache.org/documentation/io/developing-io-java/#implementing-the-reader-subclass>?
>  The documentation is kind of confusing in that part when you are working 
> with SDF, Should it be treated as Unbounded for the source/reading part?

Well, it’s actually opposite - there are two types for Read implementation in 
Beam:
- based on Source interface, that you mentioned before (deprecated one);
- based on Splittable DoFn [2], which is a way that one should use (especially 
for unbounded sources) for new IO connectors.



[1] https://github.com/apache/beam/pull/11749 
<https://github.com/apache/beam/pull/11749>
[2] https://beam.apache.org/documentation/io/developing-io-overview/ 
<https://beam.apache.org/documentation/io/developing-io-overview/>


—
Alexey

Reply via email to