Repository: flink
Updated Branches:
  refs/heads/master 7a539c05a -> 7b574cf5b


[FLINK-4341] Let idle consumer subtasks emit max value watermarks and fail on 
resharding

This no longer allows the Kinesis consumer to transparently handle resharding.
This is a short-term workaround until we have a min-watermark notification 
service available in the JobManager.

This closes #2414


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7b574cf5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7b574cf5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7b574cf5

Branch: refs/heads/master
Commit: 7b574cf5b6e7549ae53ea0846022c4430a979a01
Parents: 7a539c0
Author: Gordon Tai <tzuli...@gmail.com>
Authored: Wed Aug 24 16:38:06 2016 +0800
Committer: Robert Metzger <rmetz...@apache.org>
Committed: Mon Aug 29 11:46:52 2016 +0200

----------------------------------------------------------------------
 docs/dev/connectors/kinesis.md                  |   6 +
 .../kinesis/FlinkKinesisConsumer.java           |   9 +-
 .../kinesis/internals/KinesisDataFetcher.java   | 129 ++++++++++++++++---
 .../kinesis/internals/ShardConsumer.java        |   4 +
 .../connectors/kinesis/proxy/KinesisProxy.java  |   8 +-
 5 files changed, 130 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7b574cf5/docs/dev/connectors/kinesis.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kinesis.md b/docs/dev/connectors/kinesis.md
index ce011b3..c54239d 100644
--- a/docs/dev/connectors/kinesis.md
+++ b/docs/dev/connectors/kinesis.md
@@ -107,6 +107,12 @@ to `TRIM_HORIZON`, which lets the consumer start reading 
the Kinesis stream from
 
 Other optional configuration keys for the consumer can be found in 
`ConsumerConfigConstants`.
 
