Repository: flume Updated Branches: refs/heads/flume-1.6 be2dbf1ab -> 29ca0bae3
FLUME-2479. Adding new test files. (Gwen Shapira via Hari) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/29ca0bae Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/29ca0bae Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/29ca0bae Branch: refs/heads/flume-1.6 Commit: 29ca0bae3f1e85e74ae8371ed6b598b7fe794a3a Parents: be2dbf1 Author: Hari Shreedharan <[email protected]> Authored: Fri Oct 3 11:56:15 2014 -0700 Committer: Hari Shreedharan <[email protected]> Committed: Fri Oct 3 11:56:47 2014 -0700 ---------------------------------------------------------------------- .../flume/source/kafka/TestKafkaSource.java | 213 +++++++++++++++++++ .../flume/source/kafka/TestKafkaSourceUtil.java | 92 ++++++++ 2 files changed, 305 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/29ca0bae/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java new file mode 100644 index 0000000..3695860 --- /dev/null +++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flume.source.kafka; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.*; + +import java.util.List; + +import com.google.common.base.Charsets; +import com.google.common.collect.Lists; +import junit.framework.Assert; +import kafka.common.TopicExistsException; +import kafka.consumer.ConsumerIterator; +import kafka.message.Message; + +import org.apache.flume.*; +import org.apache.flume.PollableSource.Status; +import org.apache.flume.channel.ChannelProcessor; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestKafkaSource { + private static final Logger log = + LoggerFactory.getLogger(TestKafkaSource.class); + + private KafkaSource kafkaSource; + private KafkaSourceEmbeddedKafka kafkaServer; + private ConsumerIterator<byte[], byte[]> mockIt; + private Message message; + private Context context; + private List<Event> events; + private String topicName = "test1"; + + + @SuppressWarnings("unchecked") + @Before + public void setup() throws Exception { + + kafkaSource = new KafkaSource(); + kafkaServer = new KafkaSourceEmbeddedKafka(); + try { + kafkaServer.createTopic(topicName); + } catch (TopicExistsException e) { + //do nothing + } + + + context = new Context(); + context.put(KafkaSourceConstants.ZOOKEEPER_CONNECT_FLUME, + kafkaServer.getZkConnectString()); + context.put(KafkaSourceConstants.GROUP_ID_FLUME,"flume"); + context.put(KafkaSourceConstants.TOPIC,topicName); + context.put("kafka.consumer.timeout.ms","100"); + + + ChannelProcessor channelProcessor = mock(ChannelProcessor.class); + + events = Lists.newArrayList(); + + doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + events.addAll((List<Event>)invocation.getArguments()[0]); + return null; + } + }).when(channelProcessor).processEventBatch(any(List.class)); + kafkaSource.setChannelProcessor(channelProcessor); + } + + @After + public void tearDown() throws Exception { + kafkaSource.stop(); + kafkaServer.stop(); + } + + @SuppressWarnings("unchecked") + @Test + public void testProcessItNotEmpty() throws EventDeliveryException, + SecurityException, NoSuchFieldException, IllegalArgumentException, + IllegalAccessException, InterruptedException { + context.put(KafkaSourceConstants.BATCH_SIZE,"1"); + kafkaSource.configure(context); + kafkaSource.start(); + + Thread.sleep(500L); + + kafkaServer.produce(topicName, "", "hello, world"); + + Thread.sleep(500L); + + Assert.assertEquals(Status.READY, kafkaSource.process()); + Assert.assertEquals(Status.BACKOFF, kafkaSource.process()); + Assert.assertEquals(1, events.size()); + + Assert.assertEquals("hello, world", new String(events.get(0).getBody(), + Charsets.UTF_8)); + + + } + + @SuppressWarnings("unchecked") + @Test + public void testProcessItNotEmptyBatch() throws EventDeliveryException, + SecurityException, NoSuchFieldException, IllegalArgumentException, + IllegalAccessException, InterruptedException { + context.put(KafkaSourceConstants.BATCH_SIZE,"2"); + kafkaSource.configure(context); + kafkaSource.start(); + + Thread.sleep(500L); + + kafkaServer.produce(topicName, "", "hello, world"); + kafkaServer.produce(topicName, "", "foo, bar"); + + Thread.sleep(500L); + + Status status = kafkaSource.process(); + assertEquals(Status.READY, status); + Assert.assertEquals("hello, world", new String(events.get(0).getBody(), + Charsets.UTF_8)); + Assert.assertEquals("foo, bar", new String(events.get(1).getBody(), + Charsets.UTF_8)); + + } + + + @SuppressWarnings("unchecked") + @Test + public void testProcessItEmpty() throws EventDeliveryException, + SecurityException, NoSuchFieldException, IllegalArgumentException, + IllegalAccessException, InterruptedException { + kafkaSource.configure(context); + kafkaSource.start(); + Thread.sleep(500L); + + Status status = kafkaSource.process(); + assertEquals(Status.BACKOFF, status); + } + + @SuppressWarnings("unchecked") + @Test + public void testNonExistingTopic() throws EventDeliveryException, + SecurityException, NoSuchFieldException, IllegalArgumentException, + IllegalAccessException, InterruptedException { + context.put(KafkaSourceConstants.TOPIC,"faketopic"); + kafkaSource.configure(context); + kafkaSource.start(); + Thread.sleep(500L); + + Status status = kafkaSource.process(); + assertEquals(Status.BACKOFF, status); + } + + @SuppressWarnings("unchecked") + @Test(expected= FlumeException.class) + public void testNonExistingZk() throws EventDeliveryException, + SecurityException, NoSuchFieldException, IllegalArgumentException, + IllegalAccessException, InterruptedException { + context.put(KafkaSourceConstants.ZOOKEEPER_CONNECT_FLUME,"blabla:666"); + kafkaSource.configure(context); + kafkaSource.start(); + Thread.sleep(500L); + + Status status = kafkaSource.process(); + assertEquals(Status.BACKOFF, status); + } + + @Test + public void testBatchTime() throws InterruptedException, + EventDeliveryException { + context.put(KafkaSourceConstants.BATCH_DURATION_MS,"250"); + kafkaSource.configure(context); + kafkaSource.start(); + + Thread.sleep(500L); + + for (int i=1; i<5000; i++) { + kafkaServer.produce(topicName, "", "hello, world " + i); + } + Thread.sleep(500L); + + long startTime = System.currentTimeMillis(); + Status status = kafkaSource.process(); + long endTime = System.currentTimeMillis(); + assertEquals(Status.READY, status); + assertTrue(endTime - startTime < + ( context.getLong(KafkaSourceConstants.BATCH_DURATION_MS) + + context.getLong("kafka.consumer.timeout.ms")) ); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flume/blob/29ca0bae/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSourceUtil.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSourceUtil.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSourceUtil.java new file mode 100644 index 0000000..0cbb4b6 --- /dev/null +++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSourceUtil.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flume.source.kafka; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +import java.io.IOException; +import java.util.Properties; + +import kafka.javaapi.consumer.ConsumerConnector; +import org.apache.flume.Context; +import org.apache.zookeeper.server.*; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestKafkaSourceUtil { + private Properties props = new Properties(); + private Context context = new Context(); + private int zkPort = 21818; // none-standard + private KafkaSourceEmbeddedZookeeper zookeeper; + + @Before + public void setUp() throws Exception { + context.put("kafka.consumer.timeout", "10"); + context.put("type", "KafkaSource"); + context.put("topic", "test"); + context.put("zookeeperConnect", "127.0.0.1:"+zkPort); + context.put("groupId","test"); + props = KafkaSourceUtil.getKafkaProperties(context); + zookeeper = new KafkaSourceEmbeddedZookeeper(zkPort); + + + } + + @After + public void tearDown() throws Exception { + zookeeper.stopZookeeper(); + } + + + @Test + public void testGetConsumer() { + ConsumerConnector cc = KafkaSourceUtil.getConsumer(props); + assertNotNull(cc); + + } + + @Test + public void testKafkaConsumerProperties() { + Context context = new Context(); + context.put("kafka.auto.commit.enable", "override.default.autocommit"); + context.put("kafka.fake.property", "kafka.property.value"); + context.put("kafka.zookeeper.connect","bad-zookeeper-list"); + context.put("zookeeperConnect","real-zookeeper-list"); + Properties kafkaProps = KafkaSourceUtil.getKafkaProperties(context); + + //check that we have defaults set + assertEquals( + kafkaProps.getProperty(KafkaSourceConstants.GROUP_ID), + KafkaSourceConstants.DEFAULT_GROUP_ID); + //check that kafka properties override the default and get correct name + assertEquals( + kafkaProps.getProperty(KafkaSourceConstants.AUTO_COMMIT_ENABLED), + "override.default.autocommit"); + //check that any kafka property gets in + assertEquals(kafkaProps.getProperty("fake.property"), + "kafka.property.value"); + //check that documented property overrides defaults + assertEquals(kafkaProps.getProperty("zookeeper.connect") + ,"real-zookeeper-list"); + } + + +}
