Hi Christophe,

Yes, you can achieve writing to different topics per-message using the 
`KeyedSerializationSchema` provided to the Kafka producer.
The schema interface has a `getTargetTopic` method which allows you to override 
the default target topic for a given record.
I agree that the method is somewhat odd to be part of the serialization schema, 
so I have also been thinking about moving that elsewhere (maybe as part of the 
partitioner).

If you want to route a record to some topic depending on which topic it came 
from on the consumer side, you’ll have to wrap the source topic information 
within the records so that it is available to the producer.
You can access that in the `KeyedDeserializationSchema#deserialize` method, 
which exposes information about which topic and partition each record came from.

Cheers,
Gordon

On 7 February 2018 at 9:40:50 AM, Christophe Jolif (cjo...@gmail.com) wrote:

Hi Gordon, or anyone else reading this,

Still on this idea that I consume a Kafka topic pattern. 

I want to then to sink the result of the processing in a set of topics 
depending on from where the original message came from (i.e. if this comes from 
origin-topic-1 I will serialize the result in destination-topic-1, if from 
topic-2 to topic-2 etc...). However the KafkaProducer is working on a fixed 
topic. You can provide a partitioning function (FlinkKafkaPartitioner) but not 
a "topic" function that would allow to decide to witch topic sending the 
message a bit like a BucketingSink would decide the bucket or 
ElasticsearchSinkFunction allows you to choose the index. 

Am I missing something? The reason I'm asking is that some of the sink ctor are 
talking about "defaultTopicId" and some about "topicId" just like if in some 
case there was some ability to override the topic. Is there there a feature 
that allows me to do that?

If not do you think this would be a worthwhile addition?

Thanks again,
--
Christophe

On Mon, Feb 5, 2018 at 9:52 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote:
Hi Christophe,

You can set the parallelism of the FlinkKafkaConsumer independently of the 
total number of Kafka partitions (across all subscribed streams, including 
newly created streams that match a subscribed pattern).

The consumer deterministically assigns each partition to a single consumer 
subtask, in a round-robin fashion.
E.g. if the parallelism of your FlinkKafkaConsumer is 2, and there is 6 
partitions, each consumer subtask will be assigned 3 partitions.

As for topic pattern subscription, FlinkKafkaConsumers starting from version 
1.4.0 support this feature. You can take a look at [1] on how to do that.

Hope this helps!

Cheers,
Gordon

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-topic-and-partition-discovery

On 3 February 2018 at 6:53:47 PM, Christophe Jolif (cjo...@gmail.com) wrote:

Hi,

If I'm sourcing from a KafkaConsumer do I have to explicitly set the Flink job 
parallelism to the number of partions or will it adjust automatically 
accordingly? In other word if I don't call setParallelism will get 1 or the 
number of partitions?

The reason I'm asking is that I'm listening to a topic pattern not a single 
topic and the number of actual topic (and so partitions) behind the pattern can 
change so it is not possible to know ahead ot time how many partitions I will 
get.

Thanks!
--
Christophe



--
Christophe

Reply via email to