Repository: flume
Updated Branches:
  refs/heads/flume-1.6 be2dbf1ab -> 29ca0bae3


FLUME-2479. Adding new test files.

(Gwen Shapira via Hari)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/29ca0bae
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/29ca0bae
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/29ca0bae

Branch: refs/heads/flume-1.6
Commit: 29ca0bae3f1e85e74ae8371ed6b598b7fe794a3a
Parents: be2dbf1
Author: Hari Shreedharan <[email protected]>
Authored: Fri Oct 3 11:56:15 2014 -0700
Committer: Hari Shreedharan <[email protected]>
Committed: Fri Oct 3 11:56:47 2014 -0700

----------------------------------------------------------------------
 .../flume/source/kafka/TestKafkaSource.java     | 213 +++++++++++++++++++
 .../flume/source/kafka/TestKafkaSourceUtil.java |  92 ++++++++
 2 files changed, 305 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/29ca0bae/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
 
b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
new file mode 100644
index 0000000..3695860
--- /dev/null
+++ 
b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.source.kafka;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+
+import java.util.List;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
+import junit.framework.Assert;
+import kafka.common.TopicExistsException;
+import kafka.consumer.ConsumerIterator;
+import kafka.message.Message;
+
+import org.apache.flume.*;
+import org.apache.flume.PollableSource.Status;
+import org.apache.flume.channel.ChannelProcessor;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestKafkaSource {
+  private static final Logger log =
+          LoggerFactory.getLogger(TestKafkaSource.class);
+
+  private KafkaSource kafkaSource;
+  private KafkaSourceEmbeddedKafka kafkaServer;
+  private ConsumerIterator<byte[], byte[]> mockIt;
+  private Message message;
+  private Context context;
+  private List<Event> events;
+  private String topicName = "test1";
+
+
+  @SuppressWarnings("unchecked")
+  @Before
+  public void setup() throws Exception {
+
+    kafkaSource = new KafkaSource();
+    kafkaServer = new KafkaSourceEmbeddedKafka();
+    try {
+      kafkaServer.createTopic(topicName);
+    } catch (TopicExistsException e) {
+      //do nothing
+    }
+
+
+    context = new Context();
+    context.put(KafkaSourceConstants.ZOOKEEPER_CONNECT_FLUME,
+            kafkaServer.getZkConnectString());
+    context.put(KafkaSourceConstants.GROUP_ID_FLUME,"flume");
+    context.put(KafkaSourceConstants.TOPIC,topicName);
+    context.put("kafka.consumer.timeout.ms","100");
+
+
+    ChannelProcessor channelProcessor = mock(ChannelProcessor.class);
+
+    events = Lists.newArrayList();
+
+    doAnswer(new Answer<Void>() {
+      @Override
+      public Void answer(InvocationOnMock invocation) throws Throwable {
+        events.addAll((List<Event>)invocation.getArguments()[0]);
+        return null;
+      }
+    }).when(channelProcessor).processEventBatch(any(List.class));
+    kafkaSource.setChannelProcessor(channelProcessor);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    kafkaSource.stop();
+    kafkaServer.stop();
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testProcessItNotEmpty() throws EventDeliveryException,
+          SecurityException, NoSuchFieldException, IllegalArgumentException,
+          IllegalAccessException, InterruptedException {
+    context.put(KafkaSourceConstants.BATCH_SIZE,"1");
+    kafkaSource.configure(context);
+    kafkaSource.start();
+
+    Thread.sleep(500L);
+
+    kafkaServer.produce(topicName, "", "hello, world");
+
+    Thread.sleep(500L);
+
+    Assert.assertEquals(Status.READY, kafkaSource.process());
+    Assert.assertEquals(Status.BACKOFF, kafkaSource.process());
+    Assert.assertEquals(1, events.size());
+
+    Assert.assertEquals("hello, world", new String(events.get(0).getBody(),
+            Charsets.UTF_8));
+
+
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testProcessItNotEmptyBatch() throws EventDeliveryException,
+          SecurityException, NoSuchFieldException, IllegalArgumentException,
+          IllegalAccessException, InterruptedException {
+    context.put(KafkaSourceConstants.BATCH_SIZE,"2");
+    kafkaSource.configure(context);
+    kafkaSource.start();
+
+    Thread.sleep(500L);
+
+    kafkaServer.produce(topicName, "", "hello, world");
+    kafkaServer.produce(topicName, "", "foo, bar");
+
+    Thread.sleep(500L);
+
+    Status status = kafkaSource.process();
+    assertEquals(Status.READY, status);
+    Assert.assertEquals("hello, world", new String(events.get(0).getBody(),
+            Charsets.UTF_8));
+    Assert.assertEquals("foo, bar", new String(events.get(1).getBody(),
+            Charsets.UTF_8));
+
+  }
+
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testProcessItEmpty() throws EventDeliveryException,
+          SecurityException, NoSuchFieldException, IllegalArgumentException,
+          IllegalAccessException, InterruptedException {
+    kafkaSource.configure(context);
+    kafkaSource.start();
+    Thread.sleep(500L);
+
+    Status status = kafkaSource.process();
+    assertEquals(Status.BACKOFF, status);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testNonExistingTopic() throws EventDeliveryException,
+          SecurityException, NoSuchFieldException, IllegalArgumentException,
+          IllegalAccessException, InterruptedException {
+    context.put(KafkaSourceConstants.TOPIC,"faketopic");
+    kafkaSource.configure(context);
+    kafkaSource.start();
+    Thread.sleep(500L);
+
+    Status status = kafkaSource.process();
+    assertEquals(Status.BACKOFF, status);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test(expected= FlumeException.class)
+  public void testNonExistingZk() throws EventDeliveryException,
+          SecurityException, NoSuchFieldException, IllegalArgumentException,
+          IllegalAccessException, InterruptedException {
+    context.put(KafkaSourceConstants.ZOOKEEPER_CONNECT_FLUME,"blabla:666");
+    kafkaSource.configure(context);
+    kafkaSource.start();
+    Thread.sleep(500L);
+
+    Status status = kafkaSource.process();
+    assertEquals(Status.BACKOFF, status);
+  }
+
+  @Test
+  public void testBatchTime() throws InterruptedException,
+          EventDeliveryException {
+    context.put(KafkaSourceConstants.BATCH_DURATION_MS,"250");
+    kafkaSource.configure(context);
+    kafkaSource.start();
+
+    Thread.sleep(500L);
+
+    for (int i=1; i<5000; i++) {
+      kafkaServer.produce(topicName, "", "hello, world " + i);
+    }
+    Thread.sleep(500L);
+
+    long startTime = System.currentTimeMillis();
+    Status status = kafkaSource.process();
+    long endTime = System.currentTimeMillis();
+    assertEquals(Status.READY, status);
+    assertTrue(endTime - startTime <
+            ( context.getLong(KafkaSourceConstants.BATCH_DURATION_MS) +
+            context.getLong("kafka.consumer.timeout.ms")) );
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flume/blob/29ca0bae/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSourceUtil.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSourceUtil.java
 
b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSourceUtil.java
new file mode 100644
index 0000000..0cbb4b6
--- /dev/null
+++ 
b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSourceUtil.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.source.kafka;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import kafka.javaapi.consumer.ConsumerConnector;
+import org.apache.flume.Context;
+import org.apache.zookeeper.server.*;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestKafkaSourceUtil {
+  private Properties props = new Properties();
+  private Context context = new Context();
+  private int zkPort = 21818; // none-standard
+  private KafkaSourceEmbeddedZookeeper zookeeper;
+
+  @Before
+  public void setUp() throws Exception {
+    context.put("kafka.consumer.timeout", "10");
+    context.put("type", "KafkaSource");
+    context.put("topic", "test");
+    context.put("zookeeperConnect", "127.0.0.1:"+zkPort);
+    context.put("groupId","test");
+    props = KafkaSourceUtil.getKafkaProperties(context);
+    zookeeper = new KafkaSourceEmbeddedZookeeper(zkPort);
+
+
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    zookeeper.stopZookeeper();
+  }
+
+
+  @Test
+  public void testGetConsumer() {
+    ConsumerConnector cc = KafkaSourceUtil.getConsumer(props);
+    assertNotNull(cc);
+
+  }
+
+  @Test
+  public void testKafkaConsumerProperties() {
+    Context context = new Context();
+    context.put("kafka.auto.commit.enable", "override.default.autocommit");
+    context.put("kafka.fake.property", "kafka.property.value");
+    context.put("kafka.zookeeper.connect","bad-zookeeper-list");
+    context.put("zookeeperConnect","real-zookeeper-list");
+    Properties kafkaProps = KafkaSourceUtil.getKafkaProperties(context);
+
+    //check that we have defaults set
+    assertEquals(
+            kafkaProps.getProperty(KafkaSourceConstants.GROUP_ID),
+            KafkaSourceConstants.DEFAULT_GROUP_ID);
+    //check that kafka properties override the default and get correct name
+    assertEquals(
+            kafkaProps.getProperty(KafkaSourceConstants.AUTO_COMMIT_ENABLED),
+            "override.default.autocommit");
+    //check that any kafka property gets in
+    assertEquals(kafkaProps.getProperty("fake.property"),
+            "kafka.property.value");
+    //check that documented property overrides defaults
+    assertEquals(kafkaProps.getProperty("zookeeper.connect")
+            ,"real-zookeeper-list");
+  }
+
+
+}

Reply via email to