[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15253779#comment-15253779 ]
ASF GitHub Bot commented on FLINK-3229: --------------------------------------- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1911#discussion_r60726049 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java --- @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.proxy; + +import com.amazonaws.regions.Region; +import com.amazonaws.regions.Regions; +import com.amazonaws.services.kinesis.AmazonKinesisClient; +import com.amazonaws.services.kinesis.model.DescribeStreamRequest; +import com.amazonaws.services.kinesis.model.DescribeStreamResult; +import com.amazonaws.services.kinesis.model.GetRecordsRequest; +import com.amazonaws.services.kinesis.model.GetRecordsResult; +import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; +import com.amazonaws.services.kinesis.model.LimitExceededException; +import com.amazonaws.services.kinesis.model.ResourceNotFoundException; +import com.amazonaws.services.kinesis.model.StreamStatus; +import com.amazonaws.services.kinesis.model.Shard; +import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * A utility class that is used as a proxy to make calls to AWS Kinesis + * for several functions, such as getting a list of shards and fetching + * a batch of data records starting from a specified record sequence number. + */ +public class KinesisProxy { + + private static final Logger LOG = LoggerFactory.getLogger(KinesisProxy.class); + + /** The actual Kinesis client from the AWS SDK that we will be using to make calls */ + private final AmazonKinesisClient kinesisClient; + + /** The AWS region that this proxy will be making calls to */ + private final String regionId; + + /** Configuration properties of this Flink Kinesis Connector */ + private final Properties configProps; + + /** + * Create a new KinesisProxy based on the supplied configuration properties + * + * @param configProps configuration properties containing AWS credential and AWS region info + */ + public KinesisProxy(Properties configProps) { + this.configProps = checkNotNull(configProps); + + this.regionId = configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION, KinesisConfigConstants.DEFAULT_AWS_REGION); + AmazonKinesisClient client = new AmazonKinesisClient(AWSUtil.getCredentialsProvider(configProps).getCredentials()); + client.setRegion(Region.getRegion(Regions.fromName(this.regionId))); --- End diff -- I found out what I was doing wrong. The code was using the default region ID because I forgot to set it. I'm currently fixing some issues in the consumer and I'll make the region a required argument. > Kinesis streaming consumer with integration of Flink's checkpointing mechanics > ------------------------------------------------------------------------------ > > Key: FLINK-3229 > URL: https://issues.apache.org/jira/browse/FLINK-3229 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors > Affects Versions: 1.0.0 > Reporter: Tzu-Li (Gordon) Tai > Assignee: Tzu-Li (Gordon) Tai > > Opening a sub-task to implement data source consumer for Kinesis streaming > connector (https://issues.apache.org/jira/browser/FLINK-3211). > An example of the planned user API for Flink Kinesis Consumer: > {code} > Properties kinesisConfig = new Properties(); > config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); > config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, > "BASIC"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, > "aws_access_key_id_here"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, > "aws_secret_key_here"); > config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, > "LATEST"); // or TRIM_HORIZON > DataStream<T> kinesisRecords = env.addSource(new FlinkKinesisConsumer<>( > "kinesis_stream_name", > new SimpleStringSchema(), > kinesisConfig)); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)