Re: Migrate custom partitioner from Flink 1.7 to Flink 1.9
Hi Salva, I already answered on SO [1], but I'll replicate it here: With Flink 1.9, you cannot dynamically broadcast to all channels anymore. Your StreamPartitioner has to statically specify if it's a broadcast with isBroadcast. Then, selectChannel is never invoked. Do you have a specific use case, where you'd need to dynamically switch? Best, Arvid [1] https://stackoverflow.com/questions/59485064/migrating-custom-dynamic-partitioner-from-flink-1-7-to-flink-1-9 On Sat, Jan 4, 2020 at 7:00 AM Salva Alcántara wrote: > Thanks Chesnay! Just to be clear, this how my current code looks like: > > ``` > unionChannel = broadcastChannel.broadcast().union(singleChannel) > > result = new DataStream<>( > unionChannel.getExecutionEnvironment(), > new PartitionTransformation<>(unionChannel.getTransformation(), new > MyDynamicPartitioner()) > ) > ``` > > The problem when migrating to Flink 1.9 is that MyDynamicPartitioner cannot > handle broadcasted elements as explained in the question description. So, > based on your reply, I guess I could do something like this: > > ``` > resultSingleChannel = new DataStream<>( > singleChannel.getExecutionEnvironment(), > new PartitionTransformation<>(singleChannel.getTransformation(), new > MyDynamicPartitioner()) > ) > > result = broadcastChannel.broadcast().union(resultSingleChannel) > ``` > > I will give it a try and see if it works. > > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >
Re: Migrate custom partitioner from Flink 1.7 to Flink 1.9
Thanks Chesnay! Just to be clear, this how my current code looks like: ``` unionChannel = broadcastChannel.broadcast().union(singleChannel) result = new DataStream<>( unionChannel.getExecutionEnvironment(), new PartitionTransformation<>(unionChannel.getTransformation(), new MyDynamicPartitioner()) ) ``` The problem when migrating to Flink 1.9 is that MyDynamicPartitioner cannot handle broadcasted elements as explained in the question description. So, based on your reply, I guess I could do something like this: ``` resultSingleChannel = new DataStream<>( singleChannel.getExecutionEnvironment(), new PartitionTransformation<>(singleChannel.getTransformation(), new MyDynamicPartitioner()) ) result = broadcastChannel.broadcast().union(resultSingleChannel) ``` I will give it a try and see if it works. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Migrate custom partitioner from Flink 1.7 to Flink 1.9
You should be able to implement this on the DataStream API level using DataStream#broadcast and #union like this: input = ... singleChannel = input.filter(x -> !x.isBroadCastPartitioning); broadcastChannel = input.filter(x -> x.isBroadCastPartitioning); result = broadcastChannel.broadcast().union(singleChannel) // apply operations on result On 26/12/2019 08:20, Salva Alcántara wrote: I am trying to migrate a custom dynamic partitioner from Flink 1.7 to Flink 1.9. The original partitioner implemented the `selectChannels` method within the `StreamPartitioner` interface like this: ```java // Original: working for Flink 1.7 //@Override public int[] selectChannels(SerializationDelegate> streamRecordSerializationDelegate, int numberOfOutputChannels) { T value = streamRecordSerializationDelegate.getInstance().getValue(); if (value.f0.isBroadCastPartitioning()) { // send to all channels int[] channels = new int[numberOfOutputChannels]; for (int i = 0; i < numberOfOutputChannels; ++i) { channels[i] = i; } return channels; } else if (value.f0.getPartitionKey() == -1) { // random partition returnChannels[0] = random.nextInt(numberOfOutputChannels); } else { returnChannels[0] = partitioner.partition(value.f0.getPartitionKey(), numberOfOutputChannels); } return returnChannels; } ``` I am not sure how to migrate this to Flink 1.9, since the `StreamPartitioner` interface has changed as illustrated below: ```java // New: required by Flink 1.9 @Override public int selectChannel(SerializationDelegate> streamRecordSerializationDelegate) { T value = streamRecordSerializationDelegate.getInstance().getValue(); if (value.f0.isBroadCastPartitioning()) { /* It is illegal to call this method for broadcast channel selectors and this method can remain not implemented in that case (for example by throwing UnsupportedOperationException). */ } else if (value.f0.getPartitionKey() == -1) { // random partition returnChannels[0] = random.nextInt(numberOfChannels); } else { returnChannels[0] = partitioner.partition(value.f0.getPartitionKey(), numberOfChannels); } //return returnChannels; return returnChannels[0]; } ``` Note that `selectChannels` has been replaced with `selectChannel`. So, it is no longer possible to return multiple output channels as originally done above for the case of broadcasted elements. As a matter of fact, `selectChannel` should not be invoked for this particular case. Any thoughts on how to tackle this? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Migrate custom partitioner from Flink 1.7 to Flink 1.9
I am trying to migrate a custom dynamic partitioner from Flink 1.7 to Flink 1.9. The original partitioner implemented the `selectChannels` method within the `StreamPartitioner` interface like this: ```java // Original: working for Flink 1.7 //@Override public int[] selectChannels(SerializationDelegate> streamRecordSerializationDelegate, int numberOfOutputChannels) { T value = streamRecordSerializationDelegate.getInstance().getValue(); if (value.f0.isBroadCastPartitioning()) { // send to all channels int[] channels = new int[numberOfOutputChannels]; for (int i = 0; i < numberOfOutputChannels; ++i) { channels[i] = i; } return channels; } else if (value.f0.getPartitionKey() == -1) { // random partition returnChannels[0] = random.nextInt(numberOfOutputChannels); } else { returnChannels[0] = partitioner.partition(value.f0.getPartitionKey(), numberOfOutputChannels); } return returnChannels; } ``` I am not sure how to migrate this to Flink 1.9, since the `StreamPartitioner` interface has changed as illustrated below: ```java // New: required by Flink 1.9 @Override public int selectChannel(SerializationDelegate> streamRecordSerializationDelegate) { T value = streamRecordSerializationDelegate.getInstance().getValue(); if (value.f0.isBroadCastPartitioning()) { /* It is illegal to call this method for broadcast channel selectors and this method can remain not implemented in that case (for example by throwing UnsupportedOperationException). */ } else if (value.f0.getPartitionKey() == -1) { // random partition returnChannels[0] = random.nextInt(numberOfChannels); } else { returnChannels[0] = partitioner.partition(value.f0.getPartitionKey(), numberOfChannels); } //return returnChannels; return returnChannels[0]; } ``` Note that `selectChannels` has been replaced with `selectChannel`. So, it is no longer possible to return multiple output channels as originally done above for the case of broadcasted elements. As a matter of fact, `selectChannel` should not be invoked for this particular case. Any thoughts on how to tackle this? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/