Ok thanks! I should have seen this. Sorry.

--
Christophe

On Wed, Feb 7, 2018 at 10:27 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
wrote:

> Hi Christophe,
>
> Yes, you can achieve writing to different topics per-message using the
> `KeyedSerializationSchema` provided to the Kafka producer.
> The schema interface has a `getTargetTopic` method which allows you to
> override the default target topic for a given record.
> I agree that the method is somewhat odd to be part of the serialization
> schema, so I have also been thinking about moving that elsewhere (maybe as
> part of the partitioner).
>
> If you want to route a record to some topic depending on which topic it
> came from on the consumer side, you’ll have to wrap the source topic
> information within the records so that it is available to the producer.
> You can access that in the `KeyedDeserializationSchema#deserialize`
> method, which exposes information about which topic and partition each
> record came from.
>
> Cheers,
> Gordon
>
> On 7 February 2018 at 9:40:50 AM, Christophe Jolif (cjo...@gmail.com)
> wrote:
>
> Hi Gordon, or anyone else reading this,
>
> Still on this idea that I consume a Kafka topic pattern.
>
> I want to then to sink the result of the processing in a set of topics
> depending on from where the original message came from (i.e. if this comes
> from origin-topic-1 I will serialize the result in destination-topic-1, if
> from topic-2 to topic-2 etc...). However the KafkaProducer is working on a
> fixed topic. You can provide a partitioning function
> (FlinkKafkaPartitioner) but not a "topic" function that would allow to
> decide to witch topic sending the message a bit like a BucketingSink would
> decide the bucket or ElasticsearchSinkFunction allows you to choose the
> index.
>
> Am I missing something? The reason I'm asking is that some of the sink
> ctor are talking about "defaultTopicId" and some about "topicId" just like
> if in some case there was some ability to override the topic. Is there
> there a feature that allows me to do that?
>
> If not do you think this would be a worthwhile addition?
>
> Thanks again,
> --
> Christophe
>
> On Mon, Feb 5, 2018 at 9:52 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
> wrote:
>
>> Hi Christophe,
>>
>> You can set the parallelism of the FlinkKafkaConsumer independently of
>> the total number of Kafka partitions (across all subscribed streams,
>> including newly created streams that match a subscribed pattern).
>>
>> The consumer deterministically assigns each partition to a single
>> consumer subtask, in a round-robin fashion.
>> E.g. if the parallelism of your FlinkKafkaConsumer is 2, and there is 6
>> partitions, each consumer subtask will be assigned 3 partitions.
>>
>> As for topic pattern subscription, FlinkKafkaConsumers starting from
>> version 1.4.0 support this feature. You can take a look at [1] on how to do
>> that.
>>
>> Hope this helps!
>>
>> Cheers,
>> Gordon
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-release-
>> 1.4/dev/connectors/kafka.html#kafka-consumers-topic-and-
>> partition-discovery
>>
>> On 3 February 2018 at 6:53:47 PM, Christophe Jolif (cjo...@gmail.com)
>> wrote:
>>
>> Hi,
>>
>> If I'm sourcing from a KafkaConsumer do I have to explicitly set the
>> Flink job parallelism to the number of partions or will it adjust
>> automatically accordingly? In other word if I don't call setParallelism
>> will get 1 or the number of partitions?
>>
>> The reason I'm asking is that I'm listening to a topic pattern not a
>> single topic and the number of actual topic (and so partitions) behind the
>> pattern can change so it is not possible to know ahead ot time how many
>> partitions I will get.
>>
>> Thanks!
>> --
>> Christophe
>>
>>
>
>
> --
> Christophe
>
>


-- 
Christophe

Reply via email to