Hi Marco, I tried to answer your questions and I also CC’ed Boyuan Zhang as initial author of SDF-based Read implementation for KafkaIO.
Also, I’d recommend to take a look on related PR’s discussion [1] which perhaps can give more details of some internal decisions. Please, see my answers inline. On 1 Sep 2021, at 18:13, Marco Robles <[email protected]> wrote: > > I am taking KafkaIO as an example for the PulsarIO connector, during the > development of the new IO, I got some questions on KafkaIO implementation. I > was wondering if anyone has some experience with KafkaIO SDF implementation > that might help me. > > - What was taken into consideration to implement the KafkaSourceDescriptor > <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescriptor.java> > which is used as input for the SDF in Kafka? IIRC, this class represents a Kafka topic partition that is used after in ReadFromKafkaDoFn to actually read data. So, we can have a PCollection<KafkaSourceDescriptor> to read them in parallel. > - In the ReadFromKafkaDoFn > <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java> > class, you have to implement a getSize in order to estimate how much work it > will take. What approach do you take in order to get an estimate with an > unbounded approach like kafka? It should be quite tricky to do with unbounded sources, so we try to estimate the size by the number of records for current offset in topic partition and average record size, based on collected statistics (if any). > - For the SDF implementation, I suppose it will need a Source Interface > implementation > <https://beam.apache.org/documentation/io/developing-io-java/#implementing-the-source-interface> > and a Reader subclass > <https://beam.apache.org/documentation/io/developing-io-java/#implementing-the-reader-subclass>? > The documentation is kind of confusing in that part when you are working > with SDF, Should it be treated as Unbounded for the source/reading part? Well, it’s actually opposite - there are two types for Read implementation in Beam: - based on Source interface, that you mentioned before (deprecated one); - based on Splittable DoFn [2], which is a way that one should use (especially for unbounded sources) for new IO connectors. [1] https://github.com/apache/beam/pull/11749 <https://github.com/apache/beam/pull/11749> [2] https://beam.apache.org/documentation/io/developing-io-overview/ <https://beam.apache.org/documentation/io/developing-io-overview/> — Alexey
