Hi there,
It seems not possible to use some custom partitioner in the context of the
KeyedStream, without modifying the KeyedStream.
protected DataStream<T> setConnectionType(StreamPartitioner<T> partitioner) {
throw new UnsupportedOperationException("Cannot override
partitioning for KeyedStream.");
}
In some particular situations, such as when the keys number is close to the
partitions number, and small, using the
keyBy(<keyExtractor>).window(<windowAssigner>).<windowOperation>
might results in collisions in the partition indexes (and hence empty
partitions) assigned by the HashPartitioner that is imposed to the KeyedStream
:
public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector,
TypeInformation<KEY> keyType) {
super(dataStream.getExecutionEnvironment(), new
PartitionTransformation<>(
dataStream.getTransformation(), new
HashPartitioner<>(keySelector)));
this.keySelector = keySelector;
this.keyType = keyType;
}
due to the characteristics of the underlying (any) hash function :
returnArray[0] = MathUtils.murmurHash(key.hashCode()) % numberOfOutputChannels;
Is there a particular reason to force the KeyedStream to use a HashPartitioner?
Thanks in advance and best regards.