Repository: flume Updated Branches: refs/heads/trunk 199684b62 -> 1d9bab676
FLUME-2562. Add metrics for Kafka Source, Kafka Sink and Kafka Channel. (Gwen Shapira via Hari) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/1d9bab67 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/1d9bab67 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/1d9bab67 Branch: refs/heads/trunk Commit: 1d9bab6760df38e538705a74dd599de03129777b Parents: 199684b Author: Hari Shreedharan <[email protected]> Authored: Fri Jan 16 16:24:24 2015 -0800 Committer: Hari Shreedharan <[email protected]> Committed: Fri Jan 16 16:24:24 2015 -0800 ---------------------------------------------------------------------- .../flume/channel/kafka/KafkaChannel.java | 25 +++++- .../flume/instrumentation/ChannelCounter.java | 7 ++ .../flume/instrumentation/SinkCounter.java | 8 ++ .../flume/instrumentation/SourceCounter.java | 7 ++ .../kafka/KafkaChannelCounter.java | 82 ++++++++++++++++++++ .../kafka/KafkaChannelCounterMBean.java | 50 ++++++++++++ .../instrumentation/kafka/KafkaSinkCounter.java | 53 +++++++++++++ .../kafka/KafkaSinkCounterMBean.java | 48 ++++++++++++ .../kafka/KafkaSourceCounter.java | 64 +++++++++++++++ .../kafka/KafkaSourceCounterMBean.java | 47 +++++++++++ .../org/apache/flume/sink/kafka/KafkaSink.java | 15 ++++ .../apache/flume/source/kafka/KafkaSource.java | 18 +++++ 12 files changed, 423 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/1d9bab67/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java index d767aac..80a122d 100644 --- a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java +++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java @@ -36,6 +36,7 @@ import org.apache.flume.conf.ConfigurationException; import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.*; import org.apache.flume.event.EventBuilder; +import org.apache.flume.instrumentation.kafka.KafkaChannelCounter; import org.apache.flume.source.avro.AvroFlumeEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,6 +65,8 @@ public class KafkaChannel extends BasicChannelSemantics { private final List<ConsumerAndIterator> consumers = Collections.synchronizedList(new LinkedList<ConsumerAndIterator>()); + private KafkaChannelCounter counter; + /* Each ConsumerConnector commit will commit all partitions owned by it. To * ensure that each partition is only committed when all events are * actually done, we will need to keep a ConsumerConnector per thread. @@ -95,6 +98,7 @@ public class KafkaChannel extends BasicChannelSemantics { // We always have just one topic being read by one thread LOGGER.info("Topic = " + topic.get()); topicCountMap.put(topic.get(), 1); + counter.start(); super.start(); } catch (Exception e) { LOGGER.error("Could not start producer"); @@ -114,7 +118,10 @@ public class KafkaChannel extends BasicChannelSemantics { } } producer.close(); + counter.stop(); super.stop(); + LOGGER.info("Kafka channel {} stopped. Metrics: {}", getName(), + counter); } @Override @@ -192,6 +199,10 @@ public class KafkaChannel extends BasicChannelSemantics { kafkaConf.put("auto.offset.reset", "smallest"); } + if (counter == null) { + counter = new KafkaChannelCounter(getName()); + } + } private void decommissionConsumerAndIterator(ConsumerAndIterator c) { @@ -291,7 +302,10 @@ public class KafkaChannel extends BasicChannelSemantics { } else { try { ConsumerIterator<byte[], byte[]> it = consumerAndIter.get().iterator; + long startTime = System.nanoTime(); it.hasNext(); + long endTime = System.nanoTime(); + counter.addToKafkaEventGetTimer((endTime-startTime)/(1000*1000)); if (parseAsFlumeEvent) { ByteArrayInputStream in = new ByteArrayInputStream(it.next().message()); @@ -339,7 +353,11 @@ public class KafkaChannel extends BasicChannelSemantics { messages.add(new KeyedMessage<String, byte[]>(topic.get(), null, batchUUID, event)); } + long startTime = System.nanoTime(); producer.send(messages); + long endTime = System.nanoTime(); + counter.addToKafkaEventSendTimer((endTime-startTime)/(1000*1000)); + counter.addToEventPutSuccessCount(Long.valueOf(messages.size())); serializedEvents.get().clear(); } catch (Exception ex) { LOGGER.warn("Sending events to Kafka failed", ex); @@ -348,8 +366,12 @@ public class KafkaChannel extends BasicChannelSemantics { } } else { if (consumerAndIter.get().failedEvents.isEmpty() && eventTaken) { + long startTime = System.nanoTime(); consumerAndIter.get().consumer.commitOffsets(); - } + long endTime = System.nanoTime(); + counter.addToKafkaCommitTimer((endTime-startTime)/(1000*1000)); + } + counter.addToEventTakeSuccessCount(Long.valueOf(events.get().size())); events.get().clear(); } } @@ -362,6 +384,7 @@ public class KafkaChannel extends BasicChannelSemantics { if (type.equals(TransactionType.PUT)) { serializedEvents.get().clear(); } else { + counter.addToRollbackCounter(Long.valueOf(events.get().size())); consumerAndIter.get().failedEvents.addAll(events.get()); events.get().clear(); } http://git-wip-us.apache.org/repos/asf/flume/blob/1d9bab67/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounter.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounter.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounter.java index 9938c0a..977ad6c 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounter.java +++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounter.java @@ -18,6 +18,8 @@ */ package org.apache.flume.instrumentation; +import org.apache.commons.lang.ArrayUtils; + public class ChannelCounter extends MonitoredCounterGroup implements ChannelCounterMBean { @@ -48,6 +50,11 @@ public class ChannelCounter extends MonitoredCounterGroup implements super(MonitoredCounterGroup.Type.CHANNEL, name, ATTRIBUTES); } + public ChannelCounter(String name, String[] attributes) { + super(MonitoredCounterGroup.Type.CHANNEL, name, + (String[])ArrayUtils.addAll(attributes,ATTRIBUTES)); + } + @Override public long getChannelSize() { return get(COUNTER_CHANNEL_SIZE); http://git-wip-us.apache.org/repos/asf/flume/blob/1d9bab67/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounter.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounter.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounter.java index 41b28cf..54f4a4c 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounter.java +++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounter.java @@ -17,6 +17,8 @@ */ package org.apache.flume.instrumentation; +import org.apache.commons.lang.ArrayUtils; + public class SinkCounter extends MonitoredCounterGroup implements SinkCounterMBean { @@ -56,6 +58,12 @@ public class SinkCounter extends MonitoredCounterGroup implements super(MonitoredCounterGroup.Type.SINK, name, ATTRIBUTES); } + public SinkCounter(String name, String[] attributes) { + super(MonitoredCounterGroup.Type.SINK, name, + (String[]) ArrayUtils.addAll(attributes,ATTRIBUTES)); + } + + @Override public long getConnectionCreatedCount() { return get(COUNTER_CONNECTION_CREATED); http://git-wip-us.apache.org/repos/asf/flume/blob/1d9bab67/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounter.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounter.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounter.java index 972d2c6..02ef6ed 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounter.java +++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounter.java @@ -18,6 +18,8 @@ */ package org.apache.flume.instrumentation; +import org.apache.commons.lang.ArrayUtils; + public class SourceCounter extends MonitoredCounterGroup implements SourceCounterMBean { @@ -53,6 +55,11 @@ public class SourceCounter extends MonitoredCounterGroup implements super(MonitoredCounterGroup.Type.SOURCE, name, ATTRIBUTES); } + public SourceCounter(String name, String[] attributes) { + super(Type.SOURCE, name, + (String[]) ArrayUtils.addAll(attributes,ATTRIBUTES)); + } + @Override public long getEventReceivedCount() { return get(COUNTER_EVENTS_RECEIVED); http://git-wip-us.apache.org/repos/asf/flume/blob/1d9bab67/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaChannelCounter.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaChannelCounter.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaChannelCounter.java new file mode 100644 index 0000000..6e142cf --- /dev/null +++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaChannelCounter.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flume.instrumentation.kafka; + +import org.apache.commons.lang.ArrayUtils; +import org.apache.flume.instrumentation.ChannelCounter; + +public class KafkaChannelCounter extends ChannelCounter + implements KafkaChannelCounterMBean { + + + private static final String TIMER_KAFKA_EVENT_GET = + "channel.kafka.event.get.time"; + + private static final String TIMER_KAFKA_EVENT_SEND = + "channel.kafka.event.send.time"; + + private static final String TIMER_KAFKA_COMMIT = + "channel.kafka.commit.time"; + + private static final String COUNT_ROLLBACK = + "channel.rollback.count"; + + + private static String[] ATTRIBUTES = { + TIMER_KAFKA_COMMIT,TIMER_KAFKA_EVENT_SEND,TIMER_KAFKA_EVENT_GET, + COUNT_ROLLBACK + }; + + + public KafkaChannelCounter(String name) { + super(name,ATTRIBUTES); + } + + public long addToKafkaEventGetTimer(long delta) { + return addAndGet(TIMER_KAFKA_EVENT_GET, delta); + } + + public long addToKafkaEventSendTimer(long delta) { + return addAndGet(TIMER_KAFKA_EVENT_SEND, delta); + } + + public long addToKafkaCommitTimer(long delta) { + return addAndGet(TIMER_KAFKA_COMMIT, delta); + } + + public long addToRollbackCounter(long delta) { + return addAndGet(COUNT_ROLLBACK,delta); + } + + public long getKafkaEventGetTimer() { + return get(TIMER_KAFKA_EVENT_GET); + } + + public long getKafkaEventSendTimer() { + return get(TIMER_KAFKA_EVENT_SEND); + } + + public long getKafkaCommitTimer() { + return get(TIMER_KAFKA_COMMIT); + } + + public long getRollbackCount() { + return get(COUNT_ROLLBACK); + } + +} http://git-wip-us.apache.org/repos/asf/flume/blob/1d9bab67/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaChannelCounterMBean.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaChannelCounterMBean.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaChannelCounterMBean.java new file mode 100644 index 0000000..da64f0c --- /dev/null +++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaChannelCounterMBean.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flume.instrumentation.kafka; + +public interface KafkaChannelCounterMBean { + + long getKafkaEventGetTimer(); + + long getKafkaEventSendTimer(); + + long getKafkaCommitTimer(); + + long getRollbackCount(); + + long getChannelSize(); + + long getEventPutAttemptCount(); + + long getEventTakeAttemptCount(); + + long getEventPutSuccessCount(); + + long getEventTakeSuccessCount(); + + long getStartTime(); + + long getStopTime(); + + long getChannelCapacity(); + + String getType(); + + double getChannelFillPercentage(); + +} http://git-wip-us.apache.org/repos/asf/flume/blob/1d9bab67/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaSinkCounter.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaSinkCounter.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaSinkCounter.java new file mode 100644 index 0000000..1308ff3 --- /dev/null +++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaSinkCounter.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flume.instrumentation.kafka; + +import org.apache.flume.instrumentation.SinkCounter; + +public class KafkaSinkCounter extends SinkCounter implements KafkaSinkCounterMBean { + + private static final String TIMER_KAFKA_EVENT_SEND = + "channel.kafka.event.send.time"; + + private static final String COUNT_ROLLBACK = + "channel.rollback.count"; + + private static final String[] ATTRIBUTES = + {COUNT_ROLLBACK,TIMER_KAFKA_EVENT_SEND}; + + public KafkaSinkCounter(String name) { + super(name,ATTRIBUTES); + } + + public long addToKafkaEventSendTimer(long delta) { + return addAndGet(TIMER_KAFKA_EVENT_SEND,delta); + } + + public long incrementRollbackCount() { + return increment(COUNT_ROLLBACK); + } + + public long getKafkaEventSendTimer() { + return get(TIMER_KAFKA_EVENT_SEND); + } + + public long getRollbackCount() { + return get(COUNT_ROLLBACK); + } + +} http://git-wip-us.apache.org/repos/asf/flume/blob/1d9bab67/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaSinkCounterMBean.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaSinkCounterMBean.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaSinkCounterMBean.java new file mode 100644 index 0000000..f49ca26 --- /dev/null +++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaSinkCounterMBean.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flume.instrumentation.kafka; + +public interface KafkaSinkCounterMBean { + + long getKafkaEventSendTimer(); + + long getRollbackCount(); + + long getConnectionCreatedCount(); + + long getConnectionClosedCount(); + + long getConnectionFailedCount(); + + long getBatchEmptyCount(); + + long getBatchUnderflowCount(); + + long getBatchCompleteCount(); + + long getEventDrainAttemptCount(); + + long getEventDrainSuccessCount(); + + long getStartTime(); + + long getStopTime(); + + String getType(); +} + http://git-wip-us.apache.org/repos/asf/flume/blob/1d9bab67/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaSourceCounter.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaSourceCounter.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaSourceCounter.java new file mode 100644 index 0000000..1cb911d --- /dev/null +++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaSourceCounter.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flume.instrumentation.kafka; + +import org.apache.flume.instrumentation.SourceCounter; + +public class KafkaSourceCounter extends SourceCounter implements KafkaSourceCounterMBean { + + private static final String TIMER_KAFKA_EVENT_GET = + "source.kafka.event.get.time"; + + private static final String TIMER_KAFKA_COMMIT = + "source.kafka.commit.time"; + + private static final String COUNTER_KAFKA_EMPTY = + "source.kafka.empty.count"; + + private static final String[] ATTRIBUTES = + {TIMER_KAFKA_COMMIT, TIMER_KAFKA_EVENT_GET}; + + public KafkaSourceCounter(String name) { + super(name, ATTRIBUTES); + } + + public long addToKafkaEventGetTimer(long delta) { + return addAndGet(TIMER_KAFKA_EVENT_GET,delta); + } + + public long addToKafkaCommitTimer(long delta) { + return addAndGet(TIMER_KAFKA_COMMIT,delta); + } + + public long incrementKafkaEmptyCount() { + return increment(COUNTER_KAFKA_EMPTY); + } + + public long getKafkaCommitTimer() { + return get(TIMER_KAFKA_COMMIT); + } + + public long getKafkaEventGetTimer() { + return get(TIMER_KAFKA_EVENT_GET); + } + + public long getKafkaEmptyCount() { + return get(COUNTER_KAFKA_EMPTY); + } + +} http://git-wip-us.apache.org/repos/asf/flume/blob/1d9bab67/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaSourceCounterMBean.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaSourceCounterMBean.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaSourceCounterMBean.java new file mode 100644 index 0000000..219a5b6 --- /dev/null +++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaSourceCounterMBean.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flume.instrumentation.kafka; + +public interface KafkaSourceCounterMBean { + + long getKafkaEventGetTimer(); + + long getKafkaCommitTimer(); + + long getKafkaEmptyCount(); + + long getEventReceivedCount(); + + long getEventAcceptedCount(); + + long getAppendReceivedCount(); + + long getAppendAcceptedCount(); + + long getAppendBatchReceivedCount(); + + long getAppendBatchAcceptedCount(); + + long getStartTime(); + + long getStopTime(); + + String getType(); + + long getOpenConnectionCount(); +} http://git-wip-us.apache.org/repos/asf/flume/blob/1d9bab67/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java index a90b950..eada17c 100644 --- a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java +++ b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java @@ -24,6 +24,7 @@ import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import org.apache.flume.*; import org.apache.flume.conf.Configurable; +import org.apache.flume.instrumentation.kafka.KafkaSinkCounter; import org.apache.flume.sink.AbstractSink; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,6 +74,8 @@ public class KafkaSink extends AbstractSink implements Configurable { private String topic; private int batchSize; private List<KeyedMessage<String, byte[]>> messageList; + private KafkaSinkCounter counter; + @Override public Status process() throws EventDeliveryException { @@ -122,7 +125,11 @@ public class KafkaSink extends AbstractSink implements Configurable { // publish batch and commit. if (processedEvents > 0) { + long startTime = System.nanoTime(); producer.send(messageList); + long endTime = System.nanoTime(); + counter.addToKafkaEventSendTimer((endTime-startTime)/(1000*1000)); + counter.addToEventDrainSuccessCount(Long.valueOf(messageList.size())); } transaction.commit(); @@ -134,6 +141,7 @@ public class KafkaSink extends AbstractSink implements Configurable { if (transaction != null) { try { transaction.rollback(); + counter.incrementRollbackCount(); } catch (Exception e) { logger.error("Transaction rollback failed", e); throw Throwables.propagate(e); @@ -154,12 +162,15 @@ public class KafkaSink extends AbstractSink implements Configurable { // instantiate the producer ProducerConfig config = new ProducerConfig(kafkaProps); producer = new Producer<String, byte[]>(config); + counter.start(); super.start(); } @Override public synchronized void stop() { producer.close(); + counter.stop(); + logger.info("Kafka Sink {} stopped. Metrics: {}", getName(), counter); super.stop(); } @@ -202,5 +213,9 @@ public class KafkaSink extends AbstractSink implements Configurable { if (logger.isDebugEnabled()) { logger.debug("Kafka producer properties: " + kafkaProps); } + + if (counter == null) { + counter = new KafkaSinkCounter(getName()); + } } } http://git-wip-us.apache.org/repos/asf/flume/blob/1d9bab67/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java index 00a81c6..3777639 100644 --- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java +++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java @@ -32,6 +32,8 @@ import org.apache.flume.*; import org.apache.flume.conf.Configurable; import org.apache.flume.conf.ConfigurationException; import org.apache.flume.event.EventBuilder; +import org.apache.flume.instrumentation.SourceCounter; +import org.apache.flume.instrumentation.kafka.KafkaSourceCounter; import org.apache.flume.source.AbstractSource; import org.slf4j.Logger; @@ -77,6 +79,7 @@ public class KafkaSource extends AbstractSource private Context context; private Properties kafkaProps; private final List<Event> eventList = new ArrayList<Event>(); + private KafkaSourceCounter counter; public Status process() throws EventDeliveryException { @@ -88,6 +91,7 @@ public class KafkaSource extends AbstractSource long batchEndTime = System.currentTimeMillis() + timeUpperLimit; try { boolean iterStatus = false; + long startTime = System.nanoTime(); while (eventList.size() < batchUpperLimit && System.currentTimeMillis() < batchEndTime) { iterStatus = hasNext(); @@ -116,22 +120,30 @@ public class KafkaSource extends AbstractSource log.debug("Event #: {}", eventList.size()); } } + long endTime = System.nanoTime(); + counter.addToKafkaEventGetTimer((endTime-startTime)/(1000*1000)); + counter.addToEventReceivedCount(Long.valueOf(eventList.size())); // If we have events, send events to channel // clear the event list // and commit if Kafka doesn't auto-commit if (eventList.size() > 0) { getChannelProcessor().processEventBatch(eventList); + counter.addToEventAcceptedCount(eventList.size()); eventList.clear(); if (log.isDebugEnabled()) { log.debug("Wrote {} events to channel", eventList.size()); } if (!kafkaAutoCommitEnabled) { // commit the read transactions to Kafka to avoid duplicates + long commitStartTime = System.nanoTime(); consumer.commitOffsets(); + long commitEndTime = System.nanoTime(); + counter.addToKafkaCommitTimer((commitEndTime-commitStartTime)/(1000*1000)); } } if (!iterStatus) { if (log.isDebugEnabled()) { + counter.incrementKafkaEmptyCount(); log.debug("Returning with backoff. No more data to read"); } return Status.BACKOFF; @@ -174,6 +186,9 @@ public class KafkaSource extends AbstractSource kafkaAutoCommitEnabled = Boolean.parseBoolean(kafkaProps.getProperty( KafkaSourceConstants.AUTO_COMMIT_ENABLED)); + if (counter == null) { + counter = new KafkaSourceCounter(getName()); + } } @Override @@ -207,6 +222,7 @@ public class KafkaSource extends AbstractSource throw new FlumeException("Unable to get message iterator from Kafka", e); } log.info("Kafka source {} started.", getName()); + counter.start(); super.start(); } @@ -217,6 +233,8 @@ public class KafkaSource extends AbstractSource // to avoid reading the same messages again consumer.shutdown(); } + counter.stop(); + log.info("Kafka Source {} stopped. Metrics: {}", getName(), counter); super.stop(); }
