Ok thanks! I should have seen this. Sorry. -- Christophe
On Wed, Feb 7, 2018 at 10:27 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote: > Hi Christophe, > > Yes, you can achieve writing to different topics per-message using the > `KeyedSerializationSchema` provided to the Kafka producer. > The schema interface has a `getTargetTopic` method which allows you to > override the default target topic for a given record. > I agree that the method is somewhat odd to be part of the serialization > schema, so I have also been thinking about moving that elsewhere (maybe as > part of the partitioner). > > If you want to route a record to some topic depending on which topic it > came from on the consumer side, you’ll have to wrap the source topic > information within the records so that it is available to the producer. > You can access that in the `KeyedDeserializationSchema#deserialize` > method, which exposes information about which topic and partition each > record came from. > > Cheers, > Gordon > > On 7 February 2018 at 9:40:50 AM, Christophe Jolif (cjo...@gmail.com) > wrote: > > Hi Gordon, or anyone else reading this, > > Still on this idea that I consume a Kafka topic pattern. > > I want to then to sink the result of the processing in a set of topics > depending on from where the original message came from (i.e. if this comes > from origin-topic-1 I will serialize the result in destination-topic-1, if > from topic-2 to topic-2 etc...). However the KafkaProducer is working on a > fixed topic. You can provide a partitioning function > (FlinkKafkaPartitioner) but not a "topic" function that would allow to > decide to witch topic sending the message a bit like a BucketingSink would > decide the bucket or ElasticsearchSinkFunction allows you to choose the > index. > > Am I missing something? The reason I'm asking is that some of the sink > ctor are talking about "defaultTopicId" and some about "topicId" just like > if in some case there was some ability to override the topic. Is there > there a feature that allows me to do that? > > If not do you think this would be a worthwhile addition? > > Thanks again, > -- > Christophe > > On Mon, Feb 5, 2018 at 9:52 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> > wrote: > >> Hi Christophe, >> >> You can set the parallelism of the FlinkKafkaConsumer independently of >> the total number of Kafka partitions (across all subscribed streams, >> including newly created streams that match a subscribed pattern). >> >> The consumer deterministically assigns each partition to a single >> consumer subtask, in a round-robin fashion. >> E.g. if the parallelism of your FlinkKafkaConsumer is 2, and there is 6 >> partitions, each consumer subtask will be assigned 3 partitions. >> >> As for topic pattern subscription, FlinkKafkaConsumers starting from >> version 1.4.0 support this feature. You can take a look at [1] on how to do >> that. >> >> Hope this helps! >> >> Cheers, >> Gordon >> >> [1] https://ci.apache.org/projects/flink/flink-docs-release- >> 1.4/dev/connectors/kafka.html#kafka-consumers-topic-and- >> partition-discovery >> >> On 3 February 2018 at 6:53:47 PM, Christophe Jolif (cjo...@gmail.com) >> wrote: >> >> Hi, >> >> If I'm sourcing from a KafkaConsumer do I have to explicitly set the >> Flink job parallelism to the number of partions or will it adjust >> automatically accordingly? In other word if I don't call setParallelism >> will get 1 or the number of partitions? >> >> The reason I'm asking is that I'm listening to a topic pattern not a >> single topic and the number of actual topic (and so partitions) behind the >> pattern can change so it is not possible to know ahead ot time how many >> partitions I will get. >> >> Thanks! >> -- >> Christophe >> >> > > > -- > Christophe > > -- Christophe