I have been working on getting unbounded SDFs working within Beam over
portability so if you are interested in writing an SDF KafkaIO
implementation, I would be interested.

On Mon, Feb 24, 2020 at 7:34 AM Alexey Romanenko <[email protected]>
wrote:

> Hi Maulik,
>
> For the moment, KafkaIO doesn’t support reading from dynamic topics
> (add/remove topics/partitions) without pipeline restart. In a few words,
> KafkaIO uses unbounded source API which requires fixed number of splits and
> it doesn’t deal well with watermarks for empty splits. One of the potential
> solution could be to rewrite KafkaIO Read using SDF but afaik nobody worked
> on this.
>
> There is an open Jira about that [1] with more details if you are
> interested.
>
> [1] https://issues.apache.org/jira/browse/BEAM-5786
>
> On 24 Feb 2020, at 11:27, Maulik Soneji <[email protected]> wrote:
>
> Hello All,
>
> I am currently using Beam version 2.19.0 to read data from Kafka using
> KafkaIO.
> Kafka version: 0.11.0.0
> My use case is to read all topics from the kafka cluster.
> Below is the code that reads data from kafka using KafkaIO.
>
> KafkaIO.<byte[], byte[]>read()
>         .withBootstrapServers(brokers)
>         .withTopic(topic)
>         .withKeyDeserializer(ByteArrayDeserializer.class)
>         .withValueDeserializer(ByteArrayDeserializer.class)
>         .updateConsumerProperties(props)
>         .commitOffsetsInFinalize();
>
> If I provide topic as a regex like topic-.*,the request fails with:
>
> Exception in thread "main"
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> org.apache.kafka.common.errors.InvalidTopicException: Topic topic-.*  is 
> invalid
>
> By looking at the code, I saw that there is a call to fetch partition
> information for the topics at KafkaUnboundSource here
> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java#L66>.
> Because we are fetching partitions for only the topic mentioned in the
> builder, it considers the regex as a topic and tries to fetch partition
> information for it even when it is not a topic but a regex. My requirement
> is that I should read from all topics in kafka cluster and if there are new
> tpoics which are added, they should be considered as well dynamically
> without any process restart. Can someone please share details about how I
> can read data from multiple topics by using a regex. Thanks in advance.
> Thanks and regards, Maulik
>
>
>

Reply via email to