This is an automated email from the ASF dual-hosted git repository.

yuanmei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b00d158  [FLINK-23473][runtime] Do not create transaction in 
TwoPhaseCommitSinkFunction after finish (#16768)
b00d158 is described below

commit b00d158e80ee9b5cb2287ed5b1b4d1baab3dc740
Author: Yuan Mei <yuanmei.w...@gmail.com>
AuthorDate: Fri Aug 13 23:57:28 2021 +0800

    [FLINK-23473][runtime] Do not create transaction in 
TwoPhaseCommitSinkFunction after finish (#16768)
---
 .../functions/sink/TwoPhaseCommitSinkFunction.java | 49 ++++++++++++++++++----
 .../sink/TwoPhaseCommitSinkFunctionTest.java       | 33 +++++++++++++++
 2 files changed, 74 insertions(+), 8 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
index 1c06b0d..32f3904 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
@@ -90,6 +90,12 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, 
CONTEXT> extends RichS
 
     private final ListStateDescriptor<State<TXN, CONTEXT>> stateDescriptor;
 
+    /**
+     * Current Transaction Holder, including three states: 1. Normal 
Transaction: created when a new
+     * snapshot is taken during normal task running 2. Empty Transaction: 
created when a new
+     * snapshot is taken after the task is finished. At this point, there is 
no need to initiate
+     * real transactions due to no more input data. 3. null: After 
task/function is closed.
+     */
     private TransactionHolder<TXN> currentTransactionHolder;
 
     /** Specifies the maximum time a transaction should remain open. */
@@ -107,6 +113,9 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, 
CONTEXT> extends RichS
      */
     private double transactionTimeoutWarningRatio = -1;
 
+    /** Whether this sink function as well as its task is finished. */
+    private boolean finished = false;
+
     /**
      * Use default {@link ListStateDescriptor} for internal state 
serialization. Helpful utilities
      * for using this constructor are {@link TypeInformation#of(Class)}, {@link
@@ -230,11 +239,16 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, 
CONTEXT> extends RichS
 
     @Override
     public final void invoke(IN value, Context context) throws Exception {
-        invoke(currentTransactionHolder.handle, value, context);
+        TXN currentTransaction = currentTransaction();
+        checkNotNull(
+                currentTransaction,
+                "Two phase commit sink function with null transaction should 
not be invoked! ");
+        invoke(currentTransaction, value, context);
     }
 
     @Override
     public final void finish() throws Exception {
+        finished = true;
         finishProcessing(currentTransaction());
     }
 
@@ -332,11 +346,18 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, 
CONTEXT> extends RichS
                 context.getCheckpointId(),
                 currentTransactionHolder);
 
-        preCommit(currentTransactionHolder.handle);
-        pendingCommitTransactions.put(checkpointId, currentTransactionHolder);
-        LOG.debug("{} - stored pending transactions {}", name(), 
pendingCommitTransactions);
+        if (!currentTransactionHolder.equals(TransactionHolder.empty())) {
+            preCommit(currentTransactionHolder.handle);
+            pendingCommitTransactions.put(checkpointId, 
currentTransactionHolder);
+            LOG.debug("{} - stored pending transactions {}", name(), 
pendingCommitTransactions);
+        }
 
-        currentTransactionHolder = beginTransactionInternal();
+        // no need to start new transactions after sink function is closed (no 
more input data)
+        if (!finished) {
+            currentTransactionHolder = beginTransactionInternal();
+        } else {
+            currentTransactionHolder = TransactionHolder.empty();
+        }
         LOG.debug("{} - started new transaction '{}'", name(), 
currentTransactionHolder);
 
         state.clear();
@@ -383,6 +404,9 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, 
CONTEXT> extends RichS
 
                 {
                     TXN transaction = 
operatorState.getPendingTransaction().handle;
+
+                    checkNotNull(transaction, "Pending transaction is not 
expected to be null");
+
                     recoverAndAbort(transaction);
                     handledTransactions.add(transaction);
                     LOG.info(
@@ -460,10 +484,12 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, 
CONTEXT> extends RichS
     public void close() throws Exception {
         super.close();
 
-        if (currentTransactionHolder != null) {
-            abort(currentTransactionHolder.handle);
-            currentTransactionHolder = null;
+        TXN currentTransaction = currentTransaction();
+        if (currentTransaction != null) {
+            abort(currentTransaction);
         }
+
+        currentTransactionHolder = null;
     }
 
     /**
@@ -607,6 +633,8 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, 
CONTEXT> extends RichS
 
         private final TXN handle;
 
+        private static final TransactionHolder<?> EMPTY = new 
TransactionHolder<>(null, -1);
+
         /**
          * The system time when {@link #handle} was created. Used to determine 
if the current
          * transaction has exceeded its timeout specified by {@link 
#transactionTimeout}.
@@ -623,6 +651,11 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, 
CONTEXT> extends RichS
             return clock.millis() - transactionStartTime;
         }
 
+        @SuppressWarnings("unchecked")
+        public static <TXN> TransactionHolder<TXN> empty() {
+            return (TransactionHolder<TXN>) EMPTY;
+        }
+
         @Override
         public boolean equals(Object o) {
             if (this == o) {
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
index 898c08b..4dd6cb6 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
@@ -117,6 +117,39 @@ public class TwoPhaseCommitSinkFunctionTest {
     }
 
     @Test
+    public void testNoTransactionAfterSinkFunctionFinish() throws Exception {
+        harness.open();
+        harness.processElement("42", 0);
+        harness.snapshot(0, 1);
+        harness.processElement("43", 2);
+        harness.snapshot(1, 3);
+        harness.processElement("44", 4);
+
+        // do not expect new input after finish()
+        sinkFunction.finish();
+
+        harness.snapshot(2, 5);
+        harness.notifyOfCompletedCheckpoint(1);
+
+        // make sure the previous empty transaction will not be pre-committed
+        harness.snapshot(3, 6);
+
+        try {
+            harness.processElement("45", 7);
+            fail(
+                    "TwoPhaseCommitSinkFunctionTest should not process any 
more input data after finish!");
+        } catch (NullPointerException e) {
+            // expected and do nothing here
+        }
+
+        // Checkpoint2 has not complete
+        assertExactlyOnce(Arrays.asList("42", "43"));
+
+        // transaction for checkpoint2
+        assertEquals(1, tmpDirectory.listFiles().size());
+    }
+
+    @Test
     public void testNotifyOfCompletedCheckpoint() throws Exception {
         harness.open();
         harness.processElement("42", 0);

Reply via email to