Repository: flume
Updated Branches:
  refs/heads/flume-1.6 99736f6ee -> be2dbf1ab


FLUME-2479. Kafka property auto.commit.enable is incorrect for KafkaSource.

(Gwen Shapira via Hari)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/be2dbf1a
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/be2dbf1a
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/be2dbf1a

Branch: refs/heads/flume-1.6
Commit: be2dbf1ab68c8b22f565585a627886e78e190503
Parents: 99736f6
Author: Hari Shreedharan <[email protected]>
Authored: Fri Oct 3 11:52:13 2014 -0700
Committer: Hari Shreedharan <[email protected]>
Committed: Fri Oct 3 11:52:13 2014 -0700

----------------------------------------------------------------------
 .../source/kafka/KafkaSourceConstants.java      |   4 +-
 .../flume/source/kafka/KafkaSourceTest.java     | 223 -------------------
 .../flume/source/kafka/KafkaSourceUtilTest.java |  92 --------
 3 files changed, 2 insertions(+), 317 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/be2dbf1a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
 
b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
index 169cc10..7390618 100644
--- 
a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
+++ 
b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
@@ -22,7 +22,7 @@ public class KafkaSourceConstants {
   public static final String BATCH_SIZE = "batchSize";
   public static final String BATCH_DURATION_MS = "batchDurationMillis";
   public static final String CONSUMER_TIMEOUT = "consumer.timeout.ms";
-  public static final String AUTO_COMMIT_ENABLED = "auto.commit.enabled";
+  public static final String AUTO_COMMIT_ENABLED = "auto.commit.enable";
   public static final String ZOOKEEPER_CONNECT = "zookeeper.connect";
   public static final String ZOOKEEPER_CONNECT_FLUME = "zookeeperConnect";
   public static final String GROUP_ID = "group.id";
@@ -33,7 +33,7 @@ public class KafkaSourceConstants {
   public static final int DEFAULT_BATCH_SIZE = 1000;
   public static final int DEFAULT_BATCH_DURATION = 1000;
   public static final String DEFAULT_CONSUMER_TIMEOUT = "10";
-  public static final boolean DEFAULT_AUTO_COMMIT =  false;
+  public static final String DEFAULT_AUTO_COMMIT =  "false";
   public static final String DEFAULT_GROUP_ID = "flume";
 
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/be2dbf1a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceTest.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceTest.java
 
b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceTest.java
deleted file mode 100644
index 7684616..0000000
--- 
a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceTest.java
+++ /dev/null
@@ -1,223 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flume.source.kafka;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyList;
-import static org.mockito.Mockito.*;
-
-import java.lang.reflect.Field;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Properties;
-
-import com.google.common.base.Charsets;
-import com.google.common.collect.Lists;
-import junit.framework.Assert;
-import kafka.common.TopicExistsException;
-import kafka.consumer.ConsumerIterator;
-import kafka.message.Message;
-
-import kafka.message.MessageAndMetadata;
-
-import org.apache.flume.*;
-import org.apache.flume.PollableSource.Status;
-import org.apache.flume.channel.ChannelProcessor;
-import org.apache.flume.conf.Configurables;
-import org.apache.flume.source.AbstractSource;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class KafkaSourceTest {
-  private static final Logger log =
-          LoggerFactory.getLogger(KafkaSourceTest.class);
-
-  private KafkaSource kafkaSource;
-  private KafkaSourceEmbeddedKafka kafkaServer;
-  private ConsumerIterator<byte[], byte[]> mockIt;
-  private Message message;
-  private Context context;
-  private List<Event> events;
-  private String topicName = "test1";
-
-
-  @SuppressWarnings("unchecked")
-  @Before
-  public void setup() throws Exception {
-
-    kafkaSource = new KafkaSource();
-    kafkaServer = new KafkaSourceEmbeddedKafka();
-    try {
-      kafkaServer.createTopic(topicName);
-    } catch (TopicExistsException e) {
-      //do nothing
-    }
-
-    context = new Context();
-    context.put(KafkaSourceConstants.ZOOKEEPER_CONNECT_FLUME,
-            kafkaServer.getZkConnectString());
-    context.put(KafkaSourceConstants.GROUP_ID_FLUME,"flume");
-    context.put(KafkaSourceConstants.TOPIC,topicName);
-    context.put("kafka.consumer.timeout.ms","100");
-
-    ChannelProcessor channelProcessor = mock(ChannelProcessor.class);
-
-    events = Lists.newArrayList();
-
-    doAnswer(new Answer<Void>() {
-      @Override
-      public Void answer(InvocationOnMock invocation) throws Throwable {
-        events.addAll((List<Event>)invocation.getArguments()[0]);
-        return null;
-      }
-    }).when(channelProcessor).processEventBatch(any(List.class));
-    kafkaSource.setChannelProcessor(channelProcessor);
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    kafkaSource.stop();
-    kafkaServer.stop();
-  }
-
-  @SuppressWarnings("unchecked")
-  @Test
-  public void testProcessItNotEmpty() throws EventDeliveryException,
-          SecurityException, NoSuchFieldException, IllegalArgumentException,
-          IllegalAccessException, InterruptedException {
-    context.put(KafkaSourceConstants.BATCH_SIZE,"1");
-    kafkaSource.configure(context);
-    kafkaSource.start();
-
-    Thread.sleep(500L);
-
-    kafkaServer.produce(topicName, "", "hello, world");
-
-    Thread.sleep(500L);
-
-    Assert.assertEquals(Status.READY, kafkaSource.process());
-    Assert.assertEquals(Status.BACKOFF, kafkaSource.process());
-    Assert.assertEquals(1, events.size());
-
-    Assert.assertEquals("hello, world", new String(events.get(0).getBody(),
-            Charsets.UTF_8));
-
-
-  }
-
-  @SuppressWarnings("unchecked")
-  @Test
-  public void testProcessItNotEmptyBatch() throws EventDeliveryException,
-          SecurityException, NoSuchFieldException, IllegalArgumentException,
-          IllegalAccessException, InterruptedException {
-    context.put(KafkaSourceConstants.BATCH_SIZE,"2");
-    kafkaSource.configure(context);
-    kafkaSource.start();
-
-    Thread.sleep(500L);
-
-    kafkaServer.produce(topicName, "", "hello, world");
-    kafkaServer.produce(topicName, "", "foo, bar");
-
-    Thread.sleep(500L);
-
-    Status status = kafkaSource.process();
-    assertEquals(Status.READY, status);
-    Assert.assertEquals("hello, world", new String(events.get(0).getBody(),
-            Charsets.UTF_8));
-    Assert.assertEquals("foo, bar", new String(events.get(1).getBody(),
-            Charsets.UTF_8));
-
-  }
-
-
-  @SuppressWarnings("unchecked")
-  @Test
-  public void testProcessItEmpty() throws EventDeliveryException,
-          SecurityException, NoSuchFieldException, IllegalArgumentException,
-          IllegalAccessException, InterruptedException {
-    kafkaSource.configure(context);
-    kafkaSource.start();
-    Thread.sleep(500L);
-
-    Status status = kafkaSource.process();
-    assertEquals(Status.BACKOFF, status);
-  }
-
-  @SuppressWarnings("unchecked")
-  @Test
-  public void testNonExistingTopic() throws EventDeliveryException,
-          SecurityException, NoSuchFieldException, IllegalArgumentException,
-          IllegalAccessException, InterruptedException {
-    context.put(KafkaSourceConstants.TOPIC,"faketopic");
-    kafkaSource.configure(context);
-    kafkaSource.start();
-    Thread.sleep(500L);
-
-    Status status = kafkaSource.process();
-    assertEquals(Status.BACKOFF, status);
-  }
-
-  @SuppressWarnings("unchecked")
-  @Test(expected= FlumeException.class)
-  public void testNonExistingZk() throws EventDeliveryException,
-          SecurityException, NoSuchFieldException, IllegalArgumentException,
-          IllegalAccessException, InterruptedException {
-    context.put(KafkaSourceConstants.ZOOKEEPER_CONNECT_FLUME,"blabla:666");
-    kafkaSource.configure(context);
-    kafkaSource.start();
-    Thread.sleep(500L);
-
-    Status status = kafkaSource.process();
-    assertEquals(Status.BACKOFF, status);
-  }
-
-  @Test
-  public void testBatchTime() throws InterruptedException,
-          EventDeliveryException {
-    context.put(KafkaSourceConstants.BATCH_DURATION_MS,"250");
-    kafkaSource.configure(context);
-    kafkaSource.start();
-
-    Thread.sleep(500L);
-
-    for (int i=1; i<5000; i++) {
-      kafkaServer.produce(topicName, "", "hello, world " + i);
-    }
-    Thread.sleep(500L);
-
-    long startTime = System.currentTimeMillis();
-    Status status = kafkaSource.process();
-    long endTime = System.currentTimeMillis();
-    assertEquals(Status.READY, status);
-    assertTrue(endTime - startTime <
-            ( context.getLong(KafkaSourceConstants.BATCH_DURATION_MS) +
-            context.getLong("kafka.consumer.timeout.ms")) );
-  }
-
-
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flume/blob/be2dbf1a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceUtilTest.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceUtilTest.java
 
b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceUtilTest.java
deleted file mode 100644
index f87e5ae..0000000
--- 
a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceUtilTest.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flume.source.kafka;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-
-import java.io.IOException;
-import java.util.Properties;
-
-import kafka.javaapi.consumer.ConsumerConnector;
-import org.apache.flume.Context;
-import org.apache.zookeeper.server.*;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class KafkaSourceUtilTest {
-  private Properties props = new Properties();
-  private Context context = new Context();
-  private int zkPort = 21818; // none-standard
-  private KafkaSourceEmbeddedZookeeper zookeeper;
-
-  @Before
-  public void setUp() throws Exception {
-    context.put("kafka.consumer.timeout", "10");
-    context.put("type", "KafkaSource");
-    context.put("topic", "test");
-    context.put("zookeeperConnect", "127.0.0.1:"+zkPort);
-    context.put("groupId","test");
-    props = KafkaSourceUtil.getKafkaProperties(context);
-    zookeeper = new KafkaSourceEmbeddedZookeeper(zkPort);
-
-
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    zookeeper.stopZookeeper();
-  }
-
-
-  @Test
-  public void testGetConsumer() {
-    ConsumerConnector cc = KafkaSourceUtil.getConsumer(props);
-    assertNotNull(cc);
-
-  }
-
-  @Test
-  public void testKafkaConsumerProperties() {
-    Context context = new Context();
-    context.put("kafka.auto.commit.enabled", "override.default.autocommit");
-    context.put("kafka.fake.property", "kafka.property.value");
-    context.put("kafka.zookeeper.connect","bad-zookeeper-list");
-    context.put("zookeeperConnect","real-zookeeper-list");
-    Properties kafkaProps = KafkaSourceUtil.getKafkaProperties(context);
-
-    //check that we have defaults set
-    assertEquals(
-            kafkaProps.getProperty(KafkaSourceConstants.GROUP_ID),
-            KafkaSourceConstants.DEFAULT_GROUP_ID);
-    //check that kafka properties override the default and get correct name
-    assertEquals(
-            kafkaProps.getProperty(KafkaSourceConstants.AUTO_COMMIT_ENABLED),
-            "override.default.autocommit");
-    //check that any kafka property gets in
-    assertEquals(kafkaProps.getProperty("fake.property"),
-            "kafka.property.value");
-    //check that documented property overrides defaults
-    assertEquals(kafkaProps.getProperty("zookeeper.connect")
-            ,"real-zookeeper-list");
-  }
-
-
-}

Reply via email to