Re: Migrate custom partitioner from Flink 1.7 to Flink 1.9

2020-01-08 Thread Arvid Heise
Hi Salva,

I already answered on SO [1], but I'll replicate it here:

With Flink 1.9, you cannot dynamically broadcast to all channels anymore.
Your StreamPartitioner has to statically specify if it's a broadcast with
isBroadcast. Then, selectChannel is never invoked.

Do you have a specific use case, where you'd need to dynamically switch?

Best,

Arvid

[1]
https://stackoverflow.com/questions/59485064/migrating-custom-dynamic-partitioner-from-flink-1-7-to-flink-1-9

On Sat, Jan 4, 2020 at 7:00 AM Salva Alcántara 
wrote:

> Thanks Chesnay! Just to be clear, this how my current code looks like:
>
> ```
> unionChannel = broadcastChannel.broadcast().union(singleChannel)
>
> result = new DataStream<>(
> unionChannel.getExecutionEnvironment(),
> new PartitionTransformation<>(unionChannel.getTransformation(), new
> MyDynamicPartitioner())
> )
> ```
>
> The problem when migrating to Flink 1.9 is that MyDynamicPartitioner cannot
> handle broadcasted elements as explained in the question description. So,
> based on your reply, I guess I could do something like this:
>
> ```
> resultSingleChannel = new DataStream<>(
> singleChannel.getExecutionEnvironment(),
> new PartitionTransformation<>(singleChannel.getTransformation(), new
> MyDynamicPartitioner())
> )
>
> result = broadcastChannel.broadcast().union(resultSingleChannel)
> ```
>
> I will give it a try and see if it works.
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Migrate custom partitioner from Flink 1.7 to Flink 1.9

2020-01-03 Thread Salva Alcántara
Thanks Chesnay! Just to be clear, this how my current code looks like:

```
unionChannel = broadcastChannel.broadcast().union(singleChannel)

result = new DataStream<>(
unionChannel.getExecutionEnvironment(),
new PartitionTransformation<>(unionChannel.getTransformation(), new
MyDynamicPartitioner())   
)
```

The problem when migrating to Flink 1.9 is that MyDynamicPartitioner cannot
handle broadcasted elements as explained in the question description. So,
based on your reply, I guess I could do something like this:

```
resultSingleChannel = new DataStream<>(
singleChannel.getExecutionEnvironment(),
new PartitionTransformation<>(singleChannel.getTransformation(), new
MyDynamicPartitioner())   
)

result = broadcastChannel.broadcast().union(resultSingleChannel)
```

I will give it a try and see if it works.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Migrate custom partitioner from Flink 1.7 to Flink 1.9

2020-01-03 Thread Chesnay Schepler
You should be able to implement this on the DataStream API level using 
DataStream#broadcast and #union like this:


input = ...

singleChannel = input.filter(x -> !x.isBroadCastPartitioning);

broadcastChannel = input.filter(x -> x.isBroadCastPartitioning);

result = broadcastChannel.broadcast().union(singleChannel)

// apply operations on result


On 26/12/2019 08:20, Salva Alcántara wrote:

I am trying to migrate a custom dynamic partitioner from Flink 1.7 to Flink
1.9. The original partitioner implemented the `selectChannels` method within
the `StreamPartitioner` interface like this:

```java
 // Original: working for Flink 1.7
 //@Override
 public int[] selectChannels(SerializationDelegate>
streamRecordSerializationDelegate,
 int numberOfOutputChannels) {
 T value =
streamRecordSerializationDelegate.getInstance().getValue();
 if (value.f0.isBroadCastPartitioning()) {
 // send to all channels
 int[] channels = new int[numberOfOutputChannels];
 for (int i = 0; i < numberOfOutputChannels; ++i) {
 channels[i] = i;
 }
 return channels;
 } else if (value.f0.getPartitionKey() == -1) {
 // random partition
 returnChannels[0] = random.nextInt(numberOfOutputChannels);
 } else {
 returnChannels[0] =
partitioner.partition(value.f0.getPartitionKey(), numberOfOutputChannels);
 }
 return returnChannels;
 }

```

I am not sure how to migrate this to Flink 1.9, since the
`StreamPartitioner` interface has changed as illustrated below:


```java
 // New: required by Flink 1.9
 @Override
 public int selectChannel(SerializationDelegate>
streamRecordSerializationDelegate) {
 T value =
streamRecordSerializationDelegate.getInstance().getValue();
 if (value.f0.isBroadCastPartitioning()) {
 /*
 It is illegal to call this method for broadcast channel
selectors and this method can remain not
 implemented in that case (for example by throwing
UnsupportedOperationException).
 */
 } else if (value.f0.getPartitionKey() == -1) {
 // random partition
 returnChannels[0] = random.nextInt(numberOfChannels);
 } else {
 returnChannels[0] =
partitioner.partition(value.f0.getPartitionKey(), numberOfChannels);
 }
 //return returnChannels;
 return returnChannels[0];
 }
```

Note that `selectChannels` has been replaced with `selectChannel`. So, it is
no longer possible to return multiple output channels as originally done
above for the case of broadcasted elements. As a matter of fact,
`selectChannel` should not be invoked for this particular case. Any thoughts
on how to tackle this?





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Migrate custom partitioner from Flink 1.7 to Flink 1.9

2019-12-25 Thread Salva Alcántara
I am trying to migrate a custom dynamic partitioner from Flink 1.7 to Flink
1.9. The original partitioner implemented the `selectChannels` method within
the `StreamPartitioner` interface like this:

```java
// Original: working for Flink 1.7
//@Override
public int[] selectChannels(SerializationDelegate>
streamRecordSerializationDelegate,
int numberOfOutputChannels) {
T value =
streamRecordSerializationDelegate.getInstance().getValue();
if (value.f0.isBroadCastPartitioning()) {
// send to all channels
int[] channels = new int[numberOfOutputChannels];
for (int i = 0; i < numberOfOutputChannels; ++i) {
channels[i] = i;
}
return channels;
} else if (value.f0.getPartitionKey() == -1) {
// random partition
returnChannels[0] = random.nextInt(numberOfOutputChannels);
} else {
returnChannels[0] =
partitioner.partition(value.f0.getPartitionKey(), numberOfOutputChannels);
}
return returnChannels;
}

```

I am not sure how to migrate this to Flink 1.9, since the
`StreamPartitioner` interface has changed as illustrated below:


```java
// New: required by Flink 1.9
@Override
public int selectChannel(SerializationDelegate>
streamRecordSerializationDelegate) {
T value =
streamRecordSerializationDelegate.getInstance().getValue();
if (value.f0.isBroadCastPartitioning()) {
/* 
It is illegal to call this method for broadcast channel
selectors and this method can remain not 
implemented in that case (for example by throwing
UnsupportedOperationException).
*/
} else if (value.f0.getPartitionKey() == -1) {
// random partition
returnChannels[0] = random.nextInt(numberOfChannels);
} else {
returnChannels[0] =
partitioner.partition(value.f0.getPartitionKey(), numberOfChannels);
}
//return returnChannels;
return returnChannels[0];
}
```

Note that `selectChannels` has been replaced with `selectChannel`. So, it is
no longer possible to return multiple output channels as originally done
above for the case of broadcasted elements. As a matter of fact,
`selectChannel` should not be invoked for this particular case. Any thoughts
on how to tackle this?





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/