Repository: flink Updated Branches: refs/heads/master 7a539c05a -> 7b574cf5b
[FLINK-4341] Let idle consumer subtasks emit max value watermarks and fail on resharding This no longer allows the Kinesis consumer to transparently handle resharding. This is a short-term workaround until we have a min-watermark notification service available in the JobManager. This closes #2414 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7b574cf5 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7b574cf5 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7b574cf5 Branch: refs/heads/master Commit: 7b574cf5b6e7549ae53ea0846022c4430a979a01 Parents: 7a539c0 Author: Gordon Tai <tzuli...@gmail.com> Authored: Wed Aug 24 16:38:06 2016 +0800 Committer: Robert Metzger <rmetz...@apache.org> Committed: Mon Aug 29 11:46:52 2016 +0200 ---------------------------------------------------------------------- docs/dev/connectors/kinesis.md | 6 + .../kinesis/FlinkKinesisConsumer.java | 9 +- .../kinesis/internals/KinesisDataFetcher.java | 129 ++++++++++++++++--- .../kinesis/internals/ShardConsumer.java | 4 + .../connectors/kinesis/proxy/KinesisProxy.java | 8 +- 5 files changed, 130 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/7b574cf5/docs/dev/connectors/kinesis.md ---------------------------------------------------------------------- diff --git a/docs/dev/connectors/kinesis.md b/docs/dev/connectors/kinesis.md index ce011b3..c54239d 100644 --- a/docs/dev/connectors/kinesis.md +++ b/docs/dev/connectors/kinesis.md @@ -107,6 +107,12 @@ to `TRIM_HORIZON`, which lets the consumer start reading the Kinesis stream from Other optional configuration keys for the consumer can be found in `ConsumerConfigConstants`. +**NOTE:** Currently, resharding can not be handled transparently (i.e., without failing and restarting jobs) if there are idle consumer +subtasks, which occur when the total number of shards is lower than the configured consumer parallelism. The job must be +configured to enable checkpointing, so that the new shards due to resharding can be correctly picked up and consumed by the +Kinesis consumer after the job is restored. This is a temporary limitation that will be resolved in future versions. +Please see [FLINK-4341](https://issues.apache.org/jira/browse/FLINK-4341) for more detail. + #### Fault Tolerance for Exactly-Once User-Defined State Update Semantics With Flink's checkpointing enabled, the Flink Kinesis Consumer will consume records from shards in Kinesis streams and http://git-wip-us.apache.org/repos/asf/flink/blob/7b574cf5/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java index 7b1f836..a62dc10 100644 --- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java +++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java @@ -182,7 +182,6 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> // initialize sequence numbers with restored state lastStateSnapshot = sequenceNumsToRestore; - sequenceNumsToRestore = null; } else { // start fresh with empty sequence numbers if there are no snapshots to restore from. lastStateSnapshot = new HashMap<>(); @@ -198,7 +197,7 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> fetcher = new KinesisDataFetcher<>( streams, sourceContext, getRuntimeContext(), configProps, deserializer); - boolean isRestoringFromFailure = !lastStateSnapshot.isEmpty(); + boolean isRestoringFromFailure = (sequenceNumsToRestore != null); fetcher.setIsRestoringFromFailure(isRestoringFromFailure); // if we are restoring from a checkpoint, we iterate over the restored @@ -210,7 +209,7 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> if (LOG.isInfoEnabled()) { LOG.info("Subtask {} is seeding the fetcher with restored shard {}," + - "starting state set to the restored sequence number {}" + + " starting state set to the restored sequence number {}", getRuntimeContext().getIndexOfThisSubtask(), restored.getKey().toString(), restored.getValue()); } fetcher.registerNewSubscribedShardState( @@ -285,13 +284,13 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> } if (LOG.isDebugEnabled()) { - LOG.debug("Snapshotting state. ..."); + LOG.debug("Snapshotting state ..."); } lastStateSnapshot = fetcher.snapshotState(); if (LOG.isDebugEnabled()) { - LOG.debug("Snapshotting state. Last processed sequence numbers: {}, checkpoint id: {}, timestamp: {}", + LOG.debug("Snapshotted state, last processed sequence numbers: {}, checkpoint id: {}, timestamp: {}", lastStateSnapshot.toString(), checkpointId, checkpointTimestamp); } http://git-wip-us.apache.org/repos/asf/flink/blob/7b574cf5/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java index d83ab06..a06fdca 100644 --- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java +++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.kinesis.internals; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.InitialPosition; @@ -45,6 +46,7 @@ import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -150,6 +152,14 @@ public class KinesisDataFetcher<T> { /** Thread that executed runFetcher() */ private Thread mainThread; + /** + * The current number of shards that are actively read by this fetcher. + * + * This value is updated in {@link KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)}, + * and {@link KinesisDataFetcher#updateState(int, SequenceNumber)}. + */ + private final AtomicInteger numberOfActiveShards = new AtomicInteger(0); + private volatile boolean running = true; /** @@ -229,9 +239,9 @@ public class KinesisDataFetcher<T> { // 1. query for any new shards that may have been created while the Kinesis consumer was not running, // and register them to the subscribedShardState list. if (LOG.isDebugEnabled()) { - String logFormat = (isRestoredFromFailure) + String logFormat = (!isRestoredFromFailure) ? "Subtask {} is trying to discover initial shards ..." - : "Subtask {} is trying to discover any new shards that were created while the consumer wasn't" + + : "Subtask {} is trying to discover any new shards that were created while the consumer wasn't " + "running due to failure ..."; LOG.debug(logFormat, indexOfThisConsumerSubtask); @@ -250,10 +260,10 @@ public class KinesisDataFetcher<T> { : initialPosition.toSentinelSequenceNumber(); if (LOG.isInfoEnabled()) { - String logFormat = (isRestoredFromFailure) + String logFormat = (!isRestoredFromFailure) ? "Subtask {} will be seeded with initial shard {}, starting state set as sequence number {}" - : "Subtask {} will be seeded with new shard {} that was created while the consumer wasn't" + - "running due to failure, starting state set as sequence number {}"; + : "Subtask {} will be seeded with new shard {} that was created while the consumer wasn't " + + "running due to failure, starting state set as sequence number {}"; LOG.info(logFormat, indexOfThisConsumerSubtask, shard.toString(), startingStateForNewShard.get()); } @@ -287,18 +297,22 @@ public class KinesisDataFetcher<T> { for (int seededStateIndex = 0; seededStateIndex < subscribedShardsState.size(); seededStateIndex++) { KinesisStreamShardState seededShardState = subscribedShardsState.get(seededStateIndex); - if (LOG.isInfoEnabled()) { - LOG.info("Subtask {} will start consuming seeded shard {} from sequence number {} with ShardConsumer {}", - indexOfThisConsumerSubtask, seededShardState.getKinesisStreamShard().toString(), - seededShardState.getLastProcessedSequenceNum(), seededStateIndex); - } + // only start a consuming thread if the seeded subscribed shard has not been completely read already + if (!seededShardState.getLastProcessedSequenceNum().equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) { - shardConsumersExecutor.submit( - new ShardConsumer<>( - this, - seededStateIndex, - subscribedShardsState.get(seededStateIndex).getKinesisStreamShard(), - subscribedShardsState.get(seededStateIndex).getLastProcessedSequenceNum())); + if (LOG.isInfoEnabled()) { + LOG.info("Subtask {} will start consuming seeded shard {} from sequence number {} with ShardConsumer {}", + indexOfThisConsumerSubtask, seededShardState.getKinesisStreamShard().toString(), + seededShardState.getLastProcessedSequenceNum(), seededStateIndex); + } + + shardConsumersExecutor.submit( + new ShardConsumer<>( + this, + seededStateIndex, + subscribedShardsState.get(seededStateIndex).getKinesisStreamShard(), + subscribedShardsState.get(seededStateIndex).getLastProcessedSequenceNum())); + } } // ------------------------------------------------------------------------ @@ -311,6 +325,37 @@ public class KinesisDataFetcher<T> { ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS, Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS))); + // FLINK-4341: + // For downstream operators that work on time (ex. window operators), we are required to emit a max value watermark + // for subtasks that won't continue to have shards to read from unless resharding happens in the future, otherwise + // the downstream watermarks would not advance, leading to unbounded accumulating state. + // + // The side-effect of this limitation is that on resharding, we must fail hard if the newly discovered shard + // is to be subscribed by a subtask that has previously emitted a max value watermark, otherwise the watermarks + // will be messed up. + // + // There are 2 cases were we need to either emit a max value watermark, or deliberately fail hard: + // (a) if this subtask has no more shards to read from unless resharding happens in the future, we emit a max + // value watermark. This case is encountered when 1) all previously read shards by this subtask were closed + // due to resharding, 2) when this subtask was initially only subscribed to closed shards while the consumer + // was told to start from TRIM_HORIZON, or 3) there was initially no shards for this subtask to read on startup. + // (b) this subtask has discovered new shards to read from due to a reshard; if this subtask has already emitted + // a max value watermark, we must deliberately fail hard to avoid messing up the watermarks. The new shards + // will be subscribed by this subtask after restore as initial shards on startup. + // + // TODO: This is a temporary workaround until a min-watermark information service is available in the JobManager + // Please see FLINK-4341 for more detail + + boolean emittedMaxValueWatermark = false; + + if (this.numberOfActiveShards.get() == 0) { + // FLINK-4341 workaround case (a) - please see the above for details on this case + LOG.info("Subtask {} has no initial shards to read on startup; emitting max value watermark ...", + indexOfThisConsumerSubtask); + sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE)); + emittedMaxValueWatermark = true; + } + while (running) { if (LOG.isDebugEnabled()) { LOG.debug("Subtask {} is trying to discover new shards that were created due to resharding ...", @@ -318,6 +363,41 @@ public class KinesisDataFetcher<T> { } List<KinesisStreamShard> newShardsDueToResharding = discoverNewShardsToSubscribe(); + // -- NOTE: Potential race condition between newShardsDueToResharding and numberOfActiveShards -- + // Since numberOfActiveShards is updated by parallel shard consuming threads in updateState(), there exists + // a race condition with the currently queried newShardsDueToResharding. Therefore, numberOfActiveShards + // may not correctly reflect the discover result in the below case determination. This may lead to incorrect + // case determination on the current discovery attempt, but can still be correctly handled on future attempts. + // + // Although this can be resolved by wrapping the current shard discovery attempt with the below + // case determination within a synchronized block on the checkpoint lock for atomicity, there will be + // considerable throughput performance regression as shard discovery is a remote call to AWS. Therefore, + // since the case determination is a temporary workaround for FLINK-4341, the race condition is tolerable as + // we can still eventually handle max value watermark emitting / deliberately failing on successive + // discovery attempts. + + if (newShardsDueToResharding.size() == 0 && this.numberOfActiveShards.get() == 0 && !emittedMaxValueWatermark) { + // FLINK-4341 workaround case (a) - please see the above for details on this case + LOG.info("Subtask {} has completed reading all shards; emitting max value watermark ...", + indexOfThisConsumerSubtask); + sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE)); + emittedMaxValueWatermark = true; + } else if (newShardsDueToResharding.size() > 0 && emittedMaxValueWatermark) { + // FLINK-4341 workaround case (b) - please see the above for details on this case + // + // Note that in the case where on resharding this subtask ceased to read all of it's previous shards + // but new shards is also to be subscribed by this subtask immediately after, emittedMaxValueWatermark + // will be false; this allows the fetcher to continue reading the new shards without failing on such cases. + // However, due to the race condition mentioned above, we might still fall into case (a) first, and + // then (b) on the next discovery attempt. Although the failure is ideally unnecessary, max value + // watermark emitting still remains to be correct. + + LOG.warn("Subtask {} has discovered {} new shards to subscribe, but is failing hard to avoid messing" + + " up watermarks; the new shards will be subscribed by this subtask after restore ...", + indexOfThisConsumerSubtask, newShardsDueToResharding.size()); + throw new RuntimeException("Deliberate failure to avoid messing up watermarks"); + } + for (KinesisStreamShard shard : newShardsDueToResharding) { // since there may be delay in discovering a new shard, all new shards due to // resharding should be read starting from the earliest record possible @@ -521,6 +601,14 @@ public class KinesisDataFetcher<T> { protected void updateState(int shardStateIndex, SequenceNumber lastSequenceNumber) { synchronized (checkpointLock) { subscribedShardsState.get(shardStateIndex).setLastProcessedSequenceNum(lastSequenceNumber); + + // if a shard's state is updated to be SENTINEL_SHARD_ENDING_SEQUENCE_NUM by its consumer thread, + // we've finished reading the shard and should determine it to be non-active + if (lastSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) { + this.numberOfActiveShards.decrementAndGet(); + LOG.info("Subtask {} has reached the end of subscribed shard: {}", + indexOfThisConsumerSubtask, subscribedShardsState.get(shardStateIndex).getKinesisStreamShard()); + } } } @@ -530,9 +618,16 @@ public class KinesisDataFetcher<T> { * @param newSubscribedShardState the new shard state that this fetcher is to be subscribed to */ public int registerNewSubscribedShardState(KinesisStreamShardState newSubscribedShardState) { - synchronized (checkpointLock) { subscribedShardsState.add(newSubscribedShardState); + + // If a registered shard has initial state that is not SENTINEL_SHARD_ENDING_SEQUENCE_NUM (will be the case + // if the consumer had already finished reading a shard before we failed and restored), we determine that + // this subtask has a new active shard + if (!newSubscribedShardState.getLastProcessedSequenceNum().equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) { + this.numberOfActiveShards.incrementAndGet(); + } + return subscribedShardsState.size()-1; } } http://git-wip-us.apache.org/repos/asf/flink/blob/7b574cf5/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java index 494f5de..6e24e65 100644 --- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java +++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java @@ -36,6 +36,7 @@ import java.nio.ByteBuffer; import java.util.List; import java.util.Properties; +import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -87,6 +88,9 @@ public class ShardConsumer<T> implements Runnable { this.subscribedShardStateIndex = checkNotNull(subscribedShardStateIndex); this.subscribedShard = checkNotNull(subscribedShard); this.lastSequenceNum = checkNotNull(lastSequenceNum); + checkArgument( + !lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get()), + "Should not start a ShardConsumer if the shard has already been completely read."); this.deserializer = fetcherRef.getClonedDeserializationSchema(); http://git-wip-us.apache.org/repos/asf/flink/blob/7b574cf5/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java index 906689f..1113fde 100644 --- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java +++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java @@ -201,8 +201,8 @@ public class KinesisProxy implements KinesisProxyInterface { } if (getRecordsResult == null) { - throw new RuntimeException("Rate Exceeded for getRecords operation - all " + getRecordsMaxAttempts + "retry" + - "attempts returned ProvisionedThroughputExceededException."); + throw new RuntimeException("Rate Exceeded for getRecords operation - all " + getRecordsMaxAttempts + + " retry attempts returned ProvisionedThroughputExceededException."); } return getRecordsResult; @@ -245,8 +245,8 @@ public class KinesisProxy implements KinesisProxyInterface { } if (getShardIteratorResult == null) { - throw new RuntimeException("Rate Exceeded for getShardIterator operation - all " + getShardIteratorMaxAttempts + "retry" + - "attempts returned ProvisionedThroughputExceededException."); + throw new RuntimeException("Rate Exceeded for getShardIterator operation - all " + getShardIteratorMaxAttempts + + " retry attempts returned ProvisionedThroughputExceededException."); } return getShardIteratorResult.getShardIterator(); }