[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15253836#comment-15253836 ]
ASF GitHub Bot commented on FLINK-3229: --------------------------------------- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1911#discussion_r60731025 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java --- @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.proxy; + +import com.amazonaws.regions.Region; +import com.amazonaws.regions.Regions; +import com.amazonaws.services.kinesis.AmazonKinesisClient; +import com.amazonaws.services.kinesis.model.DescribeStreamRequest; +import com.amazonaws.services.kinesis.model.DescribeStreamResult; +import com.amazonaws.services.kinesis.model.GetRecordsRequest; +import com.amazonaws.services.kinesis.model.GetRecordsResult; +import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; +import com.amazonaws.services.kinesis.model.LimitExceededException; +import com.amazonaws.services.kinesis.model.ResourceNotFoundException; +import com.amazonaws.services.kinesis.model.StreamStatus; +import com.amazonaws.services.kinesis.model.Shard; +import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * A utility class that is used as a proxy to make calls to AWS Kinesis + * for several functions, such as getting a list of shards and fetching + * a batch of data records starting from a specified record sequence number. + */ +public class KinesisProxy { + + private static final Logger LOG = LoggerFactory.getLogger(KinesisProxy.class); + + /** The actual Kinesis client from the AWS SDK that we will be using to make calls */ + private final AmazonKinesisClient kinesisClient; + + /** The AWS region that this proxy will be making calls to */ + private final String regionId; + + /** Configuration properties of this Flink Kinesis Connector */ + private final Properties configProps; + + /** + * Create a new KinesisProxy based on the supplied configuration properties + * + * @param configProps configuration properties containing AWS credential and AWS region info + */ + public KinesisProxy(Properties configProps) { + this.configProps = checkNotNull(configProps); + + this.regionId = configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION, KinesisConfigConstants.DEFAULT_AWS_REGION); + AmazonKinesisClient client = new AmazonKinesisClient(AWSUtil.getCredentialsProvider(configProps).getCredentials()); + client.setRegion(Region.getRegion(Regions.fromName(this.regionId))); + + this.kinesisClient = client; + } + + /** + * Get the next batch of data records using a specific shard iterator + * + * @param shardIterator a shard iterator that encodes info about which shard to read and where to start reading + * @param maxRecordsToGet the maximum amount of records to retrieve for this batch + * @return the batch of retrieved records + */ + public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) { + final GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); + getRecordsRequest.setShardIterator(shardIterator); + getRecordsRequest.setLimit(maxRecordsToGet); + + GetRecordsResult getRecordsResult = null; + + int remainingRetryTimes = Integer.valueOf( + configProps.getProperty(KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_RETRIES, Integer.toString(KinesisConfigConstants.DEFAULT_STREAM_DESCRIBE_RETRY_TIMES))); + long describeStreamBackoffTimeInMillis = Long.valueOf( + configProps.getProperty(KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_BACKOFF, Long.toString(KinesisConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF))); + + int i=0; + while (i <= remainingRetryTimes && getRecordsResult == null) { + try { + getRecordsResult = kinesisClient.getRecords(getRecordsRequest); + } catch (ProvisionedThroughputExceededException ex) { + LOG.warn("Got ProvisionedThroughputExceededException. Backing off for " + + describeStreamBackoffTimeInMillis + " millis."); + try { + Thread.sleep(describeStreamBackoffTimeInMillis); + } catch (InterruptedException interruptEx) { + // + } + } + i++; + } + + if (getRecordsResult == null) { + throw new RuntimeException("Rate Exceeded"); + } + + return getRecordsResult; + } + + /** + * Get the list of shards associated with multiple Kinesis streams + * + * @param streamNames the list of Kinesis streams + * @return a list of {@link KinesisStreamShard}s + */ + public List<KinesisStreamShard> getShardList(List<String> streamNames) { + List<KinesisStreamShard> shardList = new ArrayList<>(); + + for (String stream : streamNames) { + DescribeStreamResult describeStreamResult; + String lastSeenShardId = null; + + do { + describeStreamResult = describeStream(stream, lastSeenShardId); + + List<Shard> shards = describeStreamResult.getStreamDescription().getShards(); + for (Shard shard : shards) { + shardList.add(new KinesisStreamShard( + regionId, + stream, + shard.getShardId(), + shard.getSequenceNumberRange().getStartingSequenceNumber(), + shard.getSequenceNumberRange().getEndingSequenceNumber(), + shard.getParentShardId(), + shard.getAdjacentParentShardId())); + } + lastSeenShardId = shards.get(shards.size() - 1).getShardId(); + } while (describeStreamResult.getStreamDescription().isHasMoreShards()); + } + return shardList; + } + + /** + * Get a shard iterator for a Kinesis shard + * + * @param shard the shard to get the iterator for + * @param shardIteratorType the iterator type to get + * @param startingSeqNum the sequence number that the iterator will start from + * @return the shard iterator + */ + public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, String startingSeqNum) { + return kinesisClient.getShardIterator(shard.getStreamName(), shard.getShardId(), shardIteratorType, startingSeqNum).getShardIterator(); + } + + /** + * Get metainfo for a Kinesis stream, which contains information about which shards this Kinesis stream possess. + * + * @param streamName the stream to describe + * @param startShardId which shard to start with for this describe operation (earlier shard's infos will not appear in result) + * @return the result of the describe stream operation + */ + private DescribeStreamResult describeStream(String streamName, String startShardId) { + final DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); + describeStreamRequest.setStreamName(streamName); + describeStreamRequest.setExclusiveStartShardId(startShardId); + + DescribeStreamResult describeStreamResult = null; + String streamStatus = null; + int remainingRetryTimes = Integer.valueOf( + configProps.getProperty(KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_RETRIES, Integer.toString(KinesisConfigConstants.DEFAULT_STREAM_DESCRIBE_RETRY_TIMES))); + long describeStreamBackoffTimeInMillis = Long.valueOf( + configProps.getProperty(KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_BACKOFF, Long.toString(KinesisConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF))); + + // Call DescribeStream, with backoff and retries (if we get LimitExceededException). + while ((remainingRetryTimes >= 0) && (describeStreamResult == null)) { + try { + describeStreamResult = kinesisClient.describeStream(describeStreamRequest); + streamStatus = describeStreamResult.getStreamDescription().getStreamStatus(); + } catch (LimitExceededException le) { + LOG.warn("Got LimitExceededException when describing stream " + streamName + ". Backing off for " + + describeStreamBackoffTimeInMillis + " millis."); + try { + Thread.sleep(describeStreamBackoffTimeInMillis); + } catch (InterruptedException ie) { + LOG.debug("Stream " + streamName + " : Sleep was interrupted ", ie); + } + } catch (ResourceNotFoundException re) { + throw new RuntimeException("No stream name"); --- End diff -- I would change the message to something more generic + forward the exception, because it contains helpful details: ``` throw new RuntimeException("Error while getting stream details", re); ``` > Kinesis streaming consumer with integration of Flink's checkpointing mechanics > ------------------------------------------------------------------------------ > > Key: FLINK-3229 > URL: https://issues.apache.org/jira/browse/FLINK-3229 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors > Affects Versions: 1.0.0 > Reporter: Tzu-Li (Gordon) Tai > Assignee: Tzu-Li (Gordon) Tai > > Opening a sub-task to implement data source consumer for Kinesis streaming > connector (https://issues.apache.org/jira/browser/FLINK-3211). > An example of the planned user API for Flink Kinesis Consumer: > {code} > Properties kinesisConfig = new Properties(); > config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); > config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, > "BASIC"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, > "aws_access_key_id_here"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, > "aws_secret_key_here"); > config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, > "LATEST"); // or TRIM_HORIZON > DataStream<T> kinesisRecords = env.addSource(new FlinkKinesisConsumer<>( > "kinesis_stream_name", > new SimpleStringSchema(), > kinesisConfig)); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)