Aljoscha Krettek created FLINK-7552: ---------------------------------------
Summary: Extend SinkFunction interface with SinkContext Key: FLINK-7552 URL: https://issues.apache.org/jira/browse/FLINK-7552 Project: Flink Issue Type: Bug Components: DataStream API Reporter: Aljoscha Krettek Assignee: Aljoscha Krettek Fix For: 1.4.0 Now that we require Java 8 we can extend the {{SinkFunction}} interface without breaking backwards compatibility. I'm proposing this: {code} /** * Interface for implementing user defined sink functionality. * * @param <IN> Input type parameter. */ @Public public interface SinkFunction<IN> extends Function, Serializable { /** * Function for standard sink behaviour. This function is called for every record. * * @param value The input record. * @throws Exception * @deprecated Use {@link #invoke(SinkContext, Object)}. */ @Deprecated default void invoke(IN value) throws Exception { } /** * Writes the given value to the sink. This function is called for every record. * * @param context Additional context about the input record. * @param value The input record. * @throws Exception */ default void invoke(SinkContext context, IN value) throws Exception { invoke(value); } /** * Context that {@link SinkFunction SinkFunctions } can use for getting additional data about * an input record. * * @param <T> The type of elements accepted by the sink. */ @Public // Interface might be extended in the future with additional methods. interface SinkContext<T> { /** * Returns the timestamp of the current input record. */ long timestamp(); } } {code} For now, this only allows access to the element timestamp. This would allow us to fix the abomination that is {{FlinkKafkaProducer010}}, which is a hybrid {{SinkFunction}}/{{StreamOperator}} only because it needs access to timestamps. -- This message was sent by Atlassian JIRA (v6.4.14#64029)