Great! I’m interested in having a support of adding/removing new topics/partitions in KafkaIO. If there is no other easier solution than SDF, then we should consider to develop it. In the mean time, since, afaik, SDF is not yet supported by all runners completely and current KafkaIO implementation, based on UnboundedSource, is very mature and used by many users, we will need to create a new SDF-based Read implementation aside, as a supplementary one.
On 24 Feb 2020, at 18:15, Luke Cwik <[email protected]> wrote: > > I have been working on getting unbounded SDFs working within Beam over > portability so if you are interested in writing an SDF KafkaIO > implementation, I would be interested. > > On Mon, Feb 24, 2020 at 7:34 AM Alexey Romanenko <[email protected] > <mailto:[email protected]>> wrote: > Hi Maulik, > > For the moment, KafkaIO doesn’t support reading from dynamic topics > (add/remove topics/partitions) without pipeline restart. In a few words, > KafkaIO uses unbounded source API which requires fixed number of splits and > it doesn’t deal well with watermarks for empty splits. One of the potential > solution could be to rewrite KafkaIO Read using SDF but afaik nobody worked > on this. > > There is an open Jira about that [1] with more details if you are interested. > > [1] https://issues.apache.org/jira/browse/BEAM-5786 > <https://issues.apache.org/jira/browse/BEAM-5786> > >> On 24 Feb 2020, at 11:27, Maulik Soneji <[email protected] >> <mailto:[email protected]>> wrote: >> >> Hello All, >> >> I am currently using Beam version 2.19.0 to read data from Kafka using >> KafkaIO. >> Kafka version: 0.11.0.0 >> My use case is to read all topics from the kafka cluster. >> Below is the code that reads data from kafka using KafkaIO. >> KafkaIO.<byte[], byte[]>read() >> .withBootstrapServers(brokers) >> .withTopic(topic) >> .withKeyDeserializer(ByteArrayDeserializer.class) >> .withValueDeserializer(ByteArrayDeserializer.class) >> .updateConsumerProperties(props) >> .commitOffsetsInFinalize(); >> If I provide topic as a regex like topic-.*,the request fails with: >> Exception in thread "main" >> org.apache.beam.sdk.Pipeline$PipelineExecutionException: >> org.apache.kafka.common.errors.InvalidTopicException: Topic topic-.* is >> invalid >> By looking at the code, I saw that there is a call to fetch partition >> information for the topics >> at KafkaUnboundSource here >> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java#L66>. >> Because we are fetching partitions for only the topic mentioned in the >> builder, it considers the regex as a topic >> and tries to fetch partition information for it even when it is not a topic >> but a regex. >> >> My requirement is that I should read from all topics in kafka cluster and if >> there are new tpoics which are added, >> they should be considered as well dynamically without any process restart. >> >> Can someone please share details about how I can read data from multiple >> topics by using a regex. >> Thanks in advance. >> >> Thanks and regards, >> Maulik >
