[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15253831#comment-15253831 ]
ASF GitHub Bot commented on FLINK-3229: --------------------------------------- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1911#discussion_r60730773 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java --- @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.internals; + +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.kinesis.model.GetRecordsResult; +import com.amazonaws.services.kinesis.model.Record; +import com.amazonaws.services.kinesis.model.ShardIteratorType; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; +import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicReference; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * A Kinesis Data Fetcher that consumes data from a specific set of Kinesis shards. + * The fetcher spawns a single thread for connection to each shard. + */ +public class KinesisDataFetcher { + + private static final Logger LOG = LoggerFactory.getLogger(KinesisDataFetcher.class); + + /** Config properties for the Flink Kinesis Consumer */ + private final Properties configProps; + + /** The AWS credentials provider as specified in config properties */ + private final AWSCredentialsProvider credentials; + + /** The name of the consumer task that this fetcher was instantiated */ + private final String taskName; + + /** Information of the shards that this fetcher handles, along with the sequence numbers that they should start from */ + private HashMap<KinesisStreamShard, String> assignedShardsWithStartingSequenceNum; + + /** Reference to the thread that executed run() */ + private volatile Thread mainThread; + + /** Reference to the first error thrown by any of the spawned shard connection threads */ + private final AtomicReference<Throwable> error; + + private volatile boolean running = true; + + /** + * Creates a new Kinesis Data Fetcher for the specified set of shards + * + * @param assignedShards the shards that this fetcher will pull data from + * @param configProps the configuration properties of this Flink Kinesis Consumer + * @param taskName the task name of this consumer task + */ + public KinesisDataFetcher(List<KinesisStreamShard> assignedShards, Properties configProps, String taskName) { + this.configProps = checkNotNull(configProps); + this.credentials = AWSUtil.getCredentialsProvider(configProps); + this.assignedShardsWithStartingSequenceNum = new HashMap<>(); + for (KinesisStreamShard shard : assignedShards) { + assignedShardsWithStartingSequenceNum.put(shard, SentinelSequenceNumber.SENTINEL_SEQUENCE_NUMBER_NOT_SET.toString()); + } + this.taskName = taskName; + this.error = new AtomicReference<>(); + } + + /** + * Advance a shard's starting sequence number to a specified value + * + * @param streamShard the shard to perform the advance on + * @param sequenceNum the sequence number to advance to + */ + public void advanceSequenceNumberTo(KinesisStreamShard streamShard, String sequenceNum) { + if (!assignedShardsWithStartingSequenceNum.containsKey(streamShard)) { + throw new IllegalArgumentException("Can't advance sequence number on a shard we are not going to read."); + } + assignedShardsWithStartingSequenceNum.put(streamShard, sequenceNum); + } + + public <T> void run(SourceFunction.SourceContext<T> sourceContext, + KinesisDeserializationSchema<T> deserializationSchema, + HashMap<KinesisStreamShard, String> lastSequenceNums) throws Exception { + + if (assignedShardsWithStartingSequenceNum == null || assignedShardsWithStartingSequenceNum.size() == 0) { + throw new IllegalArgumentException("No shards set to read for this fetcher"); + } + + this.mainThread = Thread.currentThread(); + + LOG.info("Reading from shards " + assignedShardsWithStartingSequenceNum); + + // create a thread for each individual shard + ArrayList<ShardConsumerThread<?>> consumerThreads = new ArrayList<>(assignedShardsWithStartingSequenceNum.size()); + for (Map.Entry<KinesisStreamShard, String> assignedShard : assignedShardsWithStartingSequenceNum.entrySet()) { + ShardConsumerThread<T> thread = new ShardConsumerThread<>(this, configProps, assignedShard.getKey(), + assignedShard.getValue(), sourceContext, deserializationSchema, lastSequenceNums); + thread.setName(String.format("ShardConsumer - %s - %s/%s", + taskName, assignedShard.getKey().getStreamName() ,assignedShard.getKey().getShardId())); + thread.setDaemon(true); + consumerThreads.add(thread); + } + + // check that we are viable for running for the last time before starting threads + if (!running) { + return; + } + + for (ShardConsumerThread<?> shardConsumer : consumerThreads) { + LOG.info("Starting thread {}", shardConsumer.getName()); + shardConsumer.start(); + } + + // wait until all consumer threads are done, or until the fetcher is aborted, or until + // an error occurred in one of the consumer threads + try { + boolean consumersStillRunning = true; + while (running && error.get() == null && consumersStillRunning) { + try { + // wait for the consumer threads. if an error occurs, we are interrupted + for (ShardConsumerThread<?> consumerThread : consumerThreads) { + consumerThread.join(); + } + + // check if there are consumer threads still running + consumersStillRunning = false; + for (ShardConsumerThread<?> consumerThread : consumerThreads) { + consumersStillRunning = consumersStillRunning | consumerThread.isAlive(); + } + } catch (InterruptedException e) { + // ignore + } + } + + // make sure any asynchronous error is noticed + Throwable error = this.error.get(); + if (error != null) { + throw new Exception(error.getMessage(), error); + } + } finally { + for (ShardConsumerThread<?> consumerThread : consumerThreads) { + if (consumerThread.isAlive()) { + consumerThread.cancel(); + } + } + } + } + + public void close() throws IOException { + this.running = false; + } + + public void stopWithError(Throwable throwable) { + if (this.error.compareAndSet(null, throwable)) { + if (mainThread != null) { + mainThread.interrupt(); + } + } + } + + /** + * + * + * @param <T> + */ + private static class ShardConsumerThread<T> extends Thread { + + private final SourceFunction.SourceContext<T> sourceContext; + private final KinesisDeserializationSchema<T> deserializer; + private final HashMap<KinesisStreamShard, String> seqNoState; + + private final KinesisProxy kinesisProxy; + + private final KinesisDataFetcher ownerRef; + + private final KinesisStreamShard assignedShard; + + private String lastSequenceNum; + private String nextShardItr; + + private volatile boolean running = true; + + public ShardConsumerThread(KinesisDataFetcher ownerRef, + Properties props, + KinesisStreamShard assignedShard, + String lastSequenceNum, + SourceFunction.SourceContext<T> sourceContext, + KinesisDeserializationSchema<T> deserializer, + HashMap<KinesisStreamShard, String> seqNumState) { + this.ownerRef = checkNotNull(ownerRef); + this.assignedShard = checkNotNull(assignedShard); + this.lastSequenceNum = checkNotNull(lastSequenceNum); + this.sourceContext = checkNotNull(sourceContext); + this.deserializer = checkNotNull(deserializer); + this.seqNoState = checkNotNull(seqNumState); + this.kinesisProxy = new KinesisProxy(props); + } + + @Override + public void run() { + try { + if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM.toString())) { + // if the shard is already closed, there will be no latest next record to get for this shard + if (assignedShard.isClosed()) { + nextShardItr = null; + } else { + nextShardItr = kinesisProxy.getShardIterator(assignedShard, ShardIteratorType.LATEST.toString(), null); + } + } else if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.toString())) { + nextShardItr = kinesisProxy.getShardIterator(assignedShard, ShardIteratorType.TRIM_HORIZON.toString(), null); + } else if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.toString())) { + nextShardItr = null; + } else { + nextShardItr = kinesisProxy.getShardIterator(assignedShard, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), lastSequenceNum); + } + + long lastNextShardItrUpdateMillis = System.currentTimeMillis(); + boolean noRecordsOnLastFetch = false; + while(running) { + if (nextShardItr == null) { + lastSequenceNum = SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.toString(); + + synchronized (sourceContext.getCheckpointLock()) { + seqNoState.put(assignedShard, lastSequenceNum); + } + + break; + } else { + if (noRecordsOnLastFetch) { + if (System.currentTimeMillis() - lastNextShardItrUpdateMillis >= 290000) { + nextShardItr = kinesisProxy.getShardIterator(assignedShard, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), lastSequenceNum); + lastNextShardItrUpdateMillis = System.currentTimeMillis(); + } + } + + GetRecordsResult getRecordsResult = kinesisProxy.getRecords(nextShardItr, 100); + + final List<Record> fetchedRecords = getRecordsResult.getRecords(); --- End diff -- The records returned here might be aggregated: http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-consumer-deaggregation.html So we need to add code to deaggregate them here. > Kinesis streaming consumer with integration of Flink's checkpointing mechanics > ------------------------------------------------------------------------------ > > Key: FLINK-3229 > URL: https://issues.apache.org/jira/browse/FLINK-3229 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors > Affects Versions: 1.0.0 > Reporter: Tzu-Li (Gordon) Tai > Assignee: Tzu-Li (Gordon) Tai > > Opening a sub-task to implement data source consumer for Kinesis streaming > connector (https://issues.apache.org/jira/browser/FLINK-3211). > An example of the planned user API for Flink Kinesis Consumer: > {code} > Properties kinesisConfig = new Properties(); > config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); > config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, > "BASIC"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, > "aws_access_key_id_here"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, > "aws_secret_key_here"); > config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, > "LATEST"); // or TRIM_HORIZON > DataStream<T> kinesisRecords = env.addSource(new FlinkKinesisConsumer<>( > "kinesis_stream_name", > new SimpleStringSchema(), > kinesisConfig)); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)