[ https://issues.apache.org/jira/browse/FLINK-35384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Steven Zhen Wu updated FLINK-35384: ----------------------------------- Description: I am trying to implement a custom range partitioner in the Flink Iceberg sink. Want to publish some counter metrics for certain scenarios. This is like the network metrics exposed in `TaskIOMetricGroup`. We can implement the range partitioner using the public interface from `DataStream`. {code} public <K> DataStream<T> partitionCustom( Partitioner<K> partitioner, KeySelector<T, K> keySelector) {code} We can pass the `TaskIOMetricGroup` to the `StreamPartitioner` that `CustomPartitionerWrapper` extends from. `CustomPartitionerWrapper` wraps the pubic `Partitioner` interface, where we can implement the custom range partitioner. `Partitioner` interface is a functional interface today. we can add a new default `setup` method without breaking the backward compatibility. {code} @Public @FunctionalInterface public interface Partitioner<K> extends java.io.Serializable, Function { *default void setup(TaskIOMetricGroup metrics) {}* int partition(K key, int numPartitions); } {code} I know public interface requires a FLIP process. will do that if the community agree with this feature request. Personally, `numPartitions` should be passed in the `setup` method too. But it is a breaking change that is NOT worth the benefit right now. {code} @Public @FunctionalInterface public interface Partitioner<K> extends java.io.Serializable, Function { public void setup(int numPartitions, TaskIOMetricGroup metrics) {} int partition(K key); } {code} That would be similar to `StreamPartitioner#setup()` method that we would need to modify for passing the metrics group. {code} @Internal public abstract class StreamPartitioner<T> implements ChannelSelector<SerializationDelegate<StreamRecord<T>>>, Serializable { @Override public void setup(int numberOfChannels, TaskIOMetricGroup metrics) { this.numberOfChannels = numberOfChannels; } {code} was: I am trying to implement a custom range partitioner in the Flink Iceberg sink. Want to publish some counter metrics for certain scenarios. This is like the network metrics exposed in `TaskIOMetricGroup`. We can implement the range partitioner using the public interface from `DataStream`. ``` public <K> DataStream<T> partitionCustom( Partitioner<K> partitioner, KeySelector<T, K> keySelector) ``` We can pass the `TaskIOMetricGroup` to the `StreamPartitioner` that `CustomPartitionerWrapper` extends from. `CustomPartitionerWrapper` wraps the pubic `Partitioner` interface, where we can implement the custom range partitioner. `Partitioner` interface is a functional interface today. we can add a new default `setup` method without breaking the backward compatibility. ``` @Public @FunctionalInterface public interface Partitioner<K> extends java.io.Serializable, Function { *default void setup(TaskIOMetricGroup metrics) {}* int partition(K key, int numPartitions); } ``` I know public interface requires a FLIP process. will do that if the community agree with this feature request. Personally, `numPartitions` should be passed in the `setup` method too. But it is a breaking change that is NOT worth the benefit right now. ``` @Public @FunctionalInterface public interface Partitioner<K> extends java.io.Serializable, Function { public void setup(int numPartitions, TaskIOMetricGroup metrics) {} int partition(K key); } ``` That would be similar to `StreamPartitioner#setup()` method that we would need to modify for passing the metrics group. ``` @Internal public abstract class StreamPartitioner<T> implements ChannelSelector<SerializationDelegate<StreamRecord<T>>>, Serializable { @Override public void setup(int numberOfChannels, TaskIOMetricGroup metrics) { this.numberOfChannels = numberOfChannels; } ``` > Expose metrics group to custom Partitioner > ------------------------------------------ > > Key: FLINK-35384 > URL: https://issues.apache.org/jira/browse/FLINK-35384 > Project: Flink > Issue Type: Improvement > Components: Runtime / Metrics > Affects Versions: 1.9.4 > Reporter: Steven Zhen Wu > Priority: Major > > I am trying to implement a custom range partitioner in the Flink Iceberg > sink. Want to publish some counter metrics for certain scenarios. This is > like the network metrics exposed in `TaskIOMetricGroup`. > We can implement the range partitioner using the public interface from > `DataStream`. > {code} > public <K> DataStream<T> partitionCustom( > Partitioner<K> partitioner, KeySelector<T, K> keySelector) > {code} > We can pass the `TaskIOMetricGroup` to the `StreamPartitioner` that > `CustomPartitionerWrapper` extends from. `CustomPartitionerWrapper` wraps the > pubic `Partitioner` interface, where we can implement the custom range > partitioner. > `Partitioner` interface is a functional interface today. we can add a new > default `setup` method without breaking the backward compatibility. > {code} > @Public > @FunctionalInterface > public interface Partitioner<K> extends java.io.Serializable, Function { > *default void setup(TaskIOMetricGroup metrics) {}* > int partition(K key, int numPartitions); > } > {code} > I know public interface requires a FLIP process. will do that if the > community agree with this feature request. > Personally, `numPartitions` should be passed in the `setup` method too. But > it is a breaking change that is NOT worth the benefit right now. > {code} > @Public > @FunctionalInterface > public interface Partitioner<K> extends java.io.Serializable, Function { > public void setup(int numPartitions, TaskIOMetricGroup metrics) {} > int partition(K key); > } > {code} > That would be similar to `StreamPartitioner#setup()` method that we would > need to modify for passing the metrics group. > {code} > @Internal > public abstract class StreamPartitioner<T> > implements ChannelSelector<SerializationDelegate<StreamRecord<T>>>, > Serializable { > @Override > public void setup(int numberOfChannels, TaskIOMetricGroup metrics) { > this.numberOfChannels = numberOfChannels; > } > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)