[
https://issues.apache.org/jira/browse/FLUME-1484?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ted Malaska updated FLUME-1484:
-------------------------------
Attachment: FLUME-1411-IDEA.patch
This is completely finished patch but an example of a possible solution. If you
like the solution I'll finish the patch.
This solution supports two types of averages:
1. Averages since start
2. Rolling Averages (The rolling interval can be set through config properties)
I added two gets for these averages in sinkCounter.
I also added a junit that tests both of these averages and shows how quickly
they deviate.
Please let me know what you think.
> Flume support throughput in Agent, Source, Sink level at JMX
> ------------------------------------------------------------
>
> Key: FLUME-1484
> URL: https://issues.apache.org/jira/browse/FLUME-1484
> Project: Flume
> Issue Type: Improvement
> Components: Node, Sinks+Sources
> Affects Versions: v1.2.0
> Reporter: Denny Ye
> Assignee: Ted Malaska
> Attachments: FLUME-1411-IDEA.patch
>
>
> From user's view of point, we would like to know the current throughput from
> one of monitoring tools. WebUI is best, of course. JMX is simple way to
> implement throughput monitoring.
> Agent should have input and output throughput based on several Sources and
> Sinks.
> Here is just simple code in my environment to monitoring throughput of Source.
> {code:title=ThroughputCounter.java|borderStyle=solid}
> import java.util.concurrent.atomic.AtomicInteger;
> import org.apache.flume.instrumentation.SourceCounter;
> public class ThroughputCounter {
> private volatile boolean isRunning;
> private AtomicInteger cache = new AtomicInteger();
>
> SourceCounter sourceCounter;
> public ThroughputCounter(SourceCounter sourceCounter) {
> this.sourceCounter = sourceCounter;
> }
>
> public void start() {
> isRunning = true;
>
> Counter counter = new Counter();
> counter.start();
> }
>
> public void stop() {
> isRunning = false;
> }
>
>
> public void addWriteBytes(int bytes) {
> cache.getAndAdd(bytes);
> }
>
> private class Counter extends Thread {
>
> Counter() {
> super("ThroughputCounterThread");
> }
>
> public void run() {
> while (isRunning) {
> try {
> Thread.sleep(1000);
> sourceCounter.incrementSourceThroughput(
> cache.getAndSet(0));
> } catch (Exception e) {
> e.printStackTrace();
> }
> }
> }
> }
>
> }
> {code}
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira