Repository: flume Updated Branches: refs/heads/flume-1.7 e08f8b766 -> a56282086
FLUME-2734: Kafka Channel timeout property is overridden by default value (Johny Rufus via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/a5628208 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/a5628208 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/a5628208 Branch: refs/heads/flume-1.7 Commit: a562820869576e512317299ac9e676c5bae41182 Parents: e08f8b7 Author: Jarek Jarcec Cecho <[email protected]> Authored: Wed Sep 30 09:34:31 2015 -0700 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Wed Sep 30 09:35:04 2015 -0700 ---------------------------------------------------------------------- .../apache/flume/channel/kafka/KafkaChannel.java | 5 +++-- .../flume/channel/kafka/TestKafkaChannel.java | 18 ++++++++++++++++++ 2 files changed, 21 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/a5628208/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java index 80a122d..c83d4f6 100644 --- a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java +++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java @@ -177,13 +177,14 @@ public class KafkaChannel extends BasicChannelSemantics { throw new ConfigurationException( "Zookeeper Connection must be specified"); } - Long timeout = ctx.getLong(TIMEOUT, Long.valueOf(DEFAULT_TIMEOUT)); kafkaConf.putAll(ctx.getSubProperties(KAFKA_PREFIX)); kafkaConf.put(GROUP_ID, groupId); kafkaConf.put(BROKER_LIST_KEY, brokerList); kafkaConf.put(ZOOKEEPER_CONNECT, zkConnect); kafkaConf.put(AUTO_COMMIT_ENABLED, String.valueOf(false)); - kafkaConf.put(CONSUMER_TIMEOUT, String.valueOf(timeout)); + if(kafkaConf.get(CONSUMER_TIMEOUT) == null) { + kafkaConf.put(CONSUMER_TIMEOUT, DEFAULT_TIMEOUT); + } kafkaConf.put(REQUIRED_ACKS_KEY, "-1"); LOGGER.info(kafkaConf.toString()); parseAsFlumeEvent = http://git-wip-us.apache.org/repos/asf/flume/blob/a5628208/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java index e665431..25b9e40 100644 --- a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java +++ b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java @@ -35,6 +35,8 @@ import org.apache.flume.event.EventBuilder; import org.apache.flume.sink.kafka.util.TestUtil; import org.junit.*; +import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.*; + import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; @@ -149,6 +151,22 @@ public class TestKafkaChannel { channel.stop(); } + @Test + public void testTimeoutConfig() throws Exception { + Context context = prepareDefaultContext(true); + KafkaChannel channel = new KafkaChannel(); + Configurables.configure(channel, context); + Assert.assertTrue(channel.getKafkaConf().get(CONSUMER_TIMEOUT) + .equals(DEFAULT_TIMEOUT)); + + String timeout = "1000"; + context.put("kafka."+CONSUMER_TIMEOUT, timeout); + channel = new KafkaChannel(); + Configurables.configure(channel, context); + Assert.assertTrue(channel.getKafkaConf().get(CONSUMER_TIMEOUT) + .equals(timeout)); + } + /** * This method starts a channel, puts events into it. The channel is then * stopped and restarted. Then we check to make sure if all events we put
