Repository: flume Updated Branches: refs/heads/flume-1.7 faad35801 -> bf2495047
FLUME-2632: High CPU on KafkaSink (Ashish Paliwal via Johny Rufus) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/bf249504 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/bf249504 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/bf249504 Branch: refs/heads/flume-1.7 Commit: bf2495047f5cebb26f26f94904831c52057d129d Parents: faad358 Author: Johny Rufus <[email protected]> Authored: Wed Oct 21 15:18:42 2015 -0700 Committer: Johny Rufus <[email protected]> Committed: Wed Oct 21 15:23:23 2015 -0700 ---------------------------------------------------------------------- .../src/main/java/org/apache/flume/sink/kafka/KafkaSink.java | 6 ++++++ .../test/java/org/apache/flume/sink/kafka/TestKafkaSink.java | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/bf249504/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java index eada17c..38b854b 100644 --- a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java +++ b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java @@ -98,6 +98,12 @@ public class KafkaSink extends AbstractSink implements Configurable { if (event == null) { // no events available in channel + if(processedEvents == 0) { + result = Status.BACKOFF; + counter.incrementBatchEmptyCount(); + } else { + counter.incrementBatchUnderflowCount(); + } break; } http://git-wip-us.apache.org/repos/asf/flume/blob/bf249504/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java index 80f764f..72117b1 100644 --- a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java +++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java @@ -168,7 +168,7 @@ public class TestKafkaSink { kafkaSink.start(); Sink.Status status = kafkaSink.process(); - if (status == Sink.Status.BACKOFF) { + if (status != Sink.Status.BACKOFF) { fail("Error Occurred"); } assertNull(
