Steven Zhen Wu created FLINK-35384:
--------------------------------------
Summary: Expose metrics group to custom Partitioner
Key: FLINK-35384
URL: https://issues.apache.org/jira/browse/FLINK-35384
Project: Flink
Issue Type: Improvement
Components: Runtime / Metrics
Affects Versions: 1.9.4
Reporter: Steven Zhen Wu
I am trying to implement a custom range partitioner in the Flink Iceberg sink.
Want to publish some counter metrics for certain scenarios. This is like the
network metrics exposed in `TaskIOMetricGroup`.
We can implement the range partitioner using the public interface from
`DataStream`.
```
public <K> DataStream<T> partitionCustom(
Partitioner<K> partitioner, KeySelector<T, K> keySelector)
```
We can pass the `TaskIOMetricGroup` to the `StreamPartitioner` that
`CustomPartitionerWrapper` extends from. `CustomPartitionerWrapper` wraps the
pubic `Partitioner` interface, where we can implement the custom range
partitioner.
`Partitioner` interface is a functional interface today. we can add a new
default `setup` method without breaking the backward compatibility.
```
@Public
@FunctionalInterface
public interface Partitioner<K> extends java.io.Serializable, Function {
*default void setup(TaskIOMetricGroup metrics) {}*
int partition(K key, int numPartitions);
}
```
I know public interface requires a FLIP process. will do that if the community
agree with this feature request.
Personally, `numPartitions` should be passed in the `setup` method too. But it
is a breaking change that is NOT worth the benefit right now.
```
@Public
@FunctionalInterface
public interface Partitioner<K> extends java.io.Serializable, Function {
public void setup(int numPartitions, TaskIOMetricGroup metrics) {}
int partition(K key);
}
```
That would be similar to `StreamPartitioner#setup()` method that we would need
to modify for passing the metrics group.
```
@Internal
public abstract class StreamPartitioner<T>
implements ChannelSelector<SerializationDelegate<StreamRecord<T>>>,
Serializable {
@Override
public void setup(int numberOfChannels, TaskIOMetricGroup metrics) {
this.numberOfChannels = numberOfChannels;
}
```
--
This message was sent by Atlassian Jira
(v8.20.10#820010)