[FLINK-6988] Make TwoPhaseCommitSinkFunction work with Context
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/49cef0c0 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/49cef0c0 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/49cef0c0 Branch: refs/heads/master Commit: 49cef0c0c24c15b668381ca590b87a62a14f75b5 Parents: 9a3621b Author: Aljoscha Krettek <[email protected]> Authored: Mon Sep 25 16:16:34 2017 +0200 Committer: Aljoscha Krettek <[email protected]> Committed: Mon Oct 9 18:58:36 2017 +0200 ---------------------------------------------------------------------- .../functions/sink/TwoPhaseCommitSinkFunction.java | 14 +++++++++++--- .../sink/TwoPhaseCommitSinkFunctionTest.java | 2 +- 2 files changed, 12 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/49cef0c0/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java index 6040979..2dfa292 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java @@ -107,7 +107,7 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT> /** * Write value within a transaction. */ - protected abstract void invoke(TXN transaction, IN value) throws Exception; + protected abstract void invoke(TXN transaction, IN value, Context context) throws Exception; /** * Method that starts a new transaction. @@ -159,9 +159,17 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT> // ------ entry points for above methods implementing {@CheckPointedFunction} and {@CheckpointListener} ------ + + /** + * This should not be implemented by subclasses. + */ + @Override + public final void invoke(IN value) throws Exception {} + @Override - public final void invoke(IN value) throws Exception { - invoke(currentTransaction, value); + public final void invoke( + IN value, Context context) throws Exception { + invoke(currentTransaction, value, context); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/49cef0c0/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java index 4715c39..3043512 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java @@ -136,7 +136,7 @@ public class TwoPhaseCommitSinkFunctionTest { } @Override - protected void invoke(FileTransaction transaction, String value) throws Exception { + protected void invoke(FileTransaction transaction, String value, Context context) throws Exception { transaction.writer.write(value); }
