Repository: flink Updated Branches: refs/heads/master a7274d566 -> 662b45868
[FLINK-4191] Expose shard information in kinesis deserialization schema This closes #2225 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/662b4586 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/662b4586 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/662b4586 Branch: refs/heads/master Commit: 662b4586856379f9baf19862721ad99a20e74d13 Parents: a7274d5 Author: Robert Metzger <rmetz...@apache.org> Authored: Mon Jul 11 14:54:41 2016 +0200 Committer: Robert Metzger <rmetz...@apache.org> Committed: Tue Jul 12 10:09:41 2016 +0200 ---------------------------------------------------------------------- .../connectors/kinesis/internals/ShardConsumer.java | 8 +++++++- .../kinesis/serialization/KinesisDeserializationSchema.java | 8 +++++--- .../serialization/KinesisDeserializationSchemaWrapper.java | 7 +++++-- 3 files changed, 17 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/662b4586/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java index 3b68343..d98de78 100644 --- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java +++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java @@ -206,7 +206,13 @@ public class ShardConsumer<T> implements Runnable { final long approxArrivalTimestamp = record.getApproximateArrivalTimestamp().getTime(); final T value = deserializer.deserialize( - keyBytes, dataBytes, subscribedShard.getStreamName(), record.getSequenceNumber(), approxArrivalTimestamp); + keyBytes, + dataBytes, + record.getPartitionKey(), + record.getSequenceNumber(), + approxArrivalTimestamp, + subscribedShard.getStreamName(), + subscribedShard.getShard().getShardId()); if (record.isAggregated()) { fetcherRef.emitRecordAndUpdateState( http://git-wip-us.apache.org/repos/asf/flink/blob/662b4586/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchema.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchema.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchema.java index dfacc9a..c94df20 100644 --- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchema.java +++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchema.java @@ -37,13 +37,15 @@ public interface KinesisDeserializationSchema<T> extends Serializable, ResultTyp * * @param recordKey the records's key as a byte array (null if no key has been set for the record) * @param recordValue the record's value as a byte array - * @param stream the name of the Kinesis stream that this record was sent to + * @param partitionKey the record's partition key at the time of writing * @param seqNum the sequence number of this record in the Kinesis shard * @param approxArrivalTimestamp the server-side timestamp of when Kinesis received and stored the record + * @param stream the name of the Kinesis stream that this record was sent to + * @param shardId The identifier of the shard the record was sent to * @return the deserialized message as an Java object * @throws IOException */ - T deserialize(byte[] recordKey, byte[] recordValue, String stream, String seqNum, long approxArrivalTimestamp) throws IOException; + T deserialize(byte[] recordKey, byte[] recordValue, String partitionKey, String seqNum, long approxArrivalTimestamp, String stream, String shardId) throws IOException; /** * Method to decide whether the element signals the end of the stream. If @@ -52,5 +54,5 @@ public interface KinesisDeserializationSchema<T> extends Serializable, ResultTyp * @param nextElement the element to test for the end-of-stream signal * @return true if the element signals end of stream, false otherwise */ - boolean isEndOfStream(T nextElement); + // TODO FLINK-4194 ADD SUPPORT FOR boolean isEndOfStream(T nextElement); } http://git-wip-us.apache.org/repos/asf/flink/blob/662b4586/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchemaWrapper.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchemaWrapper.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchemaWrapper.java index c534be7..86fb72b 100644 --- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchemaWrapper.java +++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchemaWrapper.java @@ -37,15 +37,18 @@ public class KinesisDeserializationSchemaWrapper<T> implements KinesisDeserializ } @Override - public T deserialize(byte[] recordKey, byte[] recordValue, String stream, String seqNum, long approxArrivalTimestamp) + public T deserialize(byte[] recordKey, byte[] recordValue, String partitionKey, String seqNum, long approxArrivalTimestamp, String stream, String shardId) throws IOException { return deserializationSchema.deserialize(recordValue); } + /* + FLINK-4194 + @Override public boolean isEndOfStream(T nextElement) { return deserializationSchema.isEndOfStream(nextElement); - } + } */ @Override public TypeInformation<T> getProducedType() {