STORM-697: Overload of generateTuples to accept the Partition and offset

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6e4fde20
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6e4fde20
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6e4fde20

Branch: refs/heads/master
Commit: 6e4fde20af8d285cdf4829e4c2c4aef4cd45d89d
Parents: 5b4c28a
Author: matt.tieman <matt.tie...@inin.com>
Authored: Tue Mar 3 11:47:38 2015 -0500
Committer: matt.tieman <matt.tie...@inin.com>
Committed: Tue Mar 3 11:47:38 2015 -0500

----------------------------------------------------------------------
 .../storm-kafka/src/jvm/storm/kafka/KafkaUtils.java  | 15 +++++++++++++++
 1 file changed, 15 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/6e4fde20/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java 
b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
index 137dc99..9af49fe 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
@@ -210,6 +210,21 @@ public class KafkaUtils {
         }
         return tups;
     }
+    
+    public static Iterable<List<Object>> generateTuples(KafkaConfig 
kafkaConfig, Message msg, Partition partition, int offset) {
+        Iterable<List<Object>> tups;
+        ByteBuffer payload = msg.payload();
+        if (payload == null) {
+            return null;
+        }
+        
+        if (kafkaConfig.scheme instanceof MessageMetadataSchemeAsMultiScheme) {
+            tups = ((MessageMetadataSchemeAsMultiScheme) 
kafkaConfig.scheme).deserializeMessageWithMetadata(Utils.toByteArray(payload), 
partition, offset);
+        } else {
+            tups = kafkaConfig.scheme.deserialize(Utils.toByteArray(payload));
+        }
+        return tups;
+    }
 
 
     public static List<Partition> 
calculatePartitionsForTask(GlobalPartitionInformation partitionInformation, int 
totalTasks, int taskIndex) {

Reply via email to