Repository: flink
Updated Branches:
  refs/heads/master 7fb7e0b97 -> 250adfd2b


[FLINK-7784] [kafka011-producer] Make TwoPhaseCommitSinkFunction aware of 
transaction timeouts.

TwoPhaseCommitSinkFunction allows to configure a transaction timeout. The
timeout can be used to log warnings if the transaction's age is appraoching
the timeout, and it can be used to swallow exceptions that are likely
irrecoverable. This commit also integrates these changes to the
FlinkKafkaProducer011.

This closes #4910.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8cdf2ff7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8cdf2ff7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8cdf2ff7

Branch: refs/heads/master
Commit: 8cdf2ff7e5817acc0c239ce31c098daf33d326b7
Parents: 7fb7e0b
Author: gyao <[email protected]>
Authored: Thu Oct 26 19:17:55 2017 +0200
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Thu Nov 2 12:32:14 2017 +0800

----------------------------------------------------------------------
 .../connectors/kafka/FlinkKafkaProducer011.java |  49 ++-
 ...linkKafkaProducer011StateSerializerTest.java |  20 +-
 .../sink/TwoPhaseCommitSinkFunction.java        | 314 +++++++++++++++----
 .../functions/sink/TransactionHolderTest.java   |  43 +++
 .../sink/TwoPhaseCommitSinkFunctionTest.java    | 283 +++++++++++++----
 5 files changed, 589 insertions(+), 120 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8cdf2ff7/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
index 5f557d2..e83966c 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.state.ListState;
@@ -49,6 +50,7 @@ import org.apache.flink.util.SerializableObject;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
@@ -91,6 +93,7 @@ import static org.apache.flink.util.Preconditions.checkState;
  * will use {@link Semantic#AT_LEAST_ONCE} semantic. Before using {@link 
Semantic#EXACTLY_ONCE} please refer to Flink's
  * Kafka connector documentation.
  */
+@PublicEvolving
 public class FlinkKafkaProducer011<IN>
                extends TwoPhaseCommitSinkFunction<IN, 
FlinkKafkaProducer011.KafkaTransactionState, 
FlinkKafkaProducer011.KafkaTransactionContext> {
 
@@ -446,13 +449,31 @@ public class FlinkKafkaProducer011<IN>
                        throw new 
IllegalArgumentException(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + " must be 
supplied in the producer config properties.");
                }
 
-               if 
(!producerConfig.contains(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG)) {
+               if 
(!producerConfig.containsKey(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG)) {
                        long timeout = 
DEFAULT_KAFKA_TRANSACTION_TIMEOUT.toMilliseconds();
                        checkState(timeout < Integer.MAX_VALUE && timeout > 0, 
"timeout does not fit into 32 bit integer");
                        
this.producerConfig.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, (int) 
timeout);
                        LOG.warn("Property [%s] not specified. Setting it to 
%s", ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 
DEFAULT_KAFKA_TRANSACTION_TIMEOUT);
                }
 
+               // Enable transactionTimeoutWarnings to avoid silent data loss
+               // See KAFKA-6119 (affects versions 0.11.0.0 and 0.11.0.1):
+               // The KafkaProducer may not throw an exception if the 
transaction failed to commit
+               if (semantic == Semantic.EXACTLY_ONCE) {
+                       final Object object = 
this.producerConfig.get(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG);
+                       final long transactionTimeout;
+                       if (object instanceof String && 
StringUtils.isNumeric((String) object)) {
+                               transactionTimeout = Long.parseLong((String) 
object);
+                       } else if (object instanceof Number) {
+                               transactionTimeout = ((Number) 
object).longValue();
+                       } else {
+                               throw new 
IllegalArgumentException(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG
+                                       + " must be numeric, was " + object);
+                       }
+                       super.setTransactionTimeout(transactionTimeout);
+                       super.enableTransactionTimeoutWarnings(0.8);
+               }
+
                this.topicPartitionsMap = new HashMap<>();
        }
 
@@ -480,6 +501,22 @@ public class FlinkKafkaProducer011<IN>
                this.logFailuresOnly = logFailuresOnly;
        }
 
+       /**
+        * Disables the propagation of exceptions thrown when committing 
presumably timed out Kafka
+        * transactions during recovery of the job. If a Kafka transaction is 
timed out, a commit will
+        * never be successful. Hence, use this feature to avoid recovery loops 
of the Job. Exceptions
+        * will still be logged to inform the user that data loss might have 
occurred.
+        *
+        * <p>Note that we use {@link System#currentTimeMillis()} to track the 
age of a transaction.
+        * Moreover, only exceptions thrown during the recovery are caught, 
i.e., the producer will
+        * attempt at least one commit of the transaction before giving up.</p>
+        */
+       @Override
+       public FlinkKafkaProducer011<IN> 
ignoreFailuresAfterTransactionTimeout() {
+               super.ignoreFailuresAfterTransactionTimeout();
+               return this;
+       }
+
        // ----------------------------------- Utilities 
