You should be able to implement this on the DataStream API level using DataStream#broadcast and #union like this:

input = ...

singleChannel = input.filter(x -> !x.isBroadCastPartitioning);

broadcastChannel = input.filter(x -> x.isBroadCastPartitioning);

result = broadcastChannel.broadcast().union(singleChannel)

// apply operations on result


On 26/12/2019 08:20, Salva Alcántara wrote:
I am trying to migrate a custom dynamic partitioner from Flink 1.7 to Flink
1.9. The original partitioner implemented the `selectChannels` method within
the `StreamPartitioner` interface like this:

```java
     // Original: working for Flink 1.7
     //@Override
     public int[] selectChannels(SerializationDelegate<StreamRecord&lt;T>>
streamRecordSerializationDelegate,
                                 int numberOfOutputChannels) {
         T value =
streamRecordSerializationDelegate.getInstance().getValue();
         if (value.f0.isBroadCastPartitioning()) {
             // send to all channels
             int[] channels = new int[numberOfOutputChannels];
             for (int i = 0; i < numberOfOutputChannels; ++i) {
                 channels[i] = i;
             }
             return channels;
         } else if (value.f0.getPartitionKey() == -1) {
             // random partition
             returnChannels[0] = random.nextInt(numberOfOutputChannels);
         } else {
             returnChannels[0] =
partitioner.partition(value.f0.getPartitionKey(), numberOfOutputChannels);
         }
         return returnChannels;
     }

```

I am not sure how to migrate this to Flink 1.9, since the
`StreamPartitioner` interface has changed as illustrated below:


```java
     // New: required by Flink 1.9
     @Override
     public int selectChannel(SerializationDelegate<StreamRecord&lt;T>>
streamRecordSerializationDelegate) {
         T value =
streamRecordSerializationDelegate.getInstance().getValue();
         if (value.f0.isBroadCastPartitioning()) {
             /*
             It is illegal to call this method for broadcast channel
selectors and this method can remain not
             implemented in that case (for example by throwing
UnsupportedOperationException).
             */
         } else if (value.f0.getPartitionKey() == -1) {
             // random partition
             returnChannels[0] = random.nextInt(numberOfChannels);
         } else {
             returnChannels[0] =
partitioner.partition(value.f0.getPartitionKey(), numberOfChannels);
         }
         //return returnChannels;
         return returnChannels[0];
     }
```

Note that `selectChannels` has been replaced with `selectChannel`. So, it is
no longer possible to return multiple output channels as originally done
above for the case of broadcasted elements. As a matter of fact,
`selectChannel` should not be invoked for this particular case. Any thoughts
on how to tackle this?





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Reply via email to