Updated Branches:
  refs/heads/flume-1.4 65d3d63ee -> 102c5e07d

FLUME-1765. Add Load Balancing Log4jAppender.

(Cameron Gandevia via Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/102c5e07
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/102c5e07
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/102c5e07

Branch: refs/heads/flume-1.4
Commit: 102c5e07dec17740866315d342afc00c19267569
Parents: 65d3d63
Author: Hari Shreedharan <[email protected]>
Authored: Thu Feb 21 23:16:55 2013 -0800
Committer: Hari Shreedharan <[email protected]>
Committed: Thu Feb 21 23:18:16 2013 -0800

----------------------------------------------------------------------
 .../log4jappender/LoadBalancingLog4jAppender.java  |  153 +++++++++
 .../flume/clients/log4jappender/Log4jAppender.java |    3 +-
 .../TestLoadBalancingLog4jAppender.java            |  265 +++++++++++++++
 ...lume-loadbalancing-backoff-log4jtest.properties |   21 ++
 .../flume-loadbalancing-rnd-log4jtest.properties   |   20 ++
 .../flume-loadbalancinglog4jtest.properties        |   19 +
 flume-ng-doc/sphinx/FlumeUserGuide.rst             |   61 ++++
 7 files changed, 541 insertions(+), 1 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/102c5e07/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/LoadBalancingLog4jAppender.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/LoadBalancingLog4jAppender.java
 
b/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/LoadBalancingLog4jAppender.java
new file mode 100644
index 0000000..9fb115e
--- /dev/null
+++ 
b/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/LoadBalancingLog4jAppender.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.clients.log4jappender;
+
+import java.util.Properties;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.flume.FlumeException;
+import org.apache.flume.api.RpcClientConfigurationConstants;
+import org.apache.flume.api.RpcClientFactory;
+import org.apache.flume.api.RpcClientFactory.ClientType;
+import org.apache.log4j.helpers.LogLog;
+
+/**
+ *
+ * Appends Log4j Events to an external Flume client which is decribed by the
+ * Log4j configuration file. The appender takes the following required
+ * parameters:
+ * <p>
+ * <strong>Hosts</strong> : A space separated list of host:port of the first 
hop
+ * at which Flume (through an AvroSource) is listening for events.
+ * </p>
+ * <p>
+ * <strong>Selector</strong> : Selection mechanism. Must be either ROUND_ROBIN,
+ * RANDOM or custom FQDN to class that inherits from LoadBalancingSelector. If
+ * empty defaults to ROUND_ROBIN
+ * </p>
+ * The appender also takes the following optional parameters:
+ * <p>
+ * <strong>MaxBackoff</strong> : A long value representing the maximum amount 
of
+ * time in milliseconds the Load balancing client will backoff from a node that
+ * has failed to consume an event
+ * </p>
+ * A sample log4j properties file which appends to a source would look like:
+ *
+ * <pre>
+ * <p>
+ * log4j.appender.out2 = 
org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender
+ * log4j.appender.out2.Hosts = fooflumesource.com:25430 
barflumesource.com:25430
+ * log4j.appender.out2.Selector = RANDOM
+ * log4j.logger.org.apache.flume.clients.log4jappender = DEBUG,out2</p>
+ * </pre>
+ * <p>
+ * <pre>
+ * <p>
+ * log4j.appender.out2 = 
org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender
+ * log4j.appender.out2.Hosts = fooflumesource.com:25430 
barflumesource.com:25430
+ * log4j.appender.out2.Selector = ROUND_ROBIN
+ * log4j.appender.out2.MaxBackoff = 60000
+ * log4j.logger.org.apache.flume.clients.log4jappender = DEBUG,out2</p>
+ * </pre>
+ * <p>
+ * <i>Note: Change the last line to the package of the class(es), that will do
+ * the appending.For example if classes from the package com.bar.foo are
+ * appending, the last line would be:</i>
+ * </p>
+ *
+ * <pre>
+ * <p>log4j.logger.com.bar.foo = DEBUG,out2</p>
+ * </pre>
+ *
+ *
+ */
+public class LoadBalancingLog4jAppender extends Log4jAppender {
+
+  private String hosts;
+  private String selector;
+  private String maxBackoff;
+
+  public void setHosts(String hostNames) {
+    this.hosts = hostNames;
+  }
+
+  public void setSelector(String selector) {
+    this.selector = selector;
+  }
+
+  public void setMaxBackoff(String maxBackoff) {
+    this.maxBackoff = maxBackoff;
+  }
+
+  /**
+   * Activate the options set using <tt>setHosts()</tt>, <tt>setSelector</tt>
+   * and <tt>setMaxBackoff</tt>
+   *
+   * @throws FlumeException
+   *           if the LoadBalancingRpcClient cannot be instantiated.
+   */
+  @Override
+  public void activateOptions() throws FlumeException {
+    try {
+      final Properties properties = getProperties(hosts, selector, maxBackoff);
+      rpcClient = RpcClientFactory.getInstance(properties);
+    } catch (FlumeException e) {
+      String errormsg = "RPC client creation failed! " + e.getMessage();
+      LogLog.error(errormsg);
+      throw e;
+    }
+  }
+
+  private Properties getProperties(String hosts, String selector,
+      String maxBackoff) throws FlumeException {
+
+    if (StringUtils.isEmpty(hosts)) {
+      throw new IllegalArgumentException("hosts must not be null");
+    }
+
+    Properties props = new Properties();
+    String[] hostsAndPorts = hosts.split("\\s+");
+    StringBuilder names = new StringBuilder();
+    for (int i = 0; i < hostsAndPorts.length; i++) {
+      String hostAndPort = hostsAndPorts[i];
+      String name = "h" + i;
+      props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX + 
name,
+          hostAndPort);
+      names.append(name).append(" ");
+    }
+    props.put(RpcClientConfigurationConstants.CONFIG_HOSTS, names.toString());
+    props.put(RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE,
+        ClientType.DEFAULT_LOADBALANCE.toString());
+    if (!StringUtils.isEmpty(selector)) {
+      props.put(RpcClientConfigurationConstants.CONFIG_HOST_SELECTOR, 
selector);
+    }
+
+    if (!StringUtils.isEmpty(maxBackoff)) {
+      long millis = Long.parseLong(maxBackoff.trim());
+      if (millis <= 0) {
+        throw new IllegalArgumentException(
+            "Misconfigured max backoff, value must be greater than 0");
+      }
+      props.put(RpcClientConfigurationConstants.CONFIG_BACKOFF,
+          String.valueOf(true));
+      props.put(RpcClientConfigurationConstants.CONFIG_MAX_BACKOFF, 
maxBackoff);
+    }
+    return props;
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/102c5e07/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java
 
b/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java
index 315a68c..d61f807 100644
--- 
a/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java
+++ 
b/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java
@@ -63,7 +63,8 @@ public class Log4jAppender extends AppenderSkeleton {
 
   private String hostname;
   private int port;
-  private RpcClient rpcClient = null;
+
+  RpcClient rpcClient = null;
 
 
   /**

http://git-wip-us.apache.org/repos/asf/flume/blob/102c5e07/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLoadBalancingLog4jAppender.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLoadBalancingLog4jAppender.java
 
b/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLoadBalancingLog4jAppender.java
new file mode 100644
index 0000000..657af67
--- /dev/null
+++ 
b/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLoadBalancingLog4jAppender.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.clients.log4jappender;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import junit.framework.Assert;
+
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelSelector;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.FlumeException;
+import org.apache.flume.Source;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.channel.ReplicatingChannelSelector;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.source.AvroSource;
+import org.apache.flume.source.avro.AvroFlumeEvent;
+import org.apache.flume.source.avro.Status;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.log4j.PropertyConfigurator;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class TestLoadBalancingLog4jAppender{
+
+  private final List<CountingAvroSource> sources = Lists.newArrayList();
+  private Channel ch;
+  private ChannelSelector rcs;
+  private Logger fixture;
+
+  @Before
+  public void initiate() throws InterruptedException{
+    ch = new MemoryChannel();
+    Configurables.configure(ch, new Context());
+
+    List<Channel> channels = new ArrayList<Channel>();
+    channels.add(ch);
+
+    rcs = new ReplicatingChannelSelector();
+    rcs.setChannels(channels);
+  }
+
+  @After
+  public void cleanUp() {
+    for (Source source : sources) {
+      source.stop();
+    }
+  }
+
+  @Test
+  public void testLog4jAppenderRoundRobin() throws IOException {
+    int numberOfMsgs = 1000;
+    int expectedPerSource = 500;
+    File TESTFILE = new File(TestLoadBalancingLog4jAppender.class
+        .getClassLoader()
+        .getResource("flume-loadbalancinglog4jtest.properties").getFile());
+    startSources(TESTFILE, new int[] { 25430, 25431 });
+
+    sendAndAssertMessages(numberOfMsgs);
+
+    for (CountingAvroSource source : sources) {
+      Assert.assertEquals(expectedPerSource, source.appendCount.get());
+    }
+  }
+
+  @Test
+  public void testLog4jAppenderRandom() throws IOException {
+    int numberOfMsgs = 1000;
+    File TESTFILE = new File(TestLoadBalancingLog4jAppender.class
+        .getClassLoader()
+        
.getResource("flume-loadbalancing-rnd-log4jtest.properties").getFile());
+    startSources(TESTFILE, new int[] { 25430, 25431, 25432, 25433, 25434,
+        25435, 25436, 25437, 25438, 25439 });
+
+    sendAndAssertMessages(numberOfMsgs);
+
+    int total = 0;
+    Set<Integer> counts = new HashSet<Integer>();
+    for (CountingAvroSource source : sources) {
+      total += source.appendCount.intValue();
+      counts.add(source.appendCount.intValue());
+    }
+    // We are not testing distribution this is tested in the client
+    Assert.assertTrue("Very unusual distribution " + counts.size(), 
counts.size() > 2);
+    Assert.assertTrue("Missing events", total == numberOfMsgs);
+  }
+
+  @Test
+  public void testRandomBackoff() throws Exception {
+    File TESTFILE = new File(TestLoadBalancingLog4jAppender.class
+        .getClassLoader()
+        .getResource("flume-loadbalancing-backoff-log4jtest.properties")
+        .getFile());
+    startSources(TESTFILE, new int[] { 25430, 25431, 25432 });
+
+    sources.get(0).setFail();
+    sources.get(2).setFail();
+
+    sendAndAssertMessages(50);
+
+    Assert.assertEquals(50, sources.get(1).appendCount.intValue());
+    Assert.assertEquals(0, sources.get(0).appendCount.intValue());
+    Assert.assertEquals(0, sources.get(2).appendCount.intValue());
+    sources.get(0).setOk();
+    sources.get(1).setFail(); // s0 should still be backed off
+    try {
+      send(1);
+      // nothing should be able to process right now
+      Assert.fail("Expected EventDeliveryException");
+    } catch (FlumeException e) {
+      Assert.assertTrue(e.getCause() instanceof EventDeliveryException);
+    }
+    Thread.sleep(2500); // wait for s0 to no longer be backed off
+
+    sendAndAssertMessages(50);
+
+    Assert.assertEquals(50, sources.get(0).appendCount.intValue());
+    Assert.assertEquals(50, sources.get(1).appendCount.intValue());
+    Assert.assertEquals(0, sources.get(2).appendCount.intValue());
+  }
+
+  private void send(int numberOfMsgs) throws EventDeliveryException {
+    for (int count = 0; count < numberOfMsgs; count++) {
+      int level = count % 5;
+      String msg = "This is log message number" + String.valueOf(count);
+      fixture.log(Level.toLevel(level), msg);
+    }
+  }
+
+  private void sendAndAssertMessages(int numberOfMsgs) throws IOException {
+    for (int count = 0; count < numberOfMsgs; count++) {
+      int level = count % 5;
+      String msg = "This is log message number" + String.valueOf(count);
+      fixture.log(Level.toLevel(level), msg);
+
+      Transaction transaction = ch.getTransaction();
+      transaction.begin();
+      Event event = ch.take();
+      Assert.assertNotNull(event);
+      Assert.assertEquals(new String(event.getBody(), "UTF8"), msg);
+
+      Map<String, String> hdrs = event.getHeaders();
+
+      Assert.assertNotNull(hdrs.get(Log4jAvroHeaders.TIMESTAMP.toString()));
+
+      Assert.assertEquals(Level.toLevel(level),
+          Level.toLevel(hdrs.get(Log4jAvroHeaders.LOG_LEVEL.toString())));
+
+      Assert.assertEquals(fixture.getName(),
+          hdrs.get(Log4jAvroHeaders.LOGGER_NAME.toString()));
+
+      Assert.assertEquals("UTF8",
+          hdrs.get(Log4jAvroHeaders.MESSAGE_ENCODING.toString()));
+      // To confirm on console we actually got the body
+      System.out.println("Got body: " + new String(event.getBody(), "UTF8"));
+      transaction.commit();
+      transaction.close();
+    }
+
+  }
+
+  private void startSources(File log4jProps, int... ports) throws IOException {
+    for (int port : ports) {
+      CountingAvroSource source = new CountingAvroSource(port);
+      Context context = new Context();
+      context.put("port", String.valueOf(port));
+      context.put("bind", "0.0.0.0");
+      Configurables.configure(source, context);
+      sources.add(source);
+      source.setChannelProcessor(new ChannelProcessor(rcs));
+    }
+
+    for (Source source : sources) {
+      source.start();
+    }
+
+    // The properties file having Avro port info should be loaded only
+    // after the test begins, else log4j tries to connect to the source
+    // before the source has started up in the above function, since
+    // log4j setup is completed before the @Before calls also.
+    // This will cause the test to fail even before it starts!
+
+    FileReader reader = new FileReader(log4jProps);
+    Properties props = new Properties();
+    props.load(reader);
+    PropertyConfigurator.configure(props);
+    fixture = LogManager.getLogger(TestLoadBalancingLog4jAppender.class);
+  }
+
+  static class CountingAvroSource extends AvroSource {
+    AtomicInteger appendCount = new AtomicInteger();
+    volatile boolean isFail = false;
+       private final int port2;
+
+    public CountingAvroSource(int port) {
+               port2 = port;
+    }
+
+       public void setOk() {
+      this.isFail = false;
+    }
+
+    public void setFail() {
+      this.isFail = true;
+    }
+
+    @Override
+    public String getName() {
+      return "testing..." + port2;
+    }
+
+    @Override
+    public Status append(AvroFlumeEvent avroEvent) {
+      if (isFail) {
+        return Status.FAILED;
+      }
+      appendCount.incrementAndGet();
+      return super.append(avroEvent);
+    }
+
+    @Override
+    public Status appendBatch(List<AvroFlumeEvent> events) {
+      if (isFail) {
+        return Status.FAILED;
+      }
+      appendCount.addAndGet(events.size());
+      return super.appendBatch(events);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/102c5e07/flume-ng-clients/flume-ng-log4jappender/src/test/resources/flume-loadbalancing-backoff-log4jtest.properties
----------------------------------------------------------------------
diff --git 
a/flume-ng-clients/flume-ng-log4jappender/src/test/resources/flume-loadbalancing-backoff-log4jtest.properties
 
b/flume-ng-clients/flume-ng-log4jappender/src/test/resources/flume-loadbalancing-backoff-log4jtest.properties
new file mode 100644
index 0000000..6e8235e
--- /dev/null
+++ 
b/flume-ng-clients/flume-ng-log4jappender/src/test/resources/flume-loadbalancing-backoff-log4jtest.properties
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+log4j.appender.out2 = 
org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender
+log4j.appender.out2.Hosts = localhost:25430 localhost:25431 localhost:25432
+log4j.appender.out2.Selector = ROUND_ROBIN
+log4j.appender.out2.MaxBackoff = 30000
+log4j.logger.org.apache.flume.clients.log4jappender = DEBUG,out2
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flume/blob/102c5e07/flume-ng-clients/flume-ng-log4jappender/src/test/resources/flume-loadbalancing-rnd-log4jtest.properties
----------------------------------------------------------------------
diff --git 
a/flume-ng-clients/flume-ng-log4jappender/src/test/resources/flume-loadbalancing-rnd-log4jtest.properties
 
b/flume-ng-clients/flume-ng-log4jappender/src/test/resources/flume-loadbalancing-rnd-log4jtest.properties
new file mode 100644
index 0000000..fd43d19
--- /dev/null
+++ 
b/flume-ng-clients/flume-ng-log4jappender/src/test/resources/flume-loadbalancing-rnd-log4jtest.properties
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+log4j.appender.out2 = 
org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender
+log4j.appender.out2.Hosts = localhost:25430 localhost:25431 localhost:25432 
localhost:25433 localhost:25434 localhost:25435 localhost:25436 localhost:25437 
localhost:25438 localhost:25439
+log4j.appender.out2.Selector = RANDOM
+log4j.logger.org.apache.flume.clients.log4jappender = DEBUG,out2
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flume/blob/102c5e07/flume-ng-clients/flume-ng-log4jappender/src/test/resources/flume-loadbalancinglog4jtest.properties
----------------------------------------------------------------------
diff --git 
a/flume-ng-clients/flume-ng-log4jappender/src/test/resources/flume-loadbalancinglog4jtest.properties
 
b/flume-ng-clients/flume-ng-log4jappender/src/test/resources/flume-loadbalancinglog4jtest.properties
new file mode 100644
index 0000000..618e504
--- /dev/null
+++ 
b/flume-ng-clients/flume-ng-log4jappender/src/test/resources/flume-loadbalancinglog4jtest.properties
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+log4j.appender.out2 = 
org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender
+log4j.appender.out2.Hosts = localhost:25430 localhost:25431
+log4j.logger.org.apache.flume.clients.log4jappender = DEBUG,out2
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flume/blob/102c5e07/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst 
b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 8eb3734..8a4ecda 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -2491,6 +2491,67 @@ Sample log4j.properties file:
   log4j.logger.org.example.MyClass = DEBUG,flume
   #...
 
+Load Balancing Log4J Appender
+=============================
+
+Appends Log4j events to a list of flume agent's avro source. A client using 
this
+appender must have the flume-ng-sdk in the classpath (eg,
+flume-ng-sdk-1.4.0-SNAPSHOT.jar). This appender supports a round-robin and 
random
+scheme for performing the load balancing. It also supports a configurable 
backoff
+timeout so that down agents are removed temporarily from the set of hosts
+Required properties are in **bold**.
+
+=============  ===========  
==========================================================================
+Property Name  Default      Description
+=============  ===========  
==========================================================================
+**Hosts**      --           A space separated list of host:port
+                            at which Flume (through an AvroSource) is 
listening for events
+Selector       ROUND_ROBIN  Selection mechanism. Must be either ROUND_ROBIN,
+                            RANDOM or custom FQDN to class that inherits from 
LoadBalancingSelector.
+MaxBackoff     --           A long value representing the maximum amount of 
time in milliseconds
+                            the Load balancing client will backoff from a node 
that has failed to
+                            consume an event. Defaults to no backoff
+=============  ===========  
==========================================================================
+
+
+Sample log4j.properties file configured using defaults:
+
+.. code-block:: properties
+
+  #...
+  log4j.appender.out2 = 
org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender
+  log4j.appender.out2.Hosts = localhost:25430 localhost:25431
+
+  # configure a class's logger to output to the flume appender
+  log4j.logger.org.example.MyClass = DEBUG,flume
+  #...
+
+Sample log4j.properties file configured using RANDOM load balancing:
+
+.. code-block:: properties
+
+  #...
+  log4j.appender.out2 = 
org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender
+  log4j.appender.out2.Hosts = localhost:25430 localhost:25431
+  log4j.appender.out2.Selector = RANDOM
+
+  # configure a class's logger to output to the flume appender
+  log4j.logger.org.example.MyClass = DEBUG,flume
+  #...
+
+Sample log4j.properties file configured using backoff:
+
+.. code-block:: properties
+
+  #...
+  log4j.appender.out2 = 
org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender
+  log4j.appender.out2.Hosts = localhost:25430 localhost:25431 localhost:25432
+  log4j.appender.out2.Selector = ROUND_ROBIN
+  log4j.appender.out2.MaxBackoff = 30000
+
+  # configure a class's logger to output to the flume appender
+  log4j.logger.org.example.MyClass = DEBUG,flume
+  #...
 
 Security
 ========

Reply via email to