STORM-697: Overload of generateTuples to accept the Partition and offset
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6e4fde20 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6e4fde20 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6e4fde20 Branch: refs/heads/master Commit: 6e4fde20af8d285cdf4829e4c2c4aef4cd45d89d Parents: 5b4c28a Author: matt.tieman <matt.tie...@inin.com> Authored: Tue Mar 3 11:47:38 2015 -0500 Committer: matt.tieman <matt.tie...@inin.com> Committed: Tue Mar 3 11:47:38 2015 -0500 ---------------------------------------------------------------------- .../storm-kafka/src/jvm/storm/kafka/KafkaUtils.java | 15 +++++++++++++++ 1 file changed, 15 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/6e4fde20/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java index 137dc99..9af49fe 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java +++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java @@ -210,6 +210,21 @@ public class KafkaUtils { } return tups; } + + public static Iterable<List<Object>> generateTuples(KafkaConfig kafkaConfig, Message msg, Partition partition, int offset) { + Iterable<List<Object>> tups; + ByteBuffer payload = msg.payload(); + if (payload == null) { + return null; + } + + if (kafkaConfig.scheme instanceof MessageMetadataSchemeAsMultiScheme) { + tups = ((MessageMetadataSchemeAsMultiScheme) kafkaConfig.scheme).deserializeMessageWithMetadata(Utils.toByteArray(payload), partition, offset); + } else { + tups = kafkaConfig.scheme.deserialize(Utils.toByteArray(payload)); + } + return tups; + } public static List<Partition> calculatePartitionsForTask(GlobalPartitionInformation partitionInformation, int totalTasks, int taskIndex) {