STORM-697: Test for MessageMetadataSchemeAsMultiScheme and generateTuples with metadata using SchemeAsMultiScheme
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6e768665 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6e768665 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6e768665 Branch: refs/heads/master Commit: 6e768665320d08815c53f27e706ef2ae1ff5af78 Parents: 6e4fde2 Author: matt.tieman <matt.tie...@inin.com> Authored: Tue Mar 3 11:48:57 2015 -0500 Committer: matt.tieman <matt.tie...@inin.com> Committed: Tue Mar 3 11:48:57 2015 -0500 ---------------------------------------------------------------------- .../src/test/storm/kafka/KafkaUtilsTest.java | 66 +++++++++++++++++--- 1 file changed, 56 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/6e768665/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java b/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java index 1f1bbbc..a7c9b2b 100644 --- a/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java +++ b/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java @@ -17,9 +17,14 @@ */ package storm.kafka; -import backtype.storm.spout.SchemeAsMultiScheme; -import backtype.storm.utils.Utils; -import com.google.common.collect.ImmutableMap; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +import java.util.List; +import java.util.Properties; + import kafka.api.OffsetRequest; import kafka.javaapi.consumer.SimpleConsumer; import kafka.javaapi.message.ByteBufferMessageSet; @@ -27,18 +32,17 @@ import kafka.javaapi.producer.Producer; import kafka.message.MessageAndOffset; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; + import org.junit.After; import org.junit.Before; import org.junit.Test; -import storm.kafka.trident.GlobalPartitionInformation; +import org.mockito.Mockito; -import java.util.List; -import java.util.Properties; +import storm.kafka.trident.GlobalPartitionInformation; +import backtype.storm.spout.SchemeAsMultiScheme; +import backtype.storm.utils.Utils; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.CoreMatchers.is; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; +import com.google.common.collect.ImmutableMap; public class KafkaUtilsTest { @@ -166,6 +170,47 @@ public class KafkaUtilsTest { assertEquals(value, lists.iterator().next().get(0)); } } + + @Test + public void generateTuplesWithMessageAndMetadataScheme() { + String value = "value"; + Partition mockPartition = Mockito.mock(Partition.class); + mockPartition.partition = 0; + int offset = 0; + + config.scheme = new MessageMetadataSchemeAsMultiScheme(new StringMessageAndMetadataScheme()); + config.tupleMetaData = true; + + createTopicAndSendMessage(null, value); + ByteBufferMessageSet messageAndOffsets = getLastMessage(); + for (MessageAndOffset msg : messageAndOffsets) { + Iterable<List<Object>> lists = KafkaUtils.generateTuples(config, msg.message(), mockPartition, offset); + List<Object> values = lists.iterator().next(); + assertEquals("Message is incorrect", value, values.get(0)); + assertEquals("Offset is incorrect", offset, values.get(1)); + assertEquals("Partition is incorrect", mockPartition.partition, values.get(2)); + } + } + + @Test + public void generateTuplesWithValueSchemeAndMessageAndMetadata() { + String value = "value"; + Partition mockPartition = Mockito.mock(Partition.class); + mockPartition.partition = 0; + int offset = 0; + + config.scheme = new SchemeAsMultiScheme(new StringScheme()); + config.tupleMetaData = true; + + createTopicAndSendMessage(null, value); + ByteBufferMessageSet messageAndOffsets = getLastMessage(); + for (MessageAndOffset msg : messageAndOffsets) { + Iterable<List<Object>> lists = KafkaUtils.generateTuples(config, msg.message(), mockPartition, offset); + List<Object> values = lists.iterator().next(); + assertEquals("Incorrect number of tuple values", 1, values.size()); + assertEquals("Message is incorrect", value, values.get(0)); + } + } private ByteBufferMessageSet getLastMessage() { long offsetOfLastMessage = KafkaUtils.getOffset(simpleConsumer, config.topic, 0, OffsetRequest.LatestTime()) - 1; @@ -174,6 +219,7 @@ public class KafkaUtilsTest { private void runGetValueOnlyTuplesTest() { String value = "value"; + createTopicAndSendMessage(null, value); ByteBufferMessageSet messageAndOffsets = getLastMessage(); for (MessageAndOffset msg : messageAndOffsets) {