Repository: flume Updated Branches: refs/heads/trunk d6943a662 -> b5e102bee
FLUME-2729. Allow pollableSource backoff times to be configurable (Ted Malaska via Johny Rufus) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/b5e102be Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/b5e102be Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/b5e102be Branch: refs/heads/trunk Commit: b5e102bee4d4b2783f3f65aab403a53c1ae8e401 Parents: d6943a6 Author: Johny Rufus <[email protected]> Authored: Mon Jul 6 16:19:41 2015 -0700 Committer: Johny Rufus <[email protected]> Committed: Mon Jul 6 16:19:41 2015 -0700 ---------------------------------------------------------------------- .../flume/channel/file/TestIntegration.java | 3 + .../java/org/apache/flume/PollableSource.java | 4 ++ .../flume/source/AbstractPollableSource.java | 24 ++++++- .../flume/source/PollableSourceConstants.java | 28 ++++++++ .../flume/source/PollableSourceRunner.java | 8 +-- .../flume/source/SequenceGeneratorSource.java | 30 ++++----- .../org/apache/flume/source/StressSource.java | 32 ++++----- .../source/TestAbstractPollableSource.java | 68 ++++++++++++++++++++ .../flume/source/TestPollableSourceRunner.java | 10 +++ .../source/TestSequenceGeneratorSource.java | 1 + .../apache/flume/source/TestStressSource.java | 4 ++ .../apache/flume/source/kafka/KafkaSource.java | 25 +++---- 12 files changed, 182 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/b5e102be/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestIntegration.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestIntegration.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestIntegration.java index 4e2f940..2fbe116 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestIntegration.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestIntegration.java @@ -94,6 +94,9 @@ public class TestIntegration { SequenceGeneratorSource source = new SequenceGeneratorSource(); CountingSourceRunner sourceRunner = new CountingSourceRunner(source, channel); + source.configure(context); + source.start(); + NullSink sink = new NullSink(); sink.setChannel(channel); CountingSinkRunner sinkRunner = new CountingSinkRunner(sink); http://git-wip-us.apache.org/repos/asf/flume/blob/b5e102be/flume-ng-core/src/main/java/org/apache/flume/PollableSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/PollableSource.java b/flume-ng-core/src/main/java/org/apache/flume/PollableSource.java index e872b0c..764810b 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/PollableSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/PollableSource.java @@ -47,6 +47,10 @@ public interface PollableSource extends Source { */ public Status process() throws EventDeliveryException; + public long getBackOffSleepIncrement(); + + public long getMaxBackOffSleepInterval(); + public static enum Status { READY, BACKOFF } http://git-wip-us.apache.org/repos/asf/flume/blob/b5e102be/flume-ng-core/src/main/java/org/apache/flume/source/AbstractPollableSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/AbstractPollableSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/AbstractPollableSource.java index 356f4d4..33e1acc 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/AbstractPollableSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/AbstractPollableSource.java @@ -18,6 +18,7 @@ */ package org.apache.flume.source; +import org.apache.flume.Context; import org.apache.flume.EventDeliveryException; import org.apache.flume.FlumeException; import org.apache.flume.PollableSource; @@ -39,6 +40,9 @@ import org.apache.flume.annotations.InterfaceStability; public abstract class AbstractPollableSource extends BasicSourceSemantics implements PollableSource { + long backoffSleepIncrement = PollableSourceConstants.DEFAULT_BACKOFF_SLEEP_INCREMENT; + long maxBackoffSleep = PollableSourceConstants.DEFAULT_MAX_BACKOFF_SLEEP; + public AbstractPollableSource() { super(); } @@ -49,10 +53,28 @@ public abstract class AbstractPollableSource extends BasicSourceSemantics exception); } if(!isStarted()) { - throw new EventDeliveryException("Source is not started"); + throw new EventDeliveryException("Source is not started. It is in '" + getLifecycleState() + "' state"); } return doProcess(); } + @Override + public synchronized void configure(Context context) { + super.configure(context); + backoffSleepIncrement = + context.getLong(PollableSourceConstants.BACKOFF_SLEEP_INCREMENT, + PollableSourceConstants.DEFAULT_BACKOFF_SLEEP_INCREMENT); + maxBackoffSleep = context.getLong(PollableSourceConstants.MAX_BACKOFF_SLEEP, + PollableSourceConstants.DEFAULT_MAX_BACKOFF_SLEEP); + } + + public long getBackOffSleepIncrement() { + return backoffSleepIncrement; + } + + public long getMaxBackOffSleepInterval() { + return maxBackoffSleep; + } + protected abstract Status doProcess() throws EventDeliveryException; } http://git-wip-us.apache.org/repos/asf/flume/blob/b5e102be/flume-ng-core/src/main/java/org/apache/flume/source/PollableSourceConstants.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/PollableSourceConstants.java b/flume-ng-core/src/main/java/org/apache/flume/source/PollableSourceConstants.java new file mode 100644 index 0000000..f13207d --- /dev/null +++ b/flume-ng-core/src/main/java/org/apache/flume/source/PollableSourceConstants.java @@ -0,0 +1,28 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package org.apache.flume.source; + +public class PollableSourceConstants { + + public static final String BACKOFF_SLEEP_INCREMENT = "backoffSleepIncrement"; + public static final String MAX_BACKOFF_SLEEP = "maxBackoffSleep"; + public static final long DEFAULT_BACKOFF_SLEEP_INCREMENT = 1000; + public static final long DEFAULT_MAX_BACKOFF_SLEEP = 5000; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flume/blob/b5e102be/flume-ng-core/src/main/java/org/apache/flume/source/PollableSourceRunner.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/PollableSourceRunner.java b/flume-ng-core/src/main/java/org/apache/flume/source/PollableSourceRunner.java index f6c64b3..ea37703 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/PollableSourceRunner.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/PollableSourceRunner.java @@ -50,8 +50,6 @@ public class PollableSourceRunner extends SourceRunner { private static final Logger logger = LoggerFactory .getLogger(PollableSourceRunner.class); - private static final long backoffSleepIncrement = 1000; - private static final long maxBackoffSleep = 5000; private AtomicBoolean shouldStop; @@ -141,7 +139,7 @@ public class PollableSourceRunner extends SourceRunner { Thread.sleep(Math.min( counterGroup.incrementAndGet("runner.backoffs.consecutive") - * backoffSleepIncrement, maxBackoffSleep)); + * source.getBackOffSleepIncrement(), source.getMaxBackOffSleepInterval())); } else { counterGroup.set("runner.backoffs.consecutive", 0L); } @@ -154,9 +152,9 @@ public class PollableSourceRunner extends SourceRunner { } catch (Exception e) { counterGroup.incrementAndGet("runner.errors"); logger.error("Unhandled exception, logging and sleeping for " + - maxBackoffSleep + "ms", e); + source.getMaxBackOffSleepInterval() + "ms", e); try { - Thread.sleep(maxBackoffSleep); + Thread.sleep(source.getMaxBackOffSleepInterval()); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); } http://git-wip-us.apache.org/repos/asf/flume/blob/b5e102be/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java index 51e021a..1214635 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java @@ -25,15 +25,15 @@ import org.apache.flume.ChannelException; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; -import org.apache.flume.PollableSource; +import org.apache.flume.FlumeException; import org.apache.flume.conf.Configurable; import org.apache.flume.event.EventBuilder; import org.apache.flume.instrumentation.SourceCounter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class SequenceGeneratorSource extends AbstractSource implements - PollableSource, Configurable { +public class SequenceGeneratorSource extends AbstractPollableSource implements + Configurable { private static final Logger logger = LoggerFactory .getLogger(SequenceGeneratorSource.class); @@ -54,7 +54,7 @@ public class SequenceGeneratorSource extends AbstractSource implements * <li>batchSize = type int that defines the size of event batches */ @Override - public void configure(Context context) { + protected void doConfigure(Context context) throws FlumeException { batchSize = context.getInteger("batchSize", 1); if (batchSize > 1) { batchArrayList = new ArrayList<Event>(batchSize); @@ -66,15 +66,14 @@ public class SequenceGeneratorSource extends AbstractSource implements } @Override - public Status process() throws EventDeliveryException { - + protected Status doProcess() throws EventDeliveryException { Status status = Status.READY; int i = 0; try { if (batchSize <= 1) { if(eventsSent < totalEvents) { getChannelProcessor().processEvent( - EventBuilder.withBody(String.valueOf(sequence++).getBytes())); + EventBuilder.withBody(String.valueOf(sequence++).getBytes())); sourceCounter.incrementEventAcceptedCount(); eventsSent++; } else { @@ -85,7 +84,7 @@ public class SequenceGeneratorSource extends AbstractSource implements for (i = 0; i < batchSize; i++) { if(eventsSent < totalEvents){ batchArrayList.add(i, EventBuilder.withBody(String - .valueOf(sequence++).getBytes())); + .valueOf(sequence++).getBytes())); eventsSent++; } else { status = Status.BACKOFF; @@ -107,22 +106,19 @@ public class SequenceGeneratorSource extends AbstractSource implements } @Override - public void start() { - logger.info("Sequence generator source starting"); - - super.start(); + protected void doStart() throws FlumeException { + logger.info("Sequence generator source do starting"); sourceCounter.start(); - logger.debug("Sequence generator source started"); + logger.debug("Sequence generator source do started"); } @Override - public void stop() { - logger.info("Sequence generator source stopping"); + protected void doStop() throws FlumeException { + logger.info("Sequence generator source do stopping"); - super.stop(); sourceCounter.stop(); - logger.info("Sequence generator source stopped. Metrics:{}",getName(), sourceCounter); + logger.info("Sequence generator source do stopped. Metrics:{}",getName(), sourceCounter); } } http://git-wip-us.apache.org/repos/asf/flume/blob/b5e102be/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java index 0e7020b..9aa1477 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java @@ -28,7 +28,7 @@ import org.apache.flume.Context; import org.apache.flume.CounterGroup; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; -import org.apache.flume.PollableSource; +import org.apache.flume.FlumeException; import org.apache.flume.conf.Configurable; import org.apache.flume.event.EventBuilder; import org.slf4j.Logger; @@ -53,8 +53,8 @@ import org.slf4j.LoggerFactory; * * See {@link StressSource#configure(Context)} for configuration options. */ -public class StressSource extends AbstractSource implements - Configurable, PollableSource { +public class StressSource extends AbstractPollableSource implements + Configurable { private static final Logger logger = LoggerFactory .getLogger(StressSource.class); @@ -81,7 +81,7 @@ public class StressSource extends AbstractSource implements * <li>-batchSize = type int that defines the number of Events being sent in one batch */ @Override - public void configure(Context context) { + protected void doConfigure(Context context) throws FlumeException { /* Limit on the total number of events. */ maxTotalEvents = context.getLong("maxTotalEvents", -1L); /* Limit on the total number of successful events. */ @@ -113,13 +113,13 @@ public class StressSource extends AbstractSource implements } @Override - public Status process() throws EventDeliveryException { + protected Status doProcess() throws EventDeliveryException { long totalEventSent = counterGroup.addAndGet("events.total", lastSent); if ((maxTotalEvents >= 0 && - totalEventSent >= maxTotalEvents) || - (maxSuccessfulEvents >= 0 && - counterGroup.get("events.successful") >= maxSuccessfulEvents)) { + totalEventSent >= maxTotalEvents) || + (maxSuccessfulEvents >= 0 && + counterGroup.get("events.successful") >= maxSuccessfulEvents)) { return Status.BACKOFF; } try { @@ -148,20 +148,12 @@ public class StressSource extends AbstractSource implements } @Override - public void start() { - logger.info("Stress source starting"); - - super.start(); - - logger.debug("Stress source started"); + protected void doStart() throws FlumeException { + logger.info("Stress source doStart finished"); } @Override - public void stop() { - logger.info("Stress source stopping"); - - super.stop(); - - logger.info("Stress source stopped. Metrics:{}", counterGroup); + protected void doStop() throws FlumeException { + logger.info("Stress source do stop. Metrics:{}", counterGroup); } } http://git-wip-us.apache.org/repos/asf/flume/blob/b5e102be/flume-ng-core/src/test/java/org/apache/flume/source/TestAbstractPollableSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestAbstractPollableSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestAbstractPollableSource.java index 02a2f0c..d385abe 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/TestAbstractPollableSource.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestAbstractPollableSource.java @@ -23,8 +23,10 @@ import static org.mockito.Mockito.*; import org.apache.flume.Context; import org.apache.flume.EventDeliveryException; import org.apache.flume.FlumeException; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.util.HashMap; public class TestAbstractPollableSource { @@ -61,4 +63,70 @@ public class TestAbstractPollableSource { source.process(); } + @Test + public void voidBackOffConfig() { + source = spy(new AbstractPollableSource() { + @Override + protected Status doProcess() throws EventDeliveryException { + return Status.BACKOFF; + } + @Override + protected void doConfigure(Context context) throws FlumeException { + } + @Override + protected void doStart() throws FlumeException { + + } + @Override + protected void doStop() throws FlumeException { + + } + }); + + HashMap<String, String> inputConfigs = new HashMap<String,String>(); + inputConfigs.put(PollableSourceConstants.BACKOFF_SLEEP_INCREMENT, "42"); + inputConfigs.put(PollableSourceConstants.MAX_BACKOFF_SLEEP, "4242"); + + Context context = new Context(inputConfigs); + + source.configure(context); + Assert.assertEquals("BackOffSleepIncrement should equal 42 but it equals " + source.getBackOffSleepIncrement(), + 42l, source.getBackOffSleepIncrement()); + Assert.assertEquals("BackOffSleepIncrement should equal 42 but it equals " + source.getMaxBackOffSleepInterval(), + 4242l, source.getMaxBackOffSleepInterval()); + } + + @Test + public void voidBackOffConfigDefaults() { + source = spy(new AbstractPollableSource() { + @Override + protected Status doProcess() throws EventDeliveryException { + return Status.BACKOFF; + } + @Override + protected void doConfigure(Context context) throws FlumeException { + } + @Override + protected void doStart() throws FlumeException { + + } + @Override + protected void doStop() throws FlumeException { + + } + }); + + HashMap<String, String> inputConfigs = new HashMap<String,String>(); + + Assert.assertEquals("BackOffSleepIncrement should equal " + + PollableSourceConstants.DEFAULT_BACKOFF_SLEEP_INCREMENT + + " but it equals " + source.getBackOffSleepIncrement(), + PollableSourceConstants.DEFAULT_BACKOFF_SLEEP_INCREMENT, source.getBackOffSleepIncrement()); + + Assert.assertEquals("BackOffSleepIncrement should equal " + + PollableSourceConstants.DEFAULT_MAX_BACKOFF_SLEEP + + " but it equals " + source.getMaxBackOffSleepInterval(), + PollableSourceConstants.DEFAULT_MAX_BACKOFF_SLEEP, source.getMaxBackOffSleepInterval()); + } + } http://git-wip-us.apache.org/repos/asf/flume/blob/b5e102be/flume-ng-core/src/test/java/org/apache/flume/source/TestPollableSourceRunner.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestPollableSourceRunner.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestPollableSourceRunner.java index 4d4222d..d706e9b 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/TestPollableSourceRunner.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestPollableSourceRunner.java @@ -95,6 +95,16 @@ public class TestPollableSourceRunner { } @Override + public long getBackOffSleepIncrement() { + return PollableSourceConstants.DEFAULT_BACKOFF_SLEEP_INCREMENT; + } + + @Override + public long getMaxBackOffSleepInterval() { + return PollableSourceConstants.DEFAULT_MAX_BACKOFF_SLEEP; + } + + @Override public void start() { // Unused. } http://git-wip-us.apache.org/repos/asf/flume/blob/b5e102be/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java index c9d3e20..2bbcdaf 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java @@ -64,6 +64,7 @@ public class TestSequenceGeneratorSource { rcs.setChannels(channels); source.setChannelProcessor(new ChannelProcessor(rcs)); + source.start(); for (long i = 0; i < 100; i++) { source.process(); http://git-wip-us.apache.org/repos/asf/flume/blob/b5e102be/flume-ng-core/src/test/java/org/apache/flume/source/TestStressSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestStressSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestStressSource.java index 28270f4..a651281 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/TestStressSource.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestStressSource.java @@ -75,6 +75,7 @@ public class TestStressSource { Context context = new Context(); context.put("maxTotalEvents", "35"); source.configure(context); + source.start(); for (int i = 0; i < 50; i++) { source.process(); @@ -91,6 +92,7 @@ public class TestStressSource { context.put("maxTotalEvents", "35"); context.put("batchSize", "10"); source.configure(context); + source.start(); for (int i = 0; i < 50; i++) { if (source.process() == Status.BACKOFF) { @@ -121,6 +123,7 @@ public class TestStressSource { Context context = new Context(); context.put("batchSize", "10"); source.configure(context); + source.start(); for (int i = 0; i < 10; i++) { Assert.assertFalse("StressSource with no maxTotalEvents should not return " + @@ -146,6 +149,7 @@ public class TestStressSource { Context context = new Context(); context.put("maxSuccessfulEvents", "35"); source.configure(context); + source.start(); for (int i = 0; i < 10; i++) { source.process(); http://git-wip-us.apache.org/repos/asf/flume/blob/b5e102be/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java index 3777639..fd1dd3c 100644 --- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java +++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java @@ -26,15 +26,17 @@ import kafka.consumer.ConsumerIterator; import kafka.consumer.ConsumerTimeoutException; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; - import kafka.message.MessageAndMetadata; + import org.apache.flume.*; import org.apache.flume.conf.Configurable; import org.apache.flume.conf.ConfigurationException; import org.apache.flume.event.EventBuilder; import org.apache.flume.instrumentation.SourceCounter; import org.apache.flume.instrumentation.kafka.KafkaSourceCounter; +import org.apache.flume.source.AbstractPollableSource; import org.apache.flume.source.AbstractSource; +import org.apache.flume.source.BasicSourceSemantics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,8 +68,8 @@ import org.slf4j.LoggerFactory; * Any property starting with "kafka" will be passed to the kafka consumer So * you can use any configuration supported by Kafka 0.8.1.1 */ -public class KafkaSource extends AbstractSource - implements Configurable, PollableSource { +public class KafkaSource extends AbstractPollableSource + implements Configurable { private static final Logger log = LoggerFactory.getLogger(KafkaSource.class); private ConsumerConnector consumer; private ConsumerIterator<byte[],byte[]> it; @@ -81,8 +83,8 @@ public class KafkaSource extends AbstractSource private final List<Event> eventList = new ArrayList<Event>(); private KafkaSourceCounter counter; - public Status process() throws EventDeliveryException { - + @Override + protected Status doProcess() throws EventDeliveryException { byte[] kafkaMessage; byte[] kafkaKey; Event event; @@ -168,7 +170,8 @@ public class KafkaSource extends AbstractSource * * @param context */ - public void configure(Context context) { + @Override + protected void doConfigure(Context context) throws FlumeException { this.context = context; batchUpperLimit = context.getInteger(KafkaSourceConstants.BATCH_SIZE, KafkaSourceConstants.DEFAULT_BATCH_SIZE); @@ -192,7 +195,7 @@ public class KafkaSource extends AbstractSource } @Override - public synchronized void start() { + protected void doStart() throws FlumeException { log.info("Starting {}...", this); try { @@ -221,21 +224,19 @@ public class KafkaSource extends AbstractSource } catch (Exception e) { throw new FlumeException("Unable to get message iterator from Kafka", e); } - log.info("Kafka source {} started.", getName()); + log.info("Kafka source {} do started.", getName()); counter.start(); - super.start(); } @Override - public synchronized void stop() { + protected void doStop() throws FlumeException { if (consumer != null) { // exit cleanly. This syncs offsets of messages read to ZooKeeper // to avoid reading the same messages again consumer.shutdown(); } counter.stop(); - log.info("Kafka Source {} stopped. Metrics: {}", getName(), counter); - super.stop(); + log.info("Kafka Source {} do stopped. Metrics: {}", getName(), counter); } /**
