Yes, that was a point in that Jira's discussion. I have more concerns about SDF 
support in different runners.

> On 26 Feb 2020, at 17:41, Reuven Lax <[email protected]> wrote:
> 
> You can of course read a dynamic set of topics today, but what you can't do 
> is have that set of topics change during execution of the pipeline. For that 
> you'd need SDF.
> 
> On Wed, Feb 26, 2020 at 8:35 AM Alexey Romanenko <[email protected] 
> <mailto:[email protected]>> wrote:
> Great! 
> 
> I’m interested in having a support of adding/removing new topics/partitions 
> in KafkaIO. If there is no other easier solution than SDF, then we should 
> consider to develop it.
>  
> In the mean time, since, afaik, SDF is not yet supported by all runners 
> completely and current KafkaIO implementation, based on UnboundedSource, is 
> very mature and used by many users, we will need to create a new SDF-based 
> Read implementation aside, as a supplementary one. 
> 
> On 24 Feb 2020, at 18:15, Luke Cwik <[email protected] 
> <mailto:[email protected]>> wrote:
>> 
>> I have been working on getting unbounded SDFs working within Beam over 
>> portability so if you are interested in writing an SDF KafkaIO 
>> implementation, I would be interested.
>> 
>> On Mon, Feb 24, 2020 at 7:34 AM Alexey Romanenko <[email protected] 
>> <mailto:[email protected]>> wrote:
>> Hi Maulik,
>> 
>> For the moment, KafkaIO doesn’t support reading from dynamic topics 
>> (add/remove topics/partitions) without pipeline restart. In a few words, 
>> KafkaIO uses unbounded source API which requires fixed number of splits and 
>> it doesn’t deal well with watermarks for empty splits. One of the potential 
>> solution could be to rewrite KafkaIO Read using SDF but afaik nobody worked 
>> on this. 
>> 
>> There is an open Jira about that [1] with more details if you are 
>> interested. 
>> 
>> [1] https://issues.apache.org/jira/browse/BEAM-5786 
>> <https://issues.apache.org/jira/browse/BEAM-5786>
>> 
>>> On 24 Feb 2020, at 11:27, Maulik Soneji <[email protected] 
>>> <mailto:[email protected]>> wrote:
>>> 
>>> Hello All,
>>> 
>>> I am currently using Beam version 2.19.0 to read data from Kafka using 
>>> KafkaIO. 
>>> Kafka version: 0.11.0.0
>>> My use case is to read all topics from the kafka cluster.
>>> Below is the code that reads data from kafka using KafkaIO.
>>> KafkaIO.<byte[], byte[]>read()
>>>         .withBootstrapServers(brokers)
>>>         .withTopic(topic)
>>>         .withKeyDeserializer(ByteArrayDeserializer.class)
>>>         .withValueDeserializer(ByteArrayDeserializer.class)
>>>         .updateConsumerProperties(props)
>>>         .commitOffsetsInFinalize();
>>> If I provide topic as a regex like topic-.*,the request fails with:
>>> Exception in thread "main" 
>>> org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
>>> org.apache.kafka.common.errors.InvalidTopicException: Topic topic-.*  is 
>>> invalid
>>> By looking at the code, I saw that there is a call to fetch partition 
>>> information for the topics
>>> at KafkaUnboundSource here 
>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java#L66>.
>>> Because we are fetching partitions for only the topic mentioned in the 
>>> builder, it considers the regex as a topic 
>>> and tries to fetch partition information for it even when it is not a topic 
>>> but a regex.
>>> 
>>> My requirement is that I should read from all topics in kafka cluster and 
>>> if there are new tpoics which are added, 
>>> they should be considered as well dynamically without any process restart.
>>> 
>>> Can someone please share details about how I can read data from multiple 
>>> topics by using a regex.
>>> Thanks in advance.
>>> 
>>> Thanks and regards,
>>> Maulik
>> 
> 

Reply via email to