[ 
https://issues.apache.org/jira/browse/FLINK-35384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17847116#comment-17847116
 ] 

Steven Zhen Wu commented on FLINK-35384:
----------------------------------------

one potential risk of this type of API is that it is not extensible. if we want 
to pass in another arg to partitioner, we need to break the compatibility or 
add a new method.
{code}
default void setup(TaskIOMetricGroup metrics) {}
{code}

Maybe we can move to the context model that is widely used in Flink
{code}
@Public
@FunctionalInterface
public interface Partitioner<K> extends java.io.Serializable, Function {
    int partition(K key, int numPartitions);

    void init(Context context) {}

    interface Context {
        int numberOfChannels();
       TaskIOMetricGroup metrics();
    }
}
{code}



> Expose TaskIOMetricGroup to custom Partitioner
> ----------------------------------------------
>
>                 Key: FLINK-35384
>                 URL: https://issues.apache.org/jira/browse/FLINK-35384
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Metrics
>    Affects Versions: 1.9.4
>            Reporter: Steven Zhen Wu
>            Priority: Major
>
> I am trying to implement a custom range partitioner in the Flink Iceberg 
> sink. Want to publish some counter metrics for certain scenarios. This is 
> like the network metrics exposed in `TaskIOMetricGroup`.
> We can implement the range partitioner using the public interface from 
> `DataStream`. 
> {code}
>     public <K> DataStream<T> partitionCustom(
>             Partitioner<K> partitioner, KeySelector<T, K> keySelector)
> {code}
> We can pass the `TaskIOMetricGroup` to the `StreamPartitioner` that 
> `CustomPartitionerWrapper` extends from. `CustomPartitionerWrapper` wraps the 
> pubic `Partitioner` interface, where we can implement the custom range 
> partitioner.
> `Partitioner` interface is a functional interface today. we can add a new 
> default `setup` method without breaking the backward compatibility.
> {code}
> @Public
> @FunctionalInterface
> public interface Partitioner<K> extends java.io.Serializable, Function {
>     *default void setup(TaskIOMetricGroup metrics) {}*
>     int partition(K key, int numPartitions);
> }
> {code}
> I know public interface requires a FLIP process. will do that if the 
> community agree with this feature request.
> Personally, `numPartitions` should be passed in the `setup` method too. But 
> it is a breaking change that is NOT worth the benefit right now.
> {code}
> @Public
> @FunctionalInterface
> public interface Partitioner<K> extends java.io.Serializable, Function {
>     public void setup(int numPartitions, TaskIOMetricGroup metrics) {}
>     int partition(K key);
> }
> {code}
> That would be similar to `StreamPartitioner#setup()` method that we would 
> need to modify for passing the metrics group.
> {code}
> @Internal
> public abstract class StreamPartitioner<T>
>         implements ChannelSelector<SerializationDelegate<StreamRecord<T>>>, 
> Serializable {
>     @Override
>     public void setup(int numberOfChannels, TaskIOMetricGroup metrics) {
>         this.numberOfChannels = numberOfChannels;
>     }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to