Hello All,

I am currently using Beam version 2.19.0 to read data from Kafka using
KafkaIO.
Kafka version: 0.11.0.0
My use case is to read all topics from the kafka cluster.
Below is the code that reads data from kafka using KafkaIO.

KafkaIO.<byte[], byte[]>read()
        .withBootstrapServers(brokers)
        .withTopic(topic)
        .withKeyDeserializer(ByteArrayDeserializer.class)
        .withValueDeserializer(ByteArrayDeserializer.class)
        .updateConsumerProperties(props)
        .commitOffsetsInFinalize();

If I provide topic as a regex like topic-.*,the request fails with:

Exception in thread "main"
org.apache.beam.sdk.Pipeline$PipelineExecutionException:
org.apache.kafka.common.errors.InvalidTopicException: Topic topic-.*  is invalid

By looking at the code, I saw that there is a call to fetch partition
information for the topics at KafkaUnboundSource here
<https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java#L66>.
Because we are fetching partitions for only the topic mentioned in the
builder, it considers the regex as a topic and tries to fetch partition
information for it even when it is not a topic but a regex. My requirement
is that I should read from all topics in kafka cluster and if there are new
tpoics which are added, they should be considered as well dynamically
without any process restart. Can someone please share details about how I
can read data from multiple topics by using a regex. Thanks in advance.
Thanks and regards, Maulik

Reply via email to