+**NOTE:** Currently, resharding can not be handled transparently (i.e., 
without failing and restarting jobs) if there are idle consumer
+subtasks, which occur when the total number of shards is lower than the 
configured consumer parallelism. The job must be
+configured to enable checkpointing, so that the new shards due to resharding 
can be correctly picked up and consumed by the
+Kinesis consumer after the job is restored. This is a temporary limitation 
that will be resolved in future versions.
+Please see [FLINK-4341](https://issues.apache.org/jira/browse/FLINK-4341) for 
more detail.
+
 #### Fault Tolerance for Exactly-Once User-Defined State Update Semantics
 
 With Flink's checkpointing enabled, the Flink Kinesis Consumer will consume 
records from shards in Kinesis streams and

http://git-wip-us.apache.org/repos/asf/flink/blob/7b574cf5/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
 
b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
index 7b1f836..a62dc10 100644
--- 
a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
+++ 
b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
@@ -182,7 +182,6 @@ public class FlinkKinesisConsumer<T> extends 
RichParallelSourceFunction<T>
 
                        // initialize sequence numbers with restored state
                        lastStateSnapshot = sequenceNumsToRestore;
-                       sequenceNumsToRestore = null;
                } else {
                        // start fresh with empty sequence numbers if there are 
no snapshots to restore from.
                        lastStateSnapshot = new HashMap<>();
@@ -198,7 +197,7 @@ public class FlinkKinesisConsumer<T> extends 
RichParallelSourceFunction<T>
                fetcher = new KinesisDataFetcher<>(
                        streams, sourceContext, getRuntimeContext(), 
configProps, deserializer);
 
-               boolean isRestoringFromFailure = !lastStateSnapshot.isEmpty();
+               boolean isRestoringFromFailure = (sequenceNumsToRestore != 
null);
                fetcher.setIsRestoringFromFailure(isRestoringFromFailure);
 
                // if we are restoring from a checkpoint, we iterate over the 
restored
@@ -210,7 +209,7 @@ public class FlinkKinesisConsumer<T> extends 
RichParallelSourceFunction<T>
 
                                if (LOG.isInfoEnabled()) {
                                        LOG.info("Subtask {} is seeding the 
fetcher with restored shard {}," +
-                                               "starting state set to the 
restored sequence number {}" +
+                                                       " starting state set to 
the restored sequence number {}",
                                                
getRuntimeContext().getIndexOfThisSubtask(), restored.getKey().toString(), 
restored.getValue());
                                }
                                fetcher.registerNewSubscribedShardState(
@@ -285,13 +284,13 @@ public class FlinkKinesisConsumer<T> extends 
RichParallelSourceFunction<T>
                }
 
                if (LOG.isDebugEnabled()) {
-                       LOG.debug("Snapshotting state. ...");
+                       LOG.debug("Snapshotting state ...");
                }
 
                lastStateSnapshot = fetcher.snapshotState();
 
                if (LOG.isDebugEnabled()) {
-                       LOG.debug("Snapshotting state. Last processed sequence 
numbers: {}, checkpoint id: {}, timestamp: {}",
+                       LOG.debug("Snapshotted state, last processed sequence 
numbers: {}, checkpoint id: {}, timestamp: {}",
                                lastStateSnapshot.toString(), checkpointId, 
checkpointTimestamp);
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7b574cf5/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 
b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
index d83ab06..a06fdca 100644
--- 
a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++ 
b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -19,6 +19,7 @@ package 
org.apache.flink.streaming.connectors.kinesis.internals;
 
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
 import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
 import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.InitialPosition;
@@ -45,6 +46,7 @@ import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -150,6 +152,14 @@ public class KinesisDataFetcher<T> {
        /** Thread that executed runFetcher() */
        private Thread mainThread;
 
+       /**
+        * The current number of shards that are actively read by this fetcher.
+        *
+        * This value is updated in {@link 
KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)},
+        * and {@link KinesisDataFetcher#updateState(int, SequenceNumber)}.
+        */
+       private final AtomicInteger numberOfActiveShards = new AtomicInteger(0);
+
        private volatile boolean running = true;
 
        /**
@@ -229,9 +239,9 @@ public class KinesisDataFetcher<T> {
                //  1. query for any new shards that may have been created 
while the Kinesis consumer was not running,
                //     and register them to the subscribedShardState list.
                if (LOG.isDebugEnabled()) {
-                       String logFormat = (isRestoredFromFailure)
+                       String logFormat = (!isRestoredFromFailure)
                                ? "Subtask {} is trying to discover initial 
shards ..."
-                               : "Subtask {} is trying to discover any new 
shards that were created while the consumer wasn't" +
+                               : "Subtask {} is trying to discover any new 
shards that were created while the consumer wasn't " +
                                "running due to failure ...";
 
                        LOG.debug(logFormat, indexOfThisConsumerSubtask);
@@ -250,10 +260,10 @@ public class KinesisDataFetcher<T> {
                                : initialPosition.toSentinelSequenceNumber();
 
                        if (LOG.isInfoEnabled()) {
-                               String logFormat = (isRestoredFromFailure)
+                               String logFormat = (!isRestoredFromFailure)
                                        ? "Subtask {} will be seeded with 
initial shard {}, starting state set as sequence number {}"
-                                       : "Subtask {} will be seeded with new 
shard {} that was created while the consumer wasn't" +
-                                               "running due to failure, 
starting state set as sequence number {}";
+                                       : "Subtask {} will be seeded with new 
shard {} that was created while the consumer wasn't " +
+                                       "running due to failure, starting state 
set as sequence number {}";
 
                                LOG.info(logFormat, indexOfThisConsumerSubtask, 
shard.toString(), startingStateForNewShard.get());
                        }
@@ -287,18 +297,22 @@ public class KinesisDataFetcher<T> {
                for (int seededStateIndex = 0; seededStateIndex < 
subscribedShardsState.size(); seededStateIndex++) {
                        KinesisStreamShardState seededShardState = 
subscribedShardsState.get(seededStateIndex);
 
-                       if (LOG.isInfoEnabled()) {
-                               LOG.info("Subtask {} will start consuming 
seeded shard {} from sequence number {} with ShardConsumer {}",
-                                       indexOfThisConsumerSubtask, 
seededShardState.getKinesisStreamShard().toString(),
-                                       
seededShardState.getLastProcessedSequenceNum(), seededStateIndex);
-                       }
+                       // only start a consuming thread if the seeded 
subscribed shard has not been completely read already
+                       if 
(!seededShardState.getLastProcessedSequenceNum().equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get()))
 {
 
-                       shardConsumersExecutor.submit(
-                               new ShardConsumer<>(
-                                       this,
-                                       seededStateIndex,
-                                       
subscribedShardsState.get(seededStateIndex).getKinesisStreamShard(),
-                                       
subscribedShardsState.get(seededStateIndex).getLastProcessedSequenceNum()));
+                               if (LOG.isInfoEnabled()) {
+                                       LOG.info("Subtask {} will start 
consuming seeded shard {} from sequence number {} with ShardConsumer {}",
+                                               indexOfThisConsumerSubtask, 
seededShardState.getKinesisStreamShard().toString(),
+                                               
seededShardState.getLastProcessedSequenceNum(), seededStateIndex);
+                                       }
+
+                               shardConsumersExecutor.submit(
+                                       new ShardConsumer<>(
+                                               this,
+                                               seededStateIndex,
+                                               
subscribedShardsState.get(seededStateIndex).getKinesisStreamShard(),
+                                               
subscribedShardsState.get(seededStateIndex).getLastProcessedSequenceNum()));
+                       }
                }
 
                // 
------------------------------------------------------------------------
@@ -311,6 +325,37 @@ public class KinesisDataFetcher<T> {
                                
ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS,
                                
Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS)));
 
+               // FLINK-4341:
+               // For downstream operators that work on time (ex. window 
operators), we are required to emit a max value watermark
+               // for subtasks that won't continue to have shards to read from 
unless resharding happens in the future, otherwise
+               // the downstream watermarks would not advance, leading to 
unbounded accumulating state.
+               //
+               // The side-effect of this limitation is that on resharding, we 
must fail hard if the newly discovered shard
+               // is to be subscribed by a subtask that has previously emitted 
a max value watermark, otherwise the watermarks
+               // will be messed up.
+               //
+               // There are 2 cases were we need to either emit a max value 
watermark, or deliberately fail hard:
+               //  (a) if this subtask has no more shards to read from unless 
resharding happens in the future, we emit a max
+               //      value watermark. This case is encountered when 1) all 
previously read shards by this subtask were closed
+               //      due to resharding, 2) when this subtask was initially 
only subscribed to closed shards while the consumer
+               //      was told to start from TRIM_HORIZON, or 3) there was 
initially no shards for this subtask to read on startup.
+               //  (b) this subtask has discovered new shards to read from due 
to a reshard; if this subtask has already emitted
+               //      a max value watermark, we must deliberately fail hard 
to avoid messing up the watermarks. The new shards
+               //      will be subscribed by this subtask after restore as 
initial shards on startup.
+               //
+               // TODO: This is a temporary workaround until a min-watermark 
information service is available in the JobManager
+               // Please see FLINK-4341 for more detail
+
+               boolean emittedMaxValueWatermark = false;
+
+               if (this.numberOfActiveShards.get() == 0) {
+                       // FLINK-4341 workaround case (a) - please see the 
above for details on this case
+                       LOG.info("Subtask {} has no initial shards to read on 
startup; emitting max value watermark ...",
+                               indexOfThisConsumerSubtask);
+                       sourceContext.emitWatermark(new 
Watermark(Long.MAX_VALUE));
+                       emittedMaxValueWatermark = true;
+               }
+
                while (running) {
                        if (LOG.isDebugEnabled()) {
                                LOG.debug("Subtask {} is trying to discover new 
shards that were created due to resharding ...",
@@ -318,6 +363,41 @@ public class KinesisDataFetcher<T> {
                        }
                        List<KinesisStreamShard> newShardsDueToResharding = 
discoverNewShardsToSubscribe();
 
+                       // -- NOTE: Potential race condition between 
newShardsDueToResharding and numberOfActiveShards --
+                       // Since numberOfActiveShards is updated by parallel 
shard consuming threads in updateState(), there exists
+                       // a race condition with the currently queried 
newShardsDueToResharding. Therefore, numberOfActiveShards
+                       // may not correctly reflect the discover result in the 
below case determination. This may lead to incorrect
+                       // case determination on the current discovery attempt, 
but can still be correctly handled on future attempts.
+                       //
+                       // Although this can be resolved by wrapping the 
current shard discovery attempt with the below
+                       // case determination within a synchronized block on 
the checkpoint lock for atomicity, there will be
+                       // considerable throughput performance regression as 
shard discovery is a remote call to AWS. Therefore,
+                       // since the case determination is a temporary 
workaround for FLINK-4341, the race condition is tolerable as
+                       // we can still eventually handle max value watermark 
emitting / deliberately failing on successive
+                       // discovery attempts.
+
+                       if (newShardsDueToResharding.size() == 0 && 
this.numberOfActiveShards.get() == 0 && !emittedMaxValueWatermark) {
+                               // FLINK-4341 workaround case (a) - please see 
the above for details on this case
+                               LOG.info("Subtask {} has completed reading all 
shards; emitting max value watermark ...",
+                                       indexOfThisConsumerSubtask);
+                               sourceContext.emitWatermark(new 
Watermark(Long.MAX_VALUE));
+                               emittedMaxValueWatermark = true;
+                       } else if (newShardsDueToResharding.size() > 0 && 
emittedMaxValueWatermark) {
+                               // FLINK-4341 workaround case (b) - please see 
the above for details on this case
+                               //
+                               // Note that in the case where on resharding 
this subtask ceased to read all of it's previous shards
+                               // but new shards is also to be subscribed by 
this subtask immediately after, emittedMaxValueWatermark
+                               // will be false; this allows the fetcher to 
continue reading the new shards without failing on such cases.
+                               // However, due to the race condition mentioned 
above, we might still fall into case (a) first, and
+                               // then (b) on the next discovery attempt. 
Although the failure is ideally unnecessary, max value
+                               // watermark emitting still remains to be 
correct.
+
+                               LOG.warn("Subtask {} has discovered {} new 
shards to subscribe, but is failing hard to avoid messing" +
+                                               " up watermarks; the new shards 
will be subscribed by this subtask after restore ...",
+                                       indexOfThisConsumerSubtask, 
newShardsDueToResharding.size());
+                               throw new RuntimeException("Deliberate failure 
to avoid messing up watermarks");
+                       }
+
                        for (KinesisStreamShard shard : 
newShardsDueToResharding) {
                                // since there may be delay in discovering a 
new shard, all new shards due to
                                // resharding should be read starting from the 
earliest record possible
@@ -521,6 +601,14 @@ public class KinesisDataFetcher<T> {
        protected void updateState(int shardStateIndex, SequenceNumber 
lastSequenceNumber) {
                synchronized (checkpointLock) {
                        
subscribedShardsState.get(shardStateIndex).setLastProcessedSequenceNum(lastSequenceNumber);
+
+                       // if a shard's state is updated to be 
SENTINEL_SHARD_ENDING_SEQUENCE_NUM by its consumer thread,
+                       // we've finished reading the shard and should 
determine it to be non-active
+                       if 
(lastSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get()))
 {
+                               this.numberOfActiveShards.decrementAndGet();
+                               LOG.info("Subtask {} has reached the end of 
subscribed shard: {}",
+                                       indexOfThisConsumerSubtask, 
subscribedShardsState.get(shardStateIndex).getKinesisStreamShard());
+                       }
                }
        }
 
@@ -530,9 +618,16 @@ public class KinesisDataFetcher<T> {
         * @param newSubscribedShardState the new shard state that this fetcher 
is to be subscribed to
         */
        public int registerNewSubscribedShardState(KinesisStreamShardState 
newSubscribedShardState) {
-
                synchronized (checkpointLock) {
                        subscribedShardsState.add(newSubscribedShardState);
+
+                       // If a registered shard has initial state that is not 
SENTINEL_SHARD_ENDING_SEQUENCE_NUM (will be the case
+                       // if the consumer had already finished reading a shard 
before we failed and restored), we determine that
+                       // this subtask has a new active shard
+                       if 
(!newSubscribedShardState.getLastProcessedSequenceNum().equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get()))
 {
+                               this.numberOfActiveShards.incrementAndGet();
+                       }
+
                        return subscribedShardsState.size()-1;
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/7b574cf5/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 
b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
index 494f5de..6e24e65 100644
--- 
a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
+++ 
b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
@@ -36,6 +36,7 @@ import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Properties;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -87,6 +88,9 @@ public class ShardConsumer<T> implements Runnable {
                this.subscribedShardStateIndex = 
checkNotNull(subscribedShardStateIndex);
                this.subscribedShard = checkNotNull(subscribedShard);
                this.lastSequenceNum = checkNotNull(lastSequenceNum);
+               checkArgument(
+                       
!lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get()),
+                       "Should not start a ShardConsumer if the shard has 
already been completely read.");
 
                this.deserializer = fetcherRef.getClonedDeserializationSchema();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7b574cf5/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 
b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
index 906689f..1113fde 100644
--- 
a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
+++ 
b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
@@ -201,8 +201,8 @@ public class KinesisProxy implements KinesisProxyInterface {
                }
 
                if (getRecordsResult == null) {
-                       throw new RuntimeException("Rate Exceeded for 
getRecords operation - all " + getRecordsMaxAttempts + "retry" +
-                               "attempts returned 
ProvisionedThroughputExceededException.");
+                       throw new RuntimeException("Rate Exceeded for 
getRecords operation - all " + getRecordsMaxAttempts +
+                               " retry attempts returned 
ProvisionedThroughputExceededException.");
                }
 
                return getRecordsResult;
@@ -245,8 +245,8 @@ public class KinesisProxy implements KinesisProxyInterface {
                }
 
                if (getShardIteratorResult == null) {
-                       throw new RuntimeException("Rate Exceeded for 
getShardIterator operation - all " + getShardIteratorMaxAttempts + "retry" +
-                               "attempts returned 
ProvisionedThroughputExceededException.");
+                       throw new RuntimeException("Rate Exceeded for 
getShardIterator operation - all " + getShardIteratorMaxAttempts +
+                               " retry attempts returned 
ProvisionedThroughputExceededException.");
                }
                return getShardIteratorResult.getShardIterator();
        }

Reply via email to