Repository: flink Updated Branches: refs/heads/master 7fb7e0b97 -> 250adfd2b
[FLINK-7784] [kafka011-producer] Make TwoPhaseCommitSinkFunction aware of transaction timeouts. TwoPhaseCommitSinkFunction allows to configure a transaction timeout. The timeout can be used to log warnings if the transaction's age is appraoching the timeout, and it can be used to swallow exceptions that are likely irrecoverable. This commit also integrates these changes to the FlinkKafkaProducer011. This closes #4910. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8cdf2ff7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8cdf2ff7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8cdf2ff7 Branch: refs/heads/master Commit: 8cdf2ff7e5817acc0c239ce31c098daf33d326b7 Parents: 7fb7e0b Author: gyao <[email protected]> Authored: Thu Oct 26 19:17:55 2017 +0200 Committer: Tzu-Li (Gordon) Tai <[email protected]> Committed: Thu Nov 2 12:32:14 2017 +0800 ---------------------------------------------------------------------- .../connectors/kafka/FlinkKafkaProducer011.java | 49 ++- ...linkKafkaProducer011StateSerializerTest.java | 20 +- .../sink/TwoPhaseCommitSinkFunction.java | 314 +++++++++++++++---- .../functions/sink/TransactionHolderTest.java | 43 +++ .../sink/TwoPhaseCommitSinkFunctionTest.java | 283 +++++++++++++---- 5 files changed, 589 insertions(+), 120 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/8cdf2ff7/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java index 5f557d2..e83966c 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.state.ListState; @@ -49,6 +50,7 @@ import org.apache.flink.util.SerializableObject; import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; +import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; @@ -91,6 +93,7 @@ import static org.apache.flink.util.Preconditions.checkState; * will use {@link Semantic#AT_LEAST_ONCE} semantic. Before using {@link Semantic#EXACTLY_ONCE} please refer to Flink's * Kafka connector documentation. */ +@PublicEvolving public class FlinkKafkaProducer011<IN> extends TwoPhaseCommitSinkFunction<IN, FlinkKafkaProducer011.KafkaTransactionState, FlinkKafkaProducer011.KafkaTransactionContext> { @@ -446,13 +449,31 @@ public class FlinkKafkaProducer011<IN> throw new IllegalArgumentException(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + " must be supplied in the producer config properties."); } - if (!producerConfig.contains(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG)) { + if (!producerConfig.containsKey(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG)) { long timeout = DEFAULT_KAFKA_TRANSACTION_TIMEOUT.toMilliseconds(); checkState(timeout < Integer.MAX_VALUE && timeout > 0, "timeout does not fit into 32 bit integer"); this.producerConfig.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, (int) timeout); LOG.warn("Property [%s] not specified. Setting it to %s", ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, DEFAULT_KAFKA_TRANSACTION_TIMEOUT); } + // Enable transactionTimeoutWarnings to avoid silent data loss + // See KAFKA-6119 (affects versions 0.11.0.0 and 0.11.0.1): + // The KafkaProducer may not throw an exception if the transaction failed to commit + if (semantic == Semantic.EXACTLY_ONCE) { + final Object object = this.producerConfig.get(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG); + final long transactionTimeout; + if (object instanceof String && StringUtils.isNumeric((String) object)) { + transactionTimeout = Long.parseLong((String) object); + } else if (object instanceof Number) { + transactionTimeout = ((Number) object).longValue(); + } else { + throw new IllegalArgumentException(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG + + " must be numeric, was " + object); + } + super.setTransactionTimeout(transactionTimeout); + super.enableTransactionTimeoutWarnings(0.8); + } + this.topicPartitionsMap = new HashMap<>(); } @@ -480,6 +501,22 @@ public class FlinkKafkaProducer011<IN> this.logFailuresOnly = logFailuresOnly; } + /** + * Disables the propagation of exceptions thrown when committing presumably timed out Kafka + * transactions during recovery of the job. If a Kafka transaction is timed out, a commit will + * never be successful. Hence, use this feature to avoid recovery loops of the Job. Exceptions + * will still be logged to inform the user that data loss might have occurred. + * + * <p>Note that we use {@link System#currentTimeMillis()} to track the age of a transaction. + * Moreover, only exceptions thrown during the recovery are caught, i.e., the producer will + * attempt at least one commit of the transaction before giving up.</p> + */ + @Override + public FlinkKafkaProducer011<IN> ignoreFailuresAfterTransactionTimeout() { + super.ignoreFailuresAfterTransactionTimeout(); + return this; + } + // ----------------------------------- Utilities -------------------------- /** @@ -556,6 +593,7 @@ public class FlinkKafkaProducer011<IN> @Override public void close() throws Exception { + final KafkaTransactionState currentTransaction = currentTransaction(); if (currentTransaction != null) { // to avoid exceptions on aborting transactions with some pending records flush(currentTransaction); @@ -588,6 +626,7 @@ public class FlinkKafkaProducer011<IN> case AT_LEAST_ONCE: case NONE: // Do not create new producer on each beginTransaction() if it is not necessary + final KafkaTransactionState currentTransaction = currentTransaction(); if (currentTransaction != null && currentTransaction.producer != null) { return new KafkaTransactionState(currentTransaction.producer); } @@ -603,7 +642,7 @@ public class FlinkKafkaProducer011<IN> String transactionalId = availableTransactionalIds.poll(); if (transactionalId == null) { throw new Exception( - "Too many ongoing snapshots. Increase kafka producers pool size or decrease number of concurrent checktpoins."); + "Too many ongoing snapshots. Increase kafka producers pool size or decrease number of concurrent checkpoints."); } producer = initTransactionalProducer(transactionalId, true); producer.initTransactions(); @@ -645,10 +684,9 @@ public class FlinkKafkaProducer011<IN> protected void recoverAndCommit(KafkaTransactionState transaction) { switch (semantic) { case EXACTLY_ONCE: - KafkaTransactionState kafkaTransaction = transaction; FlinkKafkaProducer<byte[], byte[]> producer = - initTransactionalProducer(kafkaTransaction.transactionalId, false); - producer.resumeTransaction(kafkaTransaction.producerId, kafkaTransaction.epoch); + initTransactionalProducer(transaction.transactionalId, false); + producer.resumeTransaction(transaction.producerId, transaction.epoch); try { producer.commitTransaction(); producer.close(); @@ -857,6 +895,7 @@ public class FlinkKafkaProducer011<IN> } int getTransactionCoordinatorId() { + final KafkaTransactionState currentTransaction = currentTransaction(); if (currentTransaction == null || currentTransaction.producer == null) { throw new IllegalArgumentException(); } http://git-wip-us.apache.org/repos/asf/flink/blob/8cdf2ff7/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011StateSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011StateSerializerTest.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011StateSerializerTest.java index c6a873b..162a35c 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011StateSerializerTest.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011StateSerializerTest.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.api.common.typeutils.SerializerTestBase; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction; +import org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.TransactionHolder; import java.util.Collections; import java.util.Optional; @@ -59,42 +60,43 @@ public class FlinkKafkaProducer011StateSerializerTest protected TwoPhaseCommitSinkFunction.State< FlinkKafkaProducer011.KafkaTransactionState, FlinkKafkaProducer011.KafkaTransactionContext>[] getTestData() { + //noinspection unchecked return new TwoPhaseCommitSinkFunction.State[] { new TwoPhaseCommitSinkFunction.State< FlinkKafkaProducer011.KafkaTransactionState, FlinkKafkaProducer011.KafkaTransactionContext>( - new FlinkKafkaProducer011.KafkaTransactionState("fake", 1L, (short) 42, null), + new TransactionHolder(new FlinkKafkaProducer011.KafkaTransactionState("fake", 1L, (short) 42, null), 0), Collections.emptyList(), Optional.empty()), new TwoPhaseCommitSinkFunction.State< FlinkKafkaProducer011.KafkaTransactionState, FlinkKafkaProducer011.KafkaTransactionContext>( - new FlinkKafkaProducer011.KafkaTransactionState("fake", 1L, (short) 42, null), - Collections.singletonList(new FlinkKafkaProducer011.KafkaTransactionState("fake", 1L, (short) 42, null)), + new TransactionHolder(new FlinkKafkaProducer011.KafkaTransactionState("fake", 1L, (short) 42, null), 2711), + Collections.singletonList(new TransactionHolder(new FlinkKafkaProducer011.KafkaTransactionState("fake", 1L, (short) 42, null), 42)), Optional.empty()), new TwoPhaseCommitSinkFunction.State< FlinkKafkaProducer011.KafkaTransactionState, FlinkKafkaProducer011.KafkaTransactionContext>( - new FlinkKafkaProducer011.KafkaTransactionState("fake", 1L, (short) 42, null), + new TransactionHolder(new FlinkKafkaProducer011.KafkaTransactionState("fake", 1L, (short) 42, null), 0), Collections.emptyList(), Optional.of(new FlinkKafkaProducer011.KafkaTransactionContext(Collections.emptySet()))), new TwoPhaseCommitSinkFunction.State< FlinkKafkaProducer011.KafkaTransactionState, FlinkKafkaProducer011.KafkaTransactionContext>( - new FlinkKafkaProducer011.KafkaTransactionState("fake", 1L, (short) 42, null), + new TransactionHolder(new FlinkKafkaProducer011.KafkaTransactionState("fake", 1L, (short) 42, null), 0), Collections.emptyList(), Optional.of(new FlinkKafkaProducer011.KafkaTransactionContext(Collections.singleton("hello")))), new TwoPhaseCommitSinkFunction.State< FlinkKafkaProducer011.KafkaTransactionState, FlinkKafkaProducer011.KafkaTransactionContext>( - new FlinkKafkaProducer011.KafkaTransactionState("fake", 1L, (short) 42, null), - Collections.singletonList(new FlinkKafkaProducer011.KafkaTransactionState("fake", 1L, (short) 42, null)), + new TransactionHolder(new FlinkKafkaProducer011.KafkaTransactionState("fake", 1L, (short) 42, null), 0), + Collections.singletonList(new TransactionHolder(new FlinkKafkaProducer011.KafkaTransactionState("fake", 1L, (short) 42, null), 0)), Optional.of(new FlinkKafkaProducer011.KafkaTransactionContext(Collections.emptySet()))), new TwoPhaseCommitSinkFunction.State< FlinkKafkaProducer011.KafkaTransactionState, FlinkKafkaProducer011.KafkaTransactionContext>( - new FlinkKafkaProducer011.KafkaTransactionState("fake", 1L, (short) 42, null), - Collections.singletonList(new FlinkKafkaProducer011.KafkaTransactionState("fake", 1L, (short) 42, null)), + new TransactionHolder(new FlinkKafkaProducer011.KafkaTransactionState("fake", 1L, (short) 42, null), 0), + Collections.singletonList(new TransactionHolder(new FlinkKafkaProducer011.KafkaTransactionState("fake", 1L, (short) 42, null), 0)), Optional.of(new FlinkKafkaProducer011.KafkaTransactionContext(Collections.singleton("hello")))) }; } http://git-wip-us.apache.org/repos/asf/flink/blob/8cdf2ff7/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java index 952f298..2ffb6d5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java @@ -45,6 +45,7 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nullable; import java.io.IOException; +import java.time.Clock; import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedHashMap; @@ -53,6 +54,7 @@ import java.util.Map; import java.util.Optional; import static java.util.Objects.requireNonNull; +import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; @@ -74,16 +76,35 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT> private static final Logger LOG = LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class); - protected final ListStateDescriptor<State<TXN, CONTEXT>> stateDescriptor; + protected final LinkedHashMap<Long, TransactionHolder<TXN>> pendingCommitTransactions = new LinkedHashMap<>(); - protected final LinkedHashMap<Long, TXN> pendingCommitTransactions = new LinkedHashMap<>(); - - @Nullable - protected transient TXN currentTransaction; protected transient Optional<CONTEXT> userContext; protected transient ListState<State<TXN, CONTEXT>> state; + private final Clock clock; + + private final ListStateDescriptor<State<TXN, CONTEXT>> stateDescriptor; + + private TransactionHolder<TXN> currentTransactionHolder; + + /** + * Specifies the maximum time a transaction should remain open. + */ + private long transactionTimeout = Long.MAX_VALUE; + + /** + * If true, any exception thrown in {@link #recoverAndCommit(Object)} will be caught instead of + * propagated. + */ + private boolean ignoreFailuresAfterTransactionTimeout; + + /** + * If a transaction's elapsed time reaches this percentage of the transactionTimeout, a warning + * message will be logged. Value must be in range [0,1]. Negative value disables warnings. + */ + private double transactionTimeoutWarningRatio = -1; + /** * Use default {@link ListStateDescriptor} for internal state serialization. Helpful utilities for using this * constructor are {@link TypeInformation#of(Class)}, {@link org.apache.flink.api.common.typeinfo.TypeHint} and @@ -100,11 +121,19 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT> public TwoPhaseCommitSinkFunction( TypeSerializer<TXN> transactionSerializer, TypeSerializer<CONTEXT> contextSerializer) { + this(transactionSerializer, contextSerializer, Clock.systemUTC()); + } + @VisibleForTesting + TwoPhaseCommitSinkFunction( + TypeSerializer<TXN> transactionSerializer, + TypeSerializer<CONTEXT> contextSerializer, + Clock clock) { this.stateDescriptor = new ListStateDescriptor<>( - "state", - new StateSerializer<>(transactionSerializer, contextSerializer)); + "state", + new StateSerializer<>(transactionSerializer, contextSerializer)); + this.clock = clock; } protected Optional<CONTEXT> initializeUserContext() { @@ -115,6 +144,11 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT> return userContext; } + @Nullable + protected TXN currentTransaction() { + return currentTransactionHolder == null ? null : currentTransactionHolder.handle; + } + // ------ methods that should be implemented in child class to support two phase commit algorithm ------ /** @@ -149,7 +183,7 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT> /** * Invoked on recovered transactions after a failure. User implementation must ensure that this call will eventually * succeed. If it fails, Flink application will be restarted and it will be invoked again. If it does not succeed - * a data loss will occur. Transactions will be recovered in an order in which they were created. + * eventually, a data loss will occur. Transactions will be recovered in an order in which they were created. */ protected void recoverAndCommit(TXN transaction) { commit(transaction); @@ -182,7 +216,7 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT> @Override public final void invoke( IN value, Context context) throws Exception { - invoke(currentTransaction, value, context); + invoke(currentTransactionHolder.handle, value, context); } @Override @@ -220,13 +254,13 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT> // ==> There should never be a case where we have no pending transaction here // - Iterator<Map.Entry<Long, TXN>> pendingTransactionIterator = pendingCommitTransactions.entrySet().iterator(); + Iterator<Map.Entry<Long, TransactionHolder<TXN>>> pendingTransactionIterator = pendingCommitTransactions.entrySet().iterator(); checkState(pendingTransactionIterator.hasNext(), "checkpoint completed, but no transaction pending"); while (pendingTransactionIterator.hasNext()) { - Map.Entry<Long, TXN> entry = pendingTransactionIterator.next(); + Map.Entry<Long, TransactionHolder<TXN>> entry = pendingTransactionIterator.next(); Long pendingTransactionCheckpointId = entry.getKey(); - TXN pendingTransaction = entry.getValue(); + TransactionHolder<TXN> pendingTransaction = entry.getValue(); if (pendingTransactionCheckpointId > checkpointId) { continue; } @@ -234,7 +268,8 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT> LOG.info("{} - checkpoint {} complete, committing transaction {} from checkpoint {}", name(), checkpointId, pendingTransaction, pendingTransactionCheckpointId); - commit(pendingTransaction); + logWarningIfTimeoutAlmostReached(pendingTransaction); + commit(pendingTransaction.handle); LOG.debug("{} - committed checkpoint transaction {}", name(), pendingTransaction); @@ -247,21 +282,21 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT> // this is like the pre-commit of a 2-phase-commit transaction // we are ready to commit and remember the transaction - checkState(currentTransaction != null, "bug: no transaction object when performing state snapshot"); + checkState(currentTransactionHolder != null, "bug: no transaction object when performing state snapshot"); long checkpointId = context.getCheckpointId(); - LOG.debug("{} - checkpoint {} triggered, flushing transaction '{}'", name(), context.getCheckpointId(), currentTransaction); + LOG.debug("{} - checkpoint {} triggered, flushing transaction '{}'", name(), context.getCheckpointId(), currentTransactionHolder); - preCommit(currentTransaction); - pendingCommitTransactions.put(checkpointId, currentTransaction); + preCommit(currentTransactionHolder.handle); + pendingCommitTransactions.put(checkpointId, currentTransactionHolder); LOG.debug("{} - stored pending transactions {}", name(), pendingCommitTransactions); - currentTransaction = beginTransaction(); - LOG.debug("{} - started new transaction '{}'", name(), currentTransaction); + currentTransactionHolder = beginTransactionInternal(); + LOG.debug("{} - started new transaction '{}'", name(), currentTransactionHolder); state.clear(); state.add(new State<>( - this.currentTransaction, + this.currentTransactionHolder, new ArrayList<>(pendingCommitTransactions.values()), userContext)); } @@ -289,14 +324,14 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT> for (State<TXN, CONTEXT> operatorState : state.get()) { userContext = operatorState.getContext(); - List<TXN> recoveredTransactions = operatorState.getPendingCommitTransactions(); - for (TXN recoveredTransaction : recoveredTransactions) { - // If this fails, there is actually a data loss - recoverAndCommit(recoveredTransaction); + List<TransactionHolder<TXN>> recoveredTransactions = operatorState.getPendingCommitTransactions(); + for (TransactionHolder<TXN> recoveredTransaction : recoveredTransactions) { + // If this fails to succeed eventually, there is actually data loss + recoverAndCommitInternal(recoveredTransaction); LOG.info("{} committed recovered transaction {}", name(), recoveredTransaction); } - recoverAndAbort(operatorState.getPendingTransaction()); + recoverAndAbort(operatorState.getPendingTransaction().handle); LOG.info("{} aborted recovered transaction {}", name(), operatorState.getPendingTransaction()); if (userContext.isPresent()) { @@ -312,20 +347,104 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT> } this.pendingCommitTransactions.clear(); - currentTransaction = beginTransaction(); - LOG.debug("{} - started new transaction '{}'", name(), currentTransaction); + currentTransactionHolder = beginTransactionInternal(); + LOG.debug("{} - started new transaction '{}'", name(), currentTransactionHolder); + } + + /** + * This method must be the only place to call {@link #beginTransaction()} to ensure that the + * {@link TransactionHolder} is created at the same time. + */ + private TransactionHolder<TXN> beginTransactionInternal() throws Exception { + return new TransactionHolder<>(beginTransaction(), clock.millis()); + } + + /** + * This method must be the only place to call {@link #recoverAndCommit(Object)} to ensure that + * the configuration parameters {@link #transactionTimeout} and + * {@link #ignoreFailuresAfterTransactionTimeout} are respected. + */ + private void recoverAndCommitInternal(TransactionHolder<TXN> transactionHolder) { + try { + logWarningIfTimeoutAlmostReached(transactionHolder); + recoverAndCommit(transactionHolder.handle); + } catch (final Exception e) { + final long elapsedTime = clock.millis() - transactionHolder.transactionStartTime; + if (ignoreFailuresAfterTransactionTimeout && elapsedTime > transactionTimeout) { + LOG.error("Error while committing transaction {}. " + + "Transaction has been open for longer than the transaction timeout ({})." + + "Commit will not be attempted again. Data loss might have occurred.", + transactionHolder.handle, transactionTimeout, e); + } else { + throw e; + } + } + } + + private void logWarningIfTimeoutAlmostReached(TransactionHolder<TXN> transactionHolder) { + final long elapsedTime = transactionHolder.elapsedTime(clock); + if (transactionTimeoutWarningRatio >= 0 && + elapsedTime > transactionTimeout * transactionTimeoutWarningRatio) { + LOG.warn("Transaction {} has been open for {} ms. " + + "This is close to or even exceeding the transaction timeout of {} ms.", + transactionHolder.handle, + elapsedTime, + transactionTimeout); + } } @Override public void close() throws Exception { super.close(); - if (currentTransaction != null) { - abort(currentTransaction); - currentTransaction = null; + if (currentTransactionHolder != null) { + abort(currentTransactionHolder.handle); + currentTransactionHolder = null; } } + /** + * Sets the transaction timeout. Setting only the transaction timeout has no effect in itself. + * + * @param transactionTimeout The transaction timeout in ms. + * @see #ignoreFailuresAfterTransactionTimeout() + * @see #enableTransactionTimeoutWarnings(double) + */ + protected TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT> setTransactionTimeout(long transactionTimeout) { + checkArgument(transactionTimeout >= 0, "transactionTimeout must not be negative"); + this.transactionTimeout = transactionTimeout; + return this; + } + + /** + * If called, the sink will only log but not propagate exceptions thrown in + * {@link #recoverAndCommit(Object)} if the transaction is older than a specified transaction + * timeout. The start time of an transaction is determined by {@link System#currentTimeMillis()}. + * By default, failures are propagated. + * + */ + protected TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT> ignoreFailuresAfterTransactionTimeout() { + this.ignoreFailuresAfterTransactionTimeout = true; + return this; + } + + /** + * Enables logging of warnings if a transaction's elapsed time reaches a specified ratio of + * the <code>transactionTimeout</code>. + * If <code>warningRatio</code> is 0, a warning will be always logged when committing the + * transaction. + * + * @param warningRatio A value in the range [0,1]. + * @return + */ + protected TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT> enableTransactionTimeoutWarnings( + double warningRatio) { + checkArgument(warningRatio >= 0 && warningRatio <= 1, + "warningRatio must be in range [0,1]"); + this.transactionTimeoutWarningRatio = warningRatio; + return this; + } + private String name() { return String.format( "%s %s/%s", @@ -340,32 +459,32 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT> @VisibleForTesting @Internal public static final class State<TXN, CONTEXT> { - protected TXN pendingTransaction; - protected List<TXN> pendingCommitTransactions = new ArrayList<>(); + protected TransactionHolder<TXN> pendingTransaction; + protected List<TransactionHolder<TXN>> pendingCommitTransactions = new ArrayList<>(); protected Optional<CONTEXT> context; public State() { } - public State(TXN pendingTransaction, List<TXN> pendingCommitTransactions, Optional<CONTEXT> context) { + public State(TransactionHolder<TXN> pendingTransaction, List<TransactionHolder<TXN>> pendingCommitTransactions, Optional<CONTEXT> context) { this.context = requireNonNull(context, "context is null"); this.pendingTransaction = requireNonNull(pendingTransaction, "pendingTransaction is null"); this.pendingCommitTransactions = requireNonNull(pendingCommitTransactions, "pendingCommitTransactions is null"); } - public TXN getPendingTransaction() { + public TransactionHolder<TXN> getPendingTransaction() { return pendingTransaction; } - public void setPendingTransaction(TXN pendingTransaction) { + public void setPendingTransaction(TransactionHolder<TXN> pendingTransaction) { this.pendingTransaction = pendingTransaction; } - public List<TXN> getPendingCommitTransactions() { + public List<TransactionHolder<TXN>> getPendingCommitTransactions() { return pendingCommitTransactions; } - public void setPendingCommitTransactions(List<TXN> pendingCommitTransactions) { + public void setPendingCommitTransactions(List<TransactionHolder<TXN>> pendingCommitTransactions) { this.pendingCommitTransactions = pendingCommitTransactions; } @@ -407,6 +526,65 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT> } /** + * Adds metadata (currently only the start time of the transaction) to the transaction object. + */ + @VisibleForTesting + @Internal + public static final class TransactionHolder<TXN> { + + private final TXN handle; + + /** + * The system time when {@link #handle} was created. + * Used to determine if the current transaction has exceeded its timeout specified by + * {@link #transactionTimeout}. + */ + private final long transactionStartTime; + + @VisibleForTesting + public TransactionHolder(TXN handle, long transactionStartTime) { + this.handle = handle; + this.transactionStartTime = transactionStartTime; + } + + long elapsedTime(Clock clock) { + return clock.millis() - transactionStartTime; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + TransactionHolder<?> that = (TransactionHolder<?>) o; + + if (transactionStartTime != that.transactionStartTime) { + return false; + } + return handle != null ? handle.equals(that.handle) : that.handle == null; + } + + @Override + public int hashCode() { + int result = handle != null ? handle.hashCode() : 0; + result = 31 * result + (int) (transactionStartTime ^ (transactionStartTime >>> 32)); + return result; + } + + @Override + public String toString() { + return "TransactionHolder{" + + "handle=" + handle + + ", transactionStartTime=" + transactionStartTime + + '}'; + } + } + + /** * Custom {@link TypeSerializer} for the sink state. */ @VisibleForTesting @@ -443,12 +621,20 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT> @Override public State<TXN, CONTEXT> copy(State<TXN, CONTEXT> from) { - TXN copiedPendingTransaction = transactionSerializer.copy(from.getPendingTransaction()); - List<TXN> copiedPendingCommitTransactions = new ArrayList<>(); - for (TXN txn : from.getPendingCommitTransactions()) { - copiedPendingCommitTransactions.add(transactionSerializer.copy(txn)); + final TransactionHolder<TXN> pendingTransaction = from.getPendingTransaction(); + final TransactionHolder<TXN> copiedPendingTransaction = new TransactionHolder<>( + transactionSerializer.copy(pendingTransaction.handle), + pendingTransaction.transactionStartTime); + + final List<TransactionHolder<TXN>> copiedPendingCommitTransactions = new ArrayList<>(); + for (TransactionHolder<TXN> txn : from.getPendingCommitTransactions()) { + final TXN txnHandleCopy = transactionSerializer.copy(txn.handle); + copiedPendingCommitTransactions.add(new TransactionHolder<>( + txnHandleCopy, + txn.transactionStartTime)); } - Optional<CONTEXT> copiedContext = from.getContext().map(contextSerializer::copy); + + final Optional<CONTEXT> copiedContext = from.getContext().map(contextSerializer::copy); return new State<>(copiedPendingTransaction, copiedPendingCommitTransactions, copiedContext); } @@ -468,12 +654,17 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT> public void serialize( State<TXN, CONTEXT> record, DataOutputView target) throws IOException { - transactionSerializer.serialize(record.getPendingTransaction(), target); - List<TXN> pendingCommitTransactions = record.getPendingCommitTransactions(); + final TransactionHolder<TXN> pendingTransaction = record.getPendingTransaction(); + transactionSerializer.serialize(pendingTransaction.handle, target); + target.writeLong(pendingTransaction.transactionStartTime); + + final List<TransactionHolder<TXN>> pendingCommitTransactions = record.getPendingCommitTransactions(); target.writeInt(pendingCommitTransactions.size()); - for (TXN pendingTxn : pendingCommitTransactions) { - transactionSerializer.serialize(pendingTxn, target); + for (TransactionHolder<TXN> pendingTxn : pendingCommitTransactions) { + transactionSerializer.serialize(pendingTxn.handle, target); + target.writeLong(pendingTxn.transactionStartTime); } + Optional<CONTEXT> context = record.getContext(); if (context.isPresent()) { target.writeBoolean(true); @@ -485,17 +676,28 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT> @Override public State<TXN, CONTEXT> deserialize(DataInputView source) throws IOException { - TXN pendingTxn = transactionSerializer.deserialize(source); + TXN pendingTxnHandle = transactionSerializer.deserialize(source); + final long pendingTxnStartTime = source.readLong(); + final TransactionHolder<TXN> pendingTxn = new TransactionHolder<>( + pendingTxnHandle, + pendingTxnStartTime); + int numPendingCommitTxns = source.readInt(); - List<TXN> pendingCommitTxns = new ArrayList<>(numPendingCommitTxns); + List<TransactionHolder<TXN>> pendingCommitTxns = new ArrayList<>(numPendingCommitTxns); for (int i = 0; i < numPendingCommitTxns; i++) { - pendingCommitTxns.add(transactionSerializer.deserialize(source)); + final TXN pendingCommitTxnHandle = transactionSerializer.deserialize(source); + final long pendingCommitTxnStartTime = source.readLong(); + pendingCommitTxns.add(new TransactionHolder<>( + pendingCommitTxnHandle, + pendingCommitTxnStartTime)); } + Optional<CONTEXT> context = Optional.empty(); boolean hasContext = source.readBoolean(); if (hasContext) { context = Optional.of(contextSerializer.deserialize(source)); } + return new State<>(pendingTxn, pendingCommitTxns, context); } @@ -509,14 +711,20 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT> @Override public void copy( DataInputView source, DataOutputView target) throws IOException { - TXN pendingTxn = transactionSerializer.deserialize(source); - transactionSerializer.serialize(pendingTxn, target); + TXN pendingTxnHandle = transactionSerializer.deserialize(source); + transactionSerializer.serialize(pendingTxnHandle, target); + final long pendingTxnStartTime = source.readLong(); + target.writeLong(pendingTxnStartTime); + int numPendingCommitTxns = source.readInt(); target.writeInt(numPendingCommitTxns); for (int i = 0; i < numPendingCommitTxns; i++) { - TXN pendingCommitTxn = transactionSerializer.deserialize(source); - transactionSerializer.serialize(pendingCommitTxn, target); + TXN pendingCommitTxnHandle = transactionSerializer.deserialize(source); + transactionSerializer.serialize(pendingCommitTxnHandle, target); + final long pendingCommitTxnStartTime = source.readLong(); + target.writeLong(pendingCommitTxnStartTime); } + boolean hasContext = source.readBoolean(); target.writeBoolean(hasContext); if (hasContext) { http://git-wip-us.apache.org/repos/asf/flink/blob/8cdf2ff7/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TransactionHolderTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TransactionHolderTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TransactionHolderTest.java new file mode 100644 index 0000000..47eeec6 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TransactionHolderTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink; + +import org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.TransactionHolder; + +import org.junit.Test; + +import java.time.Clock; +import java.time.Instant; +import java.time.ZoneOffset; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + +/** + * Unit tests {@link TransactionHolder}. + */ +public class TransactionHolderTest { + + @Test + public void testElapsedTime() { + final long elapsedTime = new TransactionHolder<>(new Object(), 0) + .elapsedTime(Clock.fixed(Instant.ofEpochMilli(1000), ZoneOffset.UTC)); + assertThat(elapsedTime, equalTo(1000L)); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/8cdf2ff7/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java index 20abf58..c004423 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java @@ -25,9 +25,15 @@ import org.apache.flink.streaming.api.operators.StreamSink; import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.log4j.AppenderSkeleton; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.log4j.spi.LoggingEvent; import org.junit.After; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import java.io.BufferedWriter; import java.io.File; @@ -36,14 +42,23 @@ import java.io.FileWriter; import java.io.IOException; import java.nio.charset.Charset; import java.nio.file.Files; +import java.time.Clock; +import java.time.Instant; +import java.time.ZoneId; +import java.time.ZoneOffset; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; import static java.nio.file.StandardCopyOption.ATOMIC_MOVE; +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.hasItem; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -51,45 +66,116 @@ import static org.junit.Assert.fail; * Tests for {@link TwoPhaseCommitSinkFunction}. */ public class TwoPhaseCommitSinkFunctionTest { - TestContext context; + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + private FileBasedSinkFunction sinkFunction; + + private OneInputStreamOperatorTestHarness<String, Object> harness; + + private AtomicBoolean throwException = new AtomicBoolean(); + + private File targetDirectory; + + private File tmpDirectory; + + private SettableClock clock; + + private Logger logger; + + private AppenderSkeleton testAppender; + + private List<LoggingEvent> loggingEvents; @Before public void setUp() throws Exception { - context = new TestContext(); + loggingEvents = new ArrayList<>(); + setupLogger(); + + targetDirectory = folder.newFolder("_target"); + tmpDirectory = folder.newFolder("_tmp"); + clock = new SettableClock(); + + setUpTestHarness(); + } + + /** + * Setup {@link org.apache.log4j.Logger}, the default logger implementation for tests, + * to append {@link LoggingEvent}s to {@link #loggingEvents} so that we can assert if + * the right messages were logged. + * + * @see #testLogTimeoutAlmostReachedWarningDuringCommit + * @see #testLogTimeoutAlmostReachedWarningDuringRecovery + */ + private void setupLogger() { + logger = Logger.getLogger(TwoPhaseCommitSinkFunction.class); + testAppender = new AppenderSkeleton() { + @Override + protected void append(LoggingEvent event) { + loggingEvents.add(event); + } + + @Override + public void close() { + + } + + @Override + public boolean requiresLayout() { + return false; + } + }; + logger.addAppender(testAppender); + logger.setLevel(Level.WARN); } @After public void tearDown() throws Exception { - context.close(); + closeTestHarness(); + if (logger != null) { + logger.removeAppender(testAppender); + } + loggingEvents = null; + } + + private void setUpTestHarness() throws Exception { + sinkFunction = new FileBasedSinkFunction(); + harness = new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sinkFunction), StringSerializer.INSTANCE); + harness.setup(); + } + + private void closeTestHarness() throws Exception { + harness.close(); } @Test public void testNotifyOfCompletedCheckpoint() throws Exception { - context.harness.open(); - context.harness.processElement("42", 0); - context.harness.snapshot(0, 1); - context.harness.processElement("43", 2); - context.harness.snapshot(1, 3); - context.harness.processElement("44", 4); - context.harness.snapshot(2, 5); - context.harness.notifyOfCompletedCheckpoint(1); - - assertExactlyOnceForDirectory(context.targetDirectory, Arrays.asList("42", "43")); - assertEquals(2, context.tmpDirectory.listFiles().length); // one for checkpointId 2 and second for the currentTransaction + harness.open(); + harness.processElement("42", 0); + harness.snapshot(0, 1); + harness.processElement("43", 2); + harness.snapshot(1, 3); + harness.processElement("44", 4); + harness.snapshot(2, 5); + harness.notifyOfCompletedCheckpoint(1); + + assertExactlyOnce(Arrays.asList("42", "43")); + assertEquals(2, tmpDirectory.listFiles().length); // one for checkpointId 2 and second for the currentTransaction } @Test public void testFailBeforeNotify() throws Exception { - context.harness.open(); - context.harness.processElement("42", 0); - context.harness.snapshot(0, 1); - context.harness.processElement("43", 2); - OperatorStateHandles snapshot = context.harness.snapshot(1, 3); + harness.open(); + harness.processElement("42", 0); + harness.snapshot(0, 1); + harness.processElement("43", 2); + OperatorStateHandles snapshot = harness.snapshot(1, 3); - assertTrue(context.tmpDirectory.setWritable(false)); + assertTrue(tmpDirectory.setWritable(false)); try { - context.harness.processElement("44", 4); - context.harness.snapshot(2, 5); + harness.processElement("44", 4); + harness.snapshot(2, 5); fail("something should fail"); } catch (Exception ex) { @@ -98,20 +184,97 @@ public class TwoPhaseCommitSinkFunctionTest { } // ignore } - context.close(); + closeTestHarness(); + + assertTrue(tmpDirectory.setWritable(true)); + + setUpTestHarness(); + harness.initializeState(snapshot); + + assertExactlyOnce(Arrays.asList("42", "43")); + closeTestHarness(); + + assertEquals(0, tmpDirectory.listFiles().length); + } + + @Test + public void testIgnoreCommitExceptionDuringRecovery() throws Exception { + clock.setEpochMilli(0); + + harness.open(); + harness.processElement("42", 0); + + final OperatorStateHandles snapshot = harness.snapshot(0, 1); + harness.notifyOfCompletedCheckpoint(1); + + final long transactionTimeout = 1000; + sinkFunction.setTransactionTimeout(transactionTimeout); + sinkFunction.ignoreFailuresAfterTransactionTimeout(); + throwException.set(true); + + try { + harness.initializeState(snapshot); + fail("Expected exception not thrown"); + } catch (RuntimeException e) { + assertEquals("Expected exception", e.getMessage()); + } + + clock.setEpochMilli(transactionTimeout + 1); + harness.initializeState(snapshot); + + assertExactlyOnce(Collections.singletonList("42")); + } + + @Test + public void testLogTimeoutAlmostReachedWarningDuringCommit() throws Exception { + clock.setEpochMilli(0); + + final long transactionTimeout = 1000; + final double warningRatio = 0.5; + sinkFunction.setTransactionTimeout(transactionTimeout); + sinkFunction.enableTransactionTimeoutWarnings(warningRatio); + + harness.open(); + harness.snapshot(0, 1); + final long elapsedTime = (long) ((double) transactionTimeout * warningRatio + 2); + clock.setEpochMilli(elapsedTime); + harness.notifyOfCompletedCheckpoint(1); + + final List<String> logMessages = + loggingEvents.stream().map(LoggingEvent::getRenderedMessage).collect(Collectors.toList()); + + assertThat( + logMessages, + hasItem(containsString("has been open for 502 ms. " + + "This is close to or even exceeding the transaction timeout of 1000 ms."))); + } + + @Test + public void testLogTimeoutAlmostReachedWarningDuringRecovery() throws Exception { + clock.setEpochMilli(0); + + final long transactionTimeout = 1000; + final double warningRatio = 0.5; + sinkFunction.setTransactionTimeout(transactionTimeout); + sinkFunction.enableTransactionTimeoutWarnings(warningRatio); - assertTrue(context.tmpDirectory.setWritable(true)); + harness.open(); - context.open(); - context.harness.initializeState(snapshot); + final OperatorStateHandles snapshot = harness.snapshot(0, 1); + final long elapsedTime = (long) ((double) transactionTimeout * warningRatio + 2); + clock.setEpochMilli(elapsedTime); + harness.initializeState(snapshot); - assertExactlyOnceForDirectory(context.targetDirectory, Arrays.asList("42", "43")); - context.close(); + final List<String> logMessages = + loggingEvents.stream().map(LoggingEvent::getRenderedMessage).collect(Collectors.toList()); - assertEquals(0, context.tmpDirectory.listFiles().length); + assertThat( + logMessages, + hasItem(containsString("has been open for 502 ms. " + + "This is close to or even exceeding the transaction timeout of 1000 ms."))); } - private void assertExactlyOnceForDirectory(File targetDirectory, List<String> expectedValues) throws IOException { + private void assertExactlyOnce(List<String> expectedValues) throws IOException { ArrayList<String> actualValues = new ArrayList<>(); for (File file : targetDirectory.listFiles()) { actualValues.addAll(Files.readAllLines(file.toPath(), Charset.defaultCharset())); @@ -121,21 +284,16 @@ public class TwoPhaseCommitSinkFunctionTest { assertEquals(expectedValues, actualValues); } - private static class FileBasedSinkFunction extends TwoPhaseCommitSinkFunction<String, FileTransaction, Void> { - private final File tmpDirectory; - private final File targetDirectory; + private class FileBasedSinkFunction extends TwoPhaseCommitSinkFunction<String, FileTransaction, Void> { - public FileBasedSinkFunction(File tmpDirectory, File targetDirectory) { + public FileBasedSinkFunction() { super( new KryoSerializer<>(FileTransaction.class, new ExecutionConfig()), - VoidSerializer.INSTANCE); + VoidSerializer.INSTANCE, clock); if (!tmpDirectory.isDirectory() || !targetDirectory.isDirectory()) { throw new IllegalArgumentException(); } - - this.tmpDirectory = tmpDirectory; - this.targetDirectory = targetDirectory; } @Override @@ -157,6 +315,10 @@ public class TwoPhaseCommitSinkFunctionTest { @Override protected void commit(FileTransaction transaction) { + if (throwException.get()) { + throw new RuntimeException("Expected exception"); + } + try { Files.move( transaction.tmpFile.toPath(), @@ -165,6 +327,7 @@ public class TwoPhaseCommitSinkFunctionTest { } catch (IOException e) { throw new IllegalStateException(e); } + } @Override @@ -198,28 +361,42 @@ public class TwoPhaseCommitSinkFunctionTest { } } - private static class TestContext implements AutoCloseable { - public final File tmpDirectory = Files.createTempDirectory(TwoPhaseCommitSinkFunctionTest.class.getSimpleName() + "_tmp").toFile(); - public final File targetDirectory = Files.createTempDirectory(TwoPhaseCommitSinkFunctionTest.class.getSimpleName() + "_target").toFile(); + private static class SettableClock extends Clock { - public FileBasedSinkFunction sinkFunction; - public OneInputStreamOperatorTestHarness<String, Object> harness; + private final ZoneId zoneId; - private TestContext() throws Exception { - tmpDirectory.deleteOnExit(); - targetDirectory.deleteOnExit(); - open(); + private long epochMilli; + + private SettableClock() { + this.zoneId = ZoneOffset.UTC; + } + + public SettableClock(ZoneId zoneId, long epochMilli) { + this.zoneId = zoneId; + this.epochMilli = epochMilli; + } + + public void setEpochMilli(long epochMilli) { + this.epochMilli = epochMilli; } @Override - public void close() throws Exception { - harness.close(); + public ZoneId getZone() { + return zoneId; } - public void open() throws Exception { - sinkFunction = new FileBasedSinkFunction(tmpDirectory, targetDirectory); - harness = new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sinkFunction), StringSerializer.INSTANCE); - harness.setup(); + @Override + public Clock withZone(ZoneId zone) { + if (zone.equals(this.zoneId)) { + return this; + } + return new SettableClock(zone, epochMilli); + } + + @Override + public Instant instant() { + return Instant.ofEpochMilli(epochMilli); } } + }