--------------------------
 
        /**
@@ -556,6 +593,7 @@ public class FlinkKafkaProducer011<IN>
 
        @Override
        public void close() throws Exception {
+               final KafkaTransactionState currentTransaction = 
currentTransaction();
                if (currentTransaction != null) {
                        // to avoid exceptions on aborting transactions with 
some pending records
                        flush(currentTransaction);
@@ -588,6 +626,7 @@ public class FlinkKafkaProducer011<IN>
                        case AT_LEAST_ONCE:
                        case NONE:
                                // Do not create new producer on each 
beginTransaction() if it is not necessary
+                               final KafkaTransactionState currentTransaction 
= currentTransaction();
                                if (currentTransaction != null && 
currentTransaction.producer != null) {
                                        return new 
KafkaTransactionState(currentTransaction.producer);
                                }
@@ -603,7 +642,7 @@ public class FlinkKafkaProducer011<IN>
                        String transactionalId = 
availableTransactionalIds.poll();
                        if (transactionalId == null) {
                                throw new Exception(
-                                       "Too many ongoing snapshots. Increase 
kafka producers pool size or decrease number of concurrent checktpoins.");
+                                       "Too many ongoing snapshots. Increase 
kafka producers pool size or decrease number of concurrent checkpoints.");
                        }
                        producer = initTransactionalProducer(transactionalId, 
true);
                        producer.initTransactions();
@@ -645,10 +684,9 @@ public class FlinkKafkaProducer011<IN>
        protected void recoverAndCommit(KafkaTransactionState transaction) {
                switch (semantic) {
                        case EXACTLY_ONCE:
-                               KafkaTransactionState kafkaTransaction = 
transaction;
                                FlinkKafkaProducer<byte[], byte[]> producer =
-                                       
initTransactionalProducer(kafkaTransaction.transactionalId, false);
-                               
producer.resumeTransaction(kafkaTransaction.producerId, kafkaTransaction.epoch);
+                                       
initTransactionalProducer(transaction.transactionalId, false);
+                               
producer.resumeTransaction(transaction.producerId, transaction.epoch);
                                try {
                                        producer.commitTransaction();
                                        producer.close();
@@ -857,6 +895,7 @@ public class FlinkKafkaProducer011<IN>
        }
 
        int getTransactionCoordinatorId() {
+               final KafkaTransactionState currentTransaction = 
currentTransaction();
                if (currentTransaction == null || currentTransaction.producer 
== null) {
                        throw new IllegalArgumentException();
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/8cdf2ff7/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011StateSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011StateSerializerTest.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011StateSerializerTest.java
index c6a873b..162a35c 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011StateSerializerTest.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011StateSerializerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kafka;
 import org.apache.flink.api.common.typeutils.SerializerTestBase;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
+import 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.TransactionHolder;
 
 import java.util.Collections;
 import java.util.Optional;
@@ -59,42 +60,43 @@ public class FlinkKafkaProducer011StateSerializerTest
        protected TwoPhaseCommitSinkFunction.State<
                FlinkKafkaProducer011.KafkaTransactionState,
                FlinkKafkaProducer011.KafkaTransactionContext>[] getTestData() {
+               //noinspection unchecked
                return new TwoPhaseCommitSinkFunction.State[] {
                        new TwoPhaseCommitSinkFunction.State<
                                FlinkKafkaProducer011.KafkaTransactionState,
                                FlinkKafkaProducer011.KafkaTransactionContext>(
-                                       new 
FlinkKafkaProducer011.KafkaTransactionState("fake", 1L, (short) 42, null),
+                                       new TransactionHolder(new 
FlinkKafkaProducer011.KafkaTransactionState("fake", 1L, (short) 42, null), 0),
                                        Collections.emptyList(),
                                        Optional.empty()),
                        new TwoPhaseCommitSinkFunction.State<
                                FlinkKafkaProducer011.KafkaTransactionState,
                                FlinkKafkaProducer011.KafkaTransactionContext>(
-                               new 
FlinkKafkaProducer011.KafkaTransactionState("fake", 1L, (short) 42, null),
-                               Collections.singletonList(new 
FlinkKafkaProducer011.KafkaTransactionState("fake", 1L, (short) 42, null)),
+                               new TransactionHolder(new 
FlinkKafkaProducer011.KafkaTransactionState("fake", 1L, (short) 42, null), 
2711),
+                               Collections.singletonList(new 
TransactionHolder(new FlinkKafkaProducer011.KafkaTransactionState("fake", 1L, 
(short) 42, null), 42)),
                                Optional.empty()),
                        new TwoPhaseCommitSinkFunction.State<
                                FlinkKafkaProducer011.KafkaTransactionState,
                                FlinkKafkaProducer011.KafkaTransactionContext>(
-                               new 
FlinkKafkaProducer011.KafkaTransactionState("fake", 1L, (short) 42, null),
+                               new TransactionHolder(new 
FlinkKafkaProducer011.KafkaTransactionState("fake", 1L, (short) 42, null), 0),
                                Collections.emptyList(),
                                Optional.of(new 
FlinkKafkaProducer011.KafkaTransactionContext(Collections.emptySet()))),
                        new TwoPhaseCommitSinkFunction.State<
                                FlinkKafkaProducer011.KafkaTransactionState,
                                FlinkKafkaProducer011.KafkaTransactionContext>(
-                               new 
FlinkKafkaProducer011.KafkaTransactionState("fake", 1L, (short) 42, null),
+                               new TransactionHolder(new 
FlinkKafkaProducer011.KafkaTransactionState("fake", 1L, (short) 42, null), 0),
                                Collections.emptyList(),
                                Optional.of(new 
FlinkKafkaProducer011.KafkaTransactionContext(Collections.singleton("hello")))),
                        new TwoPhaseCommitSinkFunction.State<
                                FlinkKafkaProducer011.KafkaTransactionState,
                                FlinkKafkaProducer011.KafkaTransactionContext>(
-                               new 
FlinkKafkaProducer011.KafkaTransactionState("fake", 1L, (short) 42, null),
-                               Collections.singletonList(new 
FlinkKafkaProducer011.KafkaTransactionState("fake", 1L, (short) 42, null)),
+                               new TransactionHolder(new 
FlinkKafkaProducer011.KafkaTransactionState("fake", 1L, (short) 42, null), 0),
+                               Collections.singletonList(new 
TransactionHolder(new FlinkKafkaProducer011.KafkaTransactionState("fake", 1L, 
(short) 42, null), 0)),
                                Optional.of(new 
FlinkKafkaProducer011.KafkaTransactionContext(Collections.emptySet()))),
                        new TwoPhaseCommitSinkFunction.State<
                                FlinkKafkaProducer011.KafkaTransactionState,
                                FlinkKafkaProducer011.KafkaTransactionContext>(
-                               new 
FlinkKafkaProducer011.KafkaTransactionState("fake", 1L, (short) 42, null),
-                               Collections.singletonList(new 
FlinkKafkaProducer011.KafkaTransactionState("fake", 1L, (short) 42, null)),
+                               new TransactionHolder(new 
FlinkKafkaProducer011.KafkaTransactionState("fake", 1L, (short) 42, null), 0),
+                               Collections.singletonList(new 
TransactionHolder(new FlinkKafkaProducer011.KafkaTransactionState("fake", 1L, 
(short) 42, null), 0)),
                                Optional.of(new 
FlinkKafkaProducer011.KafkaTransactionContext(Collections.singleton("hello"))))
                };
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/8cdf2ff7/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
index 952f298..2ffb6d5 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
@@ -45,6 +45,7 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.time.Clock;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
@@ -53,6 +54,7 @@ import java.util.Map;
 import java.util.Optional;
 
 import static java.util.Objects.requireNonNull;
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
@@ -74,16 +76,35 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, 
CONTEXT>
 
        private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
 
-       protected final ListStateDescriptor<State<TXN, CONTEXT>> 
stateDescriptor;
+       protected final LinkedHashMap<Long, TransactionHolder<TXN>> 
pendingCommitTransactions = new LinkedHashMap<>();
 
-       protected final LinkedHashMap<Long, TXN> pendingCommitTransactions = 
new LinkedHashMap<>();
-
-       @Nullable
-       protected transient TXN currentTransaction;
        protected transient Optional<CONTEXT> userContext;
 
        protected transient ListState<State<TXN, CONTEXT>> state;
 
+       private final Clock clock;
+
+       private final ListStateDescriptor<State<TXN, CONTEXT>> stateDescriptor;
+
+       private TransactionHolder<TXN> currentTransactionHolder;
+
+       /**
+        * Specifies the maximum time a transaction should remain open.
+        */
+       private long transactionTimeout = Long.MAX_VALUE;
+
+       /**
+        * If true, any exception thrown in {@link #recoverAndCommit(Object)} 
will be caught instead of
+        * propagated.
+        */
+       private boolean ignoreFailuresAfterTransactionTimeout;
+
+       /**
+        * If a transaction's elapsed time reaches this percentage of the 
transactionTimeout, a warning
+        * message will be logged. Value must be in range [0,1]. Negative value 
disables warnings.
+        */
+       private double transactionTimeoutWarningRatio = -1;
+
        /**
         * Use default {@link ListStateDescriptor} for internal state 
serialization. Helpful utilities for using this
         * constructor are {@link TypeInformation#of(Class)}, {@link 
org.apache.flink.api.common.typeinfo.TypeHint} and
@@ -100,11 +121,19 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, 
CONTEXT>
        public TwoPhaseCommitSinkFunction(
                        TypeSerializer<TXN> transactionSerializer,
                        TypeSerializer<CONTEXT> contextSerializer) {
+               this(transactionSerializer, contextSerializer, 
Clock.systemUTC());
+       }
 
+       @VisibleForTesting
+       TwoPhaseCommitSinkFunction(
+               TypeSerializer<TXN> transactionSerializer,
+               TypeSerializer<CONTEXT> contextSerializer,
+               Clock clock) {
                this.stateDescriptor =
                        new ListStateDescriptor<>(
-                                       "state",
-                                       new 
StateSerializer<>(transactionSerializer, contextSerializer));
+                               "state",
+                               new StateSerializer<>(transactionSerializer, 
contextSerializer));
+               this.clock = clock;
        }
 
        protected Optional<CONTEXT> initializeUserContext() {
@@ -115,6 +144,11 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, 
CONTEXT>
                return userContext;
        }
 
+       @Nullable
+       protected TXN currentTransaction() {
+               return currentTransactionHolder == null ? null : 
currentTransactionHolder.handle;
+       }
+
        // ------ methods that should be implemented in child class to support 
two phase commit algorithm ------
 
        /**
@@ -149,7 +183,7 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, 
CONTEXT>
        /**
         * Invoked on recovered transactions after a failure. User 
implementation must ensure that this call will eventually
         * succeed. If it fails, Flink application will be restarted and it 
will be invoked again. If it does not succeed
-        * a data loss will occur. Transactions will be recovered in an order 
in which they were created.
+        * eventually, a data loss will occur. Transactions will be recovered 
in an order in which they were created.
         */
        protected void recoverAndCommit(TXN transaction) {
                commit(transaction);
@@ -182,7 +216,7 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, 
CONTEXT>
        @Override
        public final void invoke(
                IN value, Context context) throws Exception {
-               invoke(currentTransaction, value, context);
+               invoke(currentTransactionHolder.handle, value, context);
        }
 
        @Override
@@ -220,13 +254,13 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, 
CONTEXT>
                // ==> There should never be a case where we have no pending 
transaction here
                //
 
-               Iterator<Map.Entry<Long, TXN>> pendingTransactionIterator = 
pendingCommitTransactions.entrySet().iterator();
+               Iterator<Map.Entry<Long, TransactionHolder<TXN>>> 
pendingTransactionIterator = pendingCommitTransactions.entrySet().iterator();
                checkState(pendingTransactionIterator.hasNext(), "checkpoint 
completed, but no transaction pending");
 
                while (pendingTransactionIterator.hasNext()) {
-                       Map.Entry<Long, TXN> entry = 
pendingTransactionIterator.next();
+                       Map.Entry<Long, TransactionHolder<TXN>> entry = 
pendingTransactionIterator.next();
                        Long pendingTransactionCheckpointId = entry.getKey();
-                       TXN pendingTransaction = entry.getValue();
+                       TransactionHolder<TXN> pendingTransaction = 
entry.getValue();
                        if (pendingTransactionCheckpointId > checkpointId) {
                                continue;
                        }
@@ -234,7 +268,8 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, 
CONTEXT>
                        LOG.info("{} - checkpoint {} complete, committing 
transaction {} from checkpoint {}",
                                name(), checkpointId, pendingTransaction, 
pendingTransactionCheckpointId);
 
-                       commit(pendingTransaction);
+                       logWarningIfTimeoutAlmostReached(pendingTransaction);
+                       commit(pendingTransaction.handle);
 
                        LOG.debug("{} - committed checkpoint transaction {}", 
name(), pendingTransaction);
 
@@ -247,21 +282,21 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, 
CONTEXT>
                // this is like the pre-commit of a 2-phase-commit transaction
                // we are ready to commit and remember the transaction
 
-               checkState(currentTransaction != null, "bug: no transaction 
object when performing state snapshot");
+               checkState(currentTransactionHolder != null, "bug: no 
transaction object when performing state snapshot");
 
                long checkpointId = context.getCheckpointId();
-               LOG.debug("{} - checkpoint {} triggered, flushing transaction 
'{}'", name(), context.getCheckpointId(), currentTransaction);
+               LOG.debug("{} - checkpoint {} triggered, flushing transaction 
'{}'", name(), context.getCheckpointId(), currentTransactionHolder);
 
-               preCommit(currentTransaction);
-               pendingCommitTransactions.put(checkpointId, currentTransaction);
+               preCommit(currentTransactionHolder.handle);
+               pendingCommitTransactions.put(checkpointId, 
currentTransactionHolder);
                LOG.debug("{} - stored pending transactions {}", name(), 
pendingCommitTransactions);
 
-               currentTransaction = beginTransaction();
-               LOG.debug("{} - started new transaction '{}'", name(), 
currentTransaction);
+               currentTransactionHolder = beginTransactionInternal();
+               LOG.debug("{} - started new transaction '{}'", name(), 
currentTransactionHolder);
 
                state.clear();
                state.add(new State<>(
-                       this.currentTransaction,
+                       this.currentTransactionHolder,
                        new ArrayList<>(pendingCommitTransactions.values()),
                        userContext));
        }
@@ -289,14 +324,14 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, 
CONTEXT>
 
                        for (State<TXN, CONTEXT> operatorState : state.get()) {
                                userContext = operatorState.getContext();
-                               List<TXN> recoveredTransactions = 
operatorState.getPendingCommitTransactions();
-                               for (TXN recoveredTransaction : 
recoveredTransactions) {
-                                       // If this fails, there is actually a 
data loss
-                                       recoverAndCommit(recoveredTransaction);
+                               List<TransactionHolder<TXN>> 
recoveredTransactions = operatorState.getPendingCommitTransactions();
+                               for (TransactionHolder<TXN> 
recoveredTransaction : recoveredTransactions) {
+                                       // If this fails to succeed eventually, 
there is actually data loss
+                                       
recoverAndCommitInternal(recoveredTransaction);
                                        LOG.info("{} committed recovered 
transaction {}", name(), recoveredTransaction);
                                }
 
-                               
recoverAndAbort(operatorState.getPendingTransaction());
+                               
recoverAndAbort(operatorState.getPendingTransaction().handle);
                                LOG.info("{} aborted recovered transaction {}", 
name(), operatorState.getPendingTransaction());
 
                                if (userContext.isPresent()) {
@@ -312,20 +347,104 @@ public abstract class TwoPhaseCommitSinkFunction<IN, 
TXN, CONTEXT>
                }
                this.pendingCommitTransactions.clear();
 
-               currentTransaction = beginTransaction();
-               LOG.debug("{} - started new transaction '{}'", name(), 
currentTransaction);
+               currentTransactionHolder = beginTransactionInternal();
+               LOG.debug("{} - started new transaction '{}'", name(), 
currentTransactionHolder);
+       }
+
+       /**
+        * This method must be the only place to call {@link 
#beginTransaction()} to ensure that the
+        * {@link TransactionHolder} is created at the same time.
+        */
+       private TransactionHolder<TXN> beginTransactionInternal() throws 
Exception {
+               return new TransactionHolder<>(beginTransaction(), 
clock.millis());
+       }
+
+       /**
+        * This method must be the only place to call {@link 
#recoverAndCommit(Object)} to ensure that
+        * the configuration parameters {@link #transactionTimeout} and
+        * {@link #ignoreFailuresAfterTransactionTimeout} are respected.
+        */
+       private void recoverAndCommitInternal(TransactionHolder<TXN> 
transactionHolder) {
+               try {
+                       logWarningIfTimeoutAlmostReached(transactionHolder);
+                       recoverAndCommit(transactionHolder.handle);
+               } catch (final Exception e) {
+                       final long elapsedTime = clock.millis() - 
transactionHolder.transactionStartTime;
+                       if (ignoreFailuresAfterTransactionTimeout && 
elapsedTime > transactionTimeout) {
+                               LOG.error("Error while committing transaction 
{}. " +
+                                               "Transaction has been open for 
longer than the transaction timeout ({})." +
+                                               "Commit will not be attempted 
again. Data loss might have occurred.",
+                                       transactionHolder.handle, 
transactionTimeout, e);
+                       } else {
+                               throw e;
+                       }
+               }
+       }
+
+       private void logWarningIfTimeoutAlmostReached(TransactionHolder<TXN> 
transactionHolder) {
+               final long elapsedTime = transactionHolder.elapsedTime(clock);
+               if (transactionTimeoutWarningRatio >= 0 &&
+                       elapsedTime > transactionTimeout * 
transactionTimeoutWarningRatio) {
+                       LOG.warn("Transaction {} has been open for {} ms. " +
+                                       "This is close to or even exceeding the 
transaction timeout of {} ms.",
+                               transactionHolder.handle,
+                               elapsedTime,
+                               transactionTimeout);
+               }
        }
 
        @Override
        public void close() throws Exception {
                super.close();
 
-               if (currentTransaction != null) {
-                       abort(currentTransaction);
-                       currentTransaction = null;
+               if (currentTransactionHolder != null) {
+                       abort(currentTransactionHolder.handle);
+                       currentTransactionHolder = null;
                }
        }
 
+       /**
+        * Sets the transaction timeout. Setting only the transaction timeout 
has no effect in itself.
+        *
+        * @param transactionTimeout The transaction timeout in ms.
+        * @see #ignoreFailuresAfterTransactionTimeout()
+        * @see #enableTransactionTimeoutWarnings(double)
+        */
+       protected TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT> 
setTransactionTimeout(long transactionTimeout) {
+               checkArgument(transactionTimeout >= 0, "transactionTimeout must 
not be negative");
+               this.transactionTimeout = transactionTimeout;
+               return this;
+       }
+
+       /**
+        * If called, the sink will only log but not propagate exceptions 
thrown in
+        * {@link #recoverAndCommit(Object)} if the transaction is older than a 
specified transaction
+        * timeout. The start time of an transaction is determined by {@link 
System#currentTimeMillis()}.
+        * By default, failures are propagated.
+        *
+        */
+       protected TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT> 
ignoreFailuresAfterTransactionTimeout() {
+               this.ignoreFailuresAfterTransactionTimeout = true;
+               return this;
+       }
+
+       /**
+        * Enables logging of warnings if a transaction's elapsed time reaches 
a specified ratio of
+        * the <code>transactionTimeout</code>.
+        * If <code>warningRatio</code> is 0, a warning will be always logged 
when committing the
+        * transaction.
+        *
+        * @param warningRatio A value in the range [0,1].
+        * @return
+        */
+       protected TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT> 
enableTransactionTimeoutWarnings(
+               double warningRatio) {
+               checkArgument(warningRatio >= 0 && warningRatio <= 1,
+                       "warningRatio must be in range [0,1]");
+               this.transactionTimeoutWarningRatio = warningRatio;
+               return this;
+       }
+
        private String name() {
                return String.format(
                        "%s %s/%s",
@@ -340,32 +459,32 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, 
CONTEXT>
        @VisibleForTesting
        @Internal
        public static final class State<TXN, CONTEXT> {
-               protected TXN pendingTransaction;
-               protected List<TXN> pendingCommitTransactions = new 
ArrayList<>();
+               protected TransactionHolder<TXN> pendingTransaction;
+               protected List<TransactionHolder<TXN>> 
pendingCommitTransactions = new ArrayList<>();
                protected Optional<CONTEXT> context;
 
                public State() {
                }
 
-               public State(TXN pendingTransaction, List<TXN> 
pendingCommitTransactions, Optional<CONTEXT> context) {
+               public State(TransactionHolder<TXN> pendingTransaction, 
List<TransactionHolder<TXN>> pendingCommitTransactions, Optional<CONTEXT> 
context) {
                        this.context = requireNonNull(context, "context is 
null");
                        this.pendingTransaction = 
requireNonNull(pendingTransaction, "pendingTransaction is null");
                        this.pendingCommitTransactions = 
requireNonNull(pendingCommitTransactions, "pendingCommitTransactions is null");
                }
 
-               public TXN getPendingTransaction() {
+               public TransactionHolder<TXN> getPendingTransaction() {
                        return pendingTransaction;
                }
 
-               public void setPendingTransaction(TXN pendingTransaction) {
+               public void setPendingTransaction(TransactionHolder<TXN> 
pendingTransaction) {
                        this.pendingTransaction = pendingTransaction;
                }
 
-               public List<TXN> getPendingCommitTransactions() {
+               public List<TransactionHolder<TXN>> 
getPendingCommitTransactions() {
                        return pendingCommitTransactions;
                }
 
-               public void setPendingCommitTransactions(List<TXN> 
pendingCommitTransactions) {
+               public void 
setPendingCommitTransactions(List<TransactionHolder<TXN>> 
pendingCommitTransactions) {
                        this.pendingCommitTransactions = 
pendingCommitTransactions;
                }
 
@@ -407,6 +526,65 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, 
CONTEXT>
        }
 
        /**
+        * Adds metadata (currently only the start time of the transaction) to 
the transaction object.
+        */
+       @VisibleForTesting
+       @Internal
+       public static final class TransactionHolder<TXN> {
+
+               private final TXN handle;
+
+               /**
+                * The system time when {@link #handle} was created.
+                * Used to determine if the current transaction has exceeded 
its timeout specified by
+                * {@link #transactionTimeout}.
+                */
+               private final long transactionStartTime;
+
+               @VisibleForTesting
+               public TransactionHolder(TXN handle, long transactionStartTime) 
{
+                       this.handle = handle;
+                       this.transactionStartTime = transactionStartTime;
+               }
+
+               long elapsedTime(Clock clock) {
+                       return clock.millis() - transactionStartTime;
+               }
+
+               @Override
+               public boolean equals(Object o) {
+                       if (this == o) {
+                               return true;
+                       }
+                       if (o == null || getClass() != o.getClass()) {
+                               return false;
+                       }
+
+                       TransactionHolder<?> that = (TransactionHolder<?>) o;
+
+                       if (transactionStartTime != that.transactionStartTime) {
+                               return false;
+                       }
+                       return handle != null ? handle.equals(that.handle) : 
that.handle == null;
+               }
+
+               @Override
+               public int hashCode() {
+                       int result = handle != null ? handle.hashCode() : 0;
+                       result = 31 * result + (int) (transactionStartTime ^ 
(transactionStartTime >>> 32));
+                       return result;
+               }
+
+               @Override
+               public String toString() {
+                       return "TransactionHolder{" +
+                               "handle=" + handle +
+                               ", transactionStartTime=" + 
transactionStartTime +
+                               '}';
+               }
+       }
+
+       /**
         * Custom {@link TypeSerializer} for the sink state.
         */
        @VisibleForTesting
@@ -443,12 +621,20 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, 
CONTEXT>
 
                @Override
                public State<TXN, CONTEXT> copy(State<TXN, CONTEXT> from) {
-                       TXN copiedPendingTransaction = 
transactionSerializer.copy(from.getPendingTransaction());
-                       List<TXN> copiedPendingCommitTransactions = new 
ArrayList<>();
-                       for (TXN txn : from.getPendingCommitTransactions()) {
-                               
copiedPendingCommitTransactions.add(transactionSerializer.copy(txn));
+                       final TransactionHolder<TXN> pendingTransaction = 
from.getPendingTransaction();
+                       final TransactionHolder<TXN> copiedPendingTransaction = 
new TransactionHolder<>(
+                               
transactionSerializer.copy(pendingTransaction.handle),
+                               pendingTransaction.transactionStartTime);
+
+                       final List<TransactionHolder<TXN>> 
copiedPendingCommitTransactions = new ArrayList<>();
+                       for (TransactionHolder<TXN> txn : 
from.getPendingCommitTransactions()) {
+                               final TXN txnHandleCopy = 
transactionSerializer.copy(txn.handle);
+                               copiedPendingCommitTransactions.add(new 
TransactionHolder<>(
+                                       txnHandleCopy,
+                                       txn.transactionStartTime));
                        }
-                       Optional<CONTEXT> copiedContext = 
from.getContext().map(contextSerializer::copy);
+
+                       final Optional<CONTEXT> copiedContext = 
from.getContext().map(contextSerializer::copy);
                        return new State<>(copiedPendingTransaction, 
copiedPendingCommitTransactions, copiedContext);
                }
 
@@ -468,12 +654,17 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, 
CONTEXT>
                public void serialize(
                                State<TXN, CONTEXT> record,
                                DataOutputView target) throws IOException {
-                       
transactionSerializer.serialize(record.getPendingTransaction(), target);
-                       List<TXN> pendingCommitTransactions = 
record.getPendingCommitTransactions();
+                       final TransactionHolder<TXN> pendingTransaction = 
record.getPendingTransaction();
+                       
transactionSerializer.serialize(pendingTransaction.handle, target);
+                       
target.writeLong(pendingTransaction.transactionStartTime);
+
+                       final List<TransactionHolder<TXN>> 
pendingCommitTransactions = record.getPendingCommitTransactions();
                        target.writeInt(pendingCommitTransactions.size());
-                       for (TXN pendingTxn : pendingCommitTransactions) {
-                               transactionSerializer.serialize(pendingTxn, 
target);
+                       for (TransactionHolder<TXN> pendingTxn : 
pendingCommitTransactions) {
+                               
transactionSerializer.serialize(pendingTxn.handle, target);
+                               
target.writeLong(pendingTxn.transactionStartTime);
                        }
+
                        Optional<CONTEXT> context = record.getContext();
                        if (context.isPresent()) {
                                target.writeBoolean(true);
@@ -485,17 +676,28 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, 
CONTEXT>
 
                @Override
                public State<TXN, CONTEXT> deserialize(DataInputView source) 
throws IOException {
-                       TXN pendingTxn = 
transactionSerializer.deserialize(source);
+                       TXN pendingTxnHandle = 
transactionSerializer.deserialize(source);
+                       final long pendingTxnStartTime = source.readLong();
+                       final TransactionHolder<TXN> pendingTxn = new 
TransactionHolder<>(
+                               pendingTxnHandle,
+                               pendingTxnStartTime);
+
                        int numPendingCommitTxns = source.readInt();
-                       List<TXN> pendingCommitTxns = new 
ArrayList<>(numPendingCommitTxns);
+                       List<TransactionHolder<TXN>> pendingCommitTxns = new 
ArrayList<>(numPendingCommitTxns);
                        for (int i = 0; i < numPendingCommitTxns; i++) {
-                               
pendingCommitTxns.add(transactionSerializer.deserialize(source));
+                               final TXN pendingCommitTxnHandle = 
transactionSerializer.deserialize(source);
+                               final long pendingCommitTxnStartTime = 
source.readLong();
+                               pendingCommitTxns.add(new TransactionHolder<>(
+                                       pendingCommitTxnHandle,
+                                       pendingCommitTxnStartTime));
                        }
+
                        Optional<CONTEXT> context = Optional.empty();
                        boolean hasContext = source.readBoolean();
                        if (hasContext) {
                                context = 
Optional.of(contextSerializer.deserialize(source));
                        }
+
                        return new State<>(pendingTxn, pendingCommitTxns, 
context);
                }
 
@@ -509,14 +711,20 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, 
CONTEXT>
                @Override
                public void copy(
                                DataInputView source, DataOutputView target) 
throws IOException {
-                       TXN pendingTxn = 
transactionSerializer.deserialize(source);
-                       transactionSerializer.serialize(pendingTxn, target);
+                       TXN pendingTxnHandle = 
transactionSerializer.deserialize(source);
+                       transactionSerializer.serialize(pendingTxnHandle, 
target);
+                       final long pendingTxnStartTime = source.readLong();
+                       target.writeLong(pendingTxnStartTime);
+
                        int numPendingCommitTxns = source.readInt();
                        target.writeInt(numPendingCommitTxns);
                        for (int i = 0; i < numPendingCommitTxns; i++) {
-                               TXN pendingCommitTxn = 
transactionSerializer.deserialize(source);
-                               
transactionSerializer.serialize(pendingCommitTxn, target);
+                               TXN pendingCommitTxnHandle = 
transactionSerializer.deserialize(source);
+                               
transactionSerializer.serialize(pendingCommitTxnHandle, target);
+                               final long pendingCommitTxnStartTime = 
source.readLong();
+                               target.writeLong(pendingCommitTxnStartTime);
                        }
+
                        boolean hasContext = source.readBoolean();
                        target.writeBoolean(hasContext);
                        if (hasContext) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8cdf2ff7/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TransactionHolderTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TransactionHolderTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TransactionHolderTest.java
new file mode 100644
index 0000000..47eeec6
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TransactionHolderTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.TransactionHolder;
+
+import org.junit.Test;
+
+import java.time.Clock;
+import java.time.Instant;
+import java.time.ZoneOffset;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+
+/**
+ * Unit tests {@link TransactionHolder}.
+ */
+public class TransactionHolderTest {
+
+       @Test
+       public void testElapsedTime() {
+               final long elapsedTime = new TransactionHolder<>(new Object(), 
0)
+                       .elapsedTime(Clock.fixed(Instant.ofEpochMilli(1000), 
ZoneOffset.UTC));
+               assertThat(elapsedTime, equalTo(1000L));
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8cdf2ff7/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
index 20abf58..c004423 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
@@ -25,9 +25,15 @@ import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 
+import org.apache.log4j.AppenderSkeleton;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.spi.LoggingEvent;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 import java.io.BufferedWriter;
 import java.io.File;
@@ -36,14 +42,23 @@ import java.io.FileWriter;
 import java.io.IOException;
 import java.nio.charset.Charset;
 import java.nio.file.Files;
+import java.time.Clock;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
 
 import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.hasItem;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -51,45 +66,116 @@ import static org.junit.Assert.fail;
  * Tests for {@link TwoPhaseCommitSinkFunction}.
  */
 public class TwoPhaseCommitSinkFunctionTest {
-       TestContext context;
+
+       @Rule
+       public TemporaryFolder folder = new TemporaryFolder();
+
+       private FileBasedSinkFunction sinkFunction;
+
+       private OneInputStreamOperatorTestHarness<String, Object> harness;
+
+       private AtomicBoolean throwException = new AtomicBoolean();
+
+       private File targetDirectory;
+
+       private File tmpDirectory;
+
+       private SettableClock clock;
+
+       private Logger logger;
+
+       private AppenderSkeleton testAppender;
+
+       private List<LoggingEvent> loggingEvents;
 
        @Before
        public void setUp() throws Exception {
-               context = new TestContext();
+               loggingEvents = new ArrayList<>();
+               setupLogger();
+
+               targetDirectory = folder.newFolder("_target");
+               tmpDirectory = folder.newFolder("_tmp");
+               clock = new SettableClock();
+
+               setUpTestHarness();
+       }
+
+       /**
+        * Setup {@link org.apache.log4j.Logger}, the default logger 
implementation for tests,
+        * to append {@link LoggingEvent}s to {@link #loggingEvents} so that we 
can assert if
+        * the right messages were logged.
+        *
+        * @see #testLogTimeoutAlmostReachedWarningDuringCommit
+        * @see #testLogTimeoutAlmostReachedWarningDuringRecovery
+        */
+       private void setupLogger() {
+               logger = Logger.getLogger(TwoPhaseCommitSinkFunction.class);
+               testAppender = new AppenderSkeleton() {
+                       @Override
+                       protected void append(LoggingEvent event) {
+                               loggingEvents.add(event);
+                       }
+
+                       @Override
+                       public void close() {
+
+                       }
+
+                       @Override
+                       public boolean requiresLayout() {
+                               return false;
+                       }
+               };
+               logger.addAppender(testAppender);
+               logger.setLevel(Level.WARN);
        }
 
        @After
        public void tearDown() throws Exception {
-               context.close();
+               closeTestHarness();
+               if (logger != null) {
+                       logger.removeAppender(testAppender);
+               }
+               loggingEvents = null;
+       }
+
+       private void setUpTestHarness() throws Exception {
+               sinkFunction = new FileBasedSinkFunction();
+               harness = new OneInputStreamOperatorTestHarness<>(new 
StreamSink<>(sinkFunction), StringSerializer.INSTANCE);
+               harness.setup();
+       }
+
+       private void closeTestHarness() throws Exception {
+               harness.close();
        }
 
        @Test
        public void testNotifyOfCompletedCheckpoint() throws Exception {
-               context.harness.open();
-               context.harness.processElement("42", 0);
-               context.harness.snapshot(0, 1);
-               context.harness.processElement("43", 2);
-               context.harness.snapshot(1, 3);
-               context.harness.processElement("44", 4);
-               context.harness.snapshot(2, 5);
-               context.harness.notifyOfCompletedCheckpoint(1);
-
-               assertExactlyOnceForDirectory(context.targetDirectory, 
Arrays.asList("42", "43"));
-               assertEquals(2, context.tmpDirectory.listFiles().length); // 
one for checkpointId 2 and second for the currentTransaction
+               harness.open();
+               harness.processElement("42", 0);
+               harness.snapshot(0, 1);
+               harness.processElement("43", 2);
+               harness.snapshot(1, 3);
+               harness.processElement("44", 4);
+               harness.snapshot(2, 5);
+               harness.notifyOfCompletedCheckpoint(1);
+
+               assertExactlyOnce(Arrays.asList("42", "43"));
+               assertEquals(2, tmpDirectory.listFiles().length); // one for 
checkpointId 2 and second for the currentTransaction
        }
 
        @Test
        public void testFailBeforeNotify() throws Exception {
-               context.harness.open();
-               context.harness.processElement("42", 0);
-               context.harness.snapshot(0, 1);
-               context.harness.processElement("43", 2);
-               OperatorStateHandles snapshot = context.harness.snapshot(1, 3);
+               harness.open();
+               harness.processElement("42", 0);
+               harness.snapshot(0, 1);
+               harness.processElement("43", 2);
+               OperatorStateHandles snapshot = harness.snapshot(1, 3);
 
-               assertTrue(context.tmpDirectory.setWritable(false));
+               assertTrue(tmpDirectory.setWritable(false));
                try {
-                       context.harness.processElement("44", 4);
-                       context.harness.snapshot(2, 5);
+                       harness.processElement("44", 4);
+                       harness.snapshot(2, 5);
                        fail("something should fail");
                }
                catch (Exception ex) {
@@ -98,20 +184,97 @@ public class TwoPhaseCommitSinkFunctionTest {
                        }
                        // ignore
                }
-               context.close();
+               closeTestHarness();
+
+               assertTrue(tmpDirectory.setWritable(true));
+
+               setUpTestHarness();
+               harness.initializeState(snapshot);
+
+               assertExactlyOnce(Arrays.asList("42", "43"));
+               closeTestHarness();
+
+               assertEquals(0, tmpDirectory.listFiles().length);
+       }
+
+       @Test
+       public void testIgnoreCommitExceptionDuringRecovery() throws Exception {
+               clock.setEpochMilli(0);
+
+               harness.open();
+               harness.processElement("42", 0);
+
+               final OperatorStateHandles snapshot = harness.snapshot(0, 1);
+               harness.notifyOfCompletedCheckpoint(1);
+
+               final long transactionTimeout = 1000;
+               sinkFunction.setTransactionTimeout(transactionTimeout);
+               sinkFunction.ignoreFailuresAfterTransactionTimeout();
+               throwException.set(true);
+
+               try {
+                       harness.initializeState(snapshot);
+                       fail("Expected exception not thrown");
+               } catch (RuntimeException e) {
+                       assertEquals("Expected exception", e.getMessage());
+               }
+
+               clock.setEpochMilli(transactionTimeout + 1);
+               harness.initializeState(snapshot);
+
+               assertExactlyOnce(Collections.singletonList("42"));
+       }
+
+       @Test
+       public void testLogTimeoutAlmostReachedWarningDuringCommit() throws 
Exception {
+               clock.setEpochMilli(0);
+
+               final long transactionTimeout = 1000;
+               final double warningRatio = 0.5;
+               sinkFunction.setTransactionTimeout(transactionTimeout);
+               sinkFunction.enableTransactionTimeoutWarnings(warningRatio);
+
+               harness.open();
+               harness.snapshot(0, 1);
+               final long elapsedTime = (long) ((double) transactionTimeout * 
warningRatio + 2);
+               clock.setEpochMilli(elapsedTime);
+               harness.notifyOfCompletedCheckpoint(1);
+
+               final List<String> logMessages =
+                       
loggingEvents.stream().map(LoggingEvent::getRenderedMessage).collect(Collectors.toList());
+
+               assertThat(
+                       logMessages,
+                       hasItem(containsString("has been open for 502 ms. " +
+                               "This is close to or even exceeding the 
transaction timeout of 1000 ms.")));
+       }
+
+       @Test
+       public void testLogTimeoutAlmostReachedWarningDuringRecovery() throws 
Exception {
+               clock.setEpochMilli(0);
+
+               final long transactionTimeout = 1000;
+               final double warningRatio = 0.5;
+               sinkFunction.setTransactionTimeout(transactionTimeout);
+               sinkFunction.enableTransactionTimeoutWarnings(warningRatio);
 
-               assertTrue(context.tmpDirectory.setWritable(true));
+               harness.open();
 
-               context.open();
-               context.harness.initializeState(snapshot);
+               final OperatorStateHandles snapshot = harness.snapshot(0, 1);
+               final long elapsedTime = (long) ((double) transactionTimeout * 
warningRatio + 2);
+               clock.setEpochMilli(elapsedTime);
+               harness.initializeState(snapshot);
 
-               assertExactlyOnceForDirectory(context.targetDirectory, 
Arrays.asList("42", "43"));
-               context.close();
+               final List<String> logMessages =
+                       
loggingEvents.stream().map(LoggingEvent::getRenderedMessage).collect(Collectors.toList());
 
-               assertEquals(0, context.tmpDirectory.listFiles().length);
+               assertThat(
+                       logMessages,
+                       hasItem(containsString("has been open for 502 ms. " +
+                               "This is close to or even exceeding the 
transaction timeout of 1000 ms.")));
        }
 
-       private void assertExactlyOnceForDirectory(File targetDirectory, 
List<String> expectedValues) throws IOException {
+       private void assertExactlyOnce(List<String> expectedValues) throws 
IOException {
                ArrayList<String> actualValues = new ArrayList<>();
                for (File file : targetDirectory.listFiles()) {
                        actualValues.addAll(Files.readAllLines(file.toPath(), 
Charset.defaultCharset()));
@@ -121,21 +284,16 @@ public class TwoPhaseCommitSinkFunctionTest {
                assertEquals(expectedValues, actualValues);
        }
 
-       private static class FileBasedSinkFunction extends 
TwoPhaseCommitSinkFunction<String, FileTransaction, Void> {
-               private final File tmpDirectory;
-               private final File targetDirectory;
+       private class FileBasedSinkFunction extends 
TwoPhaseCommitSinkFunction<String, FileTransaction, Void> {
 
-               public FileBasedSinkFunction(File tmpDirectory, File 
targetDirectory) {
+               public FileBasedSinkFunction() {
                        super(
                                new KryoSerializer<>(FileTransaction.class, new 
ExecutionConfig()),
-                               VoidSerializer.INSTANCE);
+                               VoidSerializer.INSTANCE, clock);
 
                        if (!tmpDirectory.isDirectory() || 
!targetDirectory.isDirectory()) {
                                throw new IllegalArgumentException();
                        }
-
-                       this.tmpDirectory = tmpDirectory;
-                       this.targetDirectory = targetDirectory;
                }
 
                @Override
@@ -157,6 +315,10 @@ public class TwoPhaseCommitSinkFunctionTest {
 
                @Override
                protected void commit(FileTransaction transaction) {
+                       if (throwException.get()) {
+                               throw new RuntimeException("Expected 
exception");
+                       }
+
                        try {
                                Files.move(
                                        transaction.tmpFile.toPath(),
@@ -165,6 +327,7 @@ public class TwoPhaseCommitSinkFunctionTest {
                        } catch (IOException e) {
                                throw new IllegalStateException(e);
                        }
+
                }
 
                @Override
@@ -198,28 +361,42 @@ public class TwoPhaseCommitSinkFunctionTest {
                }
        }
 
-       private static class TestContext implements AutoCloseable {
-               public final File tmpDirectory = 
Files.createTempDirectory(TwoPhaseCommitSinkFunctionTest.class.getSimpleName() 
+ "_tmp").toFile();
-               public final File targetDirectory = 
Files.createTempDirectory(TwoPhaseCommitSinkFunctionTest.class.getSimpleName() 
+ "_target").toFile();
+       private static class SettableClock extends Clock {
 
-               public FileBasedSinkFunction sinkFunction;
-               public OneInputStreamOperatorTestHarness<String, Object> 
harness;
+               private final ZoneId zoneId;
 
-               private TestContext() throws Exception {
-                       tmpDirectory.deleteOnExit();
-                       targetDirectory.deleteOnExit();
-                       open();
+               private long epochMilli;
+
+               private SettableClock() {
+                       this.zoneId = ZoneOffset.UTC;
+               }
+
+               public SettableClock(ZoneId zoneId, long epochMilli) {
+                       this.zoneId = zoneId;
+                       this.epochMilli = epochMilli;
+               }
+
+               public void setEpochMilli(long epochMilli) {
+                       this.epochMilli = epochMilli;
                }
 
                @Override
-               public void close() throws Exception {
-                       harness.close();
+               public ZoneId getZone() {
+                       return zoneId;
                }
 
-               public void open() throws Exception {
-                       sinkFunction = new FileBasedSinkFunction(tmpDirectory, 
targetDirectory);
-                       harness = new OneInputStreamOperatorTestHarness<>(new 
StreamSink<>(sinkFunction), StringSerializer.INSTANCE);
-                       harness.setup();
+               @Override
+               public Clock withZone(ZoneId zone) {
+                       if (zone.equals(this.zoneId)) {
+                               return this;
+                       }
+                       return new SettableClock(zone, epochMilli);
+               }
+
+               @Override
+               public Instant instant() {
+                       return Instant.ofEpochMilli(epochMilli);
                }
        }
+
 }

Reply via email to