Repository: flume Updated Branches: refs/heads/flume-1.6 99736f6ee -> be2dbf1ab
FLUME-2479. Kafka property auto.commit.enable is incorrect for KafkaSource. (Gwen Shapira via Hari) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/be2dbf1a Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/be2dbf1a Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/be2dbf1a Branch: refs/heads/flume-1.6 Commit: be2dbf1ab68c8b22f565585a627886e78e190503 Parents: 99736f6 Author: Hari Shreedharan <[email protected]> Authored: Fri Oct 3 11:52:13 2014 -0700 Committer: Hari Shreedharan <[email protected]> Committed: Fri Oct 3 11:52:13 2014 -0700 ---------------------------------------------------------------------- .../source/kafka/KafkaSourceConstants.java | 4 +- .../flume/source/kafka/KafkaSourceTest.java | 223 ------------------- .../flume/source/kafka/KafkaSourceUtilTest.java | 92 -------- 3 files changed, 2 insertions(+), 317 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/be2dbf1a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java index 169cc10..7390618 100644 --- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java +++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java @@ -22,7 +22,7 @@ public class KafkaSourceConstants { public static final String BATCH_SIZE = "batchSize"; public static final String BATCH_DURATION_MS = "batchDurationMillis"; public static final String CONSUMER_TIMEOUT = "consumer.timeout.ms"; - public static final String AUTO_COMMIT_ENABLED = "auto.commit.enabled"; + public static final String AUTO_COMMIT_ENABLED = "auto.commit.enable"; public static final String ZOOKEEPER_CONNECT = "zookeeper.connect"; public static final String ZOOKEEPER_CONNECT_FLUME = "zookeeperConnect"; public static final String GROUP_ID = "group.id"; @@ -33,7 +33,7 @@ public class KafkaSourceConstants { public static final int DEFAULT_BATCH_SIZE = 1000; public static final int DEFAULT_BATCH_DURATION = 1000; public static final String DEFAULT_CONSUMER_TIMEOUT = "10"; - public static final boolean DEFAULT_AUTO_COMMIT = false; + public static final String DEFAULT_AUTO_COMMIT = "false"; public static final String DEFAULT_GROUP_ID = "flume"; } http://git-wip-us.apache.org/repos/asf/flume/blob/be2dbf1a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceTest.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceTest.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceTest.java deleted file mode 100644 index 7684616..0000000 --- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceTest.java +++ /dev/null @@ -1,223 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flume.source.kafka; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyList; -import static org.mockito.Mockito.*; - -import java.lang.reflect.Field; -import java.nio.ByteBuffer; -import java.util.List; -import java.util.Properties; - -import com.google.common.base.Charsets; -import com.google.common.collect.Lists; -import junit.framework.Assert; -import kafka.common.TopicExistsException; -import kafka.consumer.ConsumerIterator; -import kafka.message.Message; - -import kafka.message.MessageAndMetadata; - -import org.apache.flume.*; -import org.apache.flume.PollableSource.Status; -import org.apache.flume.channel.ChannelProcessor; -import org.apache.flume.conf.Configurables; -import org.apache.flume.source.AbstractSource; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class KafkaSourceTest { - private static final Logger log = - LoggerFactory.getLogger(KafkaSourceTest.class); - - private KafkaSource kafkaSource; - private KafkaSourceEmbeddedKafka kafkaServer; - private ConsumerIterator<byte[], byte[]> mockIt; - private Message message; - private Context context; - private List<Event> events; - private String topicName = "test1"; - - - @SuppressWarnings("unchecked") - @Before - public void setup() throws Exception { - - kafkaSource = new KafkaSource(); - kafkaServer = new KafkaSourceEmbeddedKafka(); - try { - kafkaServer.createTopic(topicName); - } catch (TopicExistsException e) { - //do nothing - } - - context = new Context(); - context.put(KafkaSourceConstants.ZOOKEEPER_CONNECT_FLUME, - kafkaServer.getZkConnectString()); - context.put(KafkaSourceConstants.GROUP_ID_FLUME,"flume"); - context.put(KafkaSourceConstants.TOPIC,topicName); - context.put("kafka.consumer.timeout.ms","100"); - - ChannelProcessor channelProcessor = mock(ChannelProcessor.class); - - events = Lists.newArrayList(); - - doAnswer(new Answer<Void>() { - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - events.addAll((List<Event>)invocation.getArguments()[0]); - return null; - } - }).when(channelProcessor).processEventBatch(any(List.class)); - kafkaSource.setChannelProcessor(channelProcessor); - } - - @After - public void tearDown() throws Exception { - kafkaSource.stop(); - kafkaServer.stop(); - } - - @SuppressWarnings("unchecked") - @Test - public void testProcessItNotEmpty() throws EventDeliveryException, - SecurityException, NoSuchFieldException, IllegalArgumentException, - IllegalAccessException, InterruptedException { - context.put(KafkaSourceConstants.BATCH_SIZE,"1"); - kafkaSource.configure(context); - kafkaSource.start(); - - Thread.sleep(500L); - - kafkaServer.produce(topicName, "", "hello, world"); - - Thread.sleep(500L); - - Assert.assertEquals(Status.READY, kafkaSource.process()); - Assert.assertEquals(Status.BACKOFF, kafkaSource.process()); - Assert.assertEquals(1, events.size()); - - Assert.assertEquals("hello, world", new String(events.get(0).getBody(), - Charsets.UTF_8)); - - - } - - @SuppressWarnings("unchecked") - @Test - public void testProcessItNotEmptyBatch() throws EventDeliveryException, - SecurityException, NoSuchFieldException, IllegalArgumentException, - IllegalAccessException, InterruptedException { - context.put(KafkaSourceConstants.BATCH_SIZE,"2"); - kafkaSource.configure(context); - kafkaSource.start(); - - Thread.sleep(500L); - - kafkaServer.produce(topicName, "", "hello, world"); - kafkaServer.produce(topicName, "", "foo, bar"); - - Thread.sleep(500L); - - Status status = kafkaSource.process(); - assertEquals(Status.READY, status); - Assert.assertEquals("hello, world", new String(events.get(0).getBody(), - Charsets.UTF_8)); - Assert.assertEquals("foo, bar", new String(events.get(1).getBody(), - Charsets.UTF_8)); - - } - - - @SuppressWarnings("unchecked") - @Test - public void testProcessItEmpty() throws EventDeliveryException, - SecurityException, NoSuchFieldException, IllegalArgumentException, - IllegalAccessException, InterruptedException { - kafkaSource.configure(context); - kafkaSource.start(); - Thread.sleep(500L); - - Status status = kafkaSource.process(); - assertEquals(Status.BACKOFF, status); - } - - @SuppressWarnings("unchecked") - @Test - public void testNonExistingTopic() throws EventDeliveryException, - SecurityException, NoSuchFieldException, IllegalArgumentException, - IllegalAccessException, InterruptedException { - context.put(KafkaSourceConstants.TOPIC,"faketopic"); - kafkaSource.configure(context); - kafkaSource.start(); - Thread.sleep(500L); - - Status status = kafkaSource.process(); - assertEquals(Status.BACKOFF, status); - } - - @SuppressWarnings("unchecked") - @Test(expected= FlumeException.class) - public void testNonExistingZk() throws EventDeliveryException, - SecurityException, NoSuchFieldException, IllegalArgumentException, - IllegalAccessException, InterruptedException { - context.put(KafkaSourceConstants.ZOOKEEPER_CONNECT_FLUME,"blabla:666"); - kafkaSource.configure(context); - kafkaSource.start(); - Thread.sleep(500L); - - Status status = kafkaSource.process(); - assertEquals(Status.BACKOFF, status); - } - - @Test - public void testBatchTime() throws InterruptedException, - EventDeliveryException { - context.put(KafkaSourceConstants.BATCH_DURATION_MS,"250"); - kafkaSource.configure(context); - kafkaSource.start(); - - Thread.sleep(500L); - - for (int i=1; i<5000; i++) { - kafkaServer.produce(topicName, "", "hello, world " + i); - } - Thread.sleep(500L); - - long startTime = System.currentTimeMillis(); - Status status = kafkaSource.process(); - long endTime = System.currentTimeMillis(); - assertEquals(Status.READY, status); - assertTrue(endTime - startTime < - ( context.getLong(KafkaSourceConstants.BATCH_DURATION_MS) + - context.getLong("kafka.consumer.timeout.ms")) ); - } - - - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flume/blob/be2dbf1a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceUtilTest.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceUtilTest.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceUtilTest.java deleted file mode 100644 index f87e5ae..0000000 --- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceUtilTest.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flume.source.kafka; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; - -import java.io.IOException; -import java.util.Properties; - -import kafka.javaapi.consumer.ConsumerConnector; -import org.apache.flume.Context; -import org.apache.zookeeper.server.*; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -public class KafkaSourceUtilTest { - private Properties props = new Properties(); - private Context context = new Context(); - private int zkPort = 21818; // none-standard - private KafkaSourceEmbeddedZookeeper zookeeper; - - @Before - public void setUp() throws Exception { - context.put("kafka.consumer.timeout", "10"); - context.put("type", "KafkaSource"); - context.put("topic", "test"); - context.put("zookeeperConnect", "127.0.0.1:"+zkPort); - context.put("groupId","test"); - props = KafkaSourceUtil.getKafkaProperties(context); - zookeeper = new KafkaSourceEmbeddedZookeeper(zkPort); - - - } - - @After - public void tearDown() throws Exception { - zookeeper.stopZookeeper(); - } - - - @Test - public void testGetConsumer() { - ConsumerConnector cc = KafkaSourceUtil.getConsumer(props); - assertNotNull(cc); - - } - - @Test - public void testKafkaConsumerProperties() { - Context context = new Context(); - context.put("kafka.auto.commit.enabled", "override.default.autocommit"); - context.put("kafka.fake.property", "kafka.property.value"); - context.put("kafka.zookeeper.connect","bad-zookeeper-list"); - context.put("zookeeperConnect","real-zookeeper-list"); - Properties kafkaProps = KafkaSourceUtil.getKafkaProperties(context); - - //check that we have defaults set - assertEquals( - kafkaProps.getProperty(KafkaSourceConstants.GROUP_ID), - KafkaSourceConstants.DEFAULT_GROUP_ID); - //check that kafka properties override the default and get correct name - assertEquals( - kafkaProps.getProperty(KafkaSourceConstants.AUTO_COMMIT_ENABLED), - "override.default.autocommit"); - //check that any kafka property gets in - assertEquals(kafkaProps.getProperty("fake.property"), - "kafka.property.value"); - //check that documented property overrides defaults - assertEquals(kafkaProps.getProperty("zookeeper.connect") - ,"real-zookeeper-list"); - } - - -}
