Repository: flink
Updated Branches:
  refs/heads/master a7274d566 -> 662b45868


[FLINK-4191] Expose shard information in kinesis deserialization schema

This closes #2225


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/662b4586
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/662b4586
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/662b4586

Branch: refs/heads/master
Commit: 662b4586856379f9baf19862721ad99a20e74d13
Parents: a7274d5
Author: Robert Metzger <rmetz...@apache.org>
Authored: Mon Jul 11 14:54:41 2016 +0200
Committer: Robert Metzger <rmetz...@apache.org>
Committed: Tue Jul 12 10:09:41 2016 +0200

----------------------------------------------------------------------
 .../connectors/kinesis/internals/ShardConsumer.java          | 8 +++++++-
 .../kinesis/serialization/KinesisDeserializationSchema.java  | 8 +++++---
 .../serialization/KinesisDeserializationSchemaWrapper.java   | 7 +++++--
 3 files changed, 17 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/662b4586/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 
b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
index 3b68343..d98de78 100644
--- 
a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
+++ 
b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
@@ -206,7 +206,13 @@ public class ShardConsumer<T> implements Runnable {
                final long approxArrivalTimestamp = 
record.getApproximateArrivalTimestamp().getTime();
 
                final T value = deserializer.deserialize(
-                       keyBytes, dataBytes, subscribedShard.getStreamName(), 
record.getSequenceNumber(), approxArrivalTimestamp);
+                       keyBytes,
+                       dataBytes,
+                       record.getPartitionKey(),
+                       record.getSequenceNumber(),
+                       approxArrivalTimestamp,
+                       subscribedShard.getStreamName(),
+                       subscribedShard.getShard().getShardId());
 
                if (record.isAggregated()) {
                        fetcherRef.emitRecordAndUpdateState(

http://git-wip-us.apache.org/repos/asf/flink/blob/662b4586/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchema.java
 
b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchema.java
index dfacc9a..c94df20 100644
--- 
a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchema.java
+++ 
b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchema.java
@@ -37,13 +37,15 @@ public interface KinesisDeserializationSchema<T> extends 
Serializable, ResultTyp
         *
         * @param recordKey the records's key as a byte array (null if no key 
has been set for the record)
         * @param recordValue the record's value as a byte array
-        * @param stream the name of the Kinesis stream that this record was 
sent to
+        * @param partitionKey the record's partition key at the time of writing
         * @param seqNum the sequence number of this record in the Kinesis shard
         * @param approxArrivalTimestamp the server-side timestamp of when 
Kinesis received and stored the record
+        * @param stream the name of the Kinesis stream that this record was 
sent to
+        * @param shardId The identifier of the shard the record was sent to
         * @return the deserialized message as an Java object
         * @throws IOException
         */
-       T deserialize(byte[] recordKey, byte[] recordValue, String stream, 
String seqNum, long approxArrivalTimestamp) throws IOException;
+       T deserialize(byte[] recordKey, byte[] recordValue, String 
partitionKey, String seqNum, long approxArrivalTimestamp, String stream, String 
shardId) throws IOException;
 
        /**
         * Method to decide whether the element signals the end of the stream. 
If
@@ -52,5 +54,5 @@ public interface KinesisDeserializationSchema<T> extends 
Serializable, ResultTyp
         * @param nextElement the element to test for the end-of-stream signal
         * @return true if the element signals end of stream, false otherwise
         */
-       boolean isEndOfStream(T nextElement);
+       // TODO FLINK-4194 ADD SUPPORT FOR boolean isEndOfStream(T nextElement);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/662b4586/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchemaWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchemaWrapper.java
 
b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchemaWrapper.java
index c534be7..86fb72b 100644
--- 
a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchemaWrapper.java
+++ 
b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchemaWrapper.java
@@ -37,15 +37,18 @@ public class KinesisDeserializationSchemaWrapper<T> 
implements KinesisDeserializ
        }
 
        @Override
-       public T deserialize(byte[] recordKey, byte[] recordValue, String 
stream, String seqNum, long approxArrivalTimestamp)
+       public T deserialize(byte[] recordKey, byte[] recordValue, String 
partitionKey, String seqNum, long approxArrivalTimestamp, String stream, String 
shardId)
                throws IOException {
                return deserializationSchema.deserialize(recordValue);
        }
 
+       /*
+       FLINK-4194
+
        @Override
        public boolean isEndOfStream(T nextElement) {
                return deserializationSchema.isEndOfStream(nextElement);
-       }
+       } */
 
        @Override
        public TypeInformation<T> getProducedType() {

Reply via email to