I have been working on getting unbounded SDFs working within Beam over portability so if you are interested in writing an SDF KafkaIO implementation, I would be interested.
On Mon, Feb 24, 2020 at 7:34 AM Alexey Romanenko <[email protected]> wrote: > Hi Maulik, > > For the moment, KafkaIO doesn’t support reading from dynamic topics > (add/remove topics/partitions) without pipeline restart. In a few words, > KafkaIO uses unbounded source API which requires fixed number of splits and > it doesn’t deal well with watermarks for empty splits. One of the potential > solution could be to rewrite KafkaIO Read using SDF but afaik nobody worked > on this. > > There is an open Jira about that [1] with more details if you are > interested. > > [1] https://issues.apache.org/jira/browse/BEAM-5786 > > On 24 Feb 2020, at 11:27, Maulik Soneji <[email protected]> wrote: > > Hello All, > > I am currently using Beam version 2.19.0 to read data from Kafka using > KafkaIO. > Kafka version: 0.11.0.0 > My use case is to read all topics from the kafka cluster. > Below is the code that reads data from kafka using KafkaIO. > > KafkaIO.<byte[], byte[]>read() > .withBootstrapServers(brokers) > .withTopic(topic) > .withKeyDeserializer(ByteArrayDeserializer.class) > .withValueDeserializer(ByteArrayDeserializer.class) > .updateConsumerProperties(props) > .commitOffsetsInFinalize(); > > If I provide topic as a regex like topic-.*,the request fails with: > > Exception in thread "main" > org.apache.beam.sdk.Pipeline$PipelineExecutionException: > org.apache.kafka.common.errors.InvalidTopicException: Topic topic-.* is > invalid > > By looking at the code, I saw that there is a call to fetch partition > information for the topics at KafkaUnboundSource here > <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java#L66>. > Because we are fetching partitions for only the topic mentioned in the > builder, it considers the regex as a topic and tries to fetch partition > information for it even when it is not a topic but a regex. My requirement > is that I should read from all topics in kafka cluster and if there are new > tpoics which are added, they should be considered as well dynamically > without any process restart. Can someone please share details about how I > can read data from multiple topics by using a regex. Thanks in advance. > Thanks and regards, Maulik > > >
