Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2131#discussion_r69299145
  
    --- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
    @@ -17,157 +17,553 @@
     
     package org.apache.flink.streaming.connectors.kinesis.internals;
     
    +import org.apache.flink.api.common.functions.RuntimeContext;
     import org.apache.flink.streaming.api.functions.source.SourceFunction;
    +import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
    +import 
org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
     import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
    +import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
     import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
     import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
    +import 
org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
    +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
    +import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
     import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
    +import 
org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
     import org.apache.flink.util.InstantiationUtil;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
     import java.io.IOException;
    -import java.util.Map;
    -import java.util.HashMap;
    +
    +import java.util.LinkedList;
     import java.util.List;
    -import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.Map;
     import java.util.Properties;
    +import java.util.Set;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.ThreadFactory;
    +import java.util.concurrent.atomic.AtomicLong;
     import java.util.concurrent.atomic.AtomicReference;
     
     import static org.apache.flink.util.Preconditions.checkNotNull;
     
     /**
    - * A Kinesis Data Fetcher that consumes data from a specific set of 
Kinesis shards.
    - * The fetcher spawns a single thread for connection to each shard.
    + * A KinesisDataFetcher is responsible for fetching data from multiple 
Kinesis shards. Each parallel subtask instantiates
    + * and runs a single fetcher throughout the subtask's lifetime. The 
fetcher accomplishes the following:
    + * <ul>
    + *     <li>1. continuously poll Kinesis to discover shards that the 
subtask should subscribe to. The subscribed subset
    + *                   of shards, including future new shards, is 
non-overlapping across subtasks (no two subtasks will be
    + *                   subscribed to the same shard) and determinate across 
subtask restores (the subtask will always subscribe
    + *                   to the same subset of shards even after 
restoring)</li>
    + *     <li>2. decide where in each discovered shard should the fetcher 
start subscribing to</li>
    + *     <li>3. subscribe to shards by creating a single thread for each 
shard</li>
    + * </ul>
    + *
    + * <p>The fetcher manages two states: 1) last seen shard ids of each 
subscribed stream (used for continuous shard discovery),
    + * and 2) last processed sequence numbers of each subscribed shard. Since 
operations on the second state will be performed
    + * by multiple threads, these operations should only be done using the 
handler methods provided in this class.
      */
    -public class KinesisDataFetcher {
    +public class KinesisDataFetcher<T> {
     
        private static final Logger LOG = 
LoggerFactory.getLogger(KinesisDataFetcher.class);
     
    -   /** Config properties for the Flink Kinesis Consumer */
    +   // 
------------------------------------------------------------------------
    +   //  Consumer-wide settings
    +   // 
------------------------------------------------------------------------
    +
    +   /** Configuration properties for the Flink Kinesis Consumer */
        private final Properties configProps;
     
    -   /** The name of the consumer task that this fetcher was instantiated */
    -   private final String taskName;
    +   /** The list of Kinesis streams that the consumer is subscribing to */
    +   private final List<String> streams;
    +
    +   /**
    +    * The deserialization schema we will be using to convert Kinesis 
records to Flink objects.
    +    * Note that since this might not be thread-safe, {@link 
ShardConsumer}s using this must
    +    * clone a copy using {@link 
KinesisDataFetcher#getClonedDeserializationSchema()}.
    +    */
    +   private final KinesisDeserializationSchema<T> deserializationSchema;
    +
    +   // 
------------------------------------------------------------------------
    +   //  Subtask-specific settings
    +   // 
------------------------------------------------------------------------
    +
    +   /** Runtime context of the subtask that this fetcher was created in */
    +   private final RuntimeContext runtimeContext;
    +
    +   private final int totalNumberOfConsumerSubtasks;
    +
    +   private final int indexOfThisConsumerSubtask;
    +
    +   /**
    +    * This flag should be set by {@link FlinkKinesisConsumer} using
    +    * {@link KinesisDataFetcher#setIsRestoringFromFailure(boolean)}
    +    */
    +   private boolean isRestoredFromFailure;
    +
    +   // 
------------------------------------------------------------------------
    +   //  Executor services to run created threads
    +   // 
------------------------------------------------------------------------
     
    -   /** Information of the shards that this fetcher handles, along with the 
sequence numbers that they should start from */
    -   private HashMap<KinesisStreamShard, SequenceNumber> 
assignedShardsWithStartingSequenceNum;
    +   /** Executor service to run {@link ShardConsumer}s to consume Kinesis 
shards */
    +   private final ExecutorService shardConsumersExecutor;
     
    -   /** Reference to the thread that executed run() */
    -   private volatile Thread mainThread;
    +   // 
------------------------------------------------------------------------
    +   //  Managed state, accessed and updated across multiple threads
    +   // 
------------------------------------------------------------------------
     
    -   /** Reference to the first error thrown by any of the spawned shard 
connection threads */
    +   /** The last discovered shard ids of each subscribed stream, updated as 
the fetcher discovers new shards in.
    +    * Note: this state will be updated if new shards are found when {@link 
KinesisDataFetcher#discoverNewShardsToSubscribe()} is called.
    +    */
    +   private final Map<String, String> 
subscribedStreamsToLastDiscoveredShardIds;
    +
    +   /**
    +    * The shards, along with their last processed sequence numbers, that 
this fetcher is subscribed to. The fetcher
    +    * will add new subscribed shard states to this list as it discovers 
new shards. {@link ShardConsumer} threads update
    +    * the last processed sequence number of subscribed shards as they 
fetch and process records.
    +    *
    +    * <p>Note that since multiple {@link ShardConsumer} threads will be 
performing operations on this list, all operations
    +    * must be wrapped in synchronized blocks on the {@link 
KinesisDataFetcher#checkpointLock} lock. For this purpose,
    +    * all threads must use the following thread-safe methods this class 
provides to operate on this list:
    +    * <ul>
    +    *     <li>{@link 
KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)}</li>
    +    *     <li>{@link KinesisDataFetcher#updateState(int, 
SequenceNumber)}</li>
    +    *     <li>{@link KinesisDataFetcher#emitRecordAndUpdateState(T, int, 
SequenceNumber)}</li>
    +    * </ul>
    +    */
    +   private final List<KinesisStreamShardState> subscribedShardsState;
    +
    +   private final SourceFunction.SourceContext<T> sourceContext;
    +
    +   /** Checkpoint lock, also used to synchronize operations on 
subscribedShardsState */
    +   private final Object checkpointLock;
    +
    +   /** Reference to the first error thrown by any of the {@link 
ShardConsumer} threads */
        private final AtomicReference<Throwable> error;
     
    +   /** The Kinesis proxy that the fetcher will be using to discover new 
shards */
    +   private final KinesisProxyInterface kinesis;
    +
    +   /** Thread that executed runFetcher() */
    +   private Thread mainThread;
    +
        private volatile boolean running = true;
     
        /**
    -    * Creates a new Kinesis Data Fetcher for the specified set of shards
    +    * Creates a Kinesis Data Fetcher.
         *
    -    * @param assignedShards the shards that this fetcher will pull data 
from
    -    * @param configProps the configuration properties of this Flink 
Kinesis Consumer
    -    * @param taskName the task name of this consumer task
    +    * @param streams the streams to subscribe to
    +    * @param sourceContext context of the source function
    +    * @param runtimeContext this subtask's runtime context
    +    * @param configProps the consumer configuration properties
    +    * @param deserializationSchema deserialization schema
         */
    -   public KinesisDataFetcher(List<KinesisStreamShard> assignedShards, 
Properties configProps, String taskName) {
    +   public KinesisDataFetcher(List<String> streams,
    +                                                   
SourceFunction.SourceContext<T> sourceContext,
    +                                                   RuntimeContext 
runtimeContext,
    +                                                   Properties configProps,
    +                                                   
KinesisDeserializationSchema<T> deserializationSchema) {
    +           this(streams,
    +                   sourceContext,
    +                   sourceContext.getCheckpointLock(),
    +                   runtimeContext,
    +                   configProps,
    +                   deserializationSchema,
    +                   new AtomicReference<Throwable>(),
    +                   new LinkedList<KinesisStreamShardState>(),
    +                   
createInitialSubscribedStreamsToLastDiscoveredShardsState(streams),
    +                   KinesisProxy.create(configProps));
    +   }
    +
    +   /** This constructor is exposed for testing purposes */
    +   protected KinesisDataFetcher(List<String> streams,
    +                                                           
SourceFunction.SourceContext<T> sourceContext,
    +                                                           Object 
checkpointLock,
    +                                                           RuntimeContext 
runtimeContext,
    +                                                           Properties 
configProps,
    +                                                           
KinesisDeserializationSchema<T> deserializationSchema,
    +                                                           
AtomicReference<Throwable> error,
    +                                                           
LinkedList<KinesisStreamShardState> subscribedShardsState,
    +                                                           HashMap<String, 
String> subscribedStreamsToLastDiscoveredShardIds,
    +                                                           
KinesisProxyInterface kinesis) {
    +           this.streams = checkNotNull(streams);
                this.configProps = checkNotNull(configProps);
    -           this.assignedShardsWithStartingSequenceNum = new HashMap<>();
    -           for (KinesisStreamShard shard : assignedShards) {
    -                   assignedShardsWithStartingSequenceNum.put(shard, 
SentinelSequenceNumber.SENTINEL_SEQUENCE_NUMBER_NOT_SET.get());
    -           }
    -           this.taskName = taskName;
    -           this.error = new AtomicReference<>();
    +           this.sourceContext = checkNotNull(sourceContext);
    +           this.checkpointLock = checkNotNull(checkpointLock);
    +           this.runtimeContext = checkNotNull(runtimeContext);
    +           this.totalNumberOfConsumerSubtasks = 
runtimeContext.getNumberOfParallelSubtasks();
    +           this.indexOfThisConsumerSubtask = 
runtimeContext.getIndexOfThisSubtask();
    +           this.deserializationSchema = 
checkNotNull(deserializationSchema);
    +           this.kinesis = checkNotNull(kinesis);
    +
    +           this.error = checkNotNull(error);
    +           this.subscribedShardsState = 
checkNotNull(subscribedShardsState);
    +           this.subscribedStreamsToLastDiscoveredShardIds = 
checkNotNull(subscribedStreamsToLastDiscoveredShardIds);
    +
    +           this.shardConsumersExecutor =
    +                   
createShardConsumersThreadPool(runtimeContext.getTaskNameWithSubtasks());
        }
     
        /**
    -    * Advance a shard's starting sequence number to a specified value
    +    * Starts the fetcher. After starting the fetcher, it can only
    +    * be stopped by calling {@link KinesisDataFetcher#shutdownFetcher()}.
         *
    -    * @param streamShard the shard to perform the advance on
    -    * @param sequenceNum the sequence number to advance to
    +    * @throws Exception the first error or exception thrown by the fetcher 
or any of the threads created by the fetcher.
         */
    -   public void advanceSequenceNumberTo(KinesisStreamShard streamShard, 
SequenceNumber sequenceNum) {
    -           if 
(!assignedShardsWithStartingSequenceNum.containsKey(streamShard)) {
    -                   throw new IllegalArgumentException("Can't advance 
sequence number on a shard we are not going to read.");
    -           }
    -           assignedShardsWithStartingSequenceNum.put(streamShard, 
sequenceNum);
    -   }
    -
    -   public <T> void run(SourceFunction.SourceContext<T> sourceContext,
    -                                           KinesisDeserializationSchema<T> 
deserializationSchema,
    -                                           HashMap<KinesisStreamShard, 
SequenceNumber> lastSequenceNums) throws Exception {
    +   public void runFetcher() throws Exception {
     
    -           if (assignedShardsWithStartingSequenceNum == null || 
assignedShardsWithStartingSequenceNum.size() == 0) {
    -                   throw new IllegalArgumentException("No shards set to 
read for this fetcher");
    +           // check that we are running before proceeding
    +           if (!running) {
    +                   return;
                }
     
                this.mainThread = Thread.currentThread();
     
    -           LOG.info("Reading from shards " + 
assignedShardsWithStartingSequenceNum);
    +           // 
------------------------------------------------------------------------
    +           //  Procedures before starting the infinite while loop:
    +           // 
------------------------------------------------------------------------
    +
    +           //  1. query for any new shards that may have been created 
while the Kinesis consumer was not running,
    +           //     and register them to the subscribedShardState list.
    +           List<KinesisStreamShard> newShardsCreatedWhileNotRunning = 
discoverNewShardsToSubscribe();
    +           for (KinesisStreamShard shard : 
newShardsCreatedWhileNotRunning) {
    +                   // the starting state for new shards created while the 
consumer wasn't running depends on whether or not
    +                   // we are starting fresh (not restoring from a 
checkpoint); when we are starting fresh, this simply means
    +                   // all existing shards of streams we are subscribing to 
are new shards; when we are restoring from checkpoint,
    +                   // any new shards due to Kinesis resharding from the 
time of the checkpoint will be considered new shards.
    +                   SentinelSequenceNumber startingStateForNewShard = 
(isRestoredFromFailure)
    +                           ? 
SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM
    +                           : 
KinesisConfigUtil.getInitialPositionAsSentinelSequenceNumber(configProps);
     
    -           // create a thread for each individual shard
    -           ArrayList<ShardConsumerThread<?>> consumerThreads = new 
ArrayList<>(assignedShardsWithStartingSequenceNum.size());
    -           for (Map.Entry<KinesisStreamShard, SequenceNumber> 
assignedShard : assignedShardsWithStartingSequenceNum.entrySet()) {
    -                   ShardConsumerThread<T> thread = new 
ShardConsumerThread<>(this, configProps, assignedShard.getKey(),
    -                           assignedShard.getValue(), sourceContext, 
InstantiationUtil.clone(deserializationSchema), lastSequenceNums);
    -                   thread.setName(String.format("ShardConsumer - %s - 
%s/%s",
    -                           taskName, 
assignedShard.getKey().getStreamName() ,assignedShard.getKey().getShardId()));
    -                   thread.setDaemon(true);
    -                   consumerThreads.add(thread);
    +                   if (LOG.isInfoEnabled()) {
    +                           String logFormat = (isRestoredFromFailure)
    +                                   ? "Subtask {} will be seeded with 
initial shard {}, starting state set as sequence number {}"
    +                                   : "Subtask {} will be seeded with new 
shard {} that was created while the consumer wasn't" +
    +                                           "running due to failure, 
starting state set as sequence number {}";
    +
    +                           LOG.info(logFormat, 
runtimeContext.getIndexOfThisSubtask(), shard.toString(), 
startingStateForNewShard.get());
    +                   }
    +                   registerNewSubscribedShardState(new 
KinesisStreamShardState(shard, startingStateForNewShard.get()));
                }
     
    -           // check that we are viable for running for the last time 
before starting threads
    -           if (!running) {
    -                   return;
    +           //  2. check that there is at least one shard in the subscribed 
streams to consume from (can be done by
    +           //     checking if at least one value in 
subscribedStreamsToLastDiscoveredShardIds is not null)
    +           boolean hasShards = false;
    +           StringBuilder streamsWithNoShardsFound = new StringBuilder();
    +           for (Map.Entry<String, String> streamToLastDiscoveredShardEntry 
: subscribedStreamsToLastDiscoveredShardIds.entrySet()) {
    +                   if (streamToLastDiscoveredShardEntry.getValue() != 
null) {
    +                           hasShards = true;
    +                   } else {
    +                           
streamsWithNoShardsFound.append(streamToLastDiscoveredShardEntry.getKey()).append(",
 ");
    +                   }
                }
     
    -           for (ShardConsumerThread<?> shardConsumer : consumerThreads) {
    -                   LOG.info("Starting thread {}", shardConsumer.getName());
    -                   shardConsumer.start();
    +           if (streamsWithNoShardsFound.length() != 0 && 
LOG.isWarnEnabled()) {
    +                   LOG.warn("Subtask {} has failed to find any shards for 
the following subscribed streams: {}",
    +                           indexOfThisConsumerSubtask, 
streamsWithNoShardsFound.toString());
                }
     
    -           // wait until all consumer threads are done, or until the 
fetcher is aborted, or until
    -           // an error occurred in one of the consumer threads
    -           try {
    -                   boolean consumersStillRunning = true;
    -                   while (running && error.get() == null && 
consumersStillRunning) {
    -                           try {
    -                                   // wait for the consumer threads. if an 
error occurs, we are interrupted
    -                                   for (ShardConsumerThread<?> 
consumerThread : consumerThreads) {
    -                                           consumerThread.join();
    -                                   }
    +           if (!hasShards) {
    +                   throw new RuntimeException("No shards can be found for 
all subscribed streams: " + streams);
    +           }
     
    -                                   // check if there are consumer threads 
still running
    -                                   consumersStillRunning = false;
    -                                   for (ShardConsumerThread<?> 
consumerThread : consumerThreads) {
    -                                           consumersStillRunning = 
consumersStillRunning | consumerThread.isAlive();
    -                                   }
    -                           } catch (InterruptedException e) {
    -                                   // ignore
    -                           }
    +           //  3. start consuming any shard state we already have in the 
subscribedShardState up to this point; the
    +           //     subscribedShardState may already be seeded with values 
due to step 1., or explicitly added by the
    +           //     consumer using a restored state checkpoint
    +           for (int seededStateIndex = 0; seededStateIndex < 
subscribedShardsState.size(); seededStateIndex++) {
    +                   KinesisStreamShardState seededShardState = 
subscribedShardsState.get(seededStateIndex);
    +
    +                   if (LOG.isInfoEnabled()) {
    +                           LOG.info("Subtask {} will start consuming 
seeded shard {} from sequence number {} with ShardConsumer {}" +
    +                                   runtimeContext.getIndexOfThisSubtask(), 
seededShardState.getKinesisStreamShard().toString(),
    +                                   
seededShardState.getLastProcessedSequenceNum(), seededStateIndex);
                        }
     
    -                   // make sure any asynchronous error is noticed
    -                   Throwable error = this.error.get();
    -                   if (error != null) {
    -                           throw new Exception(error.getMessage(), error);
    +                   shardConsumersExecutor.submit(
    +                           new ShardConsumer<>(
    +                                   this,
    +                                   seededStateIndex,
    +                                   
subscribedShardsState.get(seededStateIndex).getKinesisStreamShard(),
    +                                   
subscribedShardsState.get(seededStateIndex).getLastProcessedSequenceNum()));
    +           }
    +
    +           // 
------------------------------------------------------------------------
    +
    +           // finally, start the infinite shard discovery and consumer 
launching loop;
    +           // we will escape from this loop only when shutdownFetcher() or 
stopWithError() is called
    +
    +           final long discoveryIntervalMillis = Long.valueOf(
    +                   configProps.getProperty(
    +                           
KinesisConfigConstants.CONFIG_SHARD_DISCOVERY_INTERVAL_MILLIS,
    +                           
Long.toString(KinesisConfigConstants.DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS)));
    +
    +           while (running) {
    +                   List<KinesisStreamShard> newShardsDueToResharding = 
discoverNewShardsToSubscribe();
    +
    +                   for (KinesisStreamShard shard : 
newShardsDueToResharding) {
    +                           // since there may be delay in discovering a 
new shard, all new shards due to
    +                           // resharding should be read starting from the 
earliest record possible
    +                           KinesisStreamShardState newShardState =
    +                                   new KinesisStreamShardState(shard, 
SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get());
    +                           int newStateIndex = 
registerNewSubscribedShardState(newShardState);
    +
    +                           if (LOG.isInfoEnabled()) {
    +                                   LOG.info("Subtask {} has discovered a 
new shard {} due to resharding, and will start consuming " +
    +                                                   "the shard from 
sequence number {} with ShardConsumer {}",
    +                                           
runtimeContext.getIndexOfThisSubtask(), 
newShardState.getKinesisStreamShard().toString(),
    +                                           
newShardState.getLastProcessedSequenceNum(), newStateIndex);
    +                           }
    +
    +                           shardConsumersExecutor.submit(
    +                                   new ShardConsumer<>(
    +                                           this,
    +                                           newStateIndex,
    +                                           
newShardState.getKinesisStreamShard(),
    +                                           
newShardState.getLastProcessedSequenceNum()));
                        }
    -           } finally {
    -                   for (ShardConsumerThread<?> consumerThread : 
consumerThreads) {
    -                           if (consumerThread.isAlive()) {
    -                                   consumerThread.cancel();
    +
    +                   // we also check if we are running here so that we 
won't start the discovery sleep
    +                   // interval if the running flag was set to false during 
the middle of the while loop
    +                   if (running && discoveryIntervalMillis != 0) {
    +                           try {
    +                                   Thread.sleep(discoveryIntervalMillis);
    +                           } catch (InterruptedException iex) {
    +                                   // the sleep may be interrupted by 
shutdownFetcher()
    --- End diff --
    
    > Flink will also start interrupting the thread if its not reacting.
    I see! Okay, thanks, let's leave it as it is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to