STORM-697: Added scheme to include Partition and offset when generating tuple. >>>
The MessageMetadataScheme interface extends Sheme and defines a deserialization method that accepts the message byte[], Partition, and the offset. MessageMetadataSchemeAsMultiScheme follows the same pattern as KeyValueSchemeAsMultiScheme, extending SchemeAsMultiScheme and providing a deserialization method named for the method defined by MessageMetadataScheme. StringMessageAndMetadataScheme provides an implementation of MessageMetadataScheme, following the same pattern as StringKeyValueScheme. Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2f119c6e Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2f119c6e Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2f119c6e Branch: refs/heads/master Commit: 2f119c6e2edace030afeb9ee0885010f1de7fc28 Parents: 6e76866 Author: matt.tieman <matt.tie...@inin.com> Authored: Tue Mar 3 11:50:04 2015 -0500 Committer: matt.tieman <matt.tie...@inin.com> Committed: Tue Mar 3 11:59:44 2015 -0500 ---------------------------------------------------------------------- .../jvm/storm/kafka/MessageMetadataScheme.java | 25 ++++++++++++++++++++ .../MessageMetadataSchemeAsMultiScheme.java | 25 ++++++++++++++++++++ .../kafka/StringMessageAndMetadataScheme.java | 25 ++++++++++++++++++++ 3 files changed, 75 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/2f119c6e/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataScheme.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataScheme.java b/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataScheme.java new file mode 100644 index 0000000..d0dd2be --- /dev/null +++ b/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataScheme.java @@ -0,0 +1,25 @@ +package storm.kafka; + +import java.util.List; +import backtype.storm.spout.Scheme; + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +public interface MessageMetadataScheme extends Scheme { + public List<Object> deserializeMessageWithMetadata(byte[] message, Partition partition, int offset); +} http://git-wip-us.apache.org/repos/asf/storm/blob/2f119c6e/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataSchemeAsMultiScheme.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataSchemeAsMultiScheme.java b/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataSchemeAsMultiScheme.java new file mode 100644 index 0000000..6226676 --- /dev/null +++ b/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataSchemeAsMultiScheme.java @@ -0,0 +1,25 @@ +package storm.kafka; + +import java.util.Arrays; +import java.util.List; + +import backtype.storm.spout.Scheme; +import backtype.storm.spout.SchemeAsMultiScheme; + +public class MessageMetadataSchemeAsMultiScheme extends SchemeAsMultiScheme { + private static final long serialVersionUID = -7172403703813625116L; + + public MessageMetadataSchemeAsMultiScheme(Scheme scheme) { + super(scheme); + } + + @SuppressWarnings("unchecked") + public Iterable<List<Object>> deserializeMessageWithMetadata(byte[] message, Partition partition, int offset) { + List<Object> o = ((MessageMetadataScheme) scheme).deserializeMessageWithMetadata(message, partition, offset); + if (o == null) { + return null; + } else { + return Arrays.asList(o); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/2f119c6e/external/storm-kafka/src/jvm/storm/kafka/StringMessageAndMetadataScheme.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/StringMessageAndMetadataScheme.java b/external/storm-kafka/src/jvm/storm/kafka/StringMessageAndMetadataScheme.java new file mode 100644 index 0000000..262a27c --- /dev/null +++ b/external/storm-kafka/src/jvm/storm/kafka/StringMessageAndMetadataScheme.java @@ -0,0 +1,25 @@ +package storm.kafka; + +import java.util.List; + +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Values; + +public class StringMessageAndMetadataScheme extends StringScheme implements MessageMetadataScheme { + private static final long serialVersionUID = -5441841920447947374L; + + public static final String STRING_SCHEME_PARTITION_KEY = "partition"; + public static final String STRING_SCHEME_OFFSET = "offset"; + + @Override + public List<Object> deserializeMessageWithMetadata(byte[] message, Partition partition, int offset) { + String stringMessage = StringScheme.deserializeString(message); + return new Values(stringMessage, partition.partition, offset); + } + + @Override + public Fields getOutputFields() { + return new Fields(STRING_SCHEME_KEY, STRING_SCHEME_PARTITION_KEY, STRING_SCHEME_OFFSET); + } + +}