Repository: flume Updated Branches: refs/heads/trunk 67ed62aa1 -> d6bf08b54
FLUME-2632: High CPU on KafkaSink (Ashish Paliwal via Johny Rufus) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/d6bf08b5 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/d6bf08b5 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/d6bf08b5 Branch: refs/heads/trunk Commit: d6bf08b54e467a6bdc6a5fc0edd41c51200e9da1 Parents: 67ed62a Author: Johny Rufus <[email protected]> Authored: Wed Oct 21 15:18:42 2015 -0700 Committer: Johny Rufus <[email protected]> Committed: Wed Oct 21 15:18:42 2015 -0700 ---------------------------------------------------------------------- .../src/main/java/org/apache/flume/sink/kafka/KafkaSink.java | 6 ++++++ .../test/java/org/apache/flume/sink/kafka/TestKafkaSink.java | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/d6bf08b5/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java index eada17c..38b854b 100644 --- a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java +++ b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java @@ -98,6 +98,12 @@ public class KafkaSink extends AbstractSink implements Configurable { if (event == null) { // no events available in channel + if(processedEvents == 0) { + result = Status.BACKOFF; + counter.incrementBatchEmptyCount(); + } else { + counter.incrementBatchUnderflowCount(); + } break; } http://git-wip-us.apache.org/repos/asf/flume/blob/d6bf08b5/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java index 80f764f..72117b1 100644 --- a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java +++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java @@ -168,7 +168,7 @@ public class TestKafkaSink { kafkaSink.start(); Sink.Status status = kafkaSink.process(); - if (status == Sink.Status.BACKOFF) { + if (status != Sink.Status.BACKOFF) { fail("Error Occurred"); } assertNull(
