http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
new file mode 100644
index 0000000..612a4a7
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals;
+
+import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
+import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import com.amazonaws.services.kinesis.model.Record;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Thread that does the actual data pulling from AWS Kinesis shards. Each 
thread is in charge of one Kinesis shard only.
+ */
+public class ShardConsumer<T> implements Runnable {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(ShardConsumer.class);
+
+       private final KinesisDeserializationSchema<T> deserializer;
+
+       private final KinesisProxyInterface kinesis;
+
+       private final int subscribedShardStateIndex;
+
+       private final KinesisDataFetcher<T> fetcherRef;
+
+       private final KinesisStreamShard subscribedShard;
+
+       private final int maxNumberOfRecordsPerFetch;
+       private final long fetchIntervalMillis;
+
+       private SequenceNumber lastSequenceNum;
+
+       /**
+        * Creates a shard consumer.
+        *
+        * @param fetcherRef reference to the owning fetcher
+        * @param subscribedShardStateIndex the state index of the shard this 
consumer is subscribed to
+        * @param subscribedShard the shard this consumer is subscribed to
+        * @param lastSequenceNum the sequence number in the shard to start 
consuming
+        */
+       public ShardConsumer(KinesisDataFetcher<T> fetcherRef,
+                                               Integer 
subscribedShardStateIndex,
+                                               KinesisStreamShard 
subscribedShard,
+                                               SequenceNumber lastSequenceNum) 
{
+               this(fetcherRef,
+                       subscribedShardStateIndex,
+                       subscribedShard,
+                       lastSequenceNum,
+                       
KinesisProxy.create(fetcherRef.getConsumerConfiguration()));
+       }
+
+       /** This constructor is exposed for testing purposes */
+       protected ShardConsumer(KinesisDataFetcher<T> fetcherRef,
+                                                       Integer 
subscribedShardStateIndex,
+                                                       KinesisStreamShard 
subscribedShard,
+                                                       SequenceNumber 
lastSequenceNum,
+                                                       KinesisProxyInterface 
kinesis) {
+               this.fetcherRef = checkNotNull(fetcherRef);
+               this.subscribedShardStateIndex = 
checkNotNull(subscribedShardStateIndex);
+               this.subscribedShard = checkNotNull(subscribedShard);
+               this.lastSequenceNum = checkNotNull(lastSequenceNum);
+               checkArgument(
+                       
!lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get()),
+                       "Should not start a ShardConsumer if the shard has 
already been completely read.");
+
+               this.deserializer = fetcherRef.getClonedDeserializationSchema();
+
+               Properties consumerConfig = 
fetcherRef.getConsumerConfiguration();
+               this.kinesis = kinesis;
+               this.maxNumberOfRecordsPerFetch = 
Integer.valueOf(consumerConfig.getProperty(
+                       ConsumerConfigConstants.SHARD_GETRECORDS_MAX,
+                       
Integer.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX)));
+               this.fetchIntervalMillis = 
Long.valueOf(consumerConfig.getProperty(
+                       
ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
+                       
Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_INTERVAL_MILLIS)));
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       public void run() {
+               String nextShardItr;
+
+               try {
+                       // before infinitely looping, we set the initial 
nextShardItr appropriately
+
+                       if 
(lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM.get()))
 {
+                               // if the shard is already closed, there will 
be no latest next record to get for this shard
+                               if (subscribedShard.isClosed()) {
+                                       nextShardItr = null;
+                               } else {
+                                       nextShardItr = 
kinesis.getShardIterator(subscribedShard, ShardIteratorType.LATEST.toString(), 
null);
+                               }
+                       } else if 
(lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get()))
 {
+                               nextShardItr = 
kinesis.getShardIterator(subscribedShard, 
ShardIteratorType.TRIM_HORIZON.toString(), null);
+                       } else if 
(lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get()))
 {
+                               nextShardItr = null;
+                       } else {
+                               // we will be starting from an actual sequence 
number (due to restore from failure).
+                               // if the last sequence number refers to an 
aggregated record, we need to clean up any dangling sub-records
+                               // from the last aggregated record; otherwise, 
we can simply start iterating from the record right after.
+
+                               if (lastSequenceNum.isAggregated()) {
+                                       String itrForLastAggregatedRecord =
+                                               
kinesis.getShardIterator(subscribedShard, 
ShardIteratorType.AT_SEQUENCE_NUMBER.toString(), 
lastSequenceNum.getSequenceNumber());
+
+                                       // get only the last aggregated record
+                                       GetRecordsResult getRecordsResult = 
getRecords(itrForLastAggregatedRecord, 1);
+
+                                       List<UserRecord> fetchedRecords = 
deaggregateRecords(
+                                               getRecordsResult.getRecords(),
+                                               
subscribedShard.getShard().getHashKeyRange().getStartingHashKey(),
+                                               
subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
+
+                                       long lastSubSequenceNum = 
lastSequenceNum.getSubSequenceNumber();
+                                       for (UserRecord record : 
fetchedRecords) {
+                                               // we have found a dangling 
sub-record if it has a larger subsequence number
+                                               // than our last sequence 
number; if so, collect the record and update state
+                                               if 
(record.getSubSequenceNumber() > lastSubSequenceNum) {
+                                                       
deserializeRecordForCollectionAndUpdateState(record);
+                                               }
+                                       }
+
+                                       // set the nextShardItr so we can 
continue iterating in the next while loop
+                                       nextShardItr = 
getRecordsResult.getNextShardIterator();
+                               } else {
+                                       // the last record was non-aggregated, 
so we can simply start from the next record
+                                       nextShardItr = 
kinesis.getShardIterator(subscribedShard, 
ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), 
lastSequenceNum.getSequenceNumber());
+                               }
+                       }
+
+                       while(isRunning()) {
+                               if (nextShardItr == null) {
+                                       
fetcherRef.updateState(subscribedShardStateIndex, 
SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get());
+
+                                       // we can close this consumer thread 
once we've reached the end of the subscribed shard
+                                       break;
+                               } else {
+                                       if (fetchIntervalMillis != 0) {
+                                               
Thread.sleep(fetchIntervalMillis);
+                                       }
+
+                                       GetRecordsResult getRecordsResult = 
getRecords(nextShardItr, maxNumberOfRecordsPerFetch);
+
+                                       // each of the Kinesis records may be 
aggregated, so we must deaggregate them before proceeding
+                                       List<UserRecord> fetchedRecords = 
deaggregateRecords(
+                                               getRecordsResult.getRecords(),
+                                               
subscribedShard.getShard().getHashKeyRange().getStartingHashKey(),
+                                               
subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
+
+                                       for (UserRecord record : 
fetchedRecords) {
+                                               
deserializeRecordForCollectionAndUpdateState(record);
+                                       }
+
+                                       nextShardItr = 
getRecordsResult.getNextShardIterator();
+                               }
+                       }
+               } catch (Throwable t) {
+                       fetcherRef.stopWithError(t);
+               }
+       }
+
+       /**
+        * The loop in run() checks this before fetching next batch of records. 
Since this runnable will be executed
+        * by the ExecutorService {@link 
KinesisDataFetcher#shardConsumersExecutor}, the only way to close down this 
thread
+        * would be by calling shutdownNow() on {@link 
KinesisDataFetcher#shardConsumersExecutor} and let the executor service
+        * interrupt all currently running {@link ShardConsumer}s.
+        */
+       private boolean isRunning() {
+               return !Thread.interrupted();
+       }
+
+       /**
+        * Deserializes a record for collection, and accordingly updates the 
shard state in the fetcher. The last
+        * successfully collected sequence number in this shard consumer is 
also updated so that
+        * {@link ShardConsumer#getRecords(String, int)} may be able to use the 
correct sequence number to refresh shard
+        * iterators if necessary.
+        *
+        * Note that the server-side Kinesis timestamp is attached to the 
record when collected. When the
+        * user programs uses {@link TimeCharacteristic#EventTime}, this 
timestamp will be used by default.
+        *
+        * @param record record to deserialize and collect
+        * @throws IOException
+        */
+       private void deserializeRecordForCollectionAndUpdateState(UserRecord 
record)
+               throws IOException {
+               ByteBuffer recordData = record.getData();
+
+               byte[] dataBytes = new byte[recordData.remaining()];
+               recordData.get(dataBytes);
+
+               final long approxArrivalTimestamp = 
record.getApproximateArrivalTimestamp().getTime();
+
+               final T value = deserializer.deserialize(
+                       dataBytes,
+                       record.getPartitionKey(),
+                       record.getSequenceNumber(),
+                       approxArrivalTimestamp,
+                       subscribedShard.getStreamName(),
+                       subscribedShard.getShard().getShardId());
+
+               SequenceNumber collectedSequenceNumber = (record.isAggregated())
+                       ? new SequenceNumber(record.getSequenceNumber(), 
record.getSubSequenceNumber())
+                       : new SequenceNumber(record.getSequenceNumber());
+
+               fetcherRef.emitRecordAndUpdateState(
+                       value,
+                       approxArrivalTimestamp,
+                       subscribedShardStateIndex,
+                       collectedSequenceNumber);
+
+               lastSequenceNum = collectedSequenceNumber;
+       }
+
+       /**
+        * Calls {@link KinesisProxyInterface#getRecords(String, int)}, while 
also handling unexpected
+        * AWS {@link ExpiredIteratorException}s to assure that we get results 
and don't just fail on
+        * such occasions. The returned shard iterator within the successful 
{@link GetRecordsResult} should
+        * be used for the next call to this method.
+        *
+        * Note: it is important that this method is not called again before 
all the records from the last result have been
+        * fully collected with {@link 
ShardConsumer#deserializeRecordForCollectionAndUpdateState(UserRecord)}, 
otherwise
+        * {@link ShardConsumer#lastSequenceNum} may refer to a sub-record in 
the middle of an aggregated record, leading to
+        * incorrect shard iteration if the iterator had to be refreshed.
+        *
+        * @param shardItr shard iterator to use
+        * @param maxNumberOfRecords the maximum number of records to fetch for 
this getRecords attempt
+        * @return get records result
+        * @throws InterruptedException
+        */
+       private GetRecordsResult getRecords(String shardItr, int 
maxNumberOfRecords) throws InterruptedException {
+               GetRecordsResult getRecordsResult = null;
+               while (getRecordsResult == null) {
+                       try {
+                               getRecordsResult = kinesis.getRecords(shardItr, 
maxNumberOfRecords);
+                       } catch (ExpiredIteratorException eiEx) {
+                               LOG.warn("Encountered an unexpected expired 
iterator {} for shard {};" +
+                                       " refreshing the iterator ...", 
shardItr, subscribedShard);
+                               shardItr = 
kinesis.getShardIterator(subscribedShard, 
ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), 
lastSequenceNum.getSequenceNumber());
+
+                               // sleep for the fetch interval before the next 
getRecords attempt with the refreshed iterator
+                               if (fetchIntervalMillis != 0) {
+                                       Thread.sleep(fetchIntervalMillis);
+                               }
+                       }
+               }
+               return getRecordsResult;
+       }
+
+       @SuppressWarnings("unchecked")
+       protected static List<UserRecord> deaggregateRecords(List<Record> 
records, String startingHashKey, String endingHashKey) {
+               return UserRecord.deaggregate(records, new 
BigInteger(startingHashKey), new BigInteger(endingHashKey));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java
new file mode 100644
index 0000000..53ed11b
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.model;
+
+import com.amazonaws.services.kinesis.model.Shard;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A serializable representation of a AWS Kinesis Stream shard. It is 
basically a wrapper class around the information
+ * provided along with {@link com.amazonaws.services.kinesis.model.Shard}, 
with some extra utility methods to
+ * determine whether or not a shard is closed and whether or not the shard is 
a result of parent shard splits or merges.
+ */
+public class KinesisStreamShard implements Serializable {
+
+       private static final long serialVersionUID = -6004217801761077536L;
+
+       private final String streamName;
+       private final Shard shard;
+
+       private final int cachedHash;
+
+       /**
+        * Create a new KinesisStreamShard
+        *
+        * @param streamName
+        *           the name of the Kinesis stream that this shard belongs to
+        * @param shard
+        *           the actual AWS Shard instance that will be wrapped within 
this KinesisStreamShard
+        */
+       public KinesisStreamShard(String streamName, Shard shard) {
+               this.streamName = checkNotNull(streamName);
+               this.shard = checkNotNull(shard);
+
+               // since our description of Kinesis Streams shards can be fully 
defined with the stream name and shard id,
+               // our hash doesn't need to use hash code of Amazon's 
description of Shards, which uses other info for calculation
+               int hash = 17;
+               hash = 37 * hash + streamName.hashCode();
+               hash = 37 * hash + shard.getShardId().hashCode();
+               this.cachedHash = hash;
+       }
+
+       public String getStreamName() {
+               return streamName;
+       }
+
+       public boolean isClosed() {
+               return 
(shard.getSequenceNumberRange().getEndingSequenceNumber() != null);
+       }
+
+       public Shard getShard() {
+               return shard;
+       }
+
+       @Override
+       public String toString() {
+               return "KinesisStreamShard{" +
+                       "streamName='" + streamName + "'" +
+                       ", shard='" + shard.toString() + "'}";
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (!(obj instanceof KinesisStreamShard)) {
+                       return false;
+               }
+
+               if (obj == this) {
+                       return true;
+               }
+
+               KinesisStreamShard other = (KinesisStreamShard) obj;
+
+               return streamName.equals(other.getStreamName()) && 
shard.equals(other.getShard());
+       }
+
+       @Override
+       public int hashCode() {
+               return cachedHash;
+       }
+
+       /**
+        * Utility function to compare two shard ids
+        *
+        * @param firstShardId first shard id to compare
+        * @param secondShardId second shard id to compare
+        * @return a value less than 0 if the first shard id is smaller than 
the second shard id,
+        *         or a value larger than 0 the first shard is larger then the 
second shard id,
+        *         or 0 if they are equal
+        */
+       public static int compareShardIds(String firstShardId, String 
secondShardId) {
+               if (!isValidShardId(firstShardId)) {
+                       throw new IllegalArgumentException("The first shard id 
has invalid format.");
+               }
+
+               if (!isValidShardId(secondShardId)) {
+                       throw new IllegalArgumentException("The second shard id 
has invalid format.");
+               }
+
+               // digit segment of the shard id starts at index 8
+               return Long.compare(Long.parseLong(firstShardId.substring(8)), 
Long.parseLong(secondShardId.substring(8)));
+       }
+
+       /**
+        * Checks if a shard id has valid format.
+        * Kinesis stream shard ids have 12-digit numbers left-padded with 0's,
+        * prefixed with "shardId-", ex. "shardId-000000000015".
+        *
+        * @param shardId the shard id to check
+        * @return whether the shard id is valid
+        */
+       public static boolean isValidShardId(String shardId) {
+               if (shardId == null) { return false; }
+               return shardId.matches("^shardId-\\d{12}");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java
new file mode 100644
index 0000000..00181da
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.model;
+
+/**
+ * A wrapper class that bundles a {@link KinesisStreamShard} with its last 
processed sequence number.
+ */
+public class KinesisStreamShardState {
+
+       private KinesisStreamShard kinesisStreamShard;
+       private SequenceNumber lastProcessedSequenceNum;
+
+       public KinesisStreamShardState(KinesisStreamShard kinesisStreamShard, 
SequenceNumber lastProcessedSequenceNum) {
+               this.kinesisStreamShard = kinesisStreamShard;
+               this.lastProcessedSequenceNum = lastProcessedSequenceNum;
+       }
+
+       public KinesisStreamShard getKinesisStreamShard() {
+               return this.kinesisStreamShard;
+       }
+
+       public SequenceNumber getLastProcessedSequenceNum() {
+               return this.lastProcessedSequenceNum;
+       }
+
+       public void setLastProcessedSequenceNum(SequenceNumber update) {
+               this.lastProcessedSequenceNum = update;
+       }
+
+       @Override
+       public String toString() {
+               return "KinesisStreamShardState{" +
+                       "kinesisStreamShard='" + kinesisStreamShard.toString() 
+ "'" +
+                       ", lastProcessedSequenceNumber='" + 
lastProcessedSequenceNum.toString() + "'}";
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (!(obj instanceof KinesisStreamShardState)) {
+                       return false;
+               }
+
+               if (obj == this) {
+                       return true;
+               }
+
+               KinesisStreamShardState other = (KinesisStreamShardState) obj;
+
+               return kinesisStreamShard.equals(other.getKinesisStreamShard()) 
&& lastProcessedSequenceNum.equals(other.getLastProcessedSequenceNum());
+       }
+
+       @Override
+       public int hashCode() {
+               return 37 * (kinesisStreamShard.hashCode() + 
lastProcessedSequenceNum.hashCode());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java
new file mode 100644
index 0000000..8182201
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.model;
+
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
+import 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
+
+/**
+ * Special flag values for sequence numbers in shards to indicate special 
positions.
+ * The value is initially set by {@link FlinkKinesisConsumer} when {@link 
KinesisDataFetcher}s are created.
+ * The KinesisDataFetchers will use this value to determine how to retrieve 
the starting shard iterator from AWS Kinesis.
+ */
+public enum SentinelSequenceNumber {
+
+       /** Flag value for shard's sequence numbers to indicate that the
+        * shard should start to be read from the latest incoming records */
+       SENTINEL_LATEST_SEQUENCE_NUM( new SequenceNumber("LATEST_SEQUENCE_NUM") 
),
+
+       /** Flag value for shard's sequence numbers to indicate that the shard 
should
+        * start to be read from the earliest records that haven't expired yet 
*/
+       SENTINEL_EARLIEST_SEQUENCE_NUM( new 
SequenceNumber("EARLIEST_SEQUENCE_NUM") ),
+
+       /** Flag value to indicate that we have already read the last record of 
this shard
+        * (Note: Kinesis shards that have been closed due to a split or merge 
will have an ending data record) */
+       SENTINEL_SHARD_ENDING_SEQUENCE_NUM( new 
SequenceNumber("SHARD_ENDING_SEQUENCE_NUM") );
+
+       private SequenceNumber sentinel;
+
+       SentinelSequenceNumber(SequenceNumber sentinel) {
+               this.sentinel = sentinel;
+       }
+
+       public SequenceNumber get() {
+               return sentinel;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SequenceNumber.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SequenceNumber.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SequenceNumber.java
new file mode 100644
index 0000000..021f53f
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SequenceNumber.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.model;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A serializable representation of a Kinesis record's sequence number. It has 
two fields: the main sequence number,
+ * and also a subsequence number. If this {@link SequenceNumber} is referring 
to an aggregated Kinesis record, the
+ * subsequence number will be a non-negative value representing the order of 
the sub-record within the aggregation.
+ */
+public class SequenceNumber implements Serializable {
+
+       private static final long serialVersionUID = 876972197938972667L;
+
+       private static final String DELIMITER = "-";
+
+       private final String sequenceNumber;
+       private final long subSequenceNumber;
+
+       private final int cachedHash;
+
+       /**
+        * Create a new instance for a non-aggregated Kinesis record without a 
subsequence number.
+        * @param sequenceNumber the sequence number
+        */
+       public SequenceNumber(String sequenceNumber) {
+               this(sequenceNumber, -1);
+       }
+
+       /**
+        * Create a new instance, with the specified sequence number and 
subsequence number.
+        * To represent the sequence number for a non-aggregated Kinesis 
record, the subsequence number should be -1.
+        * Otherwise, give a non-negative sequence number to represent an 
aggregated Kinesis record.
+        *
+        * @param sequenceNumber the sequence number
+        * @param subSequenceNumber the subsequence number (-1 to represent 
non-aggregated Kinesis records)
+        */
+       public SequenceNumber(String sequenceNumber, long subSequenceNumber) {
+               this.sequenceNumber = checkNotNull(sequenceNumber);
+               this.subSequenceNumber = subSequenceNumber;
+
+               this.cachedHash = 37 * (sequenceNumber.hashCode() + 
Long.valueOf(subSequenceNumber).hashCode());
+       }
+
+       public boolean isAggregated() {
+               return subSequenceNumber >= 0;
+       }
+
+       public String getSequenceNumber() {
+               return sequenceNumber;
+       }
+
+       public long getSubSequenceNumber() {
+               return subSequenceNumber;
+       }
+
+       @Override
+       public String toString() {
+               if (isAggregated()) {
+                       return sequenceNumber + DELIMITER + subSequenceNumber;
+               } else {
+                       return sequenceNumber;
+               }
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (!(obj instanceof SequenceNumber)) {
+                       return false;
+               }
+
+               if (obj == this) {
+                       return true;
+               }
+
+               SequenceNumber other = (SequenceNumber) obj;
+
+               return sequenceNumber.equals(other.getSequenceNumber())
+                       && (subSequenceNumber == other.getSubSequenceNumber());
+       }
+
+       @Override
+       public int hashCode() {
+               return cachedHash;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/GetShardListResult.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/GetShardListResult.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/GetShardListResult.java
new file mode 100644
index 0000000..04b1654
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/GetShardListResult.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.proxy;
+
+import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Basic model class to bundle the shards retrieved from Kinesis on a {@link 
KinesisProxyInterface#getShardList(Map)} call.
+ */
+public class GetShardListResult {
+
+       private final Map<String, LinkedList<KinesisStreamShard>> 
streamsToRetrievedShardList = new HashMap<>();
+
+       public void addRetrievedShardToStream(String stream, KinesisStreamShard 
retrievedShard) {
+               if (!streamsToRetrievedShardList.containsKey(stream)) {
+                       streamsToRetrievedShardList.put(stream, new 
LinkedList<KinesisStreamShard>());
+               }
+               streamsToRetrievedShardList.get(stream).add(retrievedShard);
+       }
+
+       public void addRetrievedShardsToStream(String stream, 
List<KinesisStreamShard> retrievedShards) {
+               if (retrievedShards.size() != 0) {
+                       if (!streamsToRetrievedShardList.containsKey(stream)) {
+                               streamsToRetrievedShardList.put(stream, new 
LinkedList<KinesisStreamShard>());
+                       }
+                       
streamsToRetrievedShardList.get(stream).addAll(retrievedShards);
+               }
+       }
+
+       public List<KinesisStreamShard> getRetrievedShardListOfStream(String 
stream) {
+               if (!streamsToRetrievedShardList.containsKey(stream)) {
+                       return null;
+               } else {
+                       return streamsToRetrievedShardList.get(stream);
+               }
+       }
+
+       public KinesisStreamShard getLastSeenShardOfStream(String stream) {
+               if (!streamsToRetrievedShardList.containsKey(stream)) {
+                       return null;
+               } else {
+                       return 
streamsToRetrievedShardList.get(stream).getLast();
+               }
+       }
+
+       public boolean hasRetrievedShards() {
+               return !streamsToRetrievedShardList.isEmpty();
+       }
+
+       public Set<String> getStreamsWithRetrievedShards() {
+               return streamsToRetrievedShardList.keySet();
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
new file mode 100644
index 0000000..9ffc8e6
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.proxy;
+
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
+import com.amazonaws.services.kinesis.model.DescribeStreamResult;
+import com.amazonaws.services.kinesis.model.GetRecordsRequest;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
+import 
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
+import com.amazonaws.services.kinesis.model.LimitExceededException;
+import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
+import com.amazonaws.services.kinesis.model.StreamStatus;
+import com.amazonaws.services.kinesis.model.Shard;
+import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Map;
+import java.util.Random;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Kinesis proxy implementation - a utility class that is used as a proxy to 
make
+ * calls to AWS Kinesis for several functions, such as getting a list of 
shards and
+ * fetching a batch of data records starting from a specified record sequence 
number.
+ *
+ * NOTE:
+ * In the AWS KCL library, there is a similar implementation - {@link 
com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy}.
+ * This implementation differs mainly in that we can make operations to 
arbitrary Kinesis streams, which is a needed
+ * functionality for the Flink Kinesis Connecter since the consumer may 
simultaneously read from multiple Kinesis streams.
+ */
+public class KinesisProxy implements KinesisProxyInterface {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(KinesisProxy.class);
+
+       /** The actual Kinesis client from the AWS SDK that we will be using to 
make calls */
+       private final AmazonKinesisClient kinesisClient;
+
+       /** Random seed used to calculate backoff jitter for Kinesis operations 
*/
+       private final static Random seed = new Random();
+
+       // 
------------------------------------------------------------------------
+       //  describeStream() related performance settings
+       // 
------------------------------------------------------------------------
+
+       /** Base backoff millis for the describe stream operation */
+       private final long describeStreamBaseBackoffMillis;
+
+       /** Maximum backoff millis for the describe stream operation */
+       private final long describeStreamMaxBackoffMillis;
+
+       /** Exponential backoff power constant for the describe stream 
operation */
+       private final double describeStreamExpConstant;
+
+       // 
------------------------------------------------------------------------
+       //  getRecords() related performance settings
+       // 
------------------------------------------------------------------------
+
+       /** Base backoff millis for the get records operation */
+       private final long getRecordsBaseBackoffMillis;
+
+       /** Maximum backoff millis for the get records operation */
+       private final long getRecordsMaxBackoffMillis;
+
+       /** Exponential backoff power constant for the get records operation */
+       private final double getRecordsExpConstant;
+
+       /** Maximum attempts for the get records operation */
+       private final int getRecordsMaxAttempts;
+
+       // 
------------------------------------------------------------------------
+       //  getShardIterator() related performance settings
+       // 
------------------------------------------------------------------------
+
+       /** Base backoff millis for the get shard iterator operation */
+       private final long getShardIteratorBaseBackoffMillis;
+
+       /** Maximum backoff millis for the get shard iterator operation */
+       private final long getShardIteratorMaxBackoffMillis;
+
+       /** Exponential backoff power constant for the get shard iterator 
operation */
+       private final double getShardIteratorExpConstant;
+
+       /** Maximum attempts for the get shard iterator operation */
+       private final int getShardIteratorMaxAttempts;
+
+       /**
+        * Create a new KinesisProxy based on the supplied configuration 
properties
+        *
+        * @param configProps configuration properties containing AWS 
credential and AWS region info
+        */
+       private KinesisProxy(Properties configProps) {
+               checkNotNull(configProps);
+
+               this.kinesisClient = AWSUtil.createKinesisClient(configProps);
+
+               this.describeStreamBaseBackoffMillis = Long.valueOf(
+                       configProps.getProperty(
+                               
ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE,
+                               
Long.toString(ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_BASE)));
+               this.describeStreamMaxBackoffMillis = Long.valueOf(
+                       configProps.getProperty(
+                               
ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_MAX,
+                               
Long.toString(ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_MAX)));
+               this.describeStreamExpConstant = Double.valueOf(
+                       configProps.getProperty(
+                               
ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT,
+                               
Double.toString(ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT)));
+
+               this.getRecordsBaseBackoffMillis = Long.valueOf(
+                       configProps.getProperty(
+                               
ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_BASE,
+                               
Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_BACKOFF_BASE)));
+               this.getRecordsMaxBackoffMillis = Long.valueOf(
+                       configProps.getProperty(
+                               
ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_MAX,
+                               
Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_BACKOFF_MAX)));
+               this.getRecordsExpConstant = Double.valueOf(
+                       configProps.getProperty(
+                               
ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT,
+                               
Double.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT)));
+               this.getRecordsMaxAttempts = Integer.valueOf(
+                       configProps.getProperty(
+                               
ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES,
+                               
Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_RETRIES)));
+
+               this.getShardIteratorBaseBackoffMillis = Long.valueOf(
+                       configProps.getProperty(
+                               
ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_BASE,
+                               
Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_BACKOFF_BASE)));
+               this.getShardIteratorMaxBackoffMillis = Long.valueOf(
+                       configProps.getProperty(
+                               
ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_MAX,
+                               
Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_BACKOFF_MAX)));
+               this.getShardIteratorExpConstant = Double.valueOf(
+                       configProps.getProperty(
+                               
ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT,
+                               
Double.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT)));
+               this.getShardIteratorMaxAttempts = Integer.valueOf(
+                       configProps.getProperty(
+                               
ConsumerConfigConstants.SHARD_GETITERATOR_RETRIES,
+                               
Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_RETRIES)));
+
+       }
+
+       /**
+        * Creates a Kinesis proxy.
+        *
+        * @param configProps configuration properties
+        * @return the created kinesis proxy
+        */
+       public static KinesisProxyInterface create(Properties configProps) {
+               return new KinesisProxy(configProps);
+       }
+
+       /**
+        * {@inheritDoc}
+        */
+       @Override
+       public GetRecordsResult getRecords(String shardIterator, int 
maxRecordsToGet) throws InterruptedException {
+               final GetRecordsRequest getRecordsRequest = new 
GetRecordsRequest();
+               getRecordsRequest.setShardIterator(shardIterator);
+               getRecordsRequest.setLimit(maxRecordsToGet);
+
+               GetRecordsResult getRecordsResult = null;
+
+               int attempt = 0;
+               while (attempt <= getRecordsMaxAttempts && getRecordsResult == 
null) {
+                       try {
+                               getRecordsResult = 
kinesisClient.getRecords(getRecordsRequest);
+                       } catch (ProvisionedThroughputExceededException ex) {
+                               long backoffMillis = fullJitterBackoff(
+                                       getRecordsBaseBackoffMillis, 
getRecordsMaxBackoffMillis, getRecordsExpConstant, attempt++);
+                               LOG.warn("Got 
ProvisionedThroughputExceededException. Backing off for "
+                                       + backoffMillis + " millis.");
+                               Thread.sleep(backoffMillis);
+                       }
+               }
+
+               if (getRecordsResult == null) {
+                       throw new RuntimeException("Rate Exceeded for 
getRecords operation - all " + getRecordsMaxAttempts +
+                               " retry attempts returned 
ProvisionedThroughputExceededException.");
+               }
+
+               return getRecordsResult;
+       }
+
+       /**
+        * {@inheritDoc}
+        */
+       @Override
+       public GetShardListResult getShardList(Map<String, String> 
streamNamesWithLastSeenShardIds) throws InterruptedException {
+               GetShardListResult result = new GetShardListResult();
+
+               for (Map.Entry<String,String> streamNameWithLastSeenShardId : 
streamNamesWithLastSeenShardIds.entrySet()) {
+                       String stream = streamNameWithLastSeenShardId.getKey();
+                       String lastSeenShardId = 
streamNameWithLastSeenShardId.getValue();
+                       result.addRetrievedShardsToStream(stream, 
getShardsOfStream(stream, lastSeenShardId));
+               }
+               return result;
+       }
+
+       /**
+        * {@inheritDoc}
+        */
+       @Override
+       public String getShardIterator(KinesisStreamShard shard, String 
shardIteratorType, @Nullable String startingSeqNum) throws InterruptedException 
{
+               GetShardIteratorResult getShardIteratorResult = null;
+
+               int attempt = 0;
+               while (attempt <= getShardIteratorMaxAttempts && 
getShardIteratorResult == null) {
+                       try {
+                               getShardIteratorResult =
+                                       
kinesisClient.getShardIterator(shard.getStreamName(), 
shard.getShard().getShardId(), shardIteratorType, startingSeqNum);
+                       } catch (ProvisionedThroughputExceededException ex) {
+                               long backoffMillis = fullJitterBackoff(
+                                       getShardIteratorBaseBackoffMillis, 
getShardIteratorMaxBackoffMillis, getShardIteratorExpConstant, attempt++);
+                               LOG.warn("Got 
ProvisionedThroughputExceededException. Backing off for "
+                                       + backoffMillis + " millis.");
+                               Thread.sleep(backoffMillis);
+                       }
+               }
+
+               if (getShardIteratorResult == null) {
+                       throw new RuntimeException("Rate Exceeded for 
getShardIterator operation - all " + getShardIteratorMaxAttempts +
+                               " retry attempts returned 
ProvisionedThroughputExceededException.");
+               }
+               return getShardIteratorResult.getShardIterator();
+       }
+
+       private List<KinesisStreamShard> getShardsOfStream(String streamName, 
@Nullable String lastSeenShardId) throws InterruptedException {
+               List<KinesisStreamShard> shardsOfStream = new ArrayList<>();
+
+               DescribeStreamResult describeStreamResult;
+               do {
+                       describeStreamResult = describeStream(streamName, 
lastSeenShardId);
+
+                       List<Shard> shards = 
describeStreamResult.getStreamDescription().getShards();
+                       for (Shard shard : shards) {
+                               shardsOfStream.add(new 
KinesisStreamShard(streamName, shard));
+                       }
+
+                       if (shards.size() != 0) {
+                               lastSeenShardId = shards.get(shards.size() - 
1).getShardId();
+                       }
+               } while 
(describeStreamResult.getStreamDescription().isHasMoreShards());
+
+               return shardsOfStream;
+       }
+
+       /**
+        * Get metainfo for a Kinesis stream, which contains information about 
which shards this Kinesis stream possess.
+        *
+        * This method is using a "full jitter" approach described in AWS's 
article,
+        * <a 
href="https://www.awsarchitectureblog.com/2015/03/backoff.html";>"Exponential 
Backoff and Jitter"</a>.
+        * This is necessary because concurrent calls will be made by all 
parallel subtask's fetcher. This
+        * jitter backoff approach will help distribute calls across the 
fetchers over time.
+        *
+        * @param streamName the stream to describe
+        * @param startShardId which shard to start with for this describe 
operation (earlier shard's infos will not appear in result)
+        * @return the result of the describe stream operation
+        */
+       private DescribeStreamResult describeStream(String streamName, 
@Nullable String startShardId) throws InterruptedException {
+               final DescribeStreamRequest describeStreamRequest = new 
DescribeStreamRequest();
+               describeStreamRequest.setStreamName(streamName);
+               describeStreamRequest.setExclusiveStartShardId(startShardId);
+
+               DescribeStreamResult describeStreamResult = null;
+
+               // Call DescribeStream, with full-jitter backoff (if we get 
LimitExceededException).
+               int attemptCount = 0;
+               while (describeStreamResult == null) { // retry until we get a 
result
+                       try {
+                               describeStreamResult = 
kinesisClient.describeStream(describeStreamRequest);
+                       } catch (LimitExceededException le) {
+                               long backoffMillis = fullJitterBackoff(
+                                       describeStreamBaseBackoffMillis, 
describeStreamMaxBackoffMillis, describeStreamExpConstant, attemptCount++);
+                               LOG.warn("Got LimitExceededException when 
describing stream " + streamName + ". Backing off for "
+                                       + backoffMillis + " millis.");
+                               Thread.sleep(backoffMillis);
+                       } catch (ResourceNotFoundException re) {
+                               throw new RuntimeException("Error while getting 
stream details", re);
+                       }
+               }
+
+               String streamStatus = 
describeStreamResult.getStreamDescription().getStreamStatus();
+               if (!(streamStatus.equals(StreamStatus.ACTIVE.toString()) || 
streamStatus.equals(StreamStatus.UPDATING.toString()))) {
+                       if (LOG.isWarnEnabled()) {
+                               LOG.warn("The status of stream " + streamName + 
" is " + streamStatus + "; result of the current " +
+                                       "describeStream operation will not 
contain any shard information.");
+                       }
+               }
+
+               // Kinesalite (mock implementation of Kinesis) does not 
correctly exclude shards before the exclusive
+               // start shard id in the returned shards list; check if we need 
to remove these erroneously returned shards
+               if (startShardId != null) {
+                       List<Shard> shards = 
describeStreamResult.getStreamDescription().getShards();
+                       Iterator<Shard> shardItr = shards.iterator();
+                       while (shardItr.hasNext()) {
+                               if 
(KinesisStreamShard.compareShardIds(shardItr.next().getShardId(), startShardId) 
<= 0) {
+                                       shardItr.remove();
+                               }
+                       }
+               }
+
+               return describeStreamResult;
+       }
+
+       private static long fullJitterBackoff(long base, long max, double 
power, int attempt) {
+               long exponentialBackoff = (long) Math.min(max, base * 
Math.pow(power, attempt));
+               return (long)(seed.nextDouble()*exponentialBackoff); // random 
jitter between 0 and the exponential backoff
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
new file mode 100644
index 0000000..39ddc52
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.proxy;
+
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+
+import java.util.Map;
+
+/**
+ * Interface for a Kinesis proxy that operates on multiple Kinesis streams 
within the same AWS service region.
+ */
+public interface KinesisProxyInterface {
+
+       /**
+        * Get a shard iterator from the specified position in a shard.
+        * The retrieved shard iterator can be used in {@link 
KinesisProxyInterface#getRecords(String, int)}}
+        * to read data from the Kinesis shard.
+        *
+        * @param shard the shard to get the iterator
+        * @param shardIteratorType the iterator type, defining how the shard 
is to be iterated
+        *                          (one of: TRIM_HORIZON, LATEST, 
AT_SEQUENCE_NUMBER, AFTER_SEQUENCE_NUMBER)
+        * @param startingSeqNum sequence number, must be null if 
shardIteratorType is TRIM_HORIZON or LATEST
+        * @return shard iterator which can be used to read data from Kinesis
+        * @throws InterruptedException this method will retry with backoff if 
AWS Kinesis complains that the
+        *                              operation has exceeded the rate limit; 
this exception will be thrown
+        *                              if the backoff is interrupted.
+        */
+       String getShardIterator(KinesisStreamShard shard, String 
shardIteratorType, String startingSeqNum) throws InterruptedException;
+
+       /**
+        * Get the next batch of data records using a specific shard iterator
+        *
+        * @param shardIterator a shard iterator that encodes info about which 
shard to read and where to start reading
+        * @param maxRecordsToGet the maximum amount of records to retrieve for 
this batch
+        * @return the batch of retrieved records, also with a shard iterator 
that can be used to get the next batch
+        * @throws InterruptedException this method will retry with backoff if 
AWS Kinesis complains that the
+        *                              operation has exceeded the rate limit; 
this exception will be thrown
+        *                              if the backoff is interrupted.
+        */
+       GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) 
throws InterruptedException;
+
+       /**
+        * Get shard list of multiple Kinesis streams, ignoring the
+        * shards of each stream before a specified last seen shard id.
+        *
+        * @param streamNamesWithLastSeenShardIds a map with stream as key, and 
last seen shard id as value
+        * @return result of the shard list query
+        * @throws InterruptedException this method will retry with backoff if 
AWS Kinesis complains that the
+        *                              operation has exceeded the rate limit; 
this exception will be thrown
+        *                              if the backoff is interrupted.
+        */
+       GetShardListResult getShardList(Map<String,String> 
streamNamesWithLastSeenShardIds) throws InterruptedException;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchema.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchema.java
new file mode 100644
index 0000000..0effdd8
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchema.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.serialization;
+
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * This is a deserialization schema specific for the Flink Kinesis Consumer. 
Different from the
+ * basic {@link DeserializationSchema}, this schema offers additional 
Kinesis-specific information
+ * about the record that may be useful to the user application.
+ *
+ * @param <T> The type created by the keyed deserialization schema.
+ */
+public interface KinesisDeserializationSchema<T> extends Serializable, 
ResultTypeQueryable<T> {
+
+       /**
+        * Deserializes a Kinesis record's bytes
+        *
+        * @param recordValue the record's value as a byte array
+        * @param partitionKey the record's partition key at the time of writing
+        * @param seqNum the sequence number of this record in the Kinesis shard
+        * @param approxArrivalTimestamp the server-side timestamp of when 
Kinesis received and stored the record
+        * @param stream the name of the Kinesis stream that this record was 
sent to
+        * @param shardId The identifier of the shard the record was sent to
+        * @return the deserialized message as an Java object
+        * @throws IOException
+        */
+       T deserialize(byte[] recordValue, String partitionKey, String seqNum, 
long approxArrivalTimestamp, String stream, String shardId) throws IOException;
+
+       /**
+        * Method to decide whether the element signals the end of the stream. 
If
+        * true is returned the element won't be emitted.
+        *
+        * @param nextElement the element to test for the end-of-stream signal
+        * @return true if the element signals end of stream, false otherwise
+        */
+       // TODO FLINK-4194 ADD SUPPORT FOR boolean isEndOfStream(T nextElement);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchemaWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchemaWrapper.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchemaWrapper.java
new file mode 100644
index 0000000..6e66038
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchemaWrapper.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.serialization;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+
+import java.io.IOException;
+
+/**
+ * A simple wrapper for using the {@link DeserializationSchema} with the 
{@link KinesisDeserializationSchema} interface
+ *
+ * @param <T> The type created by the deserialization schema.
+ */
+public class KinesisDeserializationSchemaWrapper<T> implements 
KinesisDeserializationSchema<T> {
+       private static final long serialVersionUID = 9143148962928375886L;
+
+       private final DeserializationSchema<T> deserializationSchema;
+
+       public KinesisDeserializationSchemaWrapper(DeserializationSchema<T> 
deserializationSchema) {
+               this.deserializationSchema = deserializationSchema;
+       }
+
+       @Override
+       public T deserialize(byte[] recordValue, String partitionKey, String 
seqNum, long approxArrivalTimestamp, String stream, String shardId)
+               throws IOException {
+               return deserializationSchema.deserialize(recordValue);
+       }
+
+       /*
+       FLINK-4194
+
+       @Override
+       public boolean isEndOfStream(T nextElement) {
+               return deserializationSchema.isEndOfStream(nextElement);
+       } */
+
+       @Override
+       public TypeInformation<T> getProducedType() {
+               return deserializationSchema.getProducedType();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisSerializationSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisSerializationSchema.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisSerializationSchema.java
new file mode 100644
index 0000000..03dd72c
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisSerializationSchema.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kinesis.serialization;
+
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+
+/**
+ * Kinesis-specific serialization schema, allowing users to specify a target 
stream based
+ * on a record's contents.
+ * @param <T>
+ */
+public interface KinesisSerializationSchema<T> extends Serializable {
+       /**
+        * Serialize the given element into a ByteBuffer
+        *
+        * @param element The element to serialize
+        * @return Serialized representation of the element
+        */
+       ByteBuffer serialize(T element);
+
+       /**
+        * Optional method to determine the target stream based on the element.
+        * Return <code>null</code> to use the default stream
+        *
+        * @param element The element to determine the target stream from
+        * @return target stream name
+        */
+       String getTargetStream(T element);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
new file mode 100644
index 0000000..cff69e5
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.ClientConfigurationFactory;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
+import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
+import com.amazonaws.auth.SystemPropertiesCredentialsProvider;
+import com.amazonaws.auth.profile.ProfileCredentialsProvider;
+import com.amazonaws.regions.Region;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
+import 
org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.CredentialProvider;
+
+import java.util.Properties;
+
+/**
+ * Some utilities specific to Amazon Web Service.
+ */
+public class AWSUtil {
+
+       /**
+        * Creates an Amazon Kinesis Client.
+        * @param configProps configuration properties containing the access 
key, secret key, and region
+        * @return a new Amazon Kinesis Client
+        */
+       public static AmazonKinesisClient createKinesisClient(Properties 
configProps) {
+               // set a Flink-specific user agent
+               ClientConfiguration awsClientConfig = new 
ClientConfigurationFactory().getConfig();
+               awsClientConfig.setUserAgent("Apache Flink " + 
EnvironmentInformation.getVersion() +
+                       " (" + 
EnvironmentInformation.getRevisionInformation().commitId + ") Kinesis 
Connector");
+
+               // utilize automatic refreshment of credentials by directly 
passing the AWSCredentialsProvider
+               AmazonKinesisClient client = new AmazonKinesisClient(
+                       AWSUtil.getCredentialsProvider(configProps), 
awsClientConfig);
+
+               
client.setRegion(Region.getRegion(Regions.fromName(configProps.getProperty(AWSConfigConstants.AWS_REGION))));
+               if (configProps.containsKey(AWSConfigConstants.AWS_ENDPOINT)) {
+                       
client.setEndpoint(configProps.getProperty(AWSConfigConstants.AWS_ENDPOINT));
+               }
+               return client;
+       }
+
+       /**
+        * Return a {@link AWSCredentialsProvider} instance corresponding to 
the configuration properties.
+        *
+        * @param configProps the configuration properties
+        * @return The corresponding AWS Credentials Provider instance
+        */
+       public static AWSCredentialsProvider getCredentialsProvider(final 
Properties configProps) {
+               CredentialProvider credentialProviderType = 
CredentialProvider.valueOf(configProps.getProperty(
+                       AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, 
CredentialProvider.BASIC.toString()));
+
+               AWSCredentialsProvider credentialsProvider;
+
+               switch (credentialProviderType) {
+                       case ENV_VAR:
+                               credentialsProvider = new 
EnvironmentVariableCredentialsProvider();
+                               break;
+                       case SYS_PROP:
+                               credentialsProvider = new 
SystemPropertiesCredentialsProvider();
+                               break;
+                       case PROFILE:
+                               String profileName = configProps.getProperty(
+                                       AWSConfigConstants.AWS_PROFILE_NAME, 
null);
+                               String profileConfigPath = 
configProps.getProperty(
+                                       AWSConfigConstants.AWS_PROFILE_PATH, 
null);
+                               credentialsProvider = (profileConfigPath == 
null)
+                                       ? new 
ProfileCredentialsProvider(profileName)
+                                       : new 
ProfileCredentialsProvider(profileConfigPath, profileName);
+                               break;
+                       case AUTO:
+                               credentialsProvider = new 
DefaultAWSCredentialsProviderChain();
+                               break;
+                       default:
+                       case BASIC:
+                               credentialsProvider = new 
AWSCredentialsProvider() {
+                                       @Override
+                                       public AWSCredentials getCredentials() {
+                                               return new BasicAWSCredentials(
+                                                       
configProps.getProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID),
+                                                       
configProps.getProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY));
+                                       }
+
+                                       @Override
+                                       public void refresh() {
+                                               // do nothing
+                                       }
+                               };
+               }
+
+               return credentialsProvider;
+       }
+
+       /**
+        * Checks whether or not a region ID is valid
+        *
+        * @param region The AWS region ID to check
+        * @return true if the supplied region ID is valid, false otherwise
+        */
+       public static boolean isValidRegion(String region) {
+               try {
+                       Regions.fromName(region.toLowerCase());
+               } catch (IllegalArgumentException e) {
+                       return false;
+               }
+               return true;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
new file mode 100644
index 0000000..9aa14ad
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import com.amazonaws.regions.Regions;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
+import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
+import 
org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.CredentialProvider;
+import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.InitialPosition;
+import 
org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
+
+import java.util.Properties;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Utilities for Flink Kinesis connector configuration.
+ */
+public class KinesisConfigUtil {
+
+       /**
+        * Validate configuration properties for {@link FlinkKinesisConsumer}.
+        */
+       public static void validateConsumerConfiguration(Properties config) {
+               checkNotNull(config, "config can not be null");
+
+               validateAwsConfiguration(config);
+
+               if 
(config.containsKey(ConsumerConfigConstants.STREAM_INITIAL_POSITION)) {
+                       String initPosType = 
config.getProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION);
+
+                       // specified initial position in stream must be either 
LATEST or TRIM_HORIZON
+                       try {
+                               InitialPosition.valueOf(initPosType);
+                       } catch (IllegalArgumentException e) {
+                               StringBuilder sb = new StringBuilder();
+                               for (InitialPosition pos : 
InitialPosition.values()) {
+                                       sb.append(pos.toString()).append(", ");
+                               }
+                               throw new IllegalArgumentException("Invalid 
initial position in stream set in config. Valid values are: " + sb.toString());
+                       }
+               }
+
+               validateOptionalPositiveIntProperty(config, 
ConsumerConfigConstants.SHARD_GETRECORDS_MAX,
+                       "Invalid value given for maximum records per getRecords 
shard operation. Must be a valid non-negative integer value.");
+
+               validateOptionalPositiveIntProperty(config, 
ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES,
+                       "Invalid value given for maximum retry attempts for 
getRecords shard operation. Must be a valid non-negative integer value.");
+
+               validateOptionalPositiveLongProperty(config, 
ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_BASE,
+                       "Invalid value given for get records operation base 
backoff milliseconds. Must be a valid non-negative long value.");
+
+               validateOptionalPositiveLongProperty(config, 
ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_MAX,
+                       "Invalid value given for get records operation max 
backoff milliseconds. Must be a valid non-negative long value.");
+
+               validateOptionalPositiveDoubleProperty(config, 
ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT,
+                       "Invalid value given for get records operation backoff 
exponential constant. Must be a valid non-negative double value.");
+
+               validateOptionalPositiveLongProperty(config, 
ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
+                       "Invalid value given for getRecords sleep interval in 
milliseconds. Must be a valid non-negative long value.");
+
+               validateOptionalPositiveIntProperty(config, 
ConsumerConfigConstants.SHARD_GETITERATOR_RETRIES,
+                       "Invalid value given for maximum retry attempts for 
getShardIterator shard operation. Must be a valid non-negative integer value.");
+
+               validateOptionalPositiveLongProperty(config, 
ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_BASE,
+                       "Invalid value given for get shard iterator operation 
base backoff milliseconds. Must be a valid non-negative long value.");
+
+               validateOptionalPositiveLongProperty(config, 
ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_MAX,
+                       "Invalid value given for get shard iterator operation 
max backoff milliseconds. Must be a valid non-negative long value.");
+
+               validateOptionalPositiveDoubleProperty(config, 
ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT,
+                       "Invalid value given for get shard iterator operation 
backoff exponential constant. Must be a valid non-negative double value.");
+
+               validateOptionalPositiveLongProperty(config, 
ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS,
+                       "Invalid value given for shard discovery sleep interval 
in milliseconds. Must be a valid non-negative long value.");
+
+               validateOptionalPositiveLongProperty(config, 
ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE,
+                       "Invalid value given for describe stream operation base 
backoff milliseconds. Must be a valid non-negative long value.");
+
+               validateOptionalPositiveLongProperty(config, 
ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_MAX,
+                       "Invalid value given for describe stream operation max 
backoff milliseconds. Must be a valid non-negative long value.");
+
+               validateOptionalPositiveDoubleProperty(config, 
ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT,
+                       "Invalid value given for describe stream operation 
backoff exponential constant. Must be a valid non-negative double value.");
+
+               if 
(config.containsKey(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS)) {
+                       checkArgument(
+                               
Long.parseLong(config.getProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS))
+                                       < 
ConsumerConfigConstants.MAX_SHARD_GETRECORDS_INTERVAL_MILLIS,
+                               "Invalid value given for getRecords sleep 
interval in milliseconds. Must be lower than " +
+                                       
ConsumerConfigConstants.MAX_SHARD_GETRECORDS_INTERVAL_MILLIS + " milliseconds."
+                       );
+               }
+       }
+
+       /**
+        * Validate configuration properties for {@link FlinkKinesisProducer}.
+        */
+       public static void validateProducerConfiguration(Properties config) {
+               checkNotNull(config, "config can not be null");
+
+               validateAwsConfiguration(config);
+
+               validateOptionalPositiveLongProperty(config, 
ProducerConfigConstants.COLLECTION_MAX_COUNT,
+                       "Invalid value given for maximum number of items to 
pack into a PutRecords request. Must be a valid non-negative long value.");
+
+               validateOptionalPositiveLongProperty(config, 
ProducerConfigConstants.AGGREGATION_MAX_COUNT,
+                       "Invalid value given for maximum number of items to 
pack into an aggregated record. Must be a valid non-negative long value.");
+       }
+
+       /**
+        * Validate configuration properties related to Amazon AWS service
+        */
+       public static void validateAwsConfiguration(Properties config) {
+               if 
(!config.containsKey(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER)) {
+                       // if the credential provider type is not specified, it 
will default to BASIC later on,
+                       // so the Access Key ID and Secret Key must be given
+                       if 
(!config.containsKey(AWSConfigConstants.AWS_ACCESS_KEY_ID)
+                               || 
!config.containsKey(AWSConfigConstants.AWS_SECRET_ACCESS_KEY)) {
+                               throw new IllegalArgumentException("Please set 
values for AWS Access Key ID ('"+ AWSConfigConstants.AWS_ACCESS_KEY_ID +"') " +
+                                       "and Secret Key ('" + 
AWSConfigConstants.AWS_SECRET_ACCESS_KEY + "') when using the BASIC AWS 
credential provider type.");
+                       }
+               } else {
+                       String credentialsProviderType = 
config.getProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER);
+
+                       // value specified for 
AWSConfigConstants.AWS_CREDENTIALS_PROVIDER needs to be recognizable
+                       CredentialProvider providerType;
+                       try {
+                               providerType = 
CredentialProvider.valueOf(credentialsProviderType);
+                       } catch (IllegalArgumentException e) {
+                               StringBuilder sb = new StringBuilder();
+                               for (CredentialProvider type : 
CredentialProvider.values()) {
+                                       sb.append(type.toString()).append(", ");
+                               }
+                               throw new IllegalArgumentException("Invalid AWS 
Credential Provider Type set in config. Valid values are: " + sb.toString());
+                       }
+
+                       // if BASIC type is used, also check that the Access 
Key ID and Secret Key is supplied
+                       if (providerType == CredentialProvider.BASIC) {
+                               if 
(!config.containsKey(AWSConfigConstants.AWS_ACCESS_KEY_ID)
+                                       || 
!config.containsKey(AWSConfigConstants.AWS_SECRET_ACCESS_KEY)) {
+                                       throw new 
IllegalArgumentException("Please set values for AWS Access Key ID ('"+ 
AWSConfigConstants.AWS_ACCESS_KEY_ID +"') " +
+                                               "and Secret Key ('" + 
AWSConfigConstants.AWS_SECRET_ACCESS_KEY + "') when using the BASIC AWS 
credential provider type.");
+                               }
+                       }
+               }
+
+               if (!config.containsKey(AWSConfigConstants.AWS_REGION)) {
+                       throw new IllegalArgumentException("The AWS region ('" 
+ AWSConfigConstants.AWS_REGION + "') must be set in the config.");
+               } else {
+                       // specified AWS Region name must be recognizable
+                       if 
(!AWSUtil.isValidRegion(config.getProperty(AWSConfigConstants.AWS_REGION))) {
+                               StringBuilder sb = new StringBuilder();
+                               for (Regions region : Regions.values()) {
+                                       sb.append(region.getName()).append(", 
");
+                               }
+                               throw new IllegalArgumentException("Invalid AWS 
region set in config. Valid values are: " + sb.toString());
+                       }
+               }
+       }
+
+       private static void validateOptionalPositiveLongProperty(Properties 
config, String key, String message) {
+               if (config.containsKey(key)) {
+                       try {
+                               long value = 
Long.parseLong(config.getProperty(key));
+                               if (value < 0) {
+                                       throw new NumberFormatException();
+                               }
+                       } catch (NumberFormatException e) {
+                               throw new IllegalArgumentException(message);
+                       }
+               }
+       }
+
+       private static void validateOptionalPositiveIntProperty(Properties 
config, String key, String message) {
+               if (config.containsKey(key)) {
+                       try {
+                               int value = 
Integer.parseInt(config.getProperty(key));
+                               if (value < 0) {
+                                       throw new NumberFormatException();
+                               }
+                       } catch (NumberFormatException e) {
+                               throw new IllegalArgumentException(message);
+                       }
+               }
+       }
+
+       private static void validateOptionalPositiveDoubleProperty(Properties 
config, String key, String message) {
+               if (config.containsKey(key)) {
+                       try {
+                               double value = 
Double.parseDouble(config.getProperty(key));
+                               if (value < 0) {
+                                       throw new NumberFormatException();
+                               }
+                       } catch (NumberFormatException e) {
+                               throw new IllegalArgumentException(message);
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/resources/log4j.properties 
b/flink-connectors/flink-connector-kinesis/src/main/resources/log4j.properties
new file mode 100644
index 0000000..773f932
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/resources/log4j.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=INFO, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger

Reply via email to