This is an automated email from the ASF dual-hosted git repository. yuanmei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new b00d158 [FLINK-23473][runtime] Do not create transaction in TwoPhaseCommitSinkFunction after finish (#16768) b00d158 is described below commit b00d158e80ee9b5cb2287ed5b1b4d1baab3dc740 Author: Yuan Mei <yuanmei.w...@gmail.com> AuthorDate: Fri Aug 13 23:57:28 2021 +0800 [FLINK-23473][runtime] Do not create transaction in TwoPhaseCommitSinkFunction after finish (#16768) --- .../functions/sink/TwoPhaseCommitSinkFunction.java | 49 ++++++++++++++++++---- .../sink/TwoPhaseCommitSinkFunctionTest.java | 33 +++++++++++++++ 2 files changed, 74 insertions(+), 8 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java index 1c06b0d..32f3904 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java @@ -90,6 +90,12 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT> extends RichS private final ListStateDescriptor<State<TXN, CONTEXT>> stateDescriptor; + /** + * Current Transaction Holder, including three states: 1. Normal Transaction: created when a new + * snapshot is taken during normal task running 2. Empty Transaction: created when a new + * snapshot is taken after the task is finished. At this point, there is no need to initiate + * real transactions due to no more input data. 3. null: After task/function is closed. + */ private TransactionHolder<TXN> currentTransactionHolder; /** Specifies the maximum time a transaction should remain open. */ @@ -107,6 +113,9 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT> extends RichS */ private double transactionTimeoutWarningRatio = -1; + /** Whether this sink function as well as its task is finished. */ + private boolean finished = false; + /** * Use default {@link ListStateDescriptor} for internal state serialization. Helpful utilities * for using this constructor are {@link TypeInformation#of(Class)}, {@link @@ -230,11 +239,16 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT> extends RichS @Override public final void invoke(IN value, Context context) throws Exception { - invoke(currentTransactionHolder.handle, value, context); + TXN currentTransaction = currentTransaction(); + checkNotNull( + currentTransaction, + "Two phase commit sink function with null transaction should not be invoked! "); + invoke(currentTransaction, value, context); } @Override public final void finish() throws Exception { + finished = true; finishProcessing(currentTransaction()); } @@ -332,11 +346,18 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT> extends RichS context.getCheckpointId(), currentTransactionHolder); - preCommit(currentTransactionHolder.handle); - pendingCommitTransactions.put(checkpointId, currentTransactionHolder); - LOG.debug("{} - stored pending transactions {}", name(), pendingCommitTransactions); + if (!currentTransactionHolder.equals(TransactionHolder.empty())) { + preCommit(currentTransactionHolder.handle); + pendingCommitTransactions.put(checkpointId, currentTransactionHolder); + LOG.debug("{} - stored pending transactions {}", name(), pendingCommitTransactions); + } - currentTransactionHolder = beginTransactionInternal(); + // no need to start new transactions after sink function is closed (no more input data) + if (!finished) { + currentTransactionHolder = beginTransactionInternal(); + } else { + currentTransactionHolder = TransactionHolder.empty(); + } LOG.debug("{} - started new transaction '{}'", name(), currentTransactionHolder); state.clear(); @@ -383,6 +404,9 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT> extends RichS { TXN transaction = operatorState.getPendingTransaction().handle; + + checkNotNull(transaction, "Pending transaction is not expected to be null"); + recoverAndAbort(transaction); handledTransactions.add(transaction); LOG.info( @@ -460,10 +484,12 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT> extends RichS public void close() throws Exception { super.close(); - if (currentTransactionHolder != null) { - abort(currentTransactionHolder.handle); - currentTransactionHolder = null; + TXN currentTransaction = currentTransaction(); + if (currentTransaction != null) { + abort(currentTransaction); } + + currentTransactionHolder = null; } /** @@ -607,6 +633,8 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT> extends RichS private final TXN handle; + private static final TransactionHolder<?> EMPTY = new TransactionHolder<>(null, -1); + /** * The system time when {@link #handle} was created. Used to determine if the current * transaction has exceeded its timeout specified by {@link #transactionTimeout}. @@ -623,6 +651,11 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT> extends RichS return clock.millis() - transactionStartTime; } + @SuppressWarnings("unchecked") + public static <TXN> TransactionHolder<TXN> empty() { + return (TransactionHolder<TXN>) EMPTY; + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java index 898c08b..4dd6cb6 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java @@ -117,6 +117,39 @@ public class TwoPhaseCommitSinkFunctionTest { } @Test + public void testNoTransactionAfterSinkFunctionFinish() throws Exception { + harness.open(); + harness.processElement("42", 0); + harness.snapshot(0, 1); + harness.processElement("43", 2); + harness.snapshot(1, 3); + harness.processElement("44", 4); + + // do not expect new input after finish() + sinkFunction.finish(); + + harness.snapshot(2, 5); + harness.notifyOfCompletedCheckpoint(1); + + // make sure the previous empty transaction will not be pre-committed + harness.snapshot(3, 6); + + try { + harness.processElement("45", 7); + fail( + "TwoPhaseCommitSinkFunctionTest should not process any more input data after finish!"); + } catch (NullPointerException e) { + // expected and do nothing here + } + + // Checkpoint2 has not complete + assertExactlyOnce(Arrays.asList("42", "43")); + + // transaction for checkpoint2 + assertEquals(1, tmpDirectory.listFiles().size()); + } + + @Test public void testNotifyOfCompletedCheckpoint() throws Exception { harness.open(); harness.processElement("42", 0);