http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java new file mode 100644 index 0000000..612a4a7 --- /dev/null +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.internals; + +import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord; +import com.amazonaws.services.kinesis.model.ExpiredIteratorException; +import com.amazonaws.services.kinesis.model.GetRecordsResult; +import com.amazonaws.services.kinesis.model.Record; +import com.amazonaws.services.kinesis.model.ShardIteratorType; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Properties; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Thread that does the actual data pulling from AWS Kinesis shards. Each thread is in charge of one Kinesis shard only. + */ +public class ShardConsumer<T> implements Runnable { + + private static final Logger LOG = LoggerFactory.getLogger(ShardConsumer.class); + + private final KinesisDeserializationSchema<T> deserializer; + + private final KinesisProxyInterface kinesis; + + private final int subscribedShardStateIndex; + + private final KinesisDataFetcher<T> fetcherRef; + + private final KinesisStreamShard subscribedShard; + + private final int maxNumberOfRecordsPerFetch; + private final long fetchIntervalMillis; + + private SequenceNumber lastSequenceNum; + + /** + * Creates a shard consumer. + * + * @param fetcherRef reference to the owning fetcher + * @param subscribedShardStateIndex the state index of the shard this consumer is subscribed to + * @param subscribedShard the shard this consumer is subscribed to + * @param lastSequenceNum the sequence number in the shard to start consuming + */ + public ShardConsumer(KinesisDataFetcher<T> fetcherRef, + Integer subscribedShardStateIndex, + KinesisStreamShard subscribedShard, + SequenceNumber lastSequenceNum) { + this(fetcherRef, + subscribedShardStateIndex, + subscribedShard, + lastSequenceNum, + KinesisProxy.create(fetcherRef.getConsumerConfiguration())); + } + + /** This constructor is exposed for testing purposes */ + protected ShardConsumer(KinesisDataFetcher<T> fetcherRef, + Integer subscribedShardStateIndex, + KinesisStreamShard subscribedShard, + SequenceNumber lastSequenceNum, + KinesisProxyInterface kinesis) { + this.fetcherRef = checkNotNull(fetcherRef); + this.subscribedShardStateIndex = checkNotNull(subscribedShardStateIndex); + this.subscribedShard = checkNotNull(subscribedShard); + this.lastSequenceNum = checkNotNull(lastSequenceNum); + checkArgument( + !lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get()), + "Should not start a ShardConsumer if the shard has already been completely read."); + + this.deserializer = fetcherRef.getClonedDeserializationSchema(); + + Properties consumerConfig = fetcherRef.getConsumerConfiguration(); + this.kinesis = kinesis; + this.maxNumberOfRecordsPerFetch = Integer.valueOf(consumerConfig.getProperty( + ConsumerConfigConstants.SHARD_GETRECORDS_MAX, + Integer.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX))); + this.fetchIntervalMillis = Long.valueOf(consumerConfig.getProperty( + ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, + Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_INTERVAL_MILLIS))); + } + + @SuppressWarnings("unchecked") + @Override + public void run() { + String nextShardItr; + + try { + // before infinitely looping, we set the initial nextShardItr appropriately + + if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM.get())) { + // if the shard is already closed, there will be no latest next record to get for this shard + if (subscribedShard.isClosed()) { + nextShardItr = null; + } else { + nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.LATEST.toString(), null); + } + } else if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get())) { + nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.TRIM_HORIZON.toString(), null); + } else if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) { + nextShardItr = null; + } else { + // we will be starting from an actual sequence number (due to restore from failure). + // if the last sequence number refers to an aggregated record, we need to clean up any dangling sub-records + // from the last aggregated record; otherwise, we can simply start iterating from the record right after. + + if (lastSequenceNum.isAggregated()) { + String itrForLastAggregatedRecord = + kinesis.getShardIterator(subscribedShard, ShardIteratorType.AT_SEQUENCE_NUMBER.toString(), lastSequenceNum.getSequenceNumber()); + + // get only the last aggregated record + GetRecordsResult getRecordsResult = getRecords(itrForLastAggregatedRecord, 1); + + List<UserRecord> fetchedRecords = deaggregateRecords( + getRecordsResult.getRecords(), + subscribedShard.getShard().getHashKeyRange().getStartingHashKey(), + subscribedShard.getShard().getHashKeyRange().getEndingHashKey()); + + long lastSubSequenceNum = lastSequenceNum.getSubSequenceNumber(); + for (UserRecord record : fetchedRecords) { + // we have found a dangling sub-record if it has a larger subsequence number + // than our last sequence number; if so, collect the record and update state + if (record.getSubSequenceNumber() > lastSubSequenceNum) { + deserializeRecordForCollectionAndUpdateState(record); + } + } + + // set the nextShardItr so we can continue iterating in the next while loop + nextShardItr = getRecordsResult.getNextShardIterator(); + } else { + // the last record was non-aggregated, so we can simply start from the next record + nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), lastSequenceNum.getSequenceNumber()); + } + } + + while(isRunning()) { + if (nextShardItr == null) { + fetcherRef.updateState(subscribedShardStateIndex, SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get()); + + // we can close this consumer thread once we've reached the end of the subscribed shard + break; + } else { + if (fetchIntervalMillis != 0) { + Thread.sleep(fetchIntervalMillis); + } + + GetRecordsResult getRecordsResult = getRecords(nextShardItr, maxNumberOfRecordsPerFetch); + + // each of the Kinesis records may be aggregated, so we must deaggregate them before proceeding + List<UserRecord> fetchedRecords = deaggregateRecords( + getRecordsResult.getRecords(), + subscribedShard.getShard().getHashKeyRange().getStartingHashKey(), + subscribedShard.getShard().getHashKeyRange().getEndingHashKey()); + + for (UserRecord record : fetchedRecords) { + deserializeRecordForCollectionAndUpdateState(record); + } + + nextShardItr = getRecordsResult.getNextShardIterator(); + } + } + } catch (Throwable t) { + fetcherRef.stopWithError(t); + } + } + + /** + * The loop in run() checks this before fetching next batch of records. Since this runnable will be executed + * by the ExecutorService {@link KinesisDataFetcher#shardConsumersExecutor}, the only way to close down this thread + * would be by calling shutdownNow() on {@link KinesisDataFetcher#shardConsumersExecutor} and let the executor service + * interrupt all currently running {@link ShardConsumer}s. + */ + private boolean isRunning() { + return !Thread.interrupted(); + } + + /** + * Deserializes a record for collection, and accordingly updates the shard state in the fetcher. The last + * successfully collected sequence number in this shard consumer is also updated so that + * {@link ShardConsumer#getRecords(String, int)} may be able to use the correct sequence number to refresh shard + * iterators if necessary. + * + * Note that the server-side Kinesis timestamp is attached to the record when collected. When the + * user programs uses {@link TimeCharacteristic#EventTime}, this timestamp will be used by default. + * + * @param record record to deserialize and collect + * @throws IOException + */ + private void deserializeRecordForCollectionAndUpdateState(UserRecord record) + throws IOException { + ByteBuffer recordData = record.getData(); + + byte[] dataBytes = new byte[recordData.remaining()]; + recordData.get(dataBytes); + + final long approxArrivalTimestamp = record.getApproximateArrivalTimestamp().getTime(); + + final T value = deserializer.deserialize( + dataBytes, + record.getPartitionKey(), + record.getSequenceNumber(), + approxArrivalTimestamp, + subscribedShard.getStreamName(), + subscribedShard.getShard().getShardId()); + + SequenceNumber collectedSequenceNumber = (record.isAggregated()) + ? new SequenceNumber(record.getSequenceNumber(), record.getSubSequenceNumber()) + : new SequenceNumber(record.getSequenceNumber()); + + fetcherRef.emitRecordAndUpdateState( + value, + approxArrivalTimestamp, + subscribedShardStateIndex, + collectedSequenceNumber); + + lastSequenceNum = collectedSequenceNumber; + } + + /** + * Calls {@link KinesisProxyInterface#getRecords(String, int)}, while also handling unexpected + * AWS {@link ExpiredIteratorException}s to assure that we get results and don't just fail on + * such occasions. The returned shard iterator within the successful {@link GetRecordsResult} should + * be used for the next call to this method. + * + * Note: it is important that this method is not called again before all the records from the last result have been + * fully collected with {@link ShardConsumer#deserializeRecordForCollectionAndUpdateState(UserRecord)}, otherwise + * {@link ShardConsumer#lastSequenceNum} may refer to a sub-record in the middle of an aggregated record, leading to + * incorrect shard iteration if the iterator had to be refreshed. + * + * @param shardItr shard iterator to use + * @param maxNumberOfRecords the maximum number of records to fetch for this getRecords attempt + * @return get records result + * @throws InterruptedException + */ + private GetRecordsResult getRecords(String shardItr, int maxNumberOfRecords) throws InterruptedException { + GetRecordsResult getRecordsResult = null; + while (getRecordsResult == null) { + try { + getRecordsResult = kinesis.getRecords(shardItr, maxNumberOfRecords); + } catch (ExpiredIteratorException eiEx) { + LOG.warn("Encountered an unexpected expired iterator {} for shard {};" + + " refreshing the iterator ...", shardItr, subscribedShard); + shardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), lastSequenceNum.getSequenceNumber()); + + // sleep for the fetch interval before the next getRecords attempt with the refreshed iterator + if (fetchIntervalMillis != 0) { + Thread.sleep(fetchIntervalMillis); + } + } + } + return getRecordsResult; + } + + @SuppressWarnings("unchecked") + protected static List<UserRecord> deaggregateRecords(List<Record> records, String startingHashKey, String endingHashKey) { + return UserRecord.deaggregate(records, new BigInteger(startingHashKey), new BigInteger(endingHashKey)); + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java new file mode 100644 index 0000000..53ed11b --- /dev/null +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.model; + +import com.amazonaws.services.kinesis.model.Shard; + +import java.io.Serializable; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A serializable representation of a AWS Kinesis Stream shard. It is basically a wrapper class around the information + * provided along with {@link com.amazonaws.services.kinesis.model.Shard}, with some extra utility methods to + * determine whether or not a shard is closed and whether or not the shard is a result of parent shard splits or merges. + */ +public class KinesisStreamShard implements Serializable { + + private static final long serialVersionUID = -6004217801761077536L; + + private final String streamName; + private final Shard shard; + + private final int cachedHash; + + /** + * Create a new KinesisStreamShard + * + * @param streamName + * the name of the Kinesis stream that this shard belongs to + * @param shard + * the actual AWS Shard instance that will be wrapped within this KinesisStreamShard + */ + public KinesisStreamShard(String streamName, Shard shard) { + this.streamName = checkNotNull(streamName); + this.shard = checkNotNull(shard); + + // since our description of Kinesis Streams shards can be fully defined with the stream name and shard id, + // our hash doesn't need to use hash code of Amazon's description of Shards, which uses other info for calculation + int hash = 17; + hash = 37 * hash + streamName.hashCode(); + hash = 37 * hash + shard.getShardId().hashCode(); + this.cachedHash = hash; + } + + public String getStreamName() { + return streamName; + } + + public boolean isClosed() { + return (shard.getSequenceNumberRange().getEndingSequenceNumber() != null); + } + + public Shard getShard() { + return shard; + } + + @Override + public String toString() { + return "KinesisStreamShard{" + + "streamName='" + streamName + "'" + + ", shard='" + shard.toString() + "'}"; + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof KinesisStreamShard)) { + return false; + } + + if (obj == this) { + return true; + } + + KinesisStreamShard other = (KinesisStreamShard) obj; + + return streamName.equals(other.getStreamName()) && shard.equals(other.getShard()); + } + + @Override + public int hashCode() { + return cachedHash; + } + + /** + * Utility function to compare two shard ids + * + * @param firstShardId first shard id to compare + * @param secondShardId second shard id to compare + * @return a value less than 0 if the first shard id is smaller than the second shard id, + * or a value larger than 0 the first shard is larger then the second shard id, + * or 0 if they are equal + */ + public static int compareShardIds(String firstShardId, String secondShardId) { + if (!isValidShardId(firstShardId)) { + throw new IllegalArgumentException("The first shard id has invalid format."); + } + + if (!isValidShardId(secondShardId)) { + throw new IllegalArgumentException("The second shard id has invalid format."); + } + + // digit segment of the shard id starts at index 8 + return Long.compare(Long.parseLong(firstShardId.substring(8)), Long.parseLong(secondShardId.substring(8))); + } + + /** + * Checks if a shard id has valid format. + * Kinesis stream shard ids have 12-digit numbers left-padded with 0's, + * prefixed with "shardId-", ex. "shardId-000000000015". + * + * @param shardId the shard id to check + * @return whether the shard id is valid + */ + public static boolean isValidShardId(String shardId) { + if (shardId == null) { return false; } + return shardId.matches("^shardId-\\d{12}"); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java new file mode 100644 index 0000000..00181da --- /dev/null +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.model; + +/** + * A wrapper class that bundles a {@link KinesisStreamShard} with its last processed sequence number. + */ +public class KinesisStreamShardState { + + private KinesisStreamShard kinesisStreamShard; + private SequenceNumber lastProcessedSequenceNum; + + public KinesisStreamShardState(KinesisStreamShard kinesisStreamShard, SequenceNumber lastProcessedSequenceNum) { + this.kinesisStreamShard = kinesisStreamShard; + this.lastProcessedSequenceNum = lastProcessedSequenceNum; + } + + public KinesisStreamShard getKinesisStreamShard() { + return this.kinesisStreamShard; + } + + public SequenceNumber getLastProcessedSequenceNum() { + return this.lastProcessedSequenceNum; + } + + public void setLastProcessedSequenceNum(SequenceNumber update) { + this.lastProcessedSequenceNum = update; + } + + @Override + public String toString() { + return "KinesisStreamShardState{" + + "kinesisStreamShard='" + kinesisStreamShard.toString() + "'" + + ", lastProcessedSequenceNumber='" + lastProcessedSequenceNum.toString() + "'}"; + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof KinesisStreamShardState)) { + return false; + } + + if (obj == this) { + return true; + } + + KinesisStreamShardState other = (KinesisStreamShardState) obj; + + return kinesisStreamShard.equals(other.getKinesisStreamShard()) && lastProcessedSequenceNum.equals(other.getLastProcessedSequenceNum()); + } + + @Override + public int hashCode() { + return 37 * (kinesisStreamShard.hashCode() + lastProcessedSequenceNum.hashCode()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java new file mode 100644 index 0000000..8182201 --- /dev/null +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.model; + +import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; +import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher; + +/** + * Special flag values for sequence numbers in shards to indicate special positions. + * The value is initially set by {@link FlinkKinesisConsumer} when {@link KinesisDataFetcher}s are created. + * The KinesisDataFetchers will use this value to determine how to retrieve the starting shard iterator from AWS Kinesis. + */ +public enum SentinelSequenceNumber { + + /** Flag value for shard's sequence numbers to indicate that the + * shard should start to be read from the latest incoming records */ + SENTINEL_LATEST_SEQUENCE_NUM( new SequenceNumber("LATEST_SEQUENCE_NUM") ), + + /** Flag value for shard's sequence numbers to indicate that the shard should + * start to be read from the earliest records that haven't expired yet */ + SENTINEL_EARLIEST_SEQUENCE_NUM( new SequenceNumber("EARLIEST_SEQUENCE_NUM") ), + + /** Flag value to indicate that we have already read the last record of this shard + * (Note: Kinesis shards that have been closed due to a split or merge will have an ending data record) */ + SENTINEL_SHARD_ENDING_SEQUENCE_NUM( new SequenceNumber("SHARD_ENDING_SEQUENCE_NUM") ); + + private SequenceNumber sentinel; + + SentinelSequenceNumber(SequenceNumber sentinel) { + this.sentinel = sentinel; + } + + public SequenceNumber get() { + return sentinel; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SequenceNumber.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SequenceNumber.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SequenceNumber.java new file mode 100644 index 0000000..021f53f --- /dev/null +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SequenceNumber.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.model; + +import java.io.Serializable; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A serializable representation of a Kinesis record's sequence number. It has two fields: the main sequence number, + * and also a subsequence number. If this {@link SequenceNumber} is referring to an aggregated Kinesis record, the + * subsequence number will be a non-negative value representing the order of the sub-record within the aggregation. + */ +public class SequenceNumber implements Serializable { + + private static final long serialVersionUID = 876972197938972667L; + + private static final String DELIMITER = "-"; + + private final String sequenceNumber; + private final long subSequenceNumber; + + private final int cachedHash; + + /** + * Create a new instance for a non-aggregated Kinesis record without a subsequence number. + * @param sequenceNumber the sequence number + */ + public SequenceNumber(String sequenceNumber) { + this(sequenceNumber, -1); + } + + /** + * Create a new instance, with the specified sequence number and subsequence number. + * To represent the sequence number for a non-aggregated Kinesis record, the subsequence number should be -1. + * Otherwise, give a non-negative sequence number to represent an aggregated Kinesis record. + * + * @param sequenceNumber the sequence number + * @param subSequenceNumber the subsequence number (-1 to represent non-aggregated Kinesis records) + */ + public SequenceNumber(String sequenceNumber, long subSequenceNumber) { + this.sequenceNumber = checkNotNull(sequenceNumber); + this.subSequenceNumber = subSequenceNumber; + + this.cachedHash = 37 * (sequenceNumber.hashCode() + Long.valueOf(subSequenceNumber).hashCode()); + } + + public boolean isAggregated() { + return subSequenceNumber >= 0; + } + + public String getSequenceNumber() { + return sequenceNumber; + } + + public long getSubSequenceNumber() { + return subSequenceNumber; + } + + @Override + public String toString() { + if (isAggregated()) { + return sequenceNumber + DELIMITER + subSequenceNumber; + } else { + return sequenceNumber; + } + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof SequenceNumber)) { + return false; + } + + if (obj == this) { + return true; + } + + SequenceNumber other = (SequenceNumber) obj; + + return sequenceNumber.equals(other.getSequenceNumber()) + && (subSequenceNumber == other.getSubSequenceNumber()); + } + + @Override + public int hashCode() { + return cachedHash; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/GetShardListResult.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/GetShardListResult.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/GetShardListResult.java new file mode 100644 index 0000000..04b1654 --- /dev/null +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/GetShardListResult.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.proxy; + +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; + +import java.util.LinkedList; +import java.util.List; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +/** + * Basic model class to bundle the shards retrieved from Kinesis on a {@link KinesisProxyInterface#getShardList(Map)} call. + */ +public class GetShardListResult { + + private final Map<String, LinkedList<KinesisStreamShard>> streamsToRetrievedShardList = new HashMap<>(); + + public void addRetrievedShardToStream(String stream, KinesisStreamShard retrievedShard) { + if (!streamsToRetrievedShardList.containsKey(stream)) { + streamsToRetrievedShardList.put(stream, new LinkedList<KinesisStreamShard>()); + } + streamsToRetrievedShardList.get(stream).add(retrievedShard); + } + + public void addRetrievedShardsToStream(String stream, List<KinesisStreamShard> retrievedShards) { + if (retrievedShards.size() != 0) { + if (!streamsToRetrievedShardList.containsKey(stream)) { + streamsToRetrievedShardList.put(stream, new LinkedList<KinesisStreamShard>()); + } + streamsToRetrievedShardList.get(stream).addAll(retrievedShards); + } + } + + public List<KinesisStreamShard> getRetrievedShardListOfStream(String stream) { + if (!streamsToRetrievedShardList.containsKey(stream)) { + return null; + } else { + return streamsToRetrievedShardList.get(stream); + } + } + + public KinesisStreamShard getLastSeenShardOfStream(String stream) { + if (!streamsToRetrievedShardList.containsKey(stream)) { + return null; + } else { + return streamsToRetrievedShardList.get(stream).getLast(); + } + } + + public boolean hasRetrievedShards() { + return !streamsToRetrievedShardList.isEmpty(); + } + + public Set<String> getStreamsWithRetrievedShards() { + return streamsToRetrievedShardList.keySet(); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java new file mode 100644 index 0000000..9ffc8e6 --- /dev/null +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java @@ -0,0 +1,338 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.proxy; + +import com.amazonaws.services.kinesis.AmazonKinesisClient; +import com.amazonaws.services.kinesis.model.DescribeStreamRequest; +import com.amazonaws.services.kinesis.model.DescribeStreamResult; +import com.amazonaws.services.kinesis.model.GetRecordsRequest; +import com.amazonaws.services.kinesis.model.GetRecordsResult; +import com.amazonaws.services.kinesis.model.GetShardIteratorResult; +import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; +import com.amazonaws.services.kinesis.model.LimitExceededException; +import com.amazonaws.services.kinesis.model.ResourceNotFoundException; +import com.amazonaws.services.kinesis.model.StreamStatus; +import com.amazonaws.services.kinesis.model.Shard; +import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; +import java.util.Map; +import java.util.Random; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Kinesis proxy implementation - a utility class that is used as a proxy to make + * calls to AWS Kinesis for several functions, such as getting a list of shards and + * fetching a batch of data records starting from a specified record sequence number. + * + * NOTE: + * In the AWS KCL library, there is a similar implementation - {@link com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy}. + * This implementation differs mainly in that we can make operations to arbitrary Kinesis streams, which is a needed + * functionality for the Flink Kinesis Connecter since the consumer may simultaneously read from multiple Kinesis streams. + */ +public class KinesisProxy implements KinesisProxyInterface { + + private static final Logger LOG = LoggerFactory.getLogger(KinesisProxy.class); + + /** The actual Kinesis client from the AWS SDK that we will be using to make calls */ + private final AmazonKinesisClient kinesisClient; + + /** Random seed used to calculate backoff jitter for Kinesis operations */ + private final static Random seed = new Random(); + + // ------------------------------------------------------------------------ + // describeStream() related performance settings + // ------------------------------------------------------------------------ + + /** Base backoff millis for the describe stream operation */ + private final long describeStreamBaseBackoffMillis; + + /** Maximum backoff millis for the describe stream operation */ + private final long describeStreamMaxBackoffMillis; + + /** Exponential backoff power constant for the describe stream operation */ + private final double describeStreamExpConstant; + + // ------------------------------------------------------------------------ + // getRecords() related performance settings + // ------------------------------------------------------------------------ + + /** Base backoff millis for the get records operation */ + private final long getRecordsBaseBackoffMillis; + + /** Maximum backoff millis for the get records operation */ + private final long getRecordsMaxBackoffMillis; + + /** Exponential backoff power constant for the get records operation */ + private final double getRecordsExpConstant; + + /** Maximum attempts for the get records operation */ + private final int getRecordsMaxAttempts; + + // ------------------------------------------------------------------------ + // getShardIterator() related performance settings + // ------------------------------------------------------------------------ + + /** Base backoff millis for the get shard iterator operation */ + private final long getShardIteratorBaseBackoffMillis; + + /** Maximum backoff millis for the get shard iterator operation */ + private final long getShardIteratorMaxBackoffMillis; + + /** Exponential backoff power constant for the get shard iterator operation */ + private final double getShardIteratorExpConstant; + + /** Maximum attempts for the get shard iterator operation */ + private final int getShardIteratorMaxAttempts; + + /** + * Create a new KinesisProxy based on the supplied configuration properties + * + * @param configProps configuration properties containing AWS credential and AWS region info + */ + private KinesisProxy(Properties configProps) { + checkNotNull(configProps); + + this.kinesisClient = AWSUtil.createKinesisClient(configProps); + + this.describeStreamBaseBackoffMillis = Long.valueOf( + configProps.getProperty( + ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE, + Long.toString(ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_BASE))); + this.describeStreamMaxBackoffMillis = Long.valueOf( + configProps.getProperty( + ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_MAX, + Long.toString(ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_MAX))); + this.describeStreamExpConstant = Double.valueOf( + configProps.getProperty( + ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT, + Double.toString(ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT))); + + this.getRecordsBaseBackoffMillis = Long.valueOf( + configProps.getProperty( + ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_BASE, + Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_BACKOFF_BASE))); + this.getRecordsMaxBackoffMillis = Long.valueOf( + configProps.getProperty( + ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_MAX, + Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_BACKOFF_MAX))); + this.getRecordsExpConstant = Double.valueOf( + configProps.getProperty( + ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT, + Double.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT))); + this.getRecordsMaxAttempts = Integer.valueOf( + configProps.getProperty( + ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES, + Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_RETRIES))); + + this.getShardIteratorBaseBackoffMillis = Long.valueOf( + configProps.getProperty( + ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_BASE, + Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_BACKOFF_BASE))); + this.getShardIteratorMaxBackoffMillis = Long.valueOf( + configProps.getProperty( + ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_MAX, + Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_BACKOFF_MAX))); + this.getShardIteratorExpConstant = Double.valueOf( + configProps.getProperty( + ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT, + Double.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT))); + this.getShardIteratorMaxAttempts = Integer.valueOf( + configProps.getProperty( + ConsumerConfigConstants.SHARD_GETITERATOR_RETRIES, + Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_RETRIES))); + + } + + /** + * Creates a Kinesis proxy. + * + * @param configProps configuration properties + * @return the created kinesis proxy + */ + public static KinesisProxyInterface create(Properties configProps) { + return new KinesisProxy(configProps); + } + + /** + * {@inheritDoc} + */ + @Override + public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) throws InterruptedException { + final GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); + getRecordsRequest.setShardIterator(shardIterator); + getRecordsRequest.setLimit(maxRecordsToGet); + + GetRecordsResult getRecordsResult = null; + + int attempt = 0; + while (attempt <= getRecordsMaxAttempts && getRecordsResult == null) { + try { + getRecordsResult = kinesisClient.getRecords(getRecordsRequest); + } catch (ProvisionedThroughputExceededException ex) { + long backoffMillis = fullJitterBackoff( + getRecordsBaseBackoffMillis, getRecordsMaxBackoffMillis, getRecordsExpConstant, attempt++); + LOG.warn("Got ProvisionedThroughputExceededException. Backing off for " + + backoffMillis + " millis."); + Thread.sleep(backoffMillis); + } + } + + if (getRecordsResult == null) { + throw new RuntimeException("Rate Exceeded for getRecords operation - all " + getRecordsMaxAttempts + + " retry attempts returned ProvisionedThroughputExceededException."); + } + + return getRecordsResult; + } + + /** + * {@inheritDoc} + */ + @Override + public GetShardListResult getShardList(Map<String, String> streamNamesWithLastSeenShardIds) throws InterruptedException { + GetShardListResult result = new GetShardListResult(); + + for (Map.Entry<String,String> streamNameWithLastSeenShardId : streamNamesWithLastSeenShardIds.entrySet()) { + String stream = streamNameWithLastSeenShardId.getKey(); + String lastSeenShardId = streamNameWithLastSeenShardId.getValue(); + result.addRetrievedShardsToStream(stream, getShardsOfStream(stream, lastSeenShardId)); + } + return result; + } + + /** + * {@inheritDoc} + */ + @Override + public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, @Nullable String startingSeqNum) throws InterruptedException { + GetShardIteratorResult getShardIteratorResult = null; + + int attempt = 0; + while (attempt <= getShardIteratorMaxAttempts && getShardIteratorResult == null) { + try { + getShardIteratorResult = + kinesisClient.getShardIterator(shard.getStreamName(), shard.getShard().getShardId(), shardIteratorType, startingSeqNum); + } catch (ProvisionedThroughputExceededException ex) { + long backoffMillis = fullJitterBackoff( + getShardIteratorBaseBackoffMillis, getShardIteratorMaxBackoffMillis, getShardIteratorExpConstant, attempt++); + LOG.warn("Got ProvisionedThroughputExceededException. Backing off for " + + backoffMillis + " millis."); + Thread.sleep(backoffMillis); + } + } + + if (getShardIteratorResult == null) { + throw new RuntimeException("Rate Exceeded for getShardIterator operation - all " + getShardIteratorMaxAttempts + + " retry attempts returned ProvisionedThroughputExceededException."); + } + return getShardIteratorResult.getShardIterator(); + } + + private List<KinesisStreamShard> getShardsOfStream(String streamName, @Nullable String lastSeenShardId) throws InterruptedException { + List<KinesisStreamShard> shardsOfStream = new ArrayList<>(); + + DescribeStreamResult describeStreamResult; + do { + describeStreamResult = describeStream(streamName, lastSeenShardId); + + List<Shard> shards = describeStreamResult.getStreamDescription().getShards(); + for (Shard shard : shards) { + shardsOfStream.add(new KinesisStreamShard(streamName, shard)); + } + + if (shards.size() != 0) { + lastSeenShardId = shards.get(shards.size() - 1).getShardId(); + } + } while (describeStreamResult.getStreamDescription().isHasMoreShards()); + + return shardsOfStream; + } + + /** + * Get metainfo for a Kinesis stream, which contains information about which shards this Kinesis stream possess. + * + * This method is using a "full jitter" approach described in AWS's article, + * <a href="https://www.awsarchitectureblog.com/2015/03/backoff.html">"Exponential Backoff and Jitter"</a>. + * This is necessary because concurrent calls will be made by all parallel subtask's fetcher. This + * jitter backoff approach will help distribute calls across the fetchers over time. + * + * @param streamName the stream to describe + * @param startShardId which shard to start with for this describe operation (earlier shard's infos will not appear in result) + * @return the result of the describe stream operation + */ + private DescribeStreamResult describeStream(String streamName, @Nullable String startShardId) throws InterruptedException { + final DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); + describeStreamRequest.setStreamName(streamName); + describeStreamRequest.setExclusiveStartShardId(startShardId); + + DescribeStreamResult describeStreamResult = null; + + // Call DescribeStream, with full-jitter backoff (if we get LimitExceededException). + int attemptCount = 0; + while (describeStreamResult == null) { // retry until we get a result + try { + describeStreamResult = kinesisClient.describeStream(describeStreamRequest); + } catch (LimitExceededException le) { + long backoffMillis = fullJitterBackoff( + describeStreamBaseBackoffMillis, describeStreamMaxBackoffMillis, describeStreamExpConstant, attemptCount++); + LOG.warn("Got LimitExceededException when describing stream " + streamName + ". Backing off for " + + backoffMillis + " millis."); + Thread.sleep(backoffMillis); + } catch (ResourceNotFoundException re) { + throw new RuntimeException("Error while getting stream details", re); + } + } + + String streamStatus = describeStreamResult.getStreamDescription().getStreamStatus(); + if (!(streamStatus.equals(StreamStatus.ACTIVE.toString()) || streamStatus.equals(StreamStatus.UPDATING.toString()))) { + if (LOG.isWarnEnabled()) { + LOG.warn("The status of stream " + streamName + " is " + streamStatus + "; result of the current " + + "describeStream operation will not contain any shard information."); + } + } + + // Kinesalite (mock implementation of Kinesis) does not correctly exclude shards before the exclusive + // start shard id in the returned shards list; check if we need to remove these erroneously returned shards + if (startShardId != null) { + List<Shard> shards = describeStreamResult.getStreamDescription().getShards(); + Iterator<Shard> shardItr = shards.iterator(); + while (shardItr.hasNext()) { + if (KinesisStreamShard.compareShardIds(shardItr.next().getShardId(), startShardId) <= 0) { + shardItr.remove(); + } + } + } + + return describeStreamResult; + } + + private static long fullJitterBackoff(long base, long max, double power, int attempt) { + long exponentialBackoff = (long) Math.min(max, base * Math.pow(power, attempt)); + return (long)(seed.nextDouble()*exponentialBackoff); // random jitter between 0 and the exponential backoff + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java new file mode 100644 index 0000000..39ddc52 --- /dev/null +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.proxy; + +import com.amazonaws.services.kinesis.model.GetRecordsResult; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; + +import java.util.Map; + +/** + * Interface for a Kinesis proxy that operates on multiple Kinesis streams within the same AWS service region. + */ +public interface KinesisProxyInterface { + + /** + * Get a shard iterator from the specified position in a shard. + * The retrieved shard iterator can be used in {@link KinesisProxyInterface#getRecords(String, int)}} + * to read data from the Kinesis shard. + * + * @param shard the shard to get the iterator + * @param shardIteratorType the iterator type, defining how the shard is to be iterated + * (one of: TRIM_HORIZON, LATEST, AT_SEQUENCE_NUMBER, AFTER_SEQUENCE_NUMBER) + * @param startingSeqNum sequence number, must be null if shardIteratorType is TRIM_HORIZON or LATEST + * @return shard iterator which can be used to read data from Kinesis + * @throws InterruptedException this method will retry with backoff if AWS Kinesis complains that the + * operation has exceeded the rate limit; this exception will be thrown + * if the backoff is interrupted. + */ + String getShardIterator(KinesisStreamShard shard, String shardIteratorType, String startingSeqNum) throws InterruptedException; + + /** + * Get the next batch of data records using a specific shard iterator + * + * @param shardIterator a shard iterator that encodes info about which shard to read and where to start reading + * @param maxRecordsToGet the maximum amount of records to retrieve for this batch + * @return the batch of retrieved records, also with a shard iterator that can be used to get the next batch + * @throws InterruptedException this method will retry with backoff if AWS Kinesis complains that the + * operation has exceeded the rate limit; this exception will be thrown + * if the backoff is interrupted. + */ + GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) throws InterruptedException; + + /** + * Get shard list of multiple Kinesis streams, ignoring the + * shards of each stream before a specified last seen shard id. + * + * @param streamNamesWithLastSeenShardIds a map with stream as key, and last seen shard id as value + * @return result of the shard list query + * @throws InterruptedException this method will retry with backoff if AWS Kinesis complains that the + * operation has exceeded the rate limit; this exception will be thrown + * if the backoff is interrupted. + */ + GetShardListResult getShardList(Map<String,String> streamNamesWithLastSeenShardIds) throws InterruptedException; +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchema.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchema.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchema.java new file mode 100644 index 0000000..0effdd8 --- /dev/null +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchema.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.serialization; + +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; + +import java.io.IOException; +import java.io.Serializable; + +/** + * This is a deserialization schema specific for the Flink Kinesis Consumer. Different from the + * basic {@link DeserializationSchema}, this schema offers additional Kinesis-specific information + * about the record that may be useful to the user application. + * + * @param <T> The type created by the keyed deserialization schema. + */ +public interface KinesisDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> { + + /** + * Deserializes a Kinesis record's bytes + * + * @param recordValue the record's value as a byte array + * @param partitionKey the record's partition key at the time of writing + * @param seqNum the sequence number of this record in the Kinesis shard + * @param approxArrivalTimestamp the server-side timestamp of when Kinesis received and stored the record + * @param stream the name of the Kinesis stream that this record was sent to + * @param shardId The identifier of the shard the record was sent to + * @return the deserialized message as an Java object + * @throws IOException + */ + T deserialize(byte[] recordValue, String partitionKey, String seqNum, long approxArrivalTimestamp, String stream, String shardId) throws IOException; + + /** + * Method to decide whether the element signals the end of the stream. If + * true is returned the element won't be emitted. + * + * @param nextElement the element to test for the end-of-stream signal + * @return true if the element signals end of stream, false otherwise + */ + // TODO FLINK-4194 ADD SUPPORT FOR boolean isEndOfStream(T nextElement); +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchemaWrapper.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchemaWrapper.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchemaWrapper.java new file mode 100644 index 0000000..6e66038 --- /dev/null +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchemaWrapper.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.serialization; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; + +import java.io.IOException; + +/** + * A simple wrapper for using the {@link DeserializationSchema} with the {@link KinesisDeserializationSchema} interface + * + * @param <T> The type created by the deserialization schema. + */ +public class KinesisDeserializationSchemaWrapper<T> implements KinesisDeserializationSchema<T> { + private static final long serialVersionUID = 9143148962928375886L; + + private final DeserializationSchema<T> deserializationSchema; + + public KinesisDeserializationSchemaWrapper(DeserializationSchema<T> deserializationSchema) { + this.deserializationSchema = deserializationSchema; + } + + @Override + public T deserialize(byte[] recordValue, String partitionKey, String seqNum, long approxArrivalTimestamp, String stream, String shardId) + throws IOException { + return deserializationSchema.deserialize(recordValue); + } + + /* + FLINK-4194 + + @Override + public boolean isEndOfStream(T nextElement) { + return deserializationSchema.isEndOfStream(nextElement); + } */ + + @Override + public TypeInformation<T> getProducedType() { + return deserializationSchema.getProducedType(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisSerializationSchema.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisSerializationSchema.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisSerializationSchema.java new file mode 100644 index 0000000..03dd72c --- /dev/null +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisSerializationSchema.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kinesis.serialization; + + +import java.io.Serializable; +import java.nio.ByteBuffer; + +/** + * Kinesis-specific serialization schema, allowing users to specify a target stream based + * on a record's contents. + * @param <T> + */ +public interface KinesisSerializationSchema<T> extends Serializable { + /** + * Serialize the given element into a ByteBuffer + * + * @param element The element to serialize + * @return Serialized representation of the element + */ + ByteBuffer serialize(T element); + + /** + * Optional method to determine the target stream based on the element. + * Return <code>null</code> to use the default stream + * + * @param element The element to determine the target stream from + * @return target stream name + */ + String getTargetStream(T element); +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java new file mode 100644 index 0000000..cff69e5 --- /dev/null +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.util; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.ClientConfigurationFactory; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; +import com.amazonaws.auth.EnvironmentVariableCredentialsProvider; +import com.amazonaws.auth.SystemPropertiesCredentialsProvider; +import com.amazonaws.auth.profile.ProfileCredentialsProvider; +import com.amazonaws.regions.Region; +import com.amazonaws.regions.Regions; +import com.amazonaws.services.kinesis.AmazonKinesisClient; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.CredentialProvider; + +import java.util.Properties; + +/** + * Some utilities specific to Amazon Web Service. + */ +public class AWSUtil { + + /** + * Creates an Amazon Kinesis Client. + * @param configProps configuration properties containing the access key, secret key, and region + * @return a new Amazon Kinesis Client + */ + public static AmazonKinesisClient createKinesisClient(Properties configProps) { + // set a Flink-specific user agent + ClientConfiguration awsClientConfig = new ClientConfigurationFactory().getConfig(); + awsClientConfig.setUserAgent("Apache Flink " + EnvironmentInformation.getVersion() + + " (" + EnvironmentInformation.getRevisionInformation().commitId + ") Kinesis Connector"); + + // utilize automatic refreshment of credentials by directly passing the AWSCredentialsProvider + AmazonKinesisClient client = new AmazonKinesisClient( + AWSUtil.getCredentialsProvider(configProps), awsClientConfig); + + client.setRegion(Region.getRegion(Regions.fromName(configProps.getProperty(AWSConfigConstants.AWS_REGION)))); + if (configProps.containsKey(AWSConfigConstants.AWS_ENDPOINT)) { + client.setEndpoint(configProps.getProperty(AWSConfigConstants.AWS_ENDPOINT)); + } + return client; + } + + /** + * Return a {@link AWSCredentialsProvider} instance corresponding to the configuration properties. + * + * @param configProps the configuration properties + * @return The corresponding AWS Credentials Provider instance + */ + public static AWSCredentialsProvider getCredentialsProvider(final Properties configProps) { + CredentialProvider credentialProviderType = CredentialProvider.valueOf(configProps.getProperty( + AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, CredentialProvider.BASIC.toString())); + + AWSCredentialsProvider credentialsProvider; + + switch (credentialProviderType) { + case ENV_VAR: + credentialsProvider = new EnvironmentVariableCredentialsProvider(); + break; + case SYS_PROP: + credentialsProvider = new SystemPropertiesCredentialsProvider(); + break; + case PROFILE: + String profileName = configProps.getProperty( + AWSConfigConstants.AWS_PROFILE_NAME, null); + String profileConfigPath = configProps.getProperty( + AWSConfigConstants.AWS_PROFILE_PATH, null); + credentialsProvider = (profileConfigPath == null) + ? new ProfileCredentialsProvider(profileName) + : new ProfileCredentialsProvider(profileConfigPath, profileName); + break; + case AUTO: + credentialsProvider = new DefaultAWSCredentialsProviderChain(); + break; + default: + case BASIC: + credentialsProvider = new AWSCredentialsProvider() { + @Override + public AWSCredentials getCredentials() { + return new BasicAWSCredentials( + configProps.getProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID), + configProps.getProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY)); + } + + @Override + public void refresh() { + // do nothing + } + }; + } + + return credentialsProvider; + } + + /** + * Checks whether or not a region ID is valid + * + * @param region The AWS region ID to check + * @return true if the supplied region ID is valid, false otherwise + */ + public static boolean isValidRegion(String region) { + try { + Regions.fromName(region.toLowerCase()); + } catch (IllegalArgumentException e) { + return false; + } + return true; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java new file mode 100644 index 0000000..9aa14ad --- /dev/null +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.util; + +import com.amazonaws.regions.Regions; +import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; +import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer; +import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.CredentialProvider; +import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.InitialPosition; +import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants; + +import java.util.Properties; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Utilities for Flink Kinesis connector configuration. + */ +public class KinesisConfigUtil { + + /** + * Validate configuration properties for {@link FlinkKinesisConsumer}. + */ + public static void validateConsumerConfiguration(Properties config) { + checkNotNull(config, "config can not be null"); + + validateAwsConfiguration(config); + + if (config.containsKey(ConsumerConfigConstants.STREAM_INITIAL_POSITION)) { + String initPosType = config.getProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION); + + // specified initial position in stream must be either LATEST or TRIM_HORIZON + try { + InitialPosition.valueOf(initPosType); + } catch (IllegalArgumentException e) { + StringBuilder sb = new StringBuilder(); + for (InitialPosition pos : InitialPosition.values()) { + sb.append(pos.toString()).append(", "); + } + throw new IllegalArgumentException("Invalid initial position in stream set in config. Valid values are: " + sb.toString()); + } + } + + validateOptionalPositiveIntProperty(config, ConsumerConfigConstants.SHARD_GETRECORDS_MAX, + "Invalid value given for maximum records per getRecords shard operation. Must be a valid non-negative integer value."); + + validateOptionalPositiveIntProperty(config, ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES, + "Invalid value given for maximum retry attempts for getRecords shard operation. Must be a valid non-negative integer value."); + + validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_BASE, + "Invalid value given for get records operation base backoff milliseconds. Must be a valid non-negative long value."); + + validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_MAX, + "Invalid value given for get records operation max backoff milliseconds. Must be a valid non-negative long value."); + + validateOptionalPositiveDoubleProperty(config, ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT, + "Invalid value given for get records operation backoff exponential constant. Must be a valid non-negative double value."); + + validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, + "Invalid value given for getRecords sleep interval in milliseconds. Must be a valid non-negative long value."); + + validateOptionalPositiveIntProperty(config, ConsumerConfigConstants.SHARD_GETITERATOR_RETRIES, + "Invalid value given for maximum retry attempts for getShardIterator shard operation. Must be a valid non-negative integer value."); + + validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_BASE, + "Invalid value given for get shard iterator operation base backoff milliseconds. Must be a valid non-negative long value."); + + validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_MAX, + "Invalid value given for get shard iterator operation max backoff milliseconds. Must be a valid non-negative long value."); + + validateOptionalPositiveDoubleProperty(config, ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT, + "Invalid value given for get shard iterator operation backoff exponential constant. Must be a valid non-negative double value."); + + validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS, + "Invalid value given for shard discovery sleep interval in milliseconds. Must be a valid non-negative long value."); + + validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE, + "Invalid value given for describe stream operation base backoff milliseconds. Must be a valid non-negative long value."); + + validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_MAX, + "Invalid value given for describe stream operation max backoff milliseconds. Must be a valid non-negative long value."); + + validateOptionalPositiveDoubleProperty(config, ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT, + "Invalid value given for describe stream operation backoff exponential constant. Must be a valid non-negative double value."); + + if (config.containsKey(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS)) { + checkArgument( + Long.parseLong(config.getProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS)) + < ConsumerConfigConstants.MAX_SHARD_GETRECORDS_INTERVAL_MILLIS, + "Invalid value given for getRecords sleep interval in milliseconds. Must be lower than " + + ConsumerConfigConstants.MAX_SHARD_GETRECORDS_INTERVAL_MILLIS + " milliseconds." + ); + } + } + + /** + * Validate configuration properties for {@link FlinkKinesisProducer}. + */ + public static void validateProducerConfiguration(Properties config) { + checkNotNull(config, "config can not be null"); + + validateAwsConfiguration(config); + + validateOptionalPositiveLongProperty(config, ProducerConfigConstants.COLLECTION_MAX_COUNT, + "Invalid value given for maximum number of items to pack into a PutRecords request. Must be a valid non-negative long value."); + + validateOptionalPositiveLongProperty(config, ProducerConfigConstants.AGGREGATION_MAX_COUNT, + "Invalid value given for maximum number of items to pack into an aggregated record. Must be a valid non-negative long value."); + } + + /** + * Validate configuration properties related to Amazon AWS service + */ + public static void validateAwsConfiguration(Properties config) { + if (!config.containsKey(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER)) { + // if the credential provider type is not specified, it will default to BASIC later on, + // so the Access Key ID and Secret Key must be given + if (!config.containsKey(AWSConfigConstants.AWS_ACCESS_KEY_ID) + || !config.containsKey(AWSConfigConstants.AWS_SECRET_ACCESS_KEY)) { + throw new IllegalArgumentException("Please set values for AWS Access Key ID ('"+ AWSConfigConstants.AWS_ACCESS_KEY_ID +"') " + + "and Secret Key ('" + AWSConfigConstants.AWS_SECRET_ACCESS_KEY + "') when using the BASIC AWS credential provider type."); + } + } else { + String credentialsProviderType = config.getProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER); + + // value specified for AWSConfigConstants.AWS_CREDENTIALS_PROVIDER needs to be recognizable + CredentialProvider providerType; + try { + providerType = CredentialProvider.valueOf(credentialsProviderType); + } catch (IllegalArgumentException e) { + StringBuilder sb = new StringBuilder(); + for (CredentialProvider type : CredentialProvider.values()) { + sb.append(type.toString()).append(", "); + } + throw new IllegalArgumentException("Invalid AWS Credential Provider Type set in config. Valid values are: " + sb.toString()); + } + + // if BASIC type is used, also check that the Access Key ID and Secret Key is supplied + if (providerType == CredentialProvider.BASIC) { + if (!config.containsKey(AWSConfigConstants.AWS_ACCESS_KEY_ID) + || !config.containsKey(AWSConfigConstants.AWS_SECRET_ACCESS_KEY)) { + throw new IllegalArgumentException("Please set values for AWS Access Key ID ('"+ AWSConfigConstants.AWS_ACCESS_KEY_ID +"') " + + "and Secret Key ('" + AWSConfigConstants.AWS_SECRET_ACCESS_KEY + "') when using the BASIC AWS credential provider type."); + } + } + } + + if (!config.containsKey(AWSConfigConstants.AWS_REGION)) { + throw new IllegalArgumentException("The AWS region ('" + AWSConfigConstants.AWS_REGION + "') must be set in the config."); + } else { + // specified AWS Region name must be recognizable + if (!AWSUtil.isValidRegion(config.getProperty(AWSConfigConstants.AWS_REGION))) { + StringBuilder sb = new StringBuilder(); + for (Regions region : Regions.values()) { + sb.append(region.getName()).append(", "); + } + throw new IllegalArgumentException("Invalid AWS region set in config. Valid values are: " + sb.toString()); + } + } + } + + private static void validateOptionalPositiveLongProperty(Properties config, String key, String message) { + if (config.containsKey(key)) { + try { + long value = Long.parseLong(config.getProperty(key)); + if (value < 0) { + throw new NumberFormatException(); + } + } catch (NumberFormatException e) { + throw new IllegalArgumentException(message); + } + } + } + + private static void validateOptionalPositiveIntProperty(Properties config, String key, String message) { + if (config.containsKey(key)) { + try { + int value = Integer.parseInt(config.getProperty(key)); + if (value < 0) { + throw new NumberFormatException(); + } + } catch (NumberFormatException e) { + throw new IllegalArgumentException(message); + } + } + } + + private static void validateOptionalPositiveDoubleProperty(Properties config, String key, String message) { + if (config.containsKey(key)) { + try { + double value = Double.parseDouble(config.getProperty(key)); + if (value < 0) { + throw new NumberFormatException(); + } + } catch (NumberFormatException e) { + throw new IllegalArgumentException(message); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/main/resources/log4j.properties b/flink-connectors/flink-connector-kinesis/src/main/resources/log4j.properties new file mode 100644 index 0000000..773f932 --- /dev/null +++ b/flink-connectors/flink-connector-kinesis/src/main/resources/log4j.properties @@ -0,0 +1,27 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +log4j.rootLogger=INFO, testlogger + +log4j.appender.testlogger=org.apache.log4j.ConsoleAppender +log4j.appender.testlogger.target = System.err +log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout +log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n + +# suppress the irrelevant (wrong) warnings from the netty channel handler +log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger