Yes, that was a point in that Jira's discussion. I have more concerns about SDF support in different runners.
> On 26 Feb 2020, at 17:41, Reuven Lax <[email protected]> wrote: > > You can of course read a dynamic set of topics today, but what you can't do > is have that set of topics change during execution of the pipeline. For that > you'd need SDF. > > On Wed, Feb 26, 2020 at 8:35 AM Alexey Romanenko <[email protected] > <mailto:[email protected]>> wrote: > Great! > > I’m interested in having a support of adding/removing new topics/partitions > in KafkaIO. If there is no other easier solution than SDF, then we should > consider to develop it. > > In the mean time, since, afaik, SDF is not yet supported by all runners > completely and current KafkaIO implementation, based on UnboundedSource, is > very mature and used by many users, we will need to create a new SDF-based > Read implementation aside, as a supplementary one. > > On 24 Feb 2020, at 18:15, Luke Cwik <[email protected] > <mailto:[email protected]>> wrote: >> >> I have been working on getting unbounded SDFs working within Beam over >> portability so if you are interested in writing an SDF KafkaIO >> implementation, I would be interested. >> >> On Mon, Feb 24, 2020 at 7:34 AM Alexey Romanenko <[email protected] >> <mailto:[email protected]>> wrote: >> Hi Maulik, >> >> For the moment, KafkaIO doesn’t support reading from dynamic topics >> (add/remove topics/partitions) without pipeline restart. In a few words, >> KafkaIO uses unbounded source API which requires fixed number of splits and >> it doesn’t deal well with watermarks for empty splits. One of the potential >> solution could be to rewrite KafkaIO Read using SDF but afaik nobody worked >> on this. >> >> There is an open Jira about that [1] with more details if you are >> interested. >> >> [1] https://issues.apache.org/jira/browse/BEAM-5786 >> <https://issues.apache.org/jira/browse/BEAM-5786> >> >>> On 24 Feb 2020, at 11:27, Maulik Soneji <[email protected] >>> <mailto:[email protected]>> wrote: >>> >>> Hello All, >>> >>> I am currently using Beam version 2.19.0 to read data from Kafka using >>> KafkaIO. >>> Kafka version: 0.11.0.0 >>> My use case is to read all topics from the kafka cluster. >>> Below is the code that reads data from kafka using KafkaIO. >>> KafkaIO.<byte[], byte[]>read() >>> .withBootstrapServers(brokers) >>> .withTopic(topic) >>> .withKeyDeserializer(ByteArrayDeserializer.class) >>> .withValueDeserializer(ByteArrayDeserializer.class) >>> .updateConsumerProperties(props) >>> .commitOffsetsInFinalize(); >>> If I provide topic as a regex like topic-.*,the request fails with: >>> Exception in thread "main" >>> org.apache.beam.sdk.Pipeline$PipelineExecutionException: >>> org.apache.kafka.common.errors.InvalidTopicException: Topic topic-.* is >>> invalid >>> By looking at the code, I saw that there is a call to fetch partition >>> information for the topics >>> at KafkaUnboundSource here >>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java#L66>. >>> Because we are fetching partitions for only the topic mentioned in the >>> builder, it considers the regex as a topic >>> and tries to fetch partition information for it even when it is not a topic >>> but a regex. >>> >>> My requirement is that I should read from all topics in kafka cluster and >>> if there are new tpoics which are added, >>> they should be considered as well dynamically without any process restart. >>> >>> Can someone please share details about how I can read data from multiple >>> topics by using a regex. >>> Thanks in advance. >>> >>> Thanks and regards, >>> Maulik >> >
