Repository: storm Updated Branches: refs/heads/1.0.x-branch 9c8930036 -> 4348548aa
http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java index 95b2199..b178687 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java @@ -17,17 +17,24 @@ */ package org.apache.storm.kafka.spout.builders; +import static org.apache.storm.kafka.spout.KafkaSpoutConfig.DEFAULT_MAX_RETRIES; +import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST; + +import java.util.List; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.storm.Config; import org.apache.storm.generated.StormTopology; -import org.apache.storm.kafka.spout.*; +import org.apache.storm.kafka.spout.Func; +import org.apache.storm.kafka.spout.KafkaSpout; +import org.apache.storm.kafka.spout.KafkaSpoutConfig; +import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff; +import org.apache.storm.kafka.spout.KafkaSpoutRetryService; +import org.apache.storm.kafka.spout.Subscription; import org.apache.storm.kafka.spout.test.KafkaSpoutTestBolt; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields; - -import java.util.HashMap; -import java.util.Map; - -import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST; +import org.apache.storm.tuple.Values; public class SingleTopicKafkaSpoutConfiguration { @@ -42,53 +49,44 @@ public class SingleTopicKafkaSpoutConfiguration { public static StormTopology getTopologyKafkaSpout(int port) { final TopologyBuilder tp = new TopologyBuilder(); - tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams(), port)), 1); + tp.setSpout("kafka_spout", new KafkaSpout<>(SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder(port).build()), 1); tp.setBolt("kafka_bolt", new KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAM); return tp.createTopology(); } - public static KafkaSpoutConfig<String, String> getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams, int port) { - return getKafkaSpoutConfig(kafkaSpoutStreams, port, 10_000); + private static Func<ConsumerRecord<String, String>, List<Object>> TOPIC_KEY_VALUE_FUNC = new Func<ConsumerRecord<String, String>, List<Object>>() { + @Override + public List<Object> apply(ConsumerRecord<String, String> r) { + return new Values(r.topic(), r.key(), r.value()); + } + }; + + public static KafkaSpoutConfig.Builder<String, String> createKafkaSpoutConfigBuilder(int port) { + return setCommonSpoutConfig(KafkaSpoutConfig.builder("127.0.0.1:" + port, TOPIC)); } - public static KafkaSpoutConfig<String, String> getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams, int port, long offsetCommitPeriodMs) { - return getKafkaSpoutConfig(kafkaSpoutStreams, port, offsetCommitPeriodMs, getRetryService()); + public static KafkaSpoutConfig.Builder<String, String> createKafkaSpoutConfigBuilder(Subscription subscription, int port) { + return setCommonSpoutConfig(new KafkaSpoutConfig.Builder<String, String>("127.0.0.1:" + port, subscription)); } - public static KafkaSpoutConfig<String, String> getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams, int port, long offsetCommitPeriodMs, KafkaSpoutRetryService retryService) { - return new KafkaSpoutConfig.Builder<>(getKafkaConsumerProps(port), kafkaSpoutStreams, getTuplesBuilder(), retryService) - .setOffsetCommitPeriodMs(offsetCommitPeriodMs) + public static KafkaSpoutConfig.Builder<String, String> setCommonSpoutConfig(KafkaSpoutConfig.Builder<String, String> config) { + return config + .setRecordTranslator(TOPIC_KEY_VALUE_FUNC, + new Fields("topic", "key", "value"), STREAM) + .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup") + .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5) + .setRetry(getNoDelayRetryService()) + .setOffsetCommitPeriodMs(10_000) .setFirstPollOffsetStrategy(EARLIEST) .setMaxUncommittedOffsets(250) - .setPollTimeoutMs(1000) - .build(); - } - - protected static KafkaSpoutRetryService getRetryService() { - return new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.microSeconds(0), - KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0), Integer.MAX_VALUE, KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0)); - - } - - protected static Map<String, Object> getKafkaConsumerProps(int port) { - Map<String, Object> props = new HashMap<>(); - props.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS, "127.0.0.1:" + port); - props.put(KafkaSpoutConfig.Consumer.GROUP_ID, "kafkaSpoutTestGroup"); - props.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer"); - props.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer"); - props.put("max.poll.records", "5"); - return props; - } - - protected static KafkaSpoutTuplesBuilder<String, String> getTuplesBuilder() { - return new KafkaSpoutTuplesBuilderNamedTopics.Builder<>( - new TopicKeyValueTupleBuilder<String, String>(TOPIC)) - .build(); + .setPollTimeoutMs(1000); } - public static KafkaSpoutStreams getKafkaSpoutStreams() { - final Fields outputFields = new Fields("topic", "key", "value"); - return new KafkaSpoutStreamsNamedTopics.Builder(outputFields, STREAM, new String[]{TOPIC}) // contents of topics test sent to test_stream - .build(); + protected static KafkaSpoutRetryService getNoDelayRetryService() { + /** + * Retry in a tight loop (keep unit tests fasts). + */ + return new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0), + DEFAULT_MAX_RETRIES, KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0)); } } http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/TopicKeyValueTupleBuilder.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/TopicKeyValueTupleBuilder.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/TopicKeyValueTupleBuilder.java deleted file mode 100644 index 4f20b58..0000000 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/TopicKeyValueTupleBuilder.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.kafka.spout.builders; - -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.storm.kafka.spout.KafkaSpoutTupleBuilder; -import org.apache.storm.tuple.Values; - -import java.util.List; - -public class TopicKeyValueTupleBuilder<K, V> extends KafkaSpoutTupleBuilder<K,V> { - /** - * @param topics list of topics that use this implementation to build tuples - */ - public TopicKeyValueTupleBuilder(String... topics) { - super(topics); - } - - @Override - public List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord) { - return new Values(consumerRecord.topic(), - consumerRecord.key(), - consumerRecord.value()); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java new file mode 100644 index 0000000..9972d4c --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java @@ -0,0 +1,196 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout.internal; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + +import java.util.NoSuchElementException; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.KafkaSpoutMessageId; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +public class OffsetManagerTest { + private static final String COMMIT_METADATA = "{\"topologyId\":\"tp1\",\"taskId\":3,\"threadName\":\"Thread-20\"}"; + + @Rule + public ExpectedException expect = ExpectedException.none(); + + private final long initialFetchOffset = 0; + private final TopicPartition testTp = new TopicPartition("testTopic", 0); + private final OffsetManager manager = new OffsetManager(testTp, initialFetchOffset); + + @Test + public void testSkipMissingOffsetsWhenFindingNextCommitOffsetWithGapInMiddleOfAcked() { + /* If topic compaction is enabled in Kafka, we sometimes need to commit past a gap of deleted offsets + * Since the Kafka consumer should return offsets in order, we can assume that if a message is acked + * then any prior message will have been emitted at least once. + * If we see an acked message and some of the offsets preceding it were not emitted, they must have been compacted away and should be skipped. + */ + manager.addToEmitMsgs(0); + manager.addToEmitMsgs(1); + manager.addToEmitMsgs(2); + //3, 4 compacted away + manager.addToEmitMsgs(initialFetchOffset + 5); + manager.addToEmitMsgs(initialFetchOffset + 6); + manager.addToAckMsgs(getMessageId(initialFetchOffset)); + manager.addToAckMsgs(getMessageId(initialFetchOffset + 1)); + manager.addToAckMsgs(getMessageId(initialFetchOffset + 2)); + manager.addToAckMsgs(getMessageId(initialFetchOffset + 6)); + + assertThat("The offset manager should not skip past offset 5 which is still pending", manager.findNextCommitOffset(COMMIT_METADATA).offset(), is(initialFetchOffset + 3)); + + manager.addToAckMsgs(getMessageId(initialFetchOffset + 5)); + + assertThat("The offset manager should skip past the gap in acked messages, since the messages were not emitted", + manager.findNextCommitOffset(COMMIT_METADATA), is(new OffsetAndMetadata(initialFetchOffset + 7, COMMIT_METADATA))); + } + + @Test + public void testSkipMissingOffsetsWhenFindingNextCommitOffsetWithGapBeforeAcked() { + //0-4 compacted away + manager.addToEmitMsgs(initialFetchOffset + 5); + manager.addToEmitMsgs(initialFetchOffset + 6); + manager.addToAckMsgs(getMessageId(initialFetchOffset + 6)); + + assertThat("The offset manager should not skip past offset 5 which is still pending", manager.findNextCommitOffset(COMMIT_METADATA), is(nullValue())); + + manager.addToAckMsgs(getMessageId(initialFetchOffset + 5)); + + assertThat("The offset manager should skip past the gap in acked messages, since the messages were not emitted", + manager.findNextCommitOffset(COMMIT_METADATA), is(new OffsetAndMetadata(initialFetchOffset + 7, COMMIT_METADATA))); + } + + @Test + public void testFindNextCommittedOffsetWithNoAcks() { + OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset(COMMIT_METADATA); + assertThat("There shouldn't be a next commit offset when nothing has been acked", nextCommitOffset, is(nullValue())); + } + + @Test + public void testFindNextCommitOffsetWithOneAck() { + /* + * The KafkaConsumer commitSync API docs: "The committed offset should be the next message your application will consume, i.e. + * lastProcessedMessageOffset + 1. " + */ + emitAndAckMessage(getMessageId(initialFetchOffset)); + OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset(COMMIT_METADATA); + assertThat("The next commit offset should be one past the processed message offset", nextCommitOffset.offset(), is(initialFetchOffset + 1)); + } + + @Test + public void testFindNextCommitOffsetWithMultipleOutOfOrderAcks() { + emitAndAckMessage(getMessageId(initialFetchOffset + 1)); + emitAndAckMessage(getMessageId(initialFetchOffset)); + OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset(COMMIT_METADATA); + assertThat("The next commit offset should be one past the processed message offset", nextCommitOffset.offset(), is(initialFetchOffset + 2)); + } + + @Test + public void testFindNextCommitOffsetWithAckedOffsetGap() { + emitAndAckMessage(getMessageId(initialFetchOffset + 2)); + manager.addToEmitMsgs(initialFetchOffset + 1); + emitAndAckMessage(getMessageId(initialFetchOffset)); + OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset(COMMIT_METADATA); + assertThat("The next commit offset should cover the sequential acked offsets", nextCommitOffset.offset(), is(initialFetchOffset + 1)); + } + + @Test + public void testFindNextOffsetWithAckedButNotEmittedOffsetGap() { + /** + * If topic compaction is enabled in Kafka some offsets may be deleted. + * We distinguish this case from regular gaps in the acked offset sequence caused by out of order acking + * by checking that offsets in the gap have been emitted at some point previously. + * If they haven't then they can't exist in Kafka, since the spout emits tuples in order. + */ + emitAndAckMessage(getMessageId(initialFetchOffset + 2)); + emitAndAckMessage(getMessageId(initialFetchOffset)); + OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset(COMMIT_METADATA); + assertThat("The next commit offset should cover all the acked offsets, since the offset in the gap hasn't been emitted and doesn't exist", + nextCommitOffset.offset(), is(initialFetchOffset + 3)); + } + + @Test + public void testFindNextCommitOffsetWithUnackedOffsetGap() { + manager.addToEmitMsgs(initialFetchOffset + 1); + emitAndAckMessage(getMessageId(initialFetchOffset)); + OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset(COMMIT_METADATA); + assertThat("The next commit offset should cover the contiguously acked offsets", nextCommitOffset.offset(), is(initialFetchOffset + 1)); + } + + @Test + public void testFindNextCommitOffsetWhenTooLowOffsetIsAcked() { + OffsetManager startAtHighOffsetManager = new OffsetManager(testTp, 10); + emitAndAckMessage(getMessageId(0)); + OffsetAndMetadata nextCommitOffset = startAtHighOffsetManager.findNextCommitOffset(COMMIT_METADATA); + assertThat("Acking an offset earlier than the committed offset should have no effect", nextCommitOffset, is(nullValue())); + } + + @Test + public void testCommit() { + emitAndAckMessage(getMessageId(initialFetchOffset)); + emitAndAckMessage(getMessageId(initialFetchOffset + 1)); + emitAndAckMessage(getMessageId(initialFetchOffset + 2)); + + long committedMessages = manager.commit(new OffsetAndMetadata(initialFetchOffset + 2)); + + assertThat("Should have committed all messages to the left of the earliest uncommitted offset", committedMessages, is(2L)); + assertThat("The committed messages should not be in the acked list anymore", manager.contains(getMessageId(initialFetchOffset)), is(false)); + assertThat("The committed messages should not be in the emitted list anymore", manager.containsEmitted(initialFetchOffset), is(false)); + assertThat("The committed messages should not be in the acked list anymore", manager.contains(getMessageId(initialFetchOffset + 1)), is(false)); + assertThat("The committed messages should not be in the emitted list anymore", manager.containsEmitted(initialFetchOffset + 1), is(false)); + assertThat("The uncommitted message should still be in the acked list", manager.contains(getMessageId(initialFetchOffset + 2)), is(true)); + assertThat("The uncommitted message should still be in the emitted list", manager.containsEmitted(initialFetchOffset + 2), is(true)); + } + + private KafkaSpoutMessageId getMessageId(long offset) { + return new KafkaSpoutMessageId(testTp, offset); + } + + private void emitAndAckMessage(KafkaSpoutMessageId msgId) { + manager.addToEmitMsgs(msgId.offset()); + manager.addToAckMsgs(msgId); + } + + @Test + public void testGetNthUncommittedOffsetAfterCommittedOffset() { + manager.addToEmitMsgs(initialFetchOffset + 1); + manager.addToEmitMsgs(initialFetchOffset + 2); + manager.addToEmitMsgs(initialFetchOffset + 5); + manager.addToEmitMsgs(initialFetchOffset + 30); + + assertThat("The third uncommitted offset should be 5", manager.getNthUncommittedOffsetAfterCommittedOffset(3), is(initialFetchOffset + 5L)); + assertThat("The fourth uncommitted offset should be 30", manager.getNthUncommittedOffsetAfterCommittedOffset(4), is(initialFetchOffset + 30L)); + + expect.expect(NoSuchElementException.class); + manager.getNthUncommittedOffsetAfterCommittedOffset(5); + } + + @Test + public void testCommittedFlagSetOnCommit() throws Exception { + assertFalse(manager.hasCommitted()); + manager.commit(mock(OffsetAndMetadata.class)); + assertTrue(manager.hasCommitted()); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscriptionTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscriptionTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscriptionTest.java new file mode 100644 index 0000000..9a2a682 --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscriptionTest.java @@ -0,0 +1,81 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout.subscription; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyList; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.ManualPartitionSubscription; +import org.apache.storm.kafka.spout.ManualPartitioner; +import org.apache.storm.kafka.spout.TopicFilter; +import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration; +import org.apache.storm.task.TopologyContext; +import org.junit.Test; +import org.mockito.InOrder; + +public class ManualPartitionSubscriptionTest { + + @Test + public void testCanReassignPartitions() { + ManualPartitioner partitionerMock = mock(ManualPartitioner.class); + TopicFilter filterMock = mock(TopicFilter.class); + KafkaConsumer<String, String> consumerMock = mock(KafkaConsumer.class); + ConsumerRebalanceListener listenerMock = mock(ConsumerRebalanceListener.class); + TopologyContext contextMock = mock(TopologyContext.class); + ManualPartitionSubscription subscription = new ManualPartitionSubscription(partitionerMock, filterMock); + + List<TopicPartition> onePartition = Collections.singletonList(new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0)); + List<TopicPartition> twoPartitions = new ArrayList<>(); + twoPartitions.add(new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0)); + twoPartitions.add(new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1)); + when(partitionerMock.partition(anyList(), any(TopologyContext.class))) + .thenReturn(onePartition) + .thenReturn(twoPartitions); + + //Set the first assignment + subscription.subscribe(consumerMock, listenerMock, contextMock); + + InOrder inOrder = inOrder(consumerMock, listenerMock); + inOrder.verify(consumerMock).assign(new HashSet<>(onePartition)); + inOrder.verify(listenerMock).onPartitionsAssigned(new HashSet<>(onePartition)); + + reset(consumerMock, listenerMock); + + when(consumerMock.assignment()).thenReturn(new HashSet<>(onePartition)); + + //Update to set the second assignment + subscription.refreshAssignment(); + + //The partition revocation hook must be called before the new partitions are assigned to the consumer, + //to allow the revocation hook to commit offsets for the revoked partitions. + inOrder.verify(listenerMock).onPartitionsRevoked(new HashSet<>(onePartition)); + inOrder.verify(consumerMock).assign(new HashSet<>(twoPartitions)); + inOrder.verify(listenerMock).onPartitionsAssigned(new HashSet<>(twoPartitions)); + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java index 5a78137..40a2d3c 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java @@ -18,45 +18,43 @@ package org.apache.storm.kafka.spout.test; +import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.List; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.StormTopology; +import org.apache.storm.kafka.spout.ByTopicRecordTranslator; +import org.apache.storm.kafka.spout.Func; import org.apache.storm.kafka.spout.KafkaSpout; import org.apache.storm.kafka.spout.KafkaSpoutConfig; import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff; import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval; import org.apache.storm.kafka.spout.KafkaSpoutRetryService; -import org.apache.storm.kafka.spout.KafkaSpoutStreams; -import org.apache.storm.kafka.spout.KafkaSpoutStreamsNamedTopics; -import org.apache.storm.kafka.spout.KafkaSpoutTuplesBuilder; -import org.apache.storm.kafka.spout.KafkaSpoutTuplesBuilderNamedTopics; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST; +import org.apache.storm.tuple.Values; public class KafkaSpoutTopologyMainNamedTopics { - private static final String[] STREAMS = new String[]{"test_stream","test1_stream","test2_stream"}; + private static final String TOPIC_2_STREAM = "test_2_stream"; + private static final String TOPIC_0_1_STREAM = "test_0_1_stream"; private static final String[] TOPICS = new String[]{"test","test1","test2"}; - public static void main(String[] args) throws Exception { new KafkaSpoutTopologyMainNamedTopics().runMain(args); } protected void runMain(String[] args) throws Exception { if (args.length == 0) { - submitTopologyLocalCluster(getTopolgyKafkaSpout(), getConfig()); + submitTopologyLocalCluster(getTopologyKafkaSpout(), getConfig()); } else { - submitTopologyRemoteCluster(args[0], getTopolgyKafkaSpout(), getConfig()); + submitTopologyRemoteCluster(args[0], getTopologyKafkaSpout(), getConfig()); } } @@ -87,16 +85,34 @@ public class KafkaSpoutTopologyMainNamedTopics { return config; } - protected StormTopology getTopolgyKafkaSpout() { + protected StormTopology getTopologyKafkaSpout() { final TopologyBuilder tp = new TopologyBuilder(); - tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams())), 1); - tp.setBolt("kafka_bolt", new KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAMS[0]); - tp.setBolt("kafka_bolt_1", new KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAMS[2]); + tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig()), 1); + tp.setBolt("kafka_bolt", new KafkaSpoutTestBolt()) + .shuffleGrouping("kafka_spout", TOPIC_0_1_STREAM) + .shuffleGrouping("kafka_spout", TOPIC_2_STREAM); + tp.setBolt("kafka_bolt_1", new KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", TOPIC_2_STREAM); return tp.createTopology(); } - protected KafkaSpoutConfig<String,String> getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams) { - return new KafkaSpoutConfig.Builder<String, String>(getKafkaConsumerProps(), kafkaSpoutStreams, getTuplesBuilder(), getRetryService()) + public static Func<ConsumerRecord<String, String>, List<Object>> TOPIC_PART_OFF_KEY_VALUE_FUNC = new Func<ConsumerRecord<String, String>, List<Object>>() { + @Override + public List<Object> apply(ConsumerRecord<String, String> r) { + return new Values(r.topic(), r.partition(), r.offset(), r.key(), r.value()); + } + }; + + protected KafkaSpoutConfig<String,String> getKafkaSpoutConfig() { + ByTopicRecordTranslator<String, String> trans = new ByTopicRecordTranslator<>( + TOPIC_PART_OFF_KEY_VALUE_FUNC, + new Fields("topic", "partition", "offset", "key", "value"), TOPIC_0_1_STREAM); + trans.forTopic(TOPICS[2], + TOPIC_PART_OFF_KEY_VALUE_FUNC, + new Fields("topic", "partition", "offset", "key", "value"), TOPIC_2_STREAM); + return KafkaSpoutConfig.builder("127.0.0.1:9092", TOPICS) + .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup") + .setRetry(getRetryService()) + .setRecordTranslator(trans) .setOffsetCommitPeriodMs(10_000) .setFirstPollOffsetStrategy(EARLIEST) .setMaxUncommittedOffsets(250) @@ -107,30 +123,4 @@ public class KafkaSpoutTopologyMainNamedTopics { return new KafkaSpoutRetryExponentialBackoff(TimeInterval.microSeconds(500), TimeInterval.milliSeconds(2), Integer.MAX_VALUE, TimeInterval.seconds(10)); } - - protected Map<String,Object> getKafkaConsumerProps() { - Map<String, Object> props = new HashMap<>(); -// props.put(KafkaSpoutConfig.Consumer.ENABLE_AUTO_COMMIT, "true"); - props.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS, "127.0.0.1:9092"); - props.put(KafkaSpoutConfig.Consumer.GROUP_ID, "kafkaSpoutTestGroup"); - props.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer"); - props.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer"); - return props; - } - - protected KafkaSpoutTuplesBuilder<String, String> getTuplesBuilder() { - return new KafkaSpoutTuplesBuilderNamedTopics.Builder<>( - new TopicsTest0Test1TupleBuilder<String, String>(TOPICS[0], TOPICS[1]), - new TopicTest2TupleBuilder<String, String>(TOPICS[2])) - .build(); - } - - protected KafkaSpoutStreams getKafkaSpoutStreams() { - final Fields outputFields = new Fields("topic", "partition", "offset", "key", "value"); - final Fields outputFields1 = new Fields("topic", "partition", "offset"); - return new KafkaSpoutStreamsNamedTopics.Builder(outputFields, STREAMS[0], new String[]{TOPICS[0], TOPICS[1]}) // contents of topics test, test1, sent to test_stream - .addStream(outputFields, STREAMS[0], new String[]{TOPICS[2]}) // contents of topic test2 sent to test_stream - .addStream(outputFields1, STREAMS[2], new String[]{TOPICS[2]}) // contents of topic test2 sent to test2_stream - .build(); - } } http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java index c362a2b..f0004ea 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java @@ -18,45 +18,51 @@ package org.apache.storm.kafka.spout.test; +import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST; + +import java.util.List; +import java.util.regex.Pattern; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.storm.generated.StormTopology; +import org.apache.storm.kafka.spout.Func; import org.apache.storm.kafka.spout.KafkaSpout; -import org.apache.storm.kafka.spout.KafkaSpoutStream; -import org.apache.storm.kafka.spout.KafkaSpoutStreams; -import org.apache.storm.kafka.spout.KafkaSpoutStreamsWildcardTopics; -import org.apache.storm.kafka.spout.KafkaSpoutTupleBuilder; -import org.apache.storm.kafka.spout.KafkaSpoutTuplesBuilder; -import org.apache.storm.kafka.spout.KafkaSpoutTuplesBuilderWildcardTopics; +import org.apache.storm.kafka.spout.KafkaSpoutConfig; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields; - -import java.util.regex.Pattern; +import org.apache.storm.tuple.Values; public class KafkaSpoutTopologyMainWildcardTopics extends KafkaSpoutTopologyMainNamedTopics { private static final String STREAM = "test_wildcard_stream"; - private static final String TOPIC_WILDCARD_PATTERN = "test[1|2]"; + private static final Pattern TOPIC_WILDCARD_PATTERN = Pattern.compile("test[1|2]"); public static void main(String[] args) throws Exception { new KafkaSpoutTopologyMainWildcardTopics().runMain(args); } - protected StormTopology getTopolgyKafkaSpout() { + protected StormTopology getTopologyKafkaSpout() { final TopologyBuilder tp = new TopologyBuilder(); - tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams())), 1); + tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig()), 1); tp.setBolt("kafka_bolt", new KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAM); return tp.createTopology(); } - protected KafkaSpoutTuplesBuilder<String, String> getTuplesBuilder() { - return new KafkaSpoutTuplesBuilderWildcardTopics<>(getTupleBuilder()); - } - - protected KafkaSpoutTupleBuilder<String, String> getTupleBuilder() { - return new TopicsTest0Test1TupleBuilder<>(TOPIC_WILDCARD_PATTERN); - } - - protected KafkaSpoutStreams getKafkaSpoutStreams() { - final Fields outputFields = new Fields("topic", "partition", "offset", "key", "value"); - final KafkaSpoutStream kafkaSpoutStream = new KafkaSpoutStream(outputFields, STREAM, Pattern.compile(TOPIC_WILDCARD_PATTERN)); - return new KafkaSpoutStreamsWildcardTopics(kafkaSpoutStream); + public static Func<ConsumerRecord<String, String>, List<Object>> TOPIC_PART_OFF_KEY_VALUE_FUNC = new Func<ConsumerRecord<String, String>, List<Object>>() { + @Override + public List<Object> apply(ConsumerRecord<String, String> r) { + return new Values(r.topic(), r.partition(), r.offset(), r.key(), r.value()); + } + }; + + protected KafkaSpoutConfig<String,String> getKafkaSpoutConfig() { + return KafkaSpoutConfig.builder("127.0.0.1:9092", TOPIC_WILDCARD_PATTERN) + .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup") + .setRetry(getRetryService()) + .setRecordTranslator(TOPIC_PART_OFF_KEY_VALUE_FUNC, + new Fields("topic", "partition", "offset", "key", "value"), STREAM) + .setOffsetCommitPeriodMs(10_000) + .setFirstPollOffsetStrategy(EARLIEST) + .setMaxUncommittedOffsets(250) + .build(); } } http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicTest2TupleBuilder.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicTest2TupleBuilder.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicTest2TupleBuilder.java deleted file mode 100644 index ca65177..0000000 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicTest2TupleBuilder.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.kafka.spout.test; - -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.storm.kafka.spout.KafkaSpoutTupleBuilder; -import org.apache.storm.tuple.Values; - -import java.util.List; - -public class TopicTest2TupleBuilder<K, V> extends KafkaSpoutTupleBuilder<K,V> { - /** - * @param topics list of topics that use this implementation to build tuples - */ - public TopicTest2TupleBuilder(String... topics) { - super(topics); - } - - @Override - public List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord) { - return new Values(consumerRecord.topic(), - consumerRecord.partition(), - consumerRecord.offset()); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicsTest0Test1TupleBuilder.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicsTest0Test1TupleBuilder.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicsTest0Test1TupleBuilder.java deleted file mode 100644 index 4c55aa1..0000000 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicsTest0Test1TupleBuilder.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.kafka.spout.test; - -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.storm.kafka.spout.KafkaSpoutTupleBuilder; -import org.apache.storm.tuple.Values; - -import java.util.List; - -public class TopicsTest0Test1TupleBuilder<K, V> extends KafkaSpoutTupleBuilder<K,V> { - /** - * @param topics list of topics that use this implementation to build tuples - */ - public TopicsTest0Test1TupleBuilder(String... topics) { - super(topics); - } - - @Override - public List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord) { - return new Values(consumerRecord.topic(), - consumerRecord.partition(), - consumerRecord.offset(), - consumerRecord.key(), - consumerRecord.value()); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadataTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadataTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadataTest.java new file mode 100644 index 0000000..da87a03 --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadataTest.java @@ -0,0 +1,66 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout.trident; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +import java.util.Collections; +import java.util.Map; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.SpoutWithMockedConsumerSetupHelper; +import org.apache.storm.shade.org.json.simple.JSONValue; +import org.junit.Test; + +public class KafkaTridentSpoutBatchMetadataTest { + + @SuppressWarnings("rawtypes") + @Test + public void testMetadataIsRoundTripSerializableWithJsonSimple() throws Exception { + /** + * Tests that the metadata object can be converted to and from a Map. This is needed because Trident metadata is written to + * Zookeeper as JSON with the json-simple library, so the spout converts the metadata to Map before returning it to Trident. + * It is important that all map entries are types json-simple knows about, + * since otherwise the library just calls toString on them which will likely produce invalid JSON. + */ + TopicPartition tp = new TopicPartition("topic", 0); + long startOffset = 10; + long endOffset = 20; + + KafkaTridentSpoutBatchMetadata metadata = new KafkaTridentSpoutBatchMetadata(tp, startOffset, endOffset); + Map<String, Object> map = metadata.toMap(); + Map deserializedMap = (Map)JSONValue.parseWithException(JSONValue.toJSONString(map)); + KafkaTridentSpoutBatchMetadata deserializedMetadata = KafkaTridentSpoutBatchMetadata.fromMap(deserializedMap); + assertThat(deserializedMetadata.getTopicPartition(), is(metadata.getTopicPartition())); + assertThat(deserializedMetadata.getFirstOffset(), is(metadata.getFirstOffset())); + assertThat(deserializedMetadata.getLastOffset(), is(metadata.getLastOffset())); + } + + @Test + public void testCreateMetadataFromRecords() { + TopicPartition tp = new TopicPartition("topic", 0); + long firstOffset = 15; + long lastOffset = 55; + ConsumerRecords<?, ?> records = new ConsumerRecords<>(Collections.singletonMap(tp, SpoutWithMockedConsumerSetupHelper.createRecords(tp, firstOffset, (int) (lastOffset - firstOffset + 1)))); + + KafkaTridentSpoutBatchMetadata metadata = new KafkaTridentSpoutBatchMetadata(tp, records); + assertThat("The first offset should be the first offset in the record set", metadata.getFirstOffset(), is(firstOffset)); + assertThat("The last offset should be the last offset in the record set", metadata.getLastOffset(), is(lastOffset)); + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/resources/log4j2.xml ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/resources/log4j2.xml b/external/storm-kafka-client/src/test/resources/log4j2.xml new file mode 100755 index 0000000..393dd2c --- /dev/null +++ b/external/storm-kafka-client/src/test/resources/log4j2.xml @@ -0,0 +1,32 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<Configuration status="WARN"> + <Appenders> + <Console name="Console" target="SYSTEM_OUT"> + <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n" charset="UTF-8"/> + </Console> + </Appenders> + <Loggers> + <Root level="WARN"> + <AppenderRef ref="Console"/> + </Root> + <Logger name="org.apache.storm.kafka" level="INFO" additivity="false"> + <AppenderRef ref="Console"/> + </Logger> + </Loggers> +</Configuration> \ No newline at end of file