Author: hshreedharan
Date: Mon Jun 1 19:49:44 2015
New Revision: 1682982
URL: http://svn.apache.org/r1682982
Log:
FLUME-2702. Update site for Flume 1.6.0
Added:
flume/site/trunk/content/sphinx/releases/1.6.0.rst (with props)
Modified:
flume/site/trunk/content/sphinx/FlumeDeveloperGuide.rst
flume/site/trunk/content/sphinx/FlumeUserGuide.rst
flume/site/trunk/content/sphinx/download.rst
flume/site/trunk/content/sphinx/index.rst
flume/site/trunk/content/sphinx/releases/index.rst
Modified: flume/site/trunk/content/sphinx/FlumeDeveloperGuide.rst
URL:
http://svn.apache.org/viewvc/flume/site/trunk/content/sphinx/FlumeDeveloperGuide.rst?rev=1682982&r1=1682981&r2=1682982&view=diff
==============================================================================
--- flume/site/trunk/content/sphinx/FlumeDeveloperGuide.rst (original)
+++ flume/site/trunk/content/sphinx/FlumeDeveloperGuide.rst Mon Jun 1 19:49:44
2015
@@ -15,7 +15,7 @@
======================================
-Flume 1.5.2 Developer Guide
+Flume 1.6.0 Developer Guide
======================================
Introduction
@@ -277,6 +277,116 @@ properties:
request-timeout = 20000 # Must be >=1000 (default: 20000)
+Secure RPC client - Thrift
+''''''''''''''''''''''''''
+
+As of Flume 1.6.0, Thrift source and sink supports kerberos based
authentication.
+The client needs to use the getThriftInstance method of
``SecureRpcClientFactory``
+to get hold of a ``SecureThriftRpcClient``. ``SecureThriftRpcClient`` extends
+``ThriftRpcClient`` which implements the ``RpcClient`` interface. The kerberos
+authentication module resides in flume-ng-auth module which is
+required in classpath, when using the ``SecureRpcClientFactory``. Both the
client
+principal and the client keytab should be passed in as parameters through the
+properties and they reflect the credentials of the client to authenticate
+against the kerberos KDC. In addition, the server principal of the destination
+Thrift source to which this client is connecting to, should also be provided.
+The following example shows how to use the ``SecureRpcClientFactory``
+within a user's data-generating application:
+
+.. code-block:: java
+
+ import org.apache.flume.Event;
+ import org.apache.flume.EventDeliveryException;
+ import org.apache.flume.event.EventBuilder;
+ import org.apache.flume.api.SecureRpcClientFactory;
+ import org.apache.flume.api.RpcClientConfigurationConstants;
+ import org.apache.flume.api.RpcClient;
+ import java.nio.charset.Charset;
+ import java.util.Properties;
+
+ public class MyApp {
+ public static void main(String[] args) {
+ MySecureRpcClientFacade client = new MySecureRpcClientFacade();
+ // Initialize client with the remote Flume agent's host, port
+ Properties props = new Properties();
+ props.setProperty(RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE,
"thrift");
+ props.setProperty("hosts", "h1");
+ props.setProperty("hosts.h1", "client.example.org"+":"+
String.valueOf(41414));
+
+ // Initialize client with the kerberos authentication related properties
+ props.setProperty("kerberos", "true");
+ props.setProperty("client-principal",
"flumeclient/[email protected]");
+ props.setProperty("client-keytab", "/tmp/flumeclient.keytab");
+ props.setProperty("server-principal",
"flume/[email protected]");
+ client.init(props);
+
+ // Send 10 events to the remote Flume agent. That agent should be
+ // configured to listen with an AvroSource.
+ String sampleData = "Hello Flume!";
+ for (int i = 0; i < 10; i++) {
+ client.sendDataToFlume(sampleData);
+ }
+
+ client.cleanUp();
+ }
+ }
+
+ class MySecureRpcClientFacade {
+ private RpcClient client;
+ private Properties properties;
+
+ public void init(Properties properties) {
+ // Setup the RPC connection
+ this.properties = properties;
+ // Create the ThriftSecureRpcClient instance by using
SecureRpcClientFactory
+ this.client = SecureRpcClientFactory.getThriftInstance(properties);
+ }
+
+ public void sendDataToFlume(String data) {
+ // Create a Flume Event object that encapsulates the sample data
+ Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
+
+ // Send the event
+ try {
+ client.append(event);
+ } catch (EventDeliveryException e) {
+ // clean up and recreate the client
+ client.close();
+ client = null;
+ client = SecureRpcClientFactory.getThriftInstance(properties);
+ }
+ }
+
+ public void cleanUp() {
+ // Close the RPC connection
+ client.close();
+ }
+ }
+
+The remote ``ThriftSource`` should be started in kerberos mode.
+Below is an example Flume agent configuration that's waiting for a connection
+from MyApp:
+
+.. code-block:: properties
+
+ a1.channels = c1
+ a1.sources = r1
+ a1.sinks = k1
+
+ a1.channels.c1.type = memory
+
+ a1.sources.r1.channels = c1
+ a1.sources.r1.type = thrift
+ a1.sources.r1.bind = 0.0.0.0
+ a1.sources.r1.port = 41414
+ a1.sources.r1.kerberos = true
+ a1.sources.r1.agent-principal = flume/[email protected]
+ a1.sources.r1.agent-keytab = /tmp/flume.keytab
+
+
+ a1.sinks.k1.channel = c1
+ a1.sinks.k1.type = logger
+
Failover Client
'''''''''''''''
@@ -450,7 +560,7 @@ sources, sinks, and channels are allowed
is a special embedded source and events should be send to the source
via the put, putAll methods on the EmbeddedAgent object. Only File Channel
and Memory Channel are allowed as channels while Avro Sink is the only
-supported sink.
+supported sink. Interceptors are also supported by the embedded agent.
Note: The embedded agent has a dependency on hadoop-core.jar.
@@ -459,18 +569,29 @@ full Agent. The following is an exhausti
Required properties are in **bold**.
-==================== ================
==============================================
-Property Name Default Description
-==================== ================
==============================================
-source.type embedded The only available source is the
embedded source.
-**channel.type** -- Either ``memory`` or ``file`` which
correspond to MemoryChannel and FileChannel respectively.
-channel.* -- Configuration options for the channel
type requested, see MemoryChannel or FileChannel user guide for an exhaustive
list.
-**sinks** -- List of sink names
-**sink.type** -- Property name must match a name in the
list of sinks. Value must be ``avro``
-sink.* -- Configuration options for the sink.
See AvroSink user guide for an exhaustive list, however note AvroSink requires
at least hostname and port.
-**processor.type** -- Either ``failover`` or
``load_balance`` which correspond to FailoverSinksProcessor and
LoadBalancingSinkProcessor respectively.
-processor.* -- Configuration options for the sink
processor selected. See FailoverSinksProcessor and LoadBalancingSinkProcessor
user guide for an exhaustive list.
-==================== ================
==============================================
+===================== ================
======================================================================
+Property Name Default Description
+===================== ================
======================================================================
+source.type embedded The only available source is the
embedded source.
+**channel.type** -- Either ``memory`` or ``file`` which
correspond
+ to MemoryChannel and FileChannel
respectively.
+channel.* -- Configuration options for the channel
type requested,
+ see MemoryChannel or FileChannel user
guide for an exhaustive list.
+**sinks** -- List of sink names
+**sink.type** -- Property name must match a name in
the list of sinks.
+ Value must be ``avro``
+sink.* -- Configuration options for the sink.
+ See AvroSink user guide for an
exhaustive list,
+ however note AvroSink requires at
least hostname and port.
+**processor.type** -- Either ``failover`` or
``load_balance`` which correspond
+ to FailoverSinksProcessor and
LoadBalancingSinkProcessor respectively.
+processor.* -- Configuration options for the sink
processor selected.
+ See FailoverSinksProcessor and
LoadBalancingSinkProcessor
+ user guide for an exhaustive list.
+source.interceptors -- Space-separated list of interceptors
+source.interceptors.* -- Configuration options for individual
interceptors
+ specified in the source.interceptors
property
+===================== ================
======================================================================
Below is an example of how to use the agent:
@@ -487,6 +608,10 @@ Below is an example of how to use the ag
properties.put("sink2.hostname", "collector2.apache.org");
properties.put("sink2.port", "5565");
properties.put("processor.type", "load_balance");
+ properties.put("source.interceptors", "i1");
+ properties.put("source.interceptors.i1.type", "static");
+ properties.put("source.interceptors.i1.key", "key1");
+ properties.put("source.interceptors.i1.value", "value1");
EmbeddedAgent agent = new EmbeddedAgent("myagent");
Modified: flume/site/trunk/content/sphinx/FlumeUserGuide.rst
URL:
http://svn.apache.org/viewvc/flume/site/trunk/content/sphinx/FlumeUserGuide.rst?rev=1682982&r1=1682981&r2=1682982&view=diff
==============================================================================
--- flume/site/trunk/content/sphinx/FlumeUserGuide.rst (original)
+++ flume/site/trunk/content/sphinx/FlumeUserGuide.rst Mon Jun 1 19:49:44 2015
@@ -15,7 +15,7 @@
======================================
-Flume 1.5.2 User Guide
+Flume 1.6.0 User Guide
======================================
Introduction
@@ -234,6 +234,31 @@ The original Flume terminal will output
Congratulations - you've successfully configured and deployed a Flume agent!
Subsequent sections cover agent configuration in much more detail.
+
+Zookeeper based Configuration
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Flume supports Agent configurations via Zookeeper. *This is an experimental
feature.* The configuration file needs to be uploaded
+in the Zookeeper, under a configurable prefix. The configuration file is
stored in Zookeeper Node data.
+Following is how the Zookeeper Node tree would look like for agents a1 and a2
+
+.. code-block:: properties
+
+ - /flume
+ |- /a1 [Agent config file]
+ |- /a2 [Agent config file]
+
+Once the configuration file is uploaded, start the agent with following options
+
+ $ bin/flume-ng agent --conf conf -z zkhost:2181,zkhost1:2181 -p /flume
--name a1 -Dflume.root.logger=INFO,console
+
+================== ================
=========================================================================
+Argument Name Default Description
+================== ================
=========================================================================
+**z** -- Zookeeper connection string. Comma
separated list of hostname:port
+**p** /flume Base Path in Zookeeper to store Agent
configurations
+================== ================
=========================================================================
+
Installing third-party plugins
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@@ -707,7 +732,7 @@ keystore-password -- T
keystore-type JKS The type of the Java keystore. This can
be "JKS" or "PKCS12".
exclude-protocols SSLv3 Space-separated list of SSL/TLS
protocols to exclude. SSLv3 will always be excluded in addition to the
protocols specified.
ipFilter false Set this to true to enable ipFiltering
for netty
-ipFilter.rules -- Define N netty ipFilter pattern rules
with this config.
+ipFilterRules -- Define N netty ipFilter pattern rules
with this config.
================== ================
===================================================
Example for agent named a1:
@@ -721,15 +746,15 @@ Example for agent named a1:
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141
-Example of ipFilter.rules
+Example of ipFilterRules
-ipFilter.rules defines N netty ipFilters separated by a comma a pattern rule
must be in this format.
+ipFilterRules defines N netty ipFilters separated by a comma a pattern rule
must be in this format.
<'allow' or deny>:<'ip' or 'name' for computer name>:<pattern>
or
allow/deny:ip/name:pattern
-example: ipFilter.rules=allow:ip:127.*,allow:name:localhost,deny:ip:*
+example: ipFilterRules=allow:ip:127.*,allow:name:localhost,deny:ip:*
Note that the first rule to match will apply as the example below shows from a
client on the localhost
@@ -742,6 +767,9 @@ Thrift Source
Listens on Thrift port and receives events from external Thrift client streams.
When paired with the built-in ThriftSink on another (previous hop) Flume agent,
it can create tiered collection topologies.
+Thrift source can be configured to start in secure mode by enabling kerberos
authentication.
+agent-principal and agent-keytab are the properties used by the
+Thrift source to authenticate to the kerberos KDC.
Required properties are in **bold**.
================== ===========
===================================================
@@ -756,6 +784,14 @@ selector.type
selector.*
interceptors -- Space separated list of interceptors
interceptors.*
+ssl false Set this to true to enable SSL encryption.
You must also specify a "keystore" and a "keystore-password".
+keystore -- This is the path to a Java keystore file.
Required for SSL.
+keystore-password -- The password for the Java keystore. Required
for SSL.
+keystore-type JKS The type of the Java keystore. This can be
"JKS" or "PKCS12".
+exclude-protocols SSLv3 Space-separated list of SSL/TLS protocols to
exclude. SSLv3 will always be excluded in addition to the protocols specified.
+kerberos false Set to true to enable kerberos
authentication. In kerberos mode, agent-principal and agent-keytab are
required for successful authentication. The Thrift source in secure mode, will
accept connections only from Thrift clients that have kerberos enabled and are
successfully authenticated to the kerberos KDC.
+agent-principal -- The kerberos principal used by the Thrift
Source to authenticate to the kerberos KDC.
+agent-keytab â- The keytab location used by the Thrift
Source in combination with the agent-principal to authenticate to the kerberos
KDC.
================== ===========
===================================================
Example for agent named a1:
@@ -793,6 +829,7 @@ restartThrottle 10000 Amount of
restart false Whether the executed cmd should be restarted if
it dies
logStdErr false Whether the command's stderr should be logged
batchSize 20 The max number of lines to read and send to the
channel at a time
+batchTimeout 3000 Amount of time (in milliseconds) to wait, if the
buffer size was not reached, before data is pushed downstream
selector.type replicating replicating or multiplexing
selector.* Depends on the selector.type value
interceptors -- Space-separated list of interceptors
@@ -841,9 +878,9 @@ invoked directly. Common values for 'sh
.. code-block:: properties
- agent_foo.sources.tailsource-1.type = exec
- agent_foo.sources.tailsource-1.shell = /bin/bash -c
- agent_foo.sources.tailsource-1.command = for i in /path/*.txt; do cat $i;
done
+ a1.sources.tailsource-1.type = exec
+ a1.sources.tailsource-1.shell = /bin/bash -c
+ a1.sources.tailsource-1.command = for i in /path/*.txt; do cat $i; done
JMS Source
~~~~~~~~~~~
@@ -986,54 +1023,13 @@ Example for an agent named agent-1:
.. code-block:: properties
- agent-1.channels = ch-1
- agent-1.sources = src-1
+ a1.channels = ch-1
+ a1.sources = src-1
- agent-1.sources.src-1.type = spooldir
- agent-1.sources.src-1.channels = ch-1
- agent-1.sources.src-1.spoolDir = /var/log/apache/flumeSpool
- agent-1.sources.src-1.fileHeader = true
-
-Twitter 1% firehose Source (experimental)
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-.. warning::
- This source is hightly experimental and may change between minor versions of
Flume.
- Use at your own risk.
-
-Experimental source that connects via Streaming API to the 1% sample twitter
-firehose, continously downloads tweets, converts them to Avro format and
-sends Avro events to a downstream Flume sink. Requires the consumer and
-access tokens and secrets of a Twitter developer account.
-Required properties are in **bold**.
-
-====================== ===========
===================================================
-Property Name Default Description
-====================== ===========
===================================================
-**channels** --
-**type** -- The component type name, needs to be
``org.apache.flume.source.twitter.TwitterSource``
-**consumerKey** -- OAuth consumer key
-**consumerSecret** -- OAuth consumer secret
-**accessToken** -- OAuth access token
-**accessTokenSecret** -- OAuth toekn secret
-maxBatchSize 1000 Maximum number of twitter messages to put
in a single batch
-maxBatchDurationMillis 1000 Maximum number of milliseconds to wait
before closing a batch
-====================== ===========
===================================================
-
-Example for agent named a1:
-
-.. code-block:: properties
-
- a1.sources = r1
- a1.channels = c1
- a1.sources.r1.type = org.apache.flume.source.twitter.TwitterSource
- a1.sources.r1.channels = c1
- a1.sources.r1.consumerKey = YOUR_TWITTER_CONSUMER_KEY
- a1.sources.r1.consumerSecret = YOUR_TWITTER_CONSUMER_SECRET
- a1.sources.r1.accessToken = YOUR_TWITTER_ACCESS_TOKEN
- a1.sources.r1.accessTokenSecret = YOUR_TWITTER_ACCESS_TOKEN_SECRET
- a1.sources.r1.maxBatchSize = 10
- a1.sources.r1.maxBatchDurationMillis = 200
+ a1.sources.src-1.type = spooldir
+ a1.sources.src-1.channels = ch-1
+ a1.sources.src-1.spoolDir = /var/log/apache/flumeSpool
+ a1.sources.src-1.fileHeader = true
Event Deserializers
'''''''''''''''''''
@@ -1094,6 +1090,95 @@ Property Name Default
deserializer.maxBlobLength 100000000 The maximum number of bytes to
read and buffer for a given request
========================== ==================
=======================================================================
+Twitter 1% firehose Source (experimental)
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+.. warning::
+ This source is hightly experimental and may change between minor versions of
Flume.
+ Use at your own risk.
+
+Experimental source that connects via Streaming API to the 1% sample twitter
+firehose, continously downloads tweets, converts them to Avro format and
+sends Avro events to a downstream Flume sink. Requires the consumer and
+access tokens and secrets of a Twitter developer account.
+Required properties are in **bold**.
+
+====================== ===========
===================================================
+Property Name Default Description
+====================== ===========
===================================================
+**channels** --
+**type** -- The component type name, needs to be
``org.apache.flume.source.twitter.TwitterSource``
+**consumerKey** -- OAuth consumer key
+**consumerSecret** -- OAuth consumer secret
+**accessToken** -- OAuth access token
+**accessTokenSecret** -- OAuth toekn secret
+maxBatchSize 1000 Maximum number of twitter messages to put
in a single batch
+maxBatchDurationMillis 1000 Maximum number of milliseconds to wait
before closing a batch
+====================== ===========
===================================================
+
+Example for agent named a1:
+
+.. code-block:: properties
+
+ a1.sources = r1
+ a1.channels = c1
+ a1.sources.r1.type = org.apache.flume.source.twitter.TwitterSource
+ a1.sources.r1.channels = c1
+ a1.sources.r1.consumerKey = YOUR_TWITTER_CONSUMER_KEY
+ a1.sources.r1.consumerSecret = YOUR_TWITTER_CONSUMER_SECRET
+ a1.sources.r1.accessToken = YOUR_TWITTER_ACCESS_TOKEN
+ a1.sources.r1.accessTokenSecret = YOUR_TWITTER_ACCESS_TOKEN_SECRET
+ a1.sources.r1.maxBatchSize = 10
+ a1.sources.r1.maxBatchDurationMillis = 200
+
+Kafka Source
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Kafka Source is an Apache Kafka consumer that reads messages from a Kafka
topic.
+If you have multiple Kafka sources running, you can configure them with the
same Consumer Group
+so each will read a unique set of partitions for the topic.
+
+
+
+=============================== ===========
===================================================
+Property Name Default Description
+=============================== ===========
===================================================
+**channels** --
+**type** -- The component type name, needs
to be ``org.apache.flume.source.kafka,KafkaSource``
+**zookeeperConnect** -- URI of ZooKeeper used by Kafka
cluster
+**groupId** flume Unique identified of consumer
group. Setting the same id in multiple sources or agents
+ indicates that they are part of
the same consumer group
+**topic** -- Kafka topic we'll read messages
from. At the time, this is a single topic only.
+batchSize 1000 Maximum number of messages
written to Channel in one batch
+batchDurationMillis 1000 Maximum time (in ms) before a
batch will be written to Channel
+ The batch will be written
whenever the first of size and time will be reached.
+Other Kafka Consumer Properties -- These properties are used to
configure the Kafka Consumer. Any producer property supported
+ by Kafka can be used. The only
requirement is to prepend the property name with the prefix ``kafka.``.
+ For example:
kafka.consumer.timeout.ms
+ Check `Kafka documentation
<https://kafka.apache.org/08/configuration.html#consumerconfigs>` for details
+=============================== ===========
===================================================
+
+.. note:: The Kafka Source overrides two Kafka consumer parameters:
+ auto.commit.enable is set to "false" by the source and we commit
every batch. For improved performance
+ this can be set to "true", however, this can lead to loss of data
+ consumer.timeout.ms is set to 10ms, so when we check Kafka for new
data we wait at most 10ms for the data to arrive
+ setting this to a higher value can reduce CPU utilization (we'll
poll Kafka in less of a tight loop), but also means
+ higher latency in writing batches to channel (since we'll wait
longer for data to arrive).
+
+
+Example for agent named tier1:
+
+.. code-block:: properties
+
+ tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
+ tier1.sources.source1.channels = channel1
+ tier1.sources.source1.zookeeperConnect = localhost:2181
+ tier1.sources.source1.topic = test1
+ tier1.sources.source1.groupId = flume
+ tier1.sources.source1.kafka.consumer.timeout.ms = 100
+
+
+
NetCat Source
~~~~~~~~~~~~~
@@ -1182,8 +1267,13 @@ Property Name Default Descriptio
**host** -- Host name or IP address to bind to
**port** -- Port # to bind to
eventSize 2500 Maximum size of a single event line, in bytes
-keepFields false Setting this to true will preserve the Priority,
+keepFields none Setting this to 'all' will preserve the Priority,
Timestamp and Hostname in the body of the event.
+ A spaced separated list of fields to include
+ is allowed as well. Currently, the following
+ fields can be included: priority, version,
+ timestamp, hostname. The values 'true' and
'false'
+ have been deprecated in favor of 'all' and
'none'.
selector.type replicating or multiplexing
selector.* replicating Depends on the selector.type value
interceptors -- Space-separated list of interceptors
@@ -1220,8 +1310,13 @@ Property Name Default
**host** -- Host name or IP address to bind to.
**ports** -- Space-separated list (one or more) of
ports to bind to.
eventSize 2500 Maximum size of a single event line,
in bytes.
-keepFields false Setting this to true will preserve the
+keepFields none Setting this to 'all' will preserve the
Priority, Timestamp and Hostname in
the body of the event.
+ A spaced separated list of fields to
include
+ is allowed as well. Currently, the
following
+ fields can be included: priority,
version,
+ timestamp, hostname. The values 'true'
and 'false'
+ have been deprecated in favor of 'all'
and 'none'.
portHeader -- If specified, the port number will be
stored in the header of each event using the header name specified here. This
allows for interceptors and channel selectors to customize routing logic based
on the incoming port.
charset.default UTF-8 Default character set used while
parsing syslog events into strings.
charset.port.<port> -- Character set is configurable on a
per-port basis.
@@ -1374,6 +1469,37 @@ Property Name Default
handler.maxBlobLength 100000000 The maximum number of bytes to read
and buffer for a given request
===================== ==================
============================================================================
+Stress Source
+~~~~~~~~~~~~~
+
+StressSource is an internal load-generating source implementation which is
very useful for
+stress tests. It allows User to configure the size of Event payload, with
empty headers.
+User can configure total number of events to be sent as well maximum number of
Successful
+Event to be delivered.
+
+Required properties are in **bold**.
+
+=================== ===========
===================================================
+Property Name Default Description
+=================== ===========
===================================================
+**type** -- The component type name, needs to be
``org.apache.flume.source.StressSource``
+size 500 Payload size of each Event. Unit:**byte**
+maxTotalEvents -1 Maximum number of Events to be sent
+maxSuccessfulEvents -1 Maximum number of Events successfully sent
+batchSize 1 Number of Events to be sent in one batch
+=================== ===========
===================================================
+
+Example for agent named **a1**:
+
+.. code-block:: properties
+
+ a1.sources = stresssource-1
+ a1.channels = memoryChannel-1
+ a1.sources.stresssource-1.type = org.apache.flume.source.StressSource
+ a1.sources.stresssource-1.size = 10240
+ a1.sources.stresssource-1.maxTotalEvents = 1000000
+ a1.sources.stresssource-1.channels = memoryChannel-1
+
Legacy Sources
~~~~~~~~~~~~~~
@@ -1485,15 +1611,16 @@ Flume should use ScribeSource based on T
For deployment of Scribe please follow the guide from Facebook.
Required properties are in **bold**.
-============== =========== ==============================================
-Property Name Default Description
-============== =========== ==============================================
-**type** -- The component type name, needs to be
``org.apache.flume.source.scribe.ScribeSource``
-port 1499 Port that Scribe should be connected
-workerThreads 5 Handing threads number in Thrift
+==================== ===========
==============================================
+Property Name Default Description
+==================== ===========
==============================================
+**type** -- The component type name, needs to be
``org.apache.flume.source.scribe.ScribeSource``
+port 1499 Port that Scribe should be connected
+maxReadBufferBytes 16384000 Thrift Default FrameBuffer Size
+workerThreads 5 Handing threads number in Thrift
selector.type
selector.*
-============== =========== ==============================================
+==================== ===========
==============================================
Example for agent named a1:
@@ -1537,12 +1664,14 @@ Alias Description
%B locale's long month name (January, February, ...)
%c locale's date and time (Thu Mar 3 23:05:25 2005)
%d day of month (01)
+%e day of month without padding (1)
%D date; same as %m/%d/%y
%H hour (00..23)
%I hour (01..12)
%j day of year (001..366)
%k hour ( 0..23)
%m month (01..12)
+%n month without padding (1..12)
%M minute (00..59)
%p locale's equivalent of am or pm
%s seconds since 1970-01-01 00:00:00 UTC
@@ -1599,10 +1728,10 @@ hdfs.roundValue 1 Ro
hdfs.roundUnit second The unit of the round down value -
``second``, ``minute`` or ``hour``.
hdfs.timeZone Local Time Name of the timezone that should be used
for resolving the directory path, e.g. America/Los_Angeles.
hdfs.useLocalTimeStamp false Use the local time (instead of the
timestamp from the event header) while replacing the escape sequences.
-hdfs.closeTries 0 Number of times the sink must try to
close a file. If set to 1, this sink will not re-try a failed close
+hdfs.closeTries 0 Number of times the sink must try
renaming a file, after initiating a close attempt. If set to 1, this sink will
not re-try a failed rename
(due to, for example, NameNode or
DataNode failure), and may leave the file in an open state with a .tmp
extension.
- If set to 0, the sink will try to close
the file until the file is eventually closed
- (there is no limit on the number of
times it would try).
+ If set to 0, the sink will try to rename
the file until the file is eventually renamed (there is no limit on the number
of times it would try).
+ The file may still remain open if the
close call fails but the data will be intact and in this case, the file will be
closed only after a Flume restart.
hdfs.retryInterval 180 Time in seconds between consecutive
attempts to close a file. Each close call costs multiple RPC round-trips to the
Namenode,
so setting this too low can cause a lot
of load on the name node. If set to 0 or less, the sink will not
attempt to close the file if the first
attempt fails, and may leave the file open or with a ".tmp" extension.
@@ -1630,6 +1759,149 @@ The above configuration will round down
timestamp 11:54:34 AM, June 12, 2012 will cause the hdfs path to become
``/flume/events/2012-06-12/1150/00``.
+Hive Sink
+~~~~~~~~~
+
+This sink streams events containing delimited text or JSON data directly into
a Hive table or partition.
+Events are written using Hive transactions. As soon as a set of events are
committed to Hive, they become
+immediately visible to Hive queries. Partitions to which flume will stream to
can either be pre-created
+or, optionally, Flume can create them if they are missing. Fields from
incoming event data are mapped to
+corresponding columns in the Hive table. **This sink is provided as a preview
feature and not recommended
+for use in production.**
+
+====================== ============
======================================================================
+Name Default Description
+====================== ============
======================================================================
+**channel** --
+**type** -- The component type name, needs to be
``hive``
+**hive.metastore** -- Hive metastore URI (eg
thrift://a.b.com:9083 )
+**hive.database** -- Hive database name
+**hive.table** -- Hive table name
+hive.partition -- Comma separate list of partition
values identifying the partition to write to. May contain escape
+ sequences. E.g: If the table is
partitioned by (continent: string, country :string, time : string)
+ then 'Asia,India,2014-02-26-01-21'
will indicate continent=Asia,country=India,time=2014-02-26-01-21
+hive.txnsPerBatchAsk 100 Hive grants a *batch of transactions*
instead of single transactions to streaming clients like Flume.
+ This setting configures the number of
desired transactions per Transaction Batch. Data from all
+ transactions in a single batch end up
in a single file. Flume will write a maximum of batchSize events
+ in each transaction in the batch. This
setting in conjunction with batchSize provides control over the
+ size of each file. Note that
eventually Hive will transparently compact these files into larger files.
+heartBeatInterval 240 (In seconds) Interval between
consecutive heartbeats sent to Hive to keep unused transactions from expiring.
+ Set this value to 0 to disable
heartbeats.
+autoCreatePartitions true Flume will automatically create the
necessary Hive partitions to stream to
+batchSize 15000 Max number of events written to Hive
in a single Hive transaction
+maxOpenConnections 500 Allow only this number of open
connections. If this number is exceeded, the least recently used connection is
closed.
+callTimeout 10000 (In milliseconds) Timeout for Hive &
HDFS I/O operations, such as openTxn, write, commit, abort.
+**serializer** Serializer is responsible for parsing
out field from the event and mapping them to columns in the hive table.
+ Choice of serializer depends upon the
format of the data in the event. Supported serializers: DELIMITED and JSON
+roundUnit minute The unit of the round down value -
``second``, ``minute`` or ``hour``.
+roundValue 1 Rounded down to the highest multiple
of this (in the unit configured using hive.roundUnit), less than current time
+timeZone Local Time Name of the timezone that should be
used for resolving the escape sequences in partition, e.g. America/Los_Angeles.
+useLocalTimeStamp false Use the local time (instead of the
timestamp from the event header) while replacing the escape sequences.
+====================== ============
======================================================================
+
+Following serializers are provided for Hive sink:
+
+**JSON**: Handles UTF8 encoded Json (strict syntax) events and requires no
configration. Object names
+in the JSON are mapped directly to columns with the same name in the Hive
table.
+Internally uses org.apache.hive.hcatalog.data.JsonSerDe but is independent of
the Serde of the Hive table.
+This serializer requires HCatalog to be installed.
+
+**DELIMITED**: Handles simple delimited textual events.
+Internally uses LazySimpleSerde but is independent of the Serde of the Hive
table.
+
+========================== ============
======================================================================
+Name Default Description
+========================== ============
======================================================================
+serializer.delimiter , (Type: string) The field delimiter
in the incoming data. To use special
+ characters, surround them with
double quotes like "\\t"
+**serializer.fieldnames** -- The mapping from input fields to
columns in hive table. Specified as a
+ comma separated list (no spaces)
of hive table columns names, identifying
+ the input fields in order of their
occurrence. To skip fields leave the
+ column name unspecified. Eg.
'time,,ip,message' indicates the 1st, 3rd
+ and 4th fields in input map to
time, ip and message columns in the hive table.
+serializer.serdeSeparator Ctrl-A (Type: character) Customizes the
separator used by underlying serde. There
+ can be a gain in efficiency if the
fields in serializer.fieldnames are in
+ same order as table columns, the
serializer.delimiter is same as the
+ serializer.serdeSeparator and
number of fields in serializer.fieldnames
+ is less than or equal to number of
table columns, as the fields in incoming
+ event body do not need to be
reordered to match order of table columns.
+ Use single quotes for special
characters like '\\t'.
+ Ensure input fields do not contain
this character. NOTE: If serializer.delimiter
+ is a single character, preferably
set this to the same character
+========================== ============
======================================================================
+
+
+The following are the escape sequences supported:
+
+========= =================================================
+Alias Description
+========= =================================================
+%{host} Substitute value of event header named "host". Arbitrary header
names are supported.
+%t Unix time in milliseconds
+%a locale's short weekday name (Mon, Tue, ...)
+%A locale's full weekday name (Monday, Tuesday, ...)
+%b locale's short month name (Jan, Feb, ...)
+%B locale's long month name (January, February, ...)
+%c locale's date and time (Thu Mar 3 23:05:25 2005)
+%d day of month (01)
+%D date; same as %m/%d/%y
+%H hour (00..23)
+%I hour (01..12)
+%j day of year (001..366)
+%k hour ( 0..23)
+%m month (01..12)
+%M minute (00..59)
+%p locale's equivalent of am or pm
+%s seconds since 1970-01-01 00:00:00 UTC
+%S second (00..59)
+%y last two digits of year (00..99)
+%Y year (2010)
+%z +hhmm numeric timezone (for example, -0400)
+========= =================================================
+
+
+.. note:: For all of the time related escape sequences, a header with the key
+ "timestamp" must exist among the headers of the event (unless
``useLocalTimeStamp`` is set to ``true``). One way to add
+ this automatically is to use the TimestampInterceptor.
+
+Example Hive table :
+
+.. code-block:: properties
+
+ create table weblogs ( id int , msg string )
+ partitioned by (continent string, country string, time string)
+ clustered by (id) into 5 buckets
+ stored as orc;
+
+Example for agent named a1:
+
+.. code-block:: properties
+
+ a1.channels = c1
+ a1.channels.c1.type = memory
+ a1.sinks = k1
+ a1.sinks.k1.type = hive
+ a1.sinks.k1.channel = c1
+ a1.sinks.k1.hive.metastore = thrift://127.0.0.1:9083
+ a1.sinks.k1.hive.database = logsdb
+ a1.sinks.k1.hive.table = weblogs
+ a1.sinks.k1.hive.partition = asia,%{country},%y-%m-%d-%H-%M
+ a1.sinks.k1.useLocalTimeStamp = false
+ a1.sinks.k1.round = true
+ a1.sinks.k1.roundValue = 10
+ a1.sinks.k1.roundUnit = minute
+ a1.sinks.k1.serializer = DELIMITED
+ a1.sinks.k1.serializer.delimiter = "\t"
+ a1.sinks.k1.serializer.serdeSeparator = '\t'
+ a1.sinks.k1.serializer.fieldnames =id,,msg
+
+
+The above configuration will round down the timestamp to the last 10th minute.
For example, an event with
+timestamp header set to 11:54:34 AM, June 12, 2012 and 'country' header set to
'india' will evaluate to the
+partition (continent='asia',country='india',time='2012-06-12-11-50'. The
serializer is configured to
+accept tab separated input containing three fields and to skip the second
field.
+
+
Logger Sink
~~~~~~~~~~~
@@ -1641,6 +1913,7 @@ Property Name Default Description
============== ======= ===========================================
**channel** --
**type** -- The component type name, needs to be ``logger``
+maxBytesToLog 16 Maximum number of bytes of the Event body to log
============== ======= ===========================================
Example for agent named a1:
@@ -1701,6 +1974,12 @@ This sink forms one half of Flume's tier
sent to this sink are turned into Thrift events and sent to the configured
hostname / port pair. The events are taken from the configured Channel in
batches of the configured batch size.
+
+Thrift sink can be configured to start in secure mode by enabling kerberos
authentication.
+To communicate with a Thrift source started in secure mode, the Thrift sink
should also
+operate in secure mode. client-principal and client-keytab are the properties
used by the
+Thrift sink to authenticate to the kerberos KDC. The server-principal
represents the
+principal of the Thrift source this sink is configured to connect to in secure
mode.
Required properties are in **bold**.
========================== =======
==============================================
@@ -1714,6 +1993,15 @@ batch-size 100 nu
connect-timeout 20000 Amount of time (ms) to allow for the
first (handshake) request.
request-timeout 20000 Amount of time (ms) to allow for
requests after the first.
connection-reset-interval none Amount of time (s) before the connection
to the next hop is reset. This will force the Thrift Sink to reconnect to the
next hop. This will allow the sink to connect to hosts behind a hardware
load-balancer when news hosts are added without having to restart the agent.
+ssl false Set to true to enable SSL for this
ThriftSink. When configuring SSL, you can optionally set a "truststore",
"truststore-password" and "truststore-type"
+truststore -- The path to a custom Java truststore
file. Flume uses the certificate authority information in this file to
determine whether the remote Thrift Source's SSL authentication credentials
should be trusted. If not specified, the default Java JSSE certificate
authority files (typically "jssecacerts" or "cacerts" in the Oracle JRE) will
be used.
+truststore-password -- The password for the specified
truststore.
+truststore-type JKS The type of the Java truststore. This
can be "JKS" or other supported Java truststore type.
+exclude-protocols SSLv3 Space-separated list of SSL/TLS
protocols to exclude
+kerberos false Set to true to enable kerberos
authentication. In kerberos mode, client-principal, client-keytab and
server-principal are required for successful authentication and communication
to a kerberos enabled Thrift Source.
+client-principal â- The kerberos principal used by the
Thrift Sink to authenticate to the kerberos KDC.
+client-keytab â- The keytab location used by the Thrift
Sink in combination with the client-principal to authenticate to the kerberos
KDC.
+server-principal -- The kerberos principal of the Thrift
Source to which the Thrift Sink is configured to connect to.
========================== =======
==============================================
Example for agent named a1:
@@ -2011,7 +2299,9 @@ Property Name Default
**type** --
The component type name, needs to be
``org.apache.flume.sink.elasticsearch.ElasticSearchSink``
**hostNames** --
Comma separated list of hostname:port, if the port is not present
the default port '9300' will be used
indexName flume
The name of the index which the date will be appended to. Example
'flume' -> 'flume-yyyy-MM-dd'
+
Arbitrary header substitution is supported, eg. %{header} replaces
with value of named event header
indexType logs
The type to index the document to, defaults to 'log'
+
Arbitrary header substitution is supported, eg. %{header} replaces
with value of named event header
clusterName elasticsearch
Name of the ElasticSearch cluster to connect to
batchSize 100
Number of events to be written per txn.
ttl --
TTL in days, when set will cause the expired documents to be
deleted automatically,
@@ -2024,6 +2314,10 @@ serializer org.apache.flume.sink.
serializer.* --
Properties to be passed to the serializer.
================
========================================================================
=======================================================================================================
+.. note:: Header substitution is a handy to use the value of an event header
to dynamically decide the indexName and indexType to use when storing the event.
+ Caution should be used in using this feature as the event submitter
now has control of the indexName and indexType.
+ Furthermore, if the elasticsearch REST client is used then the event
submitter has control of the URL path used.
+
Example for agent named a1:
.. code-block:: properties
@@ -2040,18 +2334,13 @@ Example for agent named a1:
a1.sinks.k1.serializer =
org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer
a1.sinks.k1.channel = c1
-Kite Dataset Sink (experimental)
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-.. warning::
- This source is experimental and may change between minor versions of Flume.
- Use at your own risk.
+Kite Dataset Sink
+~~~~~~~~~~~~~~~~~
-Experimental sink that writes events to a `Kite Dataset
<http://kitesdk.org/docs/current/kite-data/guide.html>`_.
+Experimental sink that writes events to a `Kite Dataset
<http://kitesdk.org/docs/current/guide/>`_.
This sink will deserialize the body of each incoming event and store the
-resulting record in a Kite Dataset. It determines target Dataset by opening a
-repository URI, ``kite.repo.uri``, and loading a Dataset by name,
-``kite.dataset.name``.
+resulting record in a Kite Dataset. It determines target Dataset by loading a
+dataset by URI.
The only supported serialization is avro, and the record schema must be passed
in the event headers, using either ``flume.avro.schema.literal`` with the JSON
@@ -2065,20 +2354,104 @@ Note 2: In some cases, file rolling may
has been exceeded. However, this delay will not exceed 5 seconds. In most
cases, the delay is neglegible.
-======================= =======
===========================================================
-Property Name Default Description
-======================= =======
===========================================================
-**channel** --
-**type** -- Must be
org.apache.flume.sink.kite.DatasetSink
-**kite.repo.uri** -- URI of the repository to open
-**kite.dataset.name** -- Name of the Dataset where records will be
written
-kite.batchSize 100 Number of records to process in each batch
-kite.rollInterval 30 Maximum wait time (seconds) before data
files are released
-auth.kerberosPrincipal -- Kerberos user principal for secure
authentication to HDFS
-auth.kerberosKeytab -- Kerberos keytab location (local FS) for the
principal
-auth.proxyUser -- The effective user for HDFS actions, if
different from
- the kerberos principal
-======================= =======
===========================================================
+============================ =======
===========================================================
+Property Name Default Description
+============================ =======
===========================================================
+**channel** --
+**type** -- Must be
org.apache.flume.sink.kite.DatasetSink
+**kite.dataset.uri** -- URI of the dataset to open
+kite.repo.uri -- URI of the repository to open
+ (deprecated; use kite.dataset.uri
instead)
+kite.dataset.namespace -- Namespace of the Dataset where records
will be written
+ (deprecated; use kite.dataset.uri
instead)
+kite.dataset.name -- Name of the Dataset where records will
be written
+ (deprecated; use kite.dataset.uri
instead)
+kite.batchSize 100 Number of records to process in each
batch
+kite.rollInterval 30 Maximum wait time (seconds) before data
files are released
+kite.flushable.commitOnBatch true If ``true``, the Flume transaction will
be commited and the
+ writer will be flushed on each batch of
``kite.batchSize``
+ records. This setting only applies to
flushable datasets. When
+ ``true``, it's possible for temp files
with commited data to be
+ left in the dataset directory. These
files need to be recovered
+ by hand for the data to be visible to
DatasetReaders.
+kite.syncable.syncOnBatch true Controls whether the sink will also
sync data when committing
+ the transaction. This setting only
applies to syncable datasets.
+ Syncing gaurentees that data will be
written on stable storage
+ on the remote system while flushing
only gaurentees that data
+ has left Flume's client buffers. When
the
+ ``kite.flushable.commitOnBatch``
property is set to ``false``,
+ this property must also be set to
``false``.
+kite.entityParser avro Parser that turns Flume ``Events`` into
Kite entities.
+ Valid values are ``avro`` and the
fully-qualified class name
+ of an implementation of the
``EntityParser.Builder`` interface.
+kite.failurePolicy retry Policy that handles non-recoverable
errors such as a missing
+ ``Schema`` in the ``Event`` header. The
default value, ``retry``,
+ will fail the current batch and try
again which matches the old
+ behavior. Other valid values are
``save``, which will write the
+ raw ``Event`` to the
``kite.error.dataset.uri`` dataset, and the
+ fully-qualified class name of an
implementation of the
+ ``FailurePolicy.Builder`` interface.
+kite.error.dataset.uri -- URI of the dataset where failed events
are saved when
+ ``kite.failurePolicy`` is set to
``save``. **Required** when
+ the ``kite.failurePolicy`` is set to
``save``.
+auth.kerberosPrincipal -- Kerberos user principal for secure
authentication to HDFS
+auth.kerberosKeytab -- Kerberos keytab location (local FS) for
the principal
+auth.proxyUser -- The effective user for HDFS actions, if
different from
+ the kerberos principal
+============================ =======
===========================================================
+
+
+Kafka Sink
+~~~~~~~~~~
+This is a Flume Sink implementation that can publish data to a
+`Kafka <http://kafka.apache.org/>`_ topic. One of the objective is to
integrate Flume
+with Kafka so that pull based processing systems can process the data coming
+through various Flume sources. This currently supports Kafka 0.8.x series of
releases.
+
+Required properties are marked in bold font.
+
+
+=============================== ===================
=============================================================================================
+Property Name Default Description
+=============================== ===================
=============================================================================================
+**type** -- Must be set to
``org.apache.flume.sink.kafka.KafkaSink``
+**brokerList** -- List of brokers
Kafka-Sink will connect to, to get the list of topic partitions
+ This can be a partial
list of brokers, but we recommend at least two for HA.
+ The format is comma
separated list of hostname:port
+topic default-flume-topic The topic in Kafka to
which the messages will be published. If this parameter is configured,
+ messages will be
published to this topic.
+ If the event header
contains a "topic" field, the event will be published to that topic
+ overriding the topic
configured here.
+batchSize 100 How many messages to
process in one batch. Larger batches improve throughput while adding latency.
+requiredAcks 1 How many replicas must
acknowledge a message before its considered successfully written.
+ Accepted values are 0
(Never wait for acknowledgement), 1 (wait for leader only), -1 (wait for all
replicas)
+ Set this to -1 to avoid
data loss in some cases of leader failure.
+Other Kafka Producer Properties -- These properties are
used to configure the Kafka Producer. Any producer property supported
+ by Kafka can be used.
The only requirement is to prepend the property name with the prefix ``kafka.``.
+ For example:
kafka.producer.type
+=============================== ===================
=============================================================================================
+
+.. note:: Kafka Sink uses the ``topic`` and ``key`` properties from the
FlumeEvent headers to send events to Kafka.
+ If ``topic`` exists in the headers, the event will be sent to that
specific topic, overriding the topic configured for the Sink.
+ If ``key`` exists in the headers, the key will used by Kafka to
partition the data between the topic partitions. Events with same key
+ will be sent to the same partition. If the key is null, events
will be sent to random partitions.
+
+An example configuration of a Kafka sink is given below. Properties starting
+with the prefix ``kafka`` (the last 3 properties) are used when instantiating
+the Kafka producer. The properties that are passed when creating the Kafka
+producer are not limited to the properties given in this example.
+Also it's possible include your custom properties here and access them inside
+the preprocessor through the Flume Context object passed in as a method
+argument.
+
+.. code-block:: properties
+
+ a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
+ a1.sinks.k1.topic = mytopic
+ a1.sinks.k1.brokerList = localhost:9092
+ a1.sinks.k1.requiredAcks = 1
+ a1.sinks.k1.batchSize = 20
+ a1.sinks.k1.channel = c1
Custom Sink
~~~~~~~~~~~
@@ -2189,6 +2562,60 @@ Example for agent named a1:
a1.channels = c1
a1.channels.c1.type = jdbc
+Kafka Channel
+~~~~~~~~~~~~~
+
+The events are stored in a Kafka cluster (must be installed separately). Kafka
provides high availability and
+replication, so in case an agent or a kafka broker crashes, the events are
immediately available to other sinks
+
+The Kafka channel can be used for multiple scenarios:
+* With Flume source and sink - it provides a reliable and highly available
channel for events
+* With Flume source and interceptor but no sink - it allows writing Flume
events into a Kafka topic, for use by other apps
+* With Flume sink, but no source - it is a low-latency, fault tolerant way to
send events from Kafka to Flume sources such as HDFS, HBase or Solr
+
+Required properties are in **bold**.
+
+====================== ==========================
===============================================================================================================
+Property Name Default Description
+====================== ==========================
===============================================================================================================
+**type** -- The component type name,
needs to be ``org.apache.flume.channel.kafka.KafkaChannel``
+**brokerList** -- List of brokers in the
Kafka cluster used by the channel
+ This can be a partial list
of brokers, but we recommend at least two for HA.
+ The format is comma
separated list of hostname:port
+**zookeeperConnect** -- URI of ZooKeeper used by
Kafka cluster
+ The format is comma
separated list of hostname:port. If chroot is used, it is added once at the end.
+ For example:
zookeeper-1:2181,zookeeper-2:2182,zookeeper-3:2181/kafka
+topic flume-channel Kafka topic which the
channel will use
+groupId flume Consumer group ID the
channel uses to register with Kafka.
+ Multiple channels must use
the same topic and group to ensure that when one agent fails another can get
the data
+ Note that having
non-channel consumers with the same ID can lead to data loss.
+parseAsFlumeEvent true Expecting Avro datums with
FlumeEvent schema in the channel.
+ This should be true if
Flume source is writing to the channel
+ And false if other
producers are writing into the topic that the channel is using
+ Flume source messages to
Kafka can be parsed outside of Flume by using
+
org.apache.flume.source.avro.AvroFlumeEvent provided by the flume-ng-sdk
artifact
+readSmallestOffset false When set to true, the
channel will read all data in the topic, starting from the oldest event
+ when false, it will read
only events written after the channel started
+ When "parseAsFlumeEvent"
is true, this will be false. Flume source will start prior to the sinks and this
+ guarantees that events
sent by source before sinks start will not be lost.
+Other Kafka Properties -- These properties are used
to configure the Kafka Producer and Consumer used by the channel.
+ Any property supported by
Kafka can be used.
+ The only requirement is to
prepend the property name with the prefix ``kafka.``.
+ For example:
kafka.producer.type
+====================== ==========================
===============================================================================================================
+
+.. note:: Due to the way the channel is load balanced, there may be duplicate
events when the agent first starts up
+
+Example for agent named a1:
+
+.. code-block:: properties
+
+ a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
+ a1.channels.channel1.capacity = 10000
+ a1.channels.channel1.transactionCapacity = 1000
+ a1.channels.channel1.brokerList=kafka-2:9092,kafka-3:9092
+ a1.channels.channel1.topic=channel1
+ a1.channels.channel1.zookeeperConnect=kafka-1:2181
File Channel
~~~~~~~~~~~~
@@ -2211,6 +2638,7 @@ capacity
keep-alive 3
Amount of time (in sec) to wait for a put operation
use-log-replay-v1 false
Expert: Use old replay logic
use-fast-replay false
Expert: Replay without using queue
+checkpointOnClose true
Controls if a checkpoint is created when the channel is closed. Creating a
checkpoint on close speeds up subsequent startup of the file channel by
avoiding replay.
encryption.activeKey --
Key name used to encrypt new data
encryption.cipherProvider --
Cipher provider type, supported types: AESCTRNOPADDING
encryption.keyProvider --
Key provider type, supported types: JCEKSFILE
@@ -2533,7 +2961,12 @@ that so long as one is available events
The failover mechanism works by relegating failed sinks to a pool where
they are assigned a cool down period, increasing with sequential failures
before they are retried. Once a sink successfully sends an event, it is
-restored to the live pool.
+restored to the live pool. The Sinks have a priority associated with them,
+larger the number, higher the priority. If a Sink fails while sending a Event
+the next Sink with highest priority shall be tried next for sending Events.
+For example, a sink with priority 100 is activated before the Sink with
priority
+80. If no priority is specified, thr priority is determined based on the order
in which
+the Sinks are specified in configuration.
To configure, set a sink groups processor to ``failover`` and set
priorities for all individual sinks. All specified priorities must
@@ -2547,8 +2980,9 @@ Property Name Defau
================================= ===========
===================================================================================
**sinks** -- Space-separated list of sinks
that are participating in the group
**processor.type** ``default`` The component type name, needs
to be ``failover``
-**processor.priority.<sinkName>** -- <sinkName> must be one of
the sink instances associated with the current sink group
-processor.maxpenalty 30000 (in millis)
+**processor.priority.<sinkName>** -- Priority value. <sinkName>
must be one of the sink instances associated with the current sink group
+ A higher priority value Sink
gets activated earlier. A larger absolute value indicates higher priority
+processor.maxpenalty 30000 The maximum backoff period for
the failed Sink (in millis)
================================= ===========
===================================================================================
Example for agent named a1:
@@ -2836,6 +3270,45 @@ Sample flume.conf file:
a1.sources.avroSrc.interceptors.morphlineinterceptor.morphlineFile =
/etc/flume-ng/conf/morphline.conf
a1.sources.avroSrc.interceptors.morphlineinterceptor.morphlineId = morphline1
+Search and Replace Interceptor
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+This interceptor provides simple string-based search-and-replace functionality
+based on Java regular expressions. Backtracking / group capture is also
available.
+This interceptor uses the same rules as in the Java Matcher.replaceAll()
method.
+
+================ =======
========================================================================
+Property Name Default Description
+================ =======
========================================================================
+**type** -- The component type name has to be ``search_replace``
+searchPattern -- The pattern to search for and replace.
+replaceString -- The replacement string.
+charset UTF-8 The charset of the event body. Assumed by default
to be UTF-8.
+================ =======
========================================================================
+
+Example configuration:
+
+.. code-block:: properties
+
+ a1.sources.avroSrc.interceptors = search-replace
+ a1.sources.avroSrc.interceptors.search-replace.type = search_replace
+
+ # Remove leading alphanumeric characters in an event body.
+ a1.sources.avroSrc.interceptors.search-replace.searchPattern = ^[A-Za-z0-9_]+
+ a1.sources.avroSrc.interceptors.search-replace.replaceString =
+
+Another example:
+
+.. code-block:: properties
+
+ a1.sources.avroSrc.interceptors = search-replace
+ a1.sources.avroSrc.interceptors.search-replace.type = search_replace
+
+ # Use grouping operators to reorder and munge words on a line.
+ a1.sources.avroSrc.interceptors.search-replace.searchPattern = The quick
brown ([a-z]+) jumped over the lazy ([a-z]+)
+ a1.sources.avroSrc.interceptors.search-replace.replaceString = The hungry $2
ate the careless $1
+
+
Regex Filtering Interceptor
~~~~~~~~~~~~~~~~~~~~~~~~~~~
@@ -2947,7 +3420,7 @@ Log4J Appender
Appends Log4j events to a flume agent's avro source. A client using this
appender must have the flume-ng-sdk in the classpath (eg,
-flume-ng-sdk-1.5.2.jar).
+flume-ng-sdk-1.6.0.jar).
Required properties are in **bold**.
===================== =======
==================================================================================
@@ -3011,7 +3484,7 @@ Load Balancing Log4J Appender
Appends Log4j events to a list of flume agent's avro source. A client using
this
appender must have the flume-ng-sdk in the classpath (eg,
-flume-ng-sdk-1.5.2.jar). This appender supports a round-robin and random
+flume-ng-sdk-1.6.0.jar). This appender supports a round-robin and random
scheme for performing the load balancing. It also supports a configurable
backoff
timeout so that down agents are removed temporarily from the set of hosts
Required properties are in **bold**.
@@ -3076,9 +3549,14 @@ Sample log4j.properties file configured
Security
========
-The HDFS sink supports Kerberos authentication if the underlying HDFS is
-running in secure mode. Please refer to the HDFS Sink section for
-configuring the HDFS sink Kerberos-related options.
+The HDFS sink, HBase sink, Thrift source, Thrift sink and Kite Dataset sink
all support
+Kerberos authentication. Please refer to the corresponding sections for
+configuring the Kerberos-related options.
+
+Flume agent will authenticate to the kerberos KDC as a single principal, which
will be
+used by different components that require kerberos authentication. The
principal and
+keytab configured for Thrift source, Thrift sink, HDFS sink, HBase sink and
DataSet sink
+should be the same, otherwise the component will fail to start.
Monitoring
==========
@@ -3087,6 +3565,16 @@ Monitoring in Flume is still a work in p
Several Flume components report metrics to the JMX platform MBean server. These
metrics can be queried using Jconsole.
+JMX Reporting
+-------------
+
+JMX Reporting can be enabled by specifying JMX parameters in the JAVA_OPTS
environment variable using
+flume-env.sh, like
+
+ export JAVA_OPTS="-Dcom.sun.management.jmxremote
-Dcom.sun.management.jmxremote.port=5445
-Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.ssl=false"
+
+NOTE: The sample above disables the security. To enable Security, please refer
http://docs.oracle.com/javase/6/docs/technotes/guides/management/agent.html
+
Ganglia Reporting
-----------------
Flume can also report these metrics to
@@ -3237,6 +3725,32 @@ metrics as long values.
}
+Tools
+=====
+
+File Channel Integrity Tool
+---------------------------
+
+File Channel Integrity tool verifies the integrity of individual Events in the
File channel
+and removes corrupted Events.
+
+The tools can be run as follows::
+
+ $bin/flume-ng tool --conf ./conf FCINTEGRITYTOOL -l ./datadir
+
+where datadir the comma separated list of data directory to ve verified.
+
+Following are the options available
+
+=======================
====================================================================
+Option Name Description
+=======================
====================================================================
+h/help Displays help
+**l/dataDirs** Comma-separated list of data directories which the
tool must verify
+=======================
====================================================================
+
+
+
Topology Design Considerations
==============================
Modified: flume/site/trunk/content/sphinx/download.rst
URL:
http://svn.apache.org/viewvc/flume/site/trunk/content/sphinx/download.rst?rev=1682982&r1=1682981&r2=1682982&view=diff
==============================================================================
--- flume/site/trunk/content/sphinx/download.rst (original)
+++ flume/site/trunk/content/sphinx/download.rst Mon Jun 1 19:49:44 2015
@@ -10,8 +10,8 @@ originals on the main distribution serve
.. csv-table::
- "Apache Flume binary (tar.gz)", `apache-flume-1.5.2-bin.tar.gz
<http://www.apache.org/dyn/closer.cgi/flume/1.5.2/apache-flume-1.5.2-bin.tar.gz>`_,
`apache-flume-1.5.2-bin.tar.gz.md5
<http://www.us.apache.org/dist/flume/1.5.2/apache-flume-1.5.2-bin.tar.gz.md5>`_,
`apache-flume-1.5.2-bin.tar.gz.asc
<http://www.us.apache.org/dist/flume/1.5.2/apache-flume-1.5.2-bin.tar.gz.asc>`_
- "Apache Flume source (tar.gz)", `apache-flume-1.5.2-src.tar.gz
<http://www.apache.org/dyn/closer.cgi/flume/1.5.2/apache-flume-1.5.2-src.tar.gz>`_,
`apache-flume-1.5.2-src.tar.gz.md5
<http://www.us.apache.org/dist/flume/1.5.2/apache-flume-1.5.2-src.tar.gz.md5>`_,
`apache-flume-1.5.2-src.tar.gz.asc
<http://www.us.apache.org/dist/flume/1.5.2/apache-flume-1.5.2-src.tar.gz.asc>`_
+ "Apache Flume binary (tar.gz)", `apache-flume-1.6.0-bin.tar.gz
<http://www.apache.org/dyn/closer.cgi/flume/1.6.0/apache-flume-1.6.0-bin.tar.gz>`_,
`apache-flume-1.6.0-bin.tar.gz.md5
<http://www.us.apache.org/dist/flume/1.6.0/apache-flume-1.6.0-bin.tar.gz.md5>`_,
`apache-flume-1.6.0-bin.tar.gz.asc
<http://www.us.apache.org/dist/flume/1.6.0/apache-flume-1.6.0-bin.tar.gz.asc>`_
+ "Apache Flume source (tar.gz)", `apache-flume-1.6.0-src.tar.gz
<http://www.apache.org/dyn/closer.cgi/flume/1.6.0/apache-flume-1.6.0-src.tar.gz>`_,
`apache-flume-1.6.0-src.tar.gz.md5
<http://www.us.apache.org/dist/flume/1.6.0/apache-flume-1.6.0-src.tar.gz.md5>`_,
`apache-flume-1.6.0-src.tar.gz.asc
<http://www.us.apache.org/dist/flume/1.6.0/apache-flume-1.6.0-src.tar.gz.asc>`_
It is essential that you verify the integrity of the downloaded files using
the PGP or MD5 signatures. Please read
`Verifying Apache HTTP Server Releases
<http://httpd.apache.org/dev/verification.html>`_ for more information on
@@ -23,9 +23,9 @@ as well as the asc signature file for th
Then verify the signatures using::
% gpg --import KEYS
- % gpg --verify apache-flume-1.5.2-src.tar.gz.asc
+ % gpg --verify apache-flume-1.6.0-src.tar.gz.asc
-Apache Flume 1.5.2 is signed by Hari Shreedharan 77FFC9AB
+Apache Flume 1.6.0 is signed by Johny Rufus 2C79120F
Alternatively, you can verify the MD5 or SHA1 signatures of the files. A
program called md5, md5sum, or shasum is included in many
Unix distributions for this purpose.
Modified: flume/site/trunk/content/sphinx/index.rst
URL:
http://svn.apache.org/viewvc/flume/site/trunk/content/sphinx/index.rst?rev=1682982&r1=1682981&r2=1682982&view=diff
==============================================================================
--- flume/site/trunk/content/sphinx/index.rst (original)
+++ flume/site/trunk/content/sphinx/index.rst Mon Jun 1 19:49:44 2015
@@ -33,6 +33,38 @@ application.
.. raw:: html
+ <h3>May 20, 2015 - Apache Flume 1.6.0 Released</h3>
+
+The Apache Flume team is pleased to announce the release of Flume 1.6.0.
+
+Flume is a distributed, reliable, and available service for efficiently
+collecting, aggregating, and moving large amounts of streaming event data.
+
+Version 1.6.0 is the ninth Flume release as an Apache top-level project.
+Flume 1.6.0 is stable, production-ready software, and is backwards-compatible
+with previous versions of the Flume 1.x codeline.
+
+Several months of active development went into this release: 105 patches were
committed since 1.5.2, representing many features, enhancements, and bug fixes.
While the full change log can be found on the 1.6.0 release page (link below),
here are a few new feature highlights:
+
+ * Flume Sink and Source for Apache Kafka
+ * A new channel that uses Kafka
+ * Hive Sink based on the new Hive Streaming support
+ * End to End authentication in Flume
+ * Simple regex search-and-replace interceptor
+
+The full change log and documentation are available on the
+`Flume 1.6.0 release page <releases/1.6.0.html>`__.
+
+This release can be downloaded from the Flume `Download <download.html>`__
page.
+
+Your contributions, feedback, help and support make Flume better!
+For more information on how to report problems or contribute,
+please visit our `Get Involved <getinvolved.html>`__ page.
+
+The Apache Flume Team
+
+.. raw:: html
+
<h3>November 18, 2014 - Apache Flume 1.5.2 Released</h3>
The Apache Flume team is pleased to announce the release of Flume 1.5.2