[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15358977#comment-15358977
 ] 

ASF GitHub Bot commented on FLINK-3231:
---------------------------------------

Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2131#discussion_r69300348
  
    --- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
    @@ -17,157 +17,553 @@
     
     package org.apache.flink.streaming.connectors.kinesis.internals;
     
    +import org.apache.flink.api.common.functions.RuntimeContext;
     import org.apache.flink.streaming.api.functions.source.SourceFunction;
    +import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
    +import 
org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
     import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
    +import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
     import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
     import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
    +import 
org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
    +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
    +import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
     import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
    +import 
org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
     import org.apache.flink.util.InstantiationUtil;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
     import java.io.IOException;
    -import java.util.Map;
    -import java.util.HashMap;
    +
    +import java.util.LinkedList;
     import java.util.List;
    -import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.Map;
     import java.util.Properties;
    +import java.util.Set;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.ThreadFactory;
    +import java.util.concurrent.atomic.AtomicLong;
     import java.util.concurrent.atomic.AtomicReference;
     
     import static org.apache.flink.util.Preconditions.checkNotNull;
     
     /**
    - * A Kinesis Data Fetcher that consumes data from a specific set of 
Kinesis shards.
    - * The fetcher spawns a single thread for connection to each shard.
    + * A KinesisDataFetcher is responsible for fetching data from multiple 
Kinesis shards. Each parallel subtask instantiates
    + * and runs a single fetcher throughout the subtask's lifetime. The 
fetcher accomplishes the following:
    + * <ul>
    + *     <li>1. continuously poll Kinesis to discover shards that the 
subtask should subscribe to. The subscribed subset
    + *                   of shards, including future new shards, is 
non-overlapping across subtasks (no two subtasks will be
    + *                   subscribed to the same shard) and determinate across 
subtask restores (the subtask will always subscribe
    + *                   to the same subset of shards even after 
restoring)</li>
    + *     <li>2. decide where in each discovered shard should the fetcher 
start subscribing to</li>
    + *     <li>3. subscribe to shards by creating a single thread for each 
shard</li>
    + * </ul>
    + *
    + * <p>The fetcher manages two states: 1) last seen shard ids of each 
subscribed stream (used for continuous shard discovery),
    + * and 2) last processed sequence numbers of each subscribed shard. Since 
operations on the second state will be performed
    + * by multiple threads, these operations should only be done using the 
handler methods provided in this class.
      */
    -public class KinesisDataFetcher {
    +public class KinesisDataFetcher<T> {
     
        private static final Logger LOG = 
LoggerFactory.getLogger(KinesisDataFetcher.class);
     
    -   /** Config properties for the Flink Kinesis Consumer */
    +   // 
------------------------------------------------------------------------
    +   //  Consumer-wide settings
    +   // 
------------------------------------------------------------------------
    +
    +   /** Configuration properties for the Flink Kinesis Consumer */
        private final Properties configProps;
     
    -   /** The name of the consumer task that this fetcher was instantiated */
    -   private final String taskName;
    +   /** The list of Kinesis streams that the consumer is subscribing to */
    +   private final List<String> streams;
    +
    +   /**
    +    * The deserialization schema we will be using to convert Kinesis 
records to Flink objects.
    +    * Note that since this might not be thread-safe, {@link 
ShardConsumer}s using this must
    +    * clone a copy using {@link 
KinesisDataFetcher#getClonedDeserializationSchema()}.
    +    */
    +   private final KinesisDeserializationSchema<T> deserializationSchema;
    +
    +   // 
------------------------------------------------------------------------
    +   //  Subtask-specific settings
    +   // 
------------------------------------------------------------------------
    +
    +   /** Runtime context of the subtask that this fetcher was created in */
    +   private final RuntimeContext runtimeContext;
    +
    +   private final int totalNumberOfConsumerSubtasks;
    +
    +   private final int indexOfThisConsumerSubtask;
    +
    +   /**
    +    * This flag should be set by {@link FlinkKinesisConsumer} using
    +    * {@link KinesisDataFetcher#setIsRestoringFromFailure(boolean)}
    +    */
    +   private boolean isRestoredFromFailure;
    +
    +   // 
------------------------------------------------------------------------
    +   //  Executor services to run created threads
    +   // 
------------------------------------------------------------------------
     
    -   /** Information of the shards that this fetcher handles, along with the 
sequence numbers that they should start from */
    -   private HashMap<KinesisStreamShard, SequenceNumber> 
assignedShardsWithStartingSequenceNum;
    +   /** Executor service to run {@link ShardConsumer}s to consume Kinesis 
shards */
    +   private final ExecutorService shardConsumersExecutor;
     
    -   /** Reference to the thread that executed run() */
    -   private volatile Thread mainThread;
    +   // 
------------------------------------------------------------------------
    +   //  Managed state, accessed and updated across multiple threads
    +   // 
------------------------------------------------------------------------
     
    -   /** Reference to the first error thrown by any of the spawned shard 
connection threads */
    +   /** The last discovered shard ids of each subscribed stream, updated as 
the fetcher discovers new shards in.
    +    * Note: this state will be updated if new shards are found when {@link 
KinesisDataFetcher#discoverNewShardsToSubscribe()} is called.
    +    */
    +   private final Map<String, String> 
subscribedStreamsToLastDiscoveredShardIds;
    +
    +   /**
    +    * The shards, along with their last processed sequence numbers, that 
this fetcher is subscribed to. The fetcher
    +    * will add new subscribed shard states to this list as it discovers 
new shards. {@link ShardConsumer} threads update
    +    * the last processed sequence number of subscribed shards as they 
fetch and process records.
    +    *
    +    * <p>Note that since multiple {@link ShardConsumer} threads will be 
performing operations on this list, all operations
    +    * must be wrapped in synchronized blocks on the {@link 
KinesisDataFetcher#checkpointLock} lock. For this purpose,
    +    * all threads must use the following thread-safe methods this class 
provides to operate on this list:
    +    * <ul>
    +    *     <li>{@link 
KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)}</li>
    +    *     <li>{@link KinesisDataFetcher#updateState(int, 
SequenceNumber)}</li>
    +    *     <li>{@link KinesisDataFetcher#emitRecordAndUpdateState(T, int, 
SequenceNumber)}</li>
    +    * </ul>
    +    */
    +   private final List<KinesisStreamShardState> subscribedShardsState;
    +
    +   private final SourceFunction.SourceContext<T> sourceContext;
    +
    +   /** Checkpoint lock, also used to synchronize operations on 
subscribedShardsState */
    +   private final Object checkpointLock;
    +
    +   /** Reference to the first error thrown by any of the {@link 
ShardConsumer} threads */
        private final AtomicReference<Throwable> error;
     
    +   /** The Kinesis proxy that the fetcher will be using to discover new 
shards */
    +   private final KinesisProxyInterface kinesis;
    +
    +   /** Thread that executed runFetcher() */
    +   private Thread mainThread;
    +
        private volatile boolean running = true;
     
        /**
    -    * Creates a new Kinesis Data Fetcher for the specified set of shards
    +    * Creates a Kinesis Data Fetcher.
         *
    -    * @param assignedShards the shards that this fetcher will pull data 
from
    -    * @param configProps the configuration properties of this Flink 
Kinesis Consumer
    -    * @param taskName the task name of this consumer task
    +    * @param streams the streams to subscribe to
    +    * @param sourceContext context of the source function
    +    * @param runtimeContext this subtask's runtime context
    +    * @param configProps the consumer configuration properties
    +    * @param deserializationSchema deserialization schema
         */
    -   public KinesisDataFetcher(List<KinesisStreamShard> assignedShards, 
Properties configProps, String taskName) {
    +   public KinesisDataFetcher(List<String> streams,
    +                                                   
SourceFunction.SourceContext<T> sourceContext,
    +                                                   RuntimeContext 
runtimeContext,
    +                                                   Properties configProps,
    +                                                   
KinesisDeserializationSchema<T> deserializationSchema) {
    +           this(streams,
    +                   sourceContext,
    +                   sourceContext.getCheckpointLock(),
    +                   runtimeContext,
    +                   configProps,
    +                   deserializationSchema,
    +                   new AtomicReference<Throwable>(),
    +                   new LinkedList<KinesisStreamShardState>(),
    +                   
createInitialSubscribedStreamsToLastDiscoveredShardsState(streams),
    +                   KinesisProxy.create(configProps));
    +   }
    +
    +   /** This constructor is exposed for testing purposes */
    +   protected KinesisDataFetcher(List<String> streams,
    +                                                           
SourceFunction.SourceContext<T> sourceContext,
    +                                                           Object 
checkpointLock,
    +                                                           RuntimeContext 
runtimeContext,
    +                                                           Properties 
configProps,
    +                                                           
KinesisDeserializationSchema<T> deserializationSchema,
    +                                                           
AtomicReference<Throwable> error,
    +                                                           
LinkedList<KinesisStreamShardState> subscribedShardsState,
    +                                                           HashMap<String, 
String> subscribedStreamsToLastDiscoveredShardIds,
    +                                                           
KinesisProxyInterface kinesis) {
    +           this.streams = checkNotNull(streams);
                this.configProps = checkNotNull(configProps);
    -           this.assignedShardsWithStartingSequenceNum = new HashMap<>();
    -           for (KinesisStreamShard shard : assignedShards) {
    -                   assignedShardsWithStartingSequenceNum.put(shard, 
SentinelSequenceNumber.SENTINEL_SEQUENCE_NUMBER_NOT_SET.get());
    -           }
    -           this.taskName = taskName;
    -           this.error = new AtomicReference<>();
    +           this.sourceContext = checkNotNull(sourceContext);
    +           this.checkpointLock = checkNotNull(checkpointLock);
    +           this.runtimeContext = checkNotNull(runtimeContext);
    +           this.totalNumberOfConsumerSubtasks = 
runtimeContext.getNumberOfParallelSubtasks();
    +           this.indexOfThisConsumerSubtask = 
runtimeContext.getIndexOfThisSubtask();
    +           this.deserializationSchema = 
checkNotNull(deserializationSchema);
    +           this.kinesis = checkNotNull(kinesis);
    +
    +           this.error = checkNotNull(error);
    +           this.subscribedShardsState = 
checkNotNull(subscribedShardsState);
    +           this.subscribedStreamsToLastDiscoveredShardIds = 
checkNotNull(subscribedStreamsToLastDiscoveredShardIds);
    +
    +           this.shardConsumersExecutor =
    +                   
createShardConsumersThreadPool(runtimeContext.getTaskNameWithSubtasks());
        }
     
        /**
    -    * Advance a shard's starting sequence number to a specified value
    +    * Starts the fetcher. After starting the fetcher, it can only
    +    * be stopped by calling {@link KinesisDataFetcher#shutdownFetcher()}.
         *
    -    * @param streamShard the shard to perform the advance on
    -    * @param sequenceNum the sequence number to advance to
    +    * @throws Exception the first error or exception thrown by the fetcher 
or any of the threads created by the fetcher.
         */
    -   public void advanceSequenceNumberTo(KinesisStreamShard streamShard, 
SequenceNumber sequenceNum) {
    -           if 
(!assignedShardsWithStartingSequenceNum.containsKey(streamShard)) {
    -                   throw new IllegalArgumentException("Can't advance 
sequence number on a shard we are not going to read.");
    -           }
    -           assignedShardsWithStartingSequenceNum.put(streamShard, 
sequenceNum);
    -   }
    -
    -   public <T> void run(SourceFunction.SourceContext<T> sourceContext,
    -                                           KinesisDeserializationSchema<T> 
deserializationSchema,
    -                                           HashMap<KinesisStreamShard, 
SequenceNumber> lastSequenceNums) throws Exception {
    +   public void runFetcher() throws Exception {
     
    -           if (assignedShardsWithStartingSequenceNum == null || 
assignedShardsWithStartingSequenceNum.size() == 0) {
    -                   throw new IllegalArgumentException("No shards set to 
read for this fetcher");
    +           // check that we are running before proceeding
    +           if (!running) {
    +                   return;
                }
     
                this.mainThread = Thread.currentThread();
     
    -           LOG.info("Reading from shards " + 
assignedShardsWithStartingSequenceNum);
    +           // 
------------------------------------------------------------------------
    +           //  Procedures before starting the infinite while loop:
    +           // 
------------------------------------------------------------------------
    +
    +           //  1. query for any new shards that may have been created 
while the Kinesis consumer was not running,
    +           //     and register them to the subscribedShardState list.
    +           List<KinesisStreamShard> newShardsCreatedWhileNotRunning = 
discoverNewShardsToSubscribe();
    +           for (KinesisStreamShard shard : 
newShardsCreatedWhileNotRunning) {
    +                   // the starting state for new shards created while the 
consumer wasn't running depends on whether or not
    +                   // we are starting fresh (not restoring from a 
checkpoint); when we are starting fresh, this simply means
    +                   // all existing shards of streams we are subscribing to 
are new shards; when we are restoring from checkpoint,
    +                   // any new shards due to Kinesis resharding from the 
time of the checkpoint will be considered new shards.
    +                   SentinelSequenceNumber startingStateForNewShard = 
(isRestoredFromFailure)
    +                           ? 
SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM
    +                           : 
KinesisConfigUtil.getInitialPositionAsSentinelSequenceNumber(configProps);
     
    -           // create a thread for each individual shard
    -           ArrayList<ShardConsumerThread<?>> consumerThreads = new 
ArrayList<>(assignedShardsWithStartingSequenceNum.size());
    -           for (Map.Entry<KinesisStreamShard, SequenceNumber> 
assignedShard : assignedShardsWithStartingSequenceNum.entrySet()) {
    -                   ShardConsumerThread<T> thread = new 
ShardConsumerThread<>(this, configProps, assignedShard.getKey(),
    -                           assignedShard.getValue(), sourceContext, 
InstantiationUtil.clone(deserializationSchema), lastSequenceNums);
    -                   thread.setName(String.format("ShardConsumer - %s - 
%s/%s",
    -                           taskName, 
assignedShard.getKey().getStreamName() ,assignedShard.getKey().getShardId()));
    -                   thread.setDaemon(true);
    -                   consumerThreads.add(thread);
    +                   if (LOG.isInfoEnabled()) {
    +                           String logFormat = (isRestoredFromFailure)
    +                                   ? "Subtask {} will be seeded with 
initial shard {}, starting state set as sequence number {}"
    +                                   : "Subtask {} will be seeded with new 
shard {} that was created while the consumer wasn't" +
    +                                           "running due to failure, 
starting state set as sequence number {}";
    +
    +                           LOG.info(logFormat, 
runtimeContext.getIndexOfThisSubtask(), shard.toString(), 
startingStateForNewShard.get());
    +                   }
    +                   registerNewSubscribedShardState(new 
KinesisStreamShardState(shard, startingStateForNewShard.get()));
                }
     
    -           // check that we are viable for running for the last time 
before starting threads
    -           if (!running) {
    -                   return;
    +           //  2. check that there is at least one shard in the subscribed 
streams to consume from (can be done by
    +           //     checking if at least one value in 
subscribedStreamsToLastDiscoveredShardIds is not null)
    +           boolean hasShards = false;
    +           StringBuilder streamsWithNoShardsFound = new StringBuilder();
    +           for (Map.Entry<String, String> streamToLastDiscoveredShardEntry 
: subscribedStreamsToLastDiscoveredShardIds.entrySet()) {
    +                   if (streamToLastDiscoveredShardEntry.getValue() != 
null) {
    +                           hasShards = true;
    +                   } else {
    +                           
streamsWithNoShardsFound.append(streamToLastDiscoveredShardEntry.getKey()).append(",
 ");
    +                   }
                }
     
    -           for (ShardConsumerThread<?> shardConsumer : consumerThreads) {
    -                   LOG.info("Starting thread {}", shardConsumer.getName());
    -                   shardConsumer.start();
    +           if (streamsWithNoShardsFound.length() != 0 && 
LOG.isWarnEnabled()) {
    +                   LOG.warn("Subtask {} has failed to find any shards for 
the following subscribed streams: {}",
    +                           indexOfThisConsumerSubtask, 
streamsWithNoShardsFound.toString());
                }
     
    -           // wait until all consumer threads are done, or until the 
fetcher is aborted, or until
    -           // an error occurred in one of the consumer threads
    -           try {
    -                   boolean consumersStillRunning = true;
    -                   while (running && error.get() == null && 
consumersStillRunning) {
    -                           try {
    -                                   // wait for the consumer threads. if an 
error occurs, we are interrupted
    -                                   for (ShardConsumerThread<?> 
consumerThread : consumerThreads) {
    -                                           consumerThread.join();
    -                                   }
    +           if (!hasShards) {
    +                   throw new RuntimeException("No shards can be found for 
all subscribed streams: " + streams);
    +           }
     
    -                                   // check if there are consumer threads 
still running
    -                                   consumersStillRunning = false;
    -                                   for (ShardConsumerThread<?> 
consumerThread : consumerThreads) {
    -                                           consumersStillRunning = 
consumersStillRunning | consumerThread.isAlive();
    -                                   }
    -                           } catch (InterruptedException e) {
    -                                   // ignore
    -                           }
    +           //  3. start consuming any shard state we already have in the 
subscribedShardState up to this point; the
    +           //     subscribedShardState may already be seeded with values 
due to step 1., or explicitly added by the
    +           //     consumer using a restored state checkpoint
    +           for (int seededStateIndex = 0; seededStateIndex < 
subscribedShardsState.size(); seededStateIndex++) {
    +                   KinesisStreamShardState seededShardState = 
subscribedShardsState.get(seededStateIndex);
    +
    +                   if (LOG.isInfoEnabled()) {
    +                           LOG.info("Subtask {} will start consuming 
seeded shard {} from sequence number {} with ShardConsumer {}" +
    +                                   runtimeContext.getIndexOfThisSubtask(), 
seededShardState.getKinesisStreamShard().toString(),
    +                                   
seededShardState.getLastProcessedSequenceNum(), seededStateIndex);
                        }
     
    -                   // make sure any asynchronous error is noticed
    -                   Throwable error = this.error.get();
    -                   if (error != null) {
    -                           throw new Exception(error.getMessage(), error);
    +                   shardConsumersExecutor.submit(
    +                           new ShardConsumer<>(
    +                                   this,
    +                                   seededStateIndex,
    +                                   
subscribedShardsState.get(seededStateIndex).getKinesisStreamShard(),
    +                                   
subscribedShardsState.get(seededStateIndex).getLastProcessedSequenceNum()));
    +           }
    +
    +           // 
------------------------------------------------------------------------
    +
    +           // finally, start the infinite shard discovery and consumer 
launching loop;
    +           // we will escape from this loop only when shutdownFetcher() or 
stopWithError() is called
    +
    +           final long discoveryIntervalMillis = Long.valueOf(
    +                   configProps.getProperty(
    +                           
KinesisConfigConstants.CONFIG_SHARD_DISCOVERY_INTERVAL_MILLIS,
    +                           
Long.toString(KinesisConfigConstants.DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS)));
    +
    +           while (running) {
    +                   List<KinesisStreamShard> newShardsDueToResharding = 
discoverNewShardsToSubscribe();
    +
    +                   for (KinesisStreamShard shard : 
newShardsDueToResharding) {
    +                           // since there may be delay in discovering a 
new shard, all new shards due to
    +                           // resharding should be read starting from the 
earliest record possible
    +                           KinesisStreamShardState newShardState =
    +                                   new KinesisStreamShardState(shard, 
SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get());
    +                           int newStateIndex = 
registerNewSubscribedShardState(newShardState);
    +
    +                           if (LOG.isInfoEnabled()) {
    +                                   LOG.info("Subtask {} has discovered a 
new shard {} due to resharding, and will start consuming " +
    +                                                   "the shard from 
sequence number {} with ShardConsumer {}",
    +                                           
runtimeContext.getIndexOfThisSubtask(), 
newShardState.getKinesisStreamShard().toString(),
    +                                           
newShardState.getLastProcessedSequenceNum(), newStateIndex);
    --- End diff --
    
    @rmetzger We're already logging here when new shards are discovered. Should 
I still add the debug level log you mentioned, perhaps in 
`KinesisDataFetcher#discoverNewShardsToSubscribe()`?


> Handle Kinesis-side resharding in Kinesis streaming consumer
> ------------------------------------------------------------
>
>                 Key: FLINK-3231
>                 URL: https://issues.apache.org/jira/browse/FLINK-3231
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Kinesis Connector, Streaming Connectors
>    Affects Versions: 1.1.0
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>             Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store, but that means it would require the user to set up ZK 
> to work.
> Since this feature introduces extensive work, it is opened as a separate 
> sub-task from the basic implementation 
> https://issues.apache.org/jira/browse/FLINK-3229.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to