Hello,

Yes, that would definitely do the trick, with an extra mapper after keyBy to 
remove the tuple so that it stays seamless. It’s less hacky that what I was 
thinking of, thanks!
However, is there any plan in a future release to have rich partitioners ? That 
would avoid adding  overhead and “intermediate” technical info in the stream 
payload.
Best,
Arnaud

De : Robert Metzger <rmetz...@apache.org>
Envoyé : vendredi 29 mai 2020 13:10
À : LINZ, Arnaud <al...@bouyguestelecom.fr>
Cc : user <user@flink.apache.org>
Objet : Re: Best way to "emulate" a rich Partitioner with open() and close() 
methods ?

Hi Arnaud,

Maybe I don't fully understand the constraints, but what about
stream.map(new GetKuduPartitionMapper).keyBy(0).addSink(KuduSink());

The map(new GetKuduPartitionMapper) will be a regular RichMapFunction with 
open() and close() where you can handle the connection with Kudu's partitioning 
service.
The map will output a Tuple2<PartitionId, Data> (or something nicer :) ), then 
Flink shuffles your data correctly, and the sinks will process the data 
correctly partitioned.

I hope that this is what you were looking for!

Best,
Robert

On Thu, May 28, 2020 at 6:21 PM LINZ, Arnaud 
<al...@bouyguestelecom.fr<mailto:al...@bouyguestelecom.fr>> wrote:

Hello,



I would like to upgrade the performance of my Apache Kudu Sink by using the new 
“KuduPartitioner” of Kudu API to match Flink stream partitions with Kudu 
partitions to lower the network shuffling.

For that, I would like to implement something like

stream.partitionCustom(new KuduFlinkPartitioner<>(…)).addSink(new KuduSink(…)));

With KuduFLinkPartitioner a implementation of 
org.apache.flink.api.common.functions.Partitioner that internally make use of 
the KuduPartitioner client tool of Kudu’s API.



However for that KuduPartioner to work, it needs to open – and close at the end 
– a connection to the Kudu table – obviously something that can’t be done for 
each line. But there is no “AbstractRichPartitioner” with open() and close() 
method that I can use for that (the way I use it in the sink for instance).



What is the best way to implement this ?

I thought of ThreadLocals that would be initialized during the first call to 
int partition(K key, int numPartitions);  but I won’t be able to close() things 
nicely as I won’t be notified on job termination.



I thought of putting those static ThreadLocals inside a “Identity Mapper” that 
would be called just prior the partition with something like :

stream.map(richIdentiyConnectionManagerMapper).partitionCustom(new 
KuduFlinkPartitioner<>(…)).addSink(new KuduSink(…)));

with kudu connections initialized in the mapper open(), closed in the mapper 
close(), and used  in the partitioner partition().

However It looks like an ugly hack breaking every coding principle, but as long 
as the threads are reused between the mapper and the partitioner I think that 
it should work.



Is there a better way to do this ?



Best regards,

Arnaud





________________________________

L'intégrité de ce message n'étant pas assurée sur internet, la société 
expéditrice ne peut être tenue responsable de son contenu ni de ses pièces 
jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous 
n'êtes pas destinataire de ce message, merci de le détruire et d'avertir 
l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company 
that sent this message cannot therefore be held liable for its content nor 
attachments. Any unauthorized use or dissemination is prohibited. If you are 
not the intended recipient of this message, then please delete it and notify 
the sender.

Reply via email to