[ https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15358818#comment-15358818 ]
ASF GitHub Bot commented on FLINK-3231: --------------------------------------- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2131#discussion_r69285572 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java --- @@ -17,157 +17,553 @@ package org.apache.flink.streaming.connectors.kinesis.internals; +import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; +import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState; import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface; import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; +import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil; import org.apache.flink.util.InstantiationUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.Map; -import java.util.HashMap; + +import java.util.LinkedList; import java.util.List; -import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import static org.apache.flink.util.Preconditions.checkNotNull; /** - * A Kinesis Data Fetcher that consumes data from a specific set of Kinesis shards. - * The fetcher spawns a single thread for connection to each shard. + * A KinesisDataFetcher is responsible for fetching data from multiple Kinesis shards. Each parallel subtask instantiates + * and runs a single fetcher throughout the subtask's lifetime. The fetcher accomplishes the following: + * <ul> + * <li>1. continuously poll Kinesis to discover shards that the subtask should subscribe to. The subscribed subset + * of shards, including future new shards, is non-overlapping across subtasks (no two subtasks will be + * subscribed to the same shard) and determinate across subtask restores (the subtask will always subscribe + * to the same subset of shards even after restoring)</li> + * <li>2. decide where in each discovered shard should the fetcher start subscribing to</li> + * <li>3. subscribe to shards by creating a single thread for each shard</li> + * </ul> + * + * <p>The fetcher manages two states: 1) last seen shard ids of each subscribed stream (used for continuous shard discovery), + * and 2) last processed sequence numbers of each subscribed shard. Since operations on the second state will be performed + * by multiple threads, these operations should only be done using the handler methods provided in this class. */ -public class KinesisDataFetcher { +public class KinesisDataFetcher<T> { private static final Logger LOG = LoggerFactory.getLogger(KinesisDataFetcher.class); - /** Config properties for the Flink Kinesis Consumer */ + // ------------------------------------------------------------------------ + // Consumer-wide settings + // ------------------------------------------------------------------------ + + /** Configuration properties for the Flink Kinesis Consumer */ private final Properties configProps; - /** The name of the consumer task that this fetcher was instantiated */ - private final String taskName; + /** The list of Kinesis streams that the consumer is subscribing to */ + private final List<String> streams; + + /** + * The deserialization schema we will be using to convert Kinesis records to Flink objects. + * Note that since this might not be thread-safe, {@link ShardConsumer}s using this must + * clone a copy using {@link KinesisDataFetcher#getClonedDeserializationSchema()}. + */ + private final KinesisDeserializationSchema<T> deserializationSchema; + + // ------------------------------------------------------------------------ + // Subtask-specific settings + // ------------------------------------------------------------------------ + + /** Runtime context of the subtask that this fetcher was created in */ + private final RuntimeContext runtimeContext; + + private final int totalNumberOfConsumerSubtasks; + + private final int indexOfThisConsumerSubtask; + + /** + * This flag should be set by {@link FlinkKinesisConsumer} using + * {@link KinesisDataFetcher#setIsRestoringFromFailure(boolean)} + */ + private boolean isRestoredFromFailure; + + // ------------------------------------------------------------------------ + // Executor services to run created threads + // ------------------------------------------------------------------------ - /** Information of the shards that this fetcher handles, along with the sequence numbers that they should start from */ - private HashMap<KinesisStreamShard, SequenceNumber> assignedShardsWithStartingSequenceNum; + /** Executor service to run {@link ShardConsumer}s to consume Kinesis shards */ + private final ExecutorService shardConsumersExecutor; - /** Reference to the thread that executed run() */ - private volatile Thread mainThread; + // ------------------------------------------------------------------------ + // Managed state, accessed and updated across multiple threads + // ------------------------------------------------------------------------ - /** Reference to the first error thrown by any of the spawned shard connection threads */ + /** The last discovered shard ids of each subscribed stream, updated as the fetcher discovers new shards in. + * Note: this state will be updated if new shards are found when {@link KinesisDataFetcher#discoverNewShardsToSubscribe()} is called. + */ + private final Map<String, String> subscribedStreamsToLastDiscoveredShardIds; + + /** + * The shards, along with their last processed sequence numbers, that this fetcher is subscribed to. The fetcher + * will add new subscribed shard states to this list as it discovers new shards. {@link ShardConsumer} threads update + * the last processed sequence number of subscribed shards as they fetch and process records. + * + * <p>Note that since multiple {@link ShardConsumer} threads will be performing operations on this list, all operations + * must be wrapped in synchronized blocks on the {@link KinesisDataFetcher#checkpointLock} lock. For this purpose, + * all threads must use the following thread-safe methods this class provides to operate on this list: + * <ul> + * <li>{@link KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)}</li> + * <li>{@link KinesisDataFetcher#updateState(int, SequenceNumber)}</li> + * <li>{@link KinesisDataFetcher#emitRecordAndUpdateState(T, int, SequenceNumber)}</li> + * </ul> + */ + private final List<KinesisStreamShardState> subscribedShardsState; + + private final SourceFunction.SourceContext<T> sourceContext; + + /** Checkpoint lock, also used to synchronize operations on subscribedShardsState */ + private final Object checkpointLock; + + /** Reference to the first error thrown by any of the {@link ShardConsumer} threads */ private final AtomicReference<Throwable> error; + /** The Kinesis proxy that the fetcher will be using to discover new shards */ + private final KinesisProxyInterface kinesis; + + /** Thread that executed runFetcher() */ + private Thread mainThread; + private volatile boolean running = true; /** - * Creates a new Kinesis Data Fetcher for the specified set of shards + * Creates a Kinesis Data Fetcher. * - * @param assignedShards the shards that this fetcher will pull data from - * @param configProps the configuration properties of this Flink Kinesis Consumer - * @param taskName the task name of this consumer task + * @param streams the streams to subscribe to + * @param sourceContext context of the source function + * @param runtimeContext this subtask's runtime context + * @param configProps the consumer configuration properties + * @param deserializationSchema deserialization schema */ - public KinesisDataFetcher(List<KinesisStreamShard> assignedShards, Properties configProps, String taskName) { + public KinesisDataFetcher(List<String> streams, + SourceFunction.SourceContext<T> sourceContext, + RuntimeContext runtimeContext, + Properties configProps, + KinesisDeserializationSchema<T> deserializationSchema) { + this(streams, + sourceContext, + sourceContext.getCheckpointLock(), + runtimeContext, + configProps, + deserializationSchema, + new AtomicReference<Throwable>(), + new LinkedList<KinesisStreamShardState>(), + createInitialSubscribedStreamsToLastDiscoveredShardsState(streams), + KinesisProxy.create(configProps)); + } + + /** This constructor is exposed for testing purposes */ + protected KinesisDataFetcher(List<String> streams, + SourceFunction.SourceContext<T> sourceContext, + Object checkpointLock, + RuntimeContext runtimeContext, + Properties configProps, + KinesisDeserializationSchema<T> deserializationSchema, + AtomicReference<Throwable> error, + LinkedList<KinesisStreamShardState> subscribedShardsState, + HashMap<String, String> subscribedStreamsToLastDiscoveredShardIds, + KinesisProxyInterface kinesis) { + this.streams = checkNotNull(streams); this.configProps = checkNotNull(configProps); - this.assignedShardsWithStartingSequenceNum = new HashMap<>(); - for (KinesisStreamShard shard : assignedShards) { - assignedShardsWithStartingSequenceNum.put(shard, SentinelSequenceNumber.SENTINEL_SEQUENCE_NUMBER_NOT_SET.get()); - } - this.taskName = taskName; - this.error = new AtomicReference<>(); + this.sourceContext = checkNotNull(sourceContext); + this.checkpointLock = checkNotNull(checkpointLock); + this.runtimeContext = checkNotNull(runtimeContext); + this.totalNumberOfConsumerSubtasks = runtimeContext.getNumberOfParallelSubtasks(); + this.indexOfThisConsumerSubtask = runtimeContext.getIndexOfThisSubtask(); + this.deserializationSchema = checkNotNull(deserializationSchema); + this.kinesis = checkNotNull(kinesis); + + this.error = checkNotNull(error); + this.subscribedShardsState = checkNotNull(subscribedShardsState); + this.subscribedStreamsToLastDiscoveredShardIds = checkNotNull(subscribedStreamsToLastDiscoveredShardIds); + + this.shardConsumersExecutor = + createShardConsumersThreadPool(runtimeContext.getTaskNameWithSubtasks()); } /** - * Advance a shard's starting sequence number to a specified value + * Starts the fetcher. After starting the fetcher, it can only + * be stopped by calling {@link KinesisDataFetcher#shutdownFetcher()}. * - * @param streamShard the shard to perform the advance on - * @param sequenceNum the sequence number to advance to + * @throws Exception the first error or exception thrown by the fetcher or any of the threads created by the fetcher. */ - public void advanceSequenceNumberTo(KinesisStreamShard streamShard, SequenceNumber sequenceNum) { - if (!assignedShardsWithStartingSequenceNum.containsKey(streamShard)) { - throw new IllegalArgumentException("Can't advance sequence number on a shard we are not going to read."); - } - assignedShardsWithStartingSequenceNum.put(streamShard, sequenceNum); - } - - public <T> void run(SourceFunction.SourceContext<T> sourceContext, - KinesisDeserializationSchema<T> deserializationSchema, - HashMap<KinesisStreamShard, SequenceNumber> lastSequenceNums) throws Exception { + public void runFetcher() throws Exception { - if (assignedShardsWithStartingSequenceNum == null || assignedShardsWithStartingSequenceNum.size() == 0) { - throw new IllegalArgumentException("No shards set to read for this fetcher"); + // check that we are running before proceeding + if (!running) { + return; } this.mainThread = Thread.currentThread(); - LOG.info("Reading from shards " + assignedShardsWithStartingSequenceNum); + // ------------------------------------------------------------------------ + // Procedures before starting the infinite while loop: + // ------------------------------------------------------------------------ + + // 1. query for any new shards that may have been created while the Kinesis consumer was not running, + // and register them to the subscribedShardState list. + List<KinesisStreamShard> newShardsCreatedWhileNotRunning = discoverNewShardsToSubscribe(); + for (KinesisStreamShard shard : newShardsCreatedWhileNotRunning) { + // the starting state for new shards created while the consumer wasn't running depends on whether or not + // we are starting fresh (not restoring from a checkpoint); when we are starting fresh, this simply means + // all existing shards of streams we are subscribing to are new shards; when we are restoring from checkpoint, + // any new shards due to Kinesis resharding from the time of the checkpoint will be considered new shards. + SentinelSequenceNumber startingStateForNewShard = (isRestoredFromFailure) + ? SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM + : KinesisConfigUtil.getInitialPositionAsSentinelSequenceNumber(configProps); - // create a thread for each individual shard - ArrayList<ShardConsumerThread<?>> consumerThreads = new ArrayList<>(assignedShardsWithStartingSequenceNum.size()); - for (Map.Entry<KinesisStreamShard, SequenceNumber> assignedShard : assignedShardsWithStartingSequenceNum.entrySet()) { - ShardConsumerThread<T> thread = new ShardConsumerThread<>(this, configProps, assignedShard.getKey(), - assignedShard.getValue(), sourceContext, InstantiationUtil.clone(deserializationSchema), lastSequenceNums); - thread.setName(String.format("ShardConsumer - %s - %s/%s", - taskName, assignedShard.getKey().getStreamName() ,assignedShard.getKey().getShardId())); - thread.setDaemon(true); - consumerThreads.add(thread); + if (LOG.isInfoEnabled()) { + String logFormat = (isRestoredFromFailure) + ? "Subtask {} will be seeded with initial shard {}, starting state set as sequence number {}" + : "Subtask {} will be seeded with new shard {} that was created while the consumer wasn't" + + "running due to failure, starting state set as sequence number {}"; + + LOG.info(logFormat, runtimeContext.getIndexOfThisSubtask(), shard.toString(), startingStateForNewShard.get()); + } + registerNewSubscribedShardState(new KinesisStreamShardState(shard, startingStateForNewShard.get())); } - // check that we are viable for running for the last time before starting threads - if (!running) { - return; + // 2. check that there is at least one shard in the subscribed streams to consume from (can be done by + // checking if at least one value in subscribedStreamsToLastDiscoveredShardIds is not null) + boolean hasShards = false; + StringBuilder streamsWithNoShardsFound = new StringBuilder(); + for (Map.Entry<String, String> streamToLastDiscoveredShardEntry : subscribedStreamsToLastDiscoveredShardIds.entrySet()) { + if (streamToLastDiscoveredShardEntry.getValue() != null) { + hasShards = true; + } else { + streamsWithNoShardsFound.append(streamToLastDiscoveredShardEntry.getKey()).append(", "); + } } - for (ShardConsumerThread<?> shardConsumer : consumerThreads) { - LOG.info("Starting thread {}", shardConsumer.getName()); - shardConsumer.start(); + if (streamsWithNoShardsFound.length() != 0 && LOG.isWarnEnabled()) { + LOG.warn("Subtask {} has failed to find any shards for the following subscribed streams: {}", + indexOfThisConsumerSubtask, streamsWithNoShardsFound.toString()); } - // wait until all consumer threads are done, or until the fetcher is aborted, or until - // an error occurred in one of the consumer threads - try { - boolean consumersStillRunning = true; - while (running && error.get() == null && consumersStillRunning) { - try { - // wait for the consumer threads. if an error occurs, we are interrupted - for (ShardConsumerThread<?> consumerThread : consumerThreads) { - consumerThread.join(); - } + if (!hasShards) { + throw new RuntimeException("No shards can be found for all subscribed streams: " + streams); + } - // check if there are consumer threads still running - consumersStillRunning = false; - for (ShardConsumerThread<?> consumerThread : consumerThreads) { - consumersStillRunning = consumersStillRunning | consumerThread.isAlive(); - } - } catch (InterruptedException e) { - // ignore - } + // 3. start consuming any shard state we already have in the subscribedShardState up to this point; the + // subscribedShardState may already be seeded with values due to step 1., or explicitly added by the + // consumer using a restored state checkpoint + for (int seededStateIndex = 0; seededStateIndex < subscribedShardsState.size(); seededStateIndex++) { + KinesisStreamShardState seededShardState = subscribedShardsState.get(seededStateIndex); + + if (LOG.isInfoEnabled()) { + LOG.info("Subtask {} will start consuming seeded shard {} from sequence number {} with ShardConsumer {}" + --- End diff -- I think the + at the end of the string is wrong. > Handle Kinesis-side resharding in Kinesis streaming consumer > ------------------------------------------------------------ > > Key: FLINK-3231 > URL: https://issues.apache.org/jira/browse/FLINK-3231 > Project: Flink > Issue Type: Sub-task > Components: Kinesis Connector, Streaming Connectors > Affects Versions: 1.1.0 > Reporter: Tzu-Li (Gordon) Tai > Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.1.0 > > > A big difference between Kinesis shards and Kafka partitions is that Kinesis > users can choose to "merge" and "split" shards at any time for adjustable > stream throughput capacity. This article explains this quite clearly: > https://brandur.org/kinesis-by-example. > This will break the static shard-to-task mapping implemented in the basic > version of the Kinesis consumer > (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task > mapping is done in a simple round-robin-like distribution which can be > locally determined at each Flink consumer task (Flink Kafka consumer does > this too). > To handle Kinesis resharding, we will need some way to let the Flink consumer > tasks coordinate which shards they are currently handling, and allow the > tasks to ask the coordinator for a shards reassignment when the task finds > out it has found a closed shard at runtime (shards will be closed by Kinesis > when it is merged and split). > We need a centralized coordinator state store which is visible to all Flink > consumer tasks. Tasks can use this state store to locally determine what > shards it can be reassigned. Amazon KCL uses a DynamoDB table for the > coordination, but as described in > https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use > KCL for the implementation of the consumer if we want to leverage Flink's > checkpointing mechanics. For our own implementation, Zookeeper can be used > for this state store, but that means it would require the user to set up ZK > to work. > Since this feature introduces extensive work, it is opened as a separate > sub-task from the basic implementation > https://issues.apache.org/jira/browse/FLINK-3229. -- This message was sent by Atlassian JIRA (v6.3.4#6332)