Aljoscha Krettek created FLINK-7552:
---------------------------------------

             Summary: Extend SinkFunction interface with SinkContext
                 Key: FLINK-7552
                 URL: https://issues.apache.org/jira/browse/FLINK-7552
             Project: Flink
          Issue Type: Bug
          Components: DataStream API
            Reporter: Aljoscha Krettek
            Assignee: Aljoscha Krettek
             Fix For: 1.4.0


Now that we require Java 8 we can extend the {{SinkFunction}} interface without 
breaking backwards compatibility. I'm proposing this:

{code}
/**
 * Interface for implementing user defined sink functionality.
 *
 * @param <IN> Input type parameter.
 */
@Public
public interface SinkFunction<IN> extends Function, Serializable {

        /**
         * Function for standard sink behaviour. This function is called for 
every record.
         *
         * @param value The input record.
         * @throws Exception
         * @deprecated Use {@link #invoke(SinkContext, Object)}.
         */
        @Deprecated
        default void invoke(IN value) throws Exception {
        }

        /**
         * Writes the given value to the sink. This function is called for 
every record.
         *
         * @param context Additional context about the input record.
         * @param value The input record.
         * @throws Exception
         */
        default void invoke(SinkContext context, IN value) throws Exception {
                invoke(value);
        }

        /**
         * Context that {@link SinkFunction SinkFunctions } can use for getting 
additional data about
         * an input record.
         *
         * @param <T> The type of elements accepted by the sink.
         */
        @Public // Interface might be extended in the future with additional 
methods.
        interface SinkContext<T> {

                /**
                 * Returns the timestamp of the current input record.
                 */
                long timestamp();
        }
}
{code}

For now, this only allows access to the element timestamp. This would allow us 
to fix the abomination that is {{FlinkKafkaProducer010}}, which is a hybrid 
{{SinkFunction}}/{{StreamOperator}} only because it needs access to timestamps.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to