Modified: websites/staging/flume/trunk/content/FlumeUserGuide.html
==============================================================================
--- websites/staging/flume/trunk/content/FlumeUserGuide.html (original)
+++ websites/staging/flume/trunk/content/FlumeUserGuide.html Mon Oct 17
12:35:17 2016
@@ -7,7 +7,7 @@
<head>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
- <title>Flume 1.6.0 User Guide — Apache Flume</title>
+ <title>Flume 1.7.0 User Guide — Apache Flume</title>
<link rel="stylesheet" href="_static/flume.css" type="text/css" />
<link rel="stylesheet" href="_static/pygments.css" type="text/css" />
@@ -26,7 +26,7 @@
<script type="text/javascript" src="_static/doctools.js"></script>
<link rel="top" title="Apache Flume" href="index.html" />
<link rel="up" title="Documentation" href="documentation.html" />
- <link rel="next" title="Flume 1.6.0 Developer Guide"
href="FlumeDeveloperGuide.html" />
+ <link rel="next" title="Flume 1.7.0 Developer Guide"
href="FlumeDeveloperGuide.html" />
<link rel="prev" title="Documentation" href="documentation.html" />
</head>
<body>
@@ -59,8 +59,8 @@
<div class="bodywrapper">
<div class="body">
- <div class="section" id="flume-1-6-0-user-guide">
-<h1>Flume 1.6.0 User Guide<a class="headerlink" href="#flume-1-6-0-user-guide"
title="Permalink to this headline">¶</a></h1>
+ <div class="section" id="flume-1-7-0-user-guide">
+<h1>Flume 1.7.0 User Guide<a class="headerlink" href="#flume-1-7-0-user-guide"
title="Permalink to this headline">¶</a></h1>
<div class="section" id="introduction">
<h2>Introduction<a class="headerlink" href="#introduction" title="Permalink to
this headline">¶</a></h2>
<div class="section" id="overview">
@@ -84,7 +84,7 @@ in the latest architecture.</p>
<div class="section" id="system-requirements">
<h3>System Requirements<a class="headerlink" href="#system-requirements"
title="Permalink to this headline">¶</a></h3>
<ol class="arabic simple">
-<li>Java Runtime Environment - Java 1.6 or later (Java 1.7 Recommended)</li>
+<li>Java Runtime Environment - Java 1.7 or later</li>
<li>Memory - Sufficient memory for configurations used by sources, channels or
sinks</li>
<li>Disk Space - Sufficient disk space for configurations used by channels or
sinks</li>
<li>Directory Permissions - Read/Write permissions for directories used by
agent</li>
@@ -248,6 +248,30 @@ OK</pre>
</div>
<p>Congratulations - you’ve successfully configured and deployed a Flume
agent! Subsequent sections cover agent configuration in much more detail.</p>
</div>
+<div class="section" id="logging-raw-data">
+<h4>Logging raw data<a class="headerlink" href="#logging-raw-data"
title="Permalink to this headline">¶</a></h4>
+<p>Logging the raw stream of data flowing through the ingest pipeline is not
desired behaviour in
+many production environments because this may result in leaking sensitive data
or security related
+configurations, such as secret keys, to Flume log files.
+By default, Flume will not log such information. On the other hand, if the
data pipeline is broken,
+Flume will attempt to provide clues for debugging the problem.</p>
+<p>One way to debug problems with event pipelines is to set up an additional
<a class="reference internal" href="#memory-channel">Memory Channel</a>
+connected to a <a class="reference internal" href="#logger-sink">Logger
Sink</a>, which will output all event data to the Flume logs.
+In some situations, however, this approach is insufficient.</p>
+<p>In order to enable logging of event- and configuration-related data, some
Java system properties
+must be set in addition to log4j properties.</p>
+<p>To enable configuration-related logging, set the Java system property
+<tt class="docutils literal"><span
class="pre">-Dorg.apache.flume.log.printconfig=true</span></tt>. This can
either be passed on the command line or by
+setting this in the <tt class="docutils literal"><span
class="pre">JAVA_OPTS</span></tt> variable in <em>flume-env.sh</em>.</p>
+<p>To enable data logging, set the Java system property <tt class="docutils
literal"><span class="pre">-Dorg.apache.flume.log.rawdata=true</span></tt>
+in the same way described above. For most components, the log4j logging level
must also be set to
+DEBUG or TRACE to make event-specific logging appear in the Flume logs.</p>
+<p>Here is an example of enabling both configuration logging and raw data
logging while also
+setting the Log4j loglevel to DEBUG for console output:</p>
+<div class="highlight-none"><div class="highlight"><pre>$ bin/flume-ng agent
--conf conf --conf-file example.conf --name a1
-Dflume.root.logger=DEBUG,console -Dorg.apache.flume.log.printconfig=true
-Dorg.apache.flume.log.rawdata=true
+</pre></div>
+</div>
+</div>
<div class="section" id="zookeeper-based-configuration">
<h4>Zookeeper based Configuration<a class="headerlink"
href="#zookeeper-based-configuration" title="Permalink to this
headline">¶</a></h4>
<p>Flume supports Agent configurations via Zookeeper. <em>This is an
experimental feature.</em> The configuration file needs to be uploaded
@@ -1048,7 +1072,7 @@ via FLUME_CLASSPATH variable in flume-en
</tr>
<tr class="row-odd"><td><strong>connectionFactory</strong></td>
<td>–</td>
-<td>The JNDI name the connection factory shoulld appear as</td>
+<td>The JNDI name the connection factory should appear as</td>
</tr>
<tr class="row-even"><td><strong>providerURL</strong></td>
<td>–</td>
@@ -1150,9 +1174,9 @@ cases in which events may be duplicated
This is consistent with the guarantees offered by other Flume components.</p>
<table border="1" class="docutils">
<colgroup>
-<col width="6%" />
-<col width="4%" />
-<col width="89%" />
+<col width="18%" />
+<col width="10%" />
+<col width="72%" />
</colgroup>
<thead valign="bottom">
<tr class="row-odd"><th class="head">Property Name</th>
@@ -1197,39 +1221,60 @@ This is consistent with the guarantees o
<td>basename</td>
<td>Header Key to use when appending basename of file to event header.</td>
</tr>
-<tr class="row-odd"><td>ignorePattern</td>
+<tr class="row-odd"><td>includePattern</td>
+<td>^.*$</td>
+<td>Regular expression specifying which files to include.
+It can used together with <tt class="docutils literal"><span
class="pre">ignorePattern</span></tt>.
+If a file matches both <tt class="docutils literal"><span
class="pre">ignorePattern</span></tt> and <tt class="docutils literal"><span
class="pre">includePattern</span></tt> regex,
+the file is ignored.</td>
+</tr>
+<tr class="row-even"><td>ignorePattern</td>
<td>^$</td>
-<td>Regular expression specifying which files to ignore (skip)</td>
+<td>Regular expression specifying which files to ignore (skip).
+It can used together with <tt class="docutils literal"><span
class="pre">includePattern</span></tt>.
+If a file matches both <tt class="docutils literal"><span
class="pre">ignorePattern</span></tt> and <tt class="docutils literal"><span
class="pre">includePattern</span></tt> regex,
+the file is ignored.</td>
</tr>
-<tr class="row-even"><td>trackerDir</td>
+<tr class="row-odd"><td>trackerDir</td>
<td>.flumespool</td>
<td>Directory to store metadata related to processing of files.
If this path is not an absolute path, then it is interpreted as relative to
the spoolDir.</td>
</tr>
-<tr class="row-odd"><td>consumeOrder</td>
+<tr class="row-even"><td>consumeOrder</td>
<td>oldest</td>
<td>In which order files in the spooling directory will be consumed <tt
class="docutils literal"><span class="pre">oldest</span></tt>,
<tt class="docutils literal"><span class="pre">youngest</span></tt> and <tt
class="docutils literal"><span class="pre">random</span></tt>. In case of <tt
class="docutils literal"><span class="pre">oldest</span></tt> and <tt
class="docutils literal"><span class="pre">youngest</span></tt>, the last
modified
time of the files will be used to compare the files. In case of a tie, the file
-with smallest laxicographical order will be consumed first. In case of <tt
class="docutils literal"><span class="pre">random</span></tt> any
+with smallest lexicographical order will be consumed first. In case of <tt
class="docutils literal"><span class="pre">random</span></tt> any
file will be picked randomly. When using <tt class="docutils literal"><span
class="pre">oldest</span></tt> and <tt class="docutils literal"><span
class="pre">youngest</span></tt> the whole
directory will be scanned to pick the oldest/youngest file, which might be
slow if there
are a large number of files, while using <tt class="docutils literal"><span
class="pre">random</span></tt> may cause old files to be consumed
very late if new files keep coming in the spooling directory.</td>
</tr>
-<tr class="row-even"><td>maxBackoff</td>
+<tr class="row-odd"><td>pollDelay</td>
+<td>500</td>
+<td>Delay (in milliseconds) used when polling for new files.</td>
+</tr>
+<tr class="row-even"><td>recursiveDirectorySearch</td>
+<td>false</td>
+<td>Whether to monitor sub directories for new files to read.</td>
+</tr>
+<tr class="row-odd"><td>maxBackoff</td>
<td>4000</td>
-<td>The maximum time (in millis) to wait between consecutive attempts to write
to the channel(s) if the channel is full. The source will start at a low
backoff and increase it exponentially each time the channel throws a
ChannelException, upto the value specified by this parameter.</td>
+<td>The maximum time (in millis) to wait between consecutive attempts to
+write to the channel(s) if the channel is full. The source will start at
+a low backoff and increase it exponentially each time the channel throws a
+ChannelException, upto the value specified by this parameter.</td>
</tr>
-<tr class="row-odd"><td>batchSize</td>
+<tr class="row-even"><td>batchSize</td>
<td>100</td>
<td>Granularity at which to batch transfer to the channel</td>
</tr>
-<tr class="row-even"><td>inputCharset</td>
+<tr class="row-odd"><td>inputCharset</td>
<td>UTF-8</td>
<td>Character set used by deserializers that treat the input file as text.</td>
</tr>
-<tr class="row-odd"><td>decodeErrorPolicy</td>
+<tr class="row-even"><td>decodeErrorPolicy</td>
<td><tt class="docutils literal"><span class="pre">FAIL</span></tt></td>
<td>What to do when we see a non-decodable character in the input file.
<tt class="docutils literal"><span class="pre">FAIL</span></tt>: Throw an
exception and fail to parse the file.
@@ -1237,37 +1282,37 @@ very late if new files keep coming in th
typically Unicode U+FFFD.
<tt class="docutils literal"><span class="pre">IGNORE</span></tt>: Drop the
unparseable character sequence.</td>
</tr>
-<tr class="row-even"><td>deserializer</td>
+<tr class="row-odd"><td>deserializer</td>
<td><tt class="docutils literal"><span class="pre">LINE</span></tt></td>
<td>Specify the deserializer used to parse the file into events.
Defaults to parsing each line as an event. The class specified must implement
<tt class="docutils literal"><span
class="pre">EventDeserializer.Builder</span></tt>.</td>
</tr>
-<tr class="row-odd"><td>deserializer.*</td>
+<tr class="row-even"><td>deserializer.*</td>
<td> </td>
<td>Varies per event deserializer.</td>
</tr>
-<tr class="row-even"><td>bufferMaxLines</td>
+<tr class="row-odd"><td>bufferMaxLines</td>
<td>–</td>
<td>(Obselete) This option is now ignored.</td>
</tr>
-<tr class="row-odd"><td>bufferMaxLineLength</td>
+<tr class="row-even"><td>bufferMaxLineLength</td>
<td>5000</td>
<td>(Deprecated) Maximum length of a line in the commit buffer. Use
deserializer.maxLineLength instead.</td>
</tr>
-<tr class="row-even"><td>selector.type</td>
+<tr class="row-odd"><td>selector.type</td>
<td>replicating</td>
<td>replicating or multiplexing</td>
</tr>
-<tr class="row-odd"><td>selector.*</td>
+<tr class="row-even"><td>selector.*</td>
<td> </td>
<td>Depends on the selector.type value</td>
</tr>
-<tr class="row-even"><td>interceptors</td>
+<tr class="row-odd"><td>interceptors</td>
<td>–</td>
<td>Space-separated list of interceptors</td>
</tr>
-<tr class="row-odd"><td>interceptors.*</td>
+<tr class="row-even"><td>interceptors.*</td>
<td> </td>
<td> </td>
</tr>
@@ -1383,11 +1428,125 @@ inefficient compared to <tt class="docut
</div>
</div>
</div>
+<div class="section" id="taildir-source">
+<h4>Taildir Source<a class="headerlink" href="#taildir-source"
title="Permalink to this headline">¶</a></h4>
+<div class="admonition note">
+<p class="first admonition-title">Note</p>
+<p class="last"><strong>This source is provided as a preview feature. It does
not work on Windows.</strong></p>
+</div>
+<p>Watch the specified files, and tail them in nearly real-time once detected
new lines appended to the each files.
+If the new lines are being written, this source will retry reading them in
wait for the completion of the write.</p>
+<p>This source is reliable and will not miss data even when the tailing files
rotate.
+It periodically writes the last read position of each files on the given
position file in JSON format.
+If Flume is stopped or down for some reason, it can restart tailing from the
position written on the existing position file.</p>
+<p>In other use case, this source can also start tailing from the arbitrary
position for each files using the given position file.
+When there is no position file on the specified path, it will start tailing
from the first line of each files by default.</p>
+<p>Files will be consumed in order of their modification time. File with the
oldest modification time will be consumed first.</p>
+<p>This source does not rename or delete or do any modifications to the file
being tailed.
+Currently this source does not support tailing binary files. It reads text
files line by line.</p>
+<table border="1" class="docutils">
+<colgroup>
+<col width="19%" />
+<col width="16%" />
+<col width="65%" />
+</colgroup>
+<thead valign="bottom">
+<tr class="row-odd"><th class="head">Property Name</th>
+<th class="head">Default</th>
+<th class="head">Description</th>
+</tr>
+</thead>
+<tbody valign="top">
+<tr class="row-even"><td><strong>channels</strong></td>
+<td>–</td>
+<td> </td>
+</tr>
+<tr class="row-odd"><td><strong>type</strong></td>
+<td>–</td>
+<td>The component type name, needs to be <tt class="docutils literal"><span
class="pre">TAILDIR</span></tt>.</td>
+</tr>
+<tr class="row-even"><td><strong>filegroups</strong></td>
+<td>–</td>
+<td>Space-separated list of file groups. Each file group indicates a set of
files to be tailed.</td>
+</tr>
+<tr class="row-odd"><td><strong>filegroups.<filegroupName></strong></td>
+<td>–</td>
+<td>Absolute path of the file group. Regular expression (and not file system
patterns) can be used for filename only.</td>
+</tr>
+<tr class="row-even"><td>positionFile</td>
+<td>~/.flume/taildir_position.json</td>
+<td>File in JSON format to record the inode, the absolute path and the last
position of each tailing file.</td>
+</tr>
+<tr class="row-odd"><td>headers.<filegroupName>.<headerKey></td>
+<td>–</td>
+<td>Header value which is the set with header key. Multiple headers can be
specified for one file group.</td>
+</tr>
+<tr class="row-even"><td>byteOffsetHeader</td>
+<td>false</td>
+<td>Whether to add the byte offset of a tailed line to a header called
‘byteoffset’.</td>
+</tr>
+<tr class="row-odd"><td>skipToEnd</td>
+<td>false</td>
+<td>Whether to skip the position to EOF in the case of files not written on
the position file.</td>
+</tr>
+<tr class="row-even"><td>idleTimeout</td>
+<td>120000</td>
+<td>Time (ms) to close inactive files. If the closed file is appended new
lines to, this source will automatically re-open it.</td>
+</tr>
+<tr class="row-odd"><td>writePosInterval</td>
+<td>3000</td>
+<td>Interval time (ms) to write the last position of each file on the position
file.</td>
+</tr>
+<tr class="row-even"><td>batchSize</td>
+<td>100</td>
+<td>Max number of lines to read and send to the channel at a time. Using the
default is usually fine.</td>
+</tr>
+<tr class="row-odd"><td>backoffSleepIncrement</td>
+<td>1000</td>
+<td>The increment for time delay before reattempting to poll for new data,
when the last attempt did not find any new data.</td>
+</tr>
+<tr class="row-even"><td>maxBackoffSleep</td>
+<td>5000</td>
+<td>The max time delay between each reattempt to poll for new data, when the
last attempt did not find any new data.</td>
+</tr>
+<tr class="row-odd"><td>cachePatternMatching</td>
+<td>true</td>
+<td>Listing directories and applying the filename regex pattern may be time
consuming for directories
+containing thousands of files. Caching the list of matching files can improve
performance.
+The order in which files are consumed will also be cached.
+Requires that the file system keeps track of modification times with at least
a 1-second granularity.</td>
+</tr>
+<tr class="row-even"><td>fileHeader</td>
+<td>false</td>
+<td>Whether to add a header storing the absolute path filename.</td>
+</tr>
+<tr class="row-odd"><td>fileHeaderKey</td>
+<td>file</td>
+<td>Header key to use when appending absolute path filename to event
header.</td>
+</tr>
+</tbody>
+</table>
+<p>Example for agent named a1:</p>
+<div class="highlight-properties"><div class="highlight"><pre><span
class="na">a1.sources</span> <span class="o">=</span> <span class="s">r1</span>
+<span class="na">a1.channels</span> <span class="o">=</span> <span
class="s">c1</span>
+<span class="na">a1.sources.r1.type</span> <span class="o">=</span> <span
class="s">TAILDIR</span>
+<span class="na">a1.sources.r1.channels</span> <span class="o">=</span> <span
class="s">c1</span>
+<span class="na">a1.sources.r1.positionFile</span> <span class="o">=</span>
<span class="s">/var/log/flume/taildir_position.json</span>
+<span class="na">a1.sources.r1.filegroups</span> <span class="o">=</span>
<span class="s">f1 f2</span>
+<span class="na">a1.sources.r1.filegroups.f1</span> <span class="o">=</span>
<span class="s">/var/log/test1/example.log</span>
+<span class="na">a1.sources.r1.headers.f1.headerKey1</span> <span
class="o">=</span> <span class="s">value1</span>
+<span class="na">a1.sources.r1.filegroups.f2</span> <span class="o">=</span>
<span class="s">/var/log/test2/.*log.*</span>
+<span class="na">a1.sources.r1.headers.f2.headerKey1</span> <span
class="o">=</span> <span class="s">value2</span>
+<span class="na">a1.sources.r1.headers.f2.headerKey2</span> <span
class="o">=</span> <span class="s">value2-2</span>
+<span class="na">a1.sources.r1.fileHeader</span> <span class="o">=</span>
<span class="s">true</span>
+</pre></div>
+</div>
+</div>
<div class="section" id="twitter-1-firehose-source-experimental">
<h4>Twitter 1% firehose Source (experimental)<a class="headerlink"
href="#twitter-1-firehose-source-experimental" title="Permalink to this
headline">¶</a></h4>
<div class="admonition warning">
<p class="first admonition-title">Warning</p>
-<p class="last">This source is hightly experimental and may change between
minor versions of Flume.
+<p class="last">This source is highly experimental and may change between
minor versions of Flume.
Use at your own risk.</p>
</div>
<p>Experimental source that connects via Streaming API to the 1% sample twitter
@@ -1430,7 +1589,7 @@ Required properties are in <strong>bold<
</tr>
<tr class="row-odd"><td><strong>accessTokenSecret</strong></td>
<td>–</td>
-<td>OAuth toekn secret</td>
+<td>OAuth token secret</td>
</tr>
<tr class="row-even"><td>maxBatchSize</td>
<td>1000</td>
@@ -1458,14 +1617,14 @@ Required properties are in <strong>bold<
</div>
<div class="section" id="kafka-source">
<h4>Kafka Source<a class="headerlink" href="#kafka-source" title="Permalink to
this headline">¶</a></h4>
-<p>Kafka Source is an Apache Kafka consumer that reads messages from a Kafka
topic.
+<p>Kafka Source is an Apache Kafka consumer that reads messages from Kafka
topics.
If you have multiple Kafka sources running, you can configure them with the
same Consumer Group
-so each will read a unique set of partitions for the topic.</p>
+so each will read a unique set of partitions for the topics.</p>
<table border="1" class="docutils">
<colgroup>
-<col width="21%" />
-<col width="8%" />
-<col width="71%" />
+<col width="19%" />
+<col width="6%" />
+<col width="75%" />
</colgroup>
<thead valign="bottom">
<tr class="row-odd"><th class="head">Property Name</th>
@@ -1480,68 +1639,245 @@ so each will read a unique set of partit
</tr>
<tr class="row-odd"><td><strong>type</strong></td>
<td>–</td>
-<td>The component type name, needs to be <tt class="docutils literal"><span
class="pre">org.apache.flume.source.kafka,KafkaSource</span></tt></td>
+<td>The component type name, needs to be <tt class="docutils literal"><span
class="pre">org.apache.flume.source.kafka.KafkaSource</span></tt></td>
</tr>
-<tr class="row-even"><td><strong>zookeeperConnect</strong></td>
+<tr class="row-even"><td><strong>kafka.bootstrap.servers</strong></td>
<td>–</td>
-<td>URI of ZooKeeper used by Kafka cluster</td>
+<td>List of brokers in the Kafka cluster used by the source</td>
</tr>
-<tr class="row-odd"><td><strong>groupId</strong></td>
+<tr class="row-odd"><td>kafka.consumer.group.id</td>
<td>flume</td>
<td>Unique identified of consumer group. Setting the same id in multiple
sources or agents
indicates that they are part of the same consumer group</td>
</tr>
-<tr class="row-even"><td><strong>topic</strong></td>
+<tr class="row-even"><td><strong>kafka.topics</strong></td>
<td>–</td>
-<td>Kafka topic we’ll read messages from. At the time, this is a single
topic only.</td>
+<td>Comma-separated list of topics the kafka consumer will read messages
from.</td>
</tr>
-<tr class="row-odd"><td>batchSize</td>
+<tr class="row-odd"><td><strong>kafka.topics.regex</strong></td>
+<td>–</td>
+<td>Regex that defines set of topics the source is subscribed on. This
property has higher priority
+than <tt class="docutils literal"><span class="pre">kafka.topics</span></tt>
and overrides <tt class="docutils literal"><span
class="pre">kafka.topics</span></tt> if exists.</td>
+</tr>
+<tr class="row-even"><td>batchSize</td>
<td>1000</td>
<td>Maximum number of messages written to Channel in one batch</td>
</tr>
-<tr class="row-even"><td>batchDurationMillis</td>
+<tr class="row-odd"><td>batchDurationMillis</td>
<td>1000</td>
<td>Maximum time (in ms) before a batch will be written to Channel
The batch will be written whenever the first of size and time will be
reached.</td>
</tr>
-<tr class="row-odd"><td>backoffSleepIncrement</td>
+<tr class="row-even"><td>backoffSleepIncrement</td>
<td>1000</td>
<td>Initial and incremental wait time that is triggered when a Kafka Topic
appears to be empty.
Wait period will reduce aggressive pinging of an empty Kafka Topic. One
second is ideal for
ingestion use cases but a lower value may be required for low latency
operations with
interceptors.</td>
</tr>
-<tr class="row-even"><td>maxBackoffSleep</td>
+<tr class="row-odd"><td>maxBackoffSleep</td>
<td>5000</td>
<td>Maximum wait time that is triggered when a Kafka Topic appears to be
empty. Five seconds is
ideal for ingestion use cases but a lower value may be required for low
latency operations
with interceptors.</td>
</tr>
-<tr class="row-odd"><td>Other Kafka Consumer Properties</td>
-<td>–</td>
-<td>These properties are used to configure the Kafka Consumer. Any producer
property supported
-by Kafka can be used. The only requirement is to prepend the property name
with the prefix <tt class="docutils literal"><span
class="pre">kafka.</span></tt>.
-For example: kafka.consumer.timeout.ms
-Check <cite>Kafka documentation
<https://kafka.apache.org/08/configuration.html#consumerconfigs></cite>
for details</td>
+<tr class="row-even"><td>useFlumeEventFormat</td>
+<td>false</td>
+<td>By default events are taken as bytes from the Kafka topic directly into
the event body. Set to
+true to read events as the Flume Avro binary format. Used in conjunction with
the same property
+on the KafkaSink or with the parseAsFlumeEvent property on the Kafka Channel
this will preserve
+any Flume headers sent on the producing side.</td>
+</tr>
+<tr class="row-odd"><td>migrateZookeeperOffsets</td>
+<td>true</td>
+<td>When no Kafka stored offset is found, look up the offsets in Zookeeper and
commit them to Kafka.
+This should be true to support seamless Kafka client migration from older
versions of Flume.
+Once migrated this can be set to false, though that should generally not be
required.
+If no Zookeeper offset is found, the Kafka configuration
kafka.consumer.auto.offset.reset
+defines how offsets are handled.
+Check <a class="reference external"
href="http://kafka.apache.org/documentation.html#newconsumerconfigs">Kafka
documentation</a> for details</td>
+</tr>
+<tr class="row-even"><td>kafka.consumer.security.protocol</td>
+<td>PLAINTEXT</td>
+<td>Set to SASL_PLAINTEXT, SASL_SSL or SSL if writing to Kafka using some
level of security. See below for additional info on secure setup.</td>
+</tr>
+<tr class="row-odd"><td><em>more consumer security props</em></td>
+<td> </td>
+<td>If using SASL_PLAINTEXT, SASL_SSL or SSL refer to <a class="reference
external" href="http://kafka.apache.org/documentation.html#security">Kafka
security</a> for additional
+properties that need to be set on consumer.</td>
+</tr>
+<tr class="row-even"><td>Other Kafka Consumer Properties</td>
+<td>–</td>
+<td>These properties are used to configure the Kafka Consumer. Any consumer
property supported
+by Kafka can be used. The only requirement is to prepend the property name
with the prefix
+<tt class="docutils literal"><span class="pre">kafka.consumer</span></tt>.
+For example: <tt class="docutils literal"><span
class="pre">kafka.consumer.auto.offset.reset</span></tt></td>
</tr>
</tbody>
</table>
<div class="admonition note">
<p class="first admonition-title">Note</p>
<p class="last">The Kafka Source overrides two Kafka consumer parameters:
-auto.commit.enable is set to “false” by the source and we commit
every batch. For improved performance
-this can be set to “true”, however, this can lead to loss of data
-consumer.timeout.ms is set to 10ms, so when we check Kafka for new data we
wait at most 10ms for the data to arrive
-setting this to a higher value can reduce CPU utilization (we’ll poll
Kafka in less of a tight loop), but also means
-higher latency in writing batches to channel (since we’ll wait longer
for data to arrive).</p>
+auto.commit.enable is set to “false” by the source and every batch
is committed. Kafka source guarantees at least once
+strategy of messages retrieval. The duplicates can be present when the source
starts.
+The Kafka Source also provides defaults for the
key.deserializer(org.apache.kafka.common.serialization.StringSerializer)
+and
value.deserializer(org.apache.kafka.common.serialization.ByteArraySerializer).
Modification of these parameters is not recommended.</p>
+</div>
+<p>Deprecated Properties</p>
+<table border="1" class="docutils">
+<colgroup>
+<col width="22%" />
+<col width="13%" />
+<col width="65%" />
+</colgroup>
+<thead valign="bottom">
+<tr class="row-odd"><th class="head">Property Name</th>
+<th class="head">Default</th>
+<th class="head">Description</th>
+</tr>
+</thead>
+<tbody valign="top">
+<tr class="row-even"><td>topic</td>
+<td>–</td>
+<td>Use kafka.topics</td>
+</tr>
+<tr class="row-odd"><td>groupId</td>
+<td>flume</td>
+<td>Use kafka.consumer.group.id</td>
+</tr>
+<tr class="row-even"><td>zookeeperConnect</td>
+<td>–</td>
+<td>Is no longer supported by kafka consumer client since 0.9.x. Use
kafka.bootstrap.servers
+to establish connection with kafka cluster</td>
+</tr>
+</tbody>
+</table>
+<p>Example for topic subscription by comma-separated topic list.</p>
+<div class="highlight-properties"><div class="highlight"><pre><span
class="na">tier1.sources.source1.type</span> <span class="o">=</span> <span
class="s">org.apache.flume.source.kafka.KafkaSource</span>
+<span class="na">tier1.sources.source1.channels</span> <span
class="o">=</span> <span class="s">channel1</span>
+<span class="na">tier1.sources.source1.batchSize</span> <span
class="o">=</span> <span class="s">5000</span>
+<span class="na">tier1.sources.source1.batchDurationMillis</span> <span
class="o">=</span> <span class="s">2000</span>
+<span class="na">tier1.sources.source1.kafka.bootstrap.servers</span> <span
class="o">=</span> <span class="s">localhost:9092</span>
+<span class="na">tier1.sources.source1.kafka.topics</span> <span
class="o">=</span> <span class="s">test1, test2</span>
+<span class="na">tier1.sources.source1.kafka.consumer.group.id</span> <span
class="o">=</span> <span class="s">custom.g.id</span>
+</pre></div>
</div>
-<p>Example for agent named tier1:</p>
+<p>Example for topic subscription by regex</p>
<div class="highlight-properties"><div class="highlight"><pre><span
class="na">tier1.sources.source1.type</span> <span class="o">=</span> <span
class="s">org.apache.flume.source.kafka.KafkaSource</span>
<span class="na">tier1.sources.source1.channels</span> <span
class="o">=</span> <span class="s">channel1</span>
-<span class="na">tier1.sources.source1.zookeeperConnect</span> <span
class="o">=</span> <span class="s">localhost:2181</span>
-<span class="na">tier1.sources.source1.topic</span> <span class="o">=</span>
<span class="s">test1</span>
-<span class="na">tier1.sources.source1.groupId</span> <span class="o">=</span>
<span class="s">flume</span>
-<span class="na">tier1.sources.source1.kafka.consumer.timeout.ms</span> <span
class="o">=</span> <span class="s">100</span>
+<span class="na">tier1.sources.source1.kafka.bootstrap.servers</span> <span
class="o">=</span> <span class="s">localhost:9092</span>
+<span class="na">tier1.sources.source1.kafka.topics.regex</span> <span
class="o">=</span> <span class="s">^topic[0-9]$</span>
+<span class="c"># the default kafka.consumer.group.id=flume is used</span>
+</pre></div>
+</div>
+<p><strong>Security and Kafka Source:</strong></p>
+<p>Secure authentication as well as data encryption is supported on the
communication channel between Flume and Kafka.
+For secure authentication SASL/GSSAPI (Kerberos V5) or SSL (even though the
parameter is named SSL, the actual protocol is a TLS implementation) can be
used from Kafka version 0.9.0.</p>
+<p>As of now data encryption is solely provided by SSL/TLS.</p>
+<p>Setting <tt class="docutils literal"><span
class="pre">kafka.consumer.security.protocol</span></tt> to any of the
following value means:</p>
+<ul class="simple">
+<li><strong>SASL_PLAINTEXT</strong> - Kerberos or plaintext authentication
with no data encryption</li>
+<li><strong>SASL_SSL</strong> - Kerberos or plaintext authentication with data
encryption</li>
+<li><strong>SSL</strong> - TLS based encryption with optional
authentication.</li>
+</ul>
+<div class="admonition warning">
+<p class="first admonition-title">Warning</p>
+<p class="last">There is a performance degradation when SSL is enabled,
+the magnitude of which depends on the CPU type and the JVM implementation.
+Reference: <a class="reference external"
href="http://kafka.apache.org/documentation#security_overview">Kafka security
overview</a>
+and the jira for tracking this issue:
+<a class="reference external"
href="https://issues.apache.org/jira/browse/KAFKA-2561">KAFKA-2561</a></p>
+</div>
+<p><strong>TLS and Kafka Source:</strong></p>
+<p>Please read the steps described in <a class="reference external"
href="http://kafka.apache.org/documentation#security_configclients">Configuring
Kafka Clients SSL</a>
+to learn about additional configuration settings for fine tuning for example
any of the following:
+security provider, cipher suites, enabled protocols, truststore or keystore
types.</p>
+<p>Example configuration with server side authentication and data
encryption.</p>
+<div class="highlight-properties"><div class="highlight"><pre><span
class="na">a1.channels.channel1.type</span> <span class="o">=</span> <span
class="s">org.apache.flume.channel.kafka.KafkaChannel</span>
+<span class="na">a1.channels.channel1.kafka.bootstrap.servers</span> <span
class="o">=</span> <span class="s">kafka-1:9093,kafka-2:9093,kafka-3:9093</span>
+<span class="na">a1.channels.channel1.kafka.topic</span> <span
class="o">=</span> <span class="s">channel1</span>
+<span class="na">a1.channels.channel1.kafka.consumer.group.id</span> <span
class="o">=</span> <span class="s">flume-consumer</span>
+<span class="na">a1.channels.channel1.kafka.consumer.security.protocol</span>
<span class="o">=</span> <span class="s">SSL</span>
+<span
class="na">a1.channels.channel1.kafka.consumer.ssl.truststore.location</span><span
class="o">=</span><span class="s">/path/to/truststore.jks</span>
+<span
class="na">a1.channels.channel1.kafka.consumer.ssl.truststore.password</span><span
class="o">=</span><span class="s"><password to access the
truststore></span>
+</pre></div>
+</div>
+<p>Note: By default the property <tt class="docutils literal"><span
class="pre">ssl.endpoint.identification.algorithm</span></tt>
+is not defined, so hostname verification is not performed.
+In order to enable hostname verification, set the following properties</p>
+<div class="highlight-properties"><div class="highlight"><pre><span
class="na">a1.channels.channel1.kafka.consumer.ssl.endpoint.identification.algorithm</span><span
class="o">=</span><span class="s">HTTPS</span>
+</pre></div>
+</div>
+<p>Once enabled, clients will verify the server’s fully qualified domain
name (FQDN)
+against one of the following two fields:</p>
+<ol class="arabic simple">
+<li>Common Name (CN) <a class="reference external"
href="https://tools.ietf.org/html/rfc6125#section-2.3">https://tools.ietf.org/html/rfc6125#section-2.3</a></li>
+<li>Subject Alternative Name (SAN) <a class="reference external"
href="https://tools.ietf.org/html/rfc5280#section-4.2.1.6">https://tools.ietf.org/html/rfc5280#section-4.2.1.6</a></li>
+</ol>
+<p>If client side authentication is also required then additionally the
following should be added to Flume agent configuration.
+Each Flume agent has to have its client certificate which has to be trusted by
Kafka brokers either
+individually or by their signature chain. Common example is to sign each
client certificate by a single Root CA
+which in turn is trusted by Kafka brokers.</p>
+<div class="highlight-properties"><div class="highlight"><pre><span
class="na">a1.channels.channel1.kafka.consumer.ssl.keystore.location</span><span
class="o">=</span><span class="s">/path/to/client.keystore.jks</span>
+<span
class="na">a1.channels.channel1.kafka.consumer.ssl.keystore.password</span><span
class="o">=</span><span class="s"><password to access the
keystore></span>
+</pre></div>
+</div>
+<p>If keystore and key use different password protection then <tt
class="docutils literal"><span class="pre">ssl.key.password</span></tt>
property will
+provide the required additional secret for both consumer keystores:</p>
+<div class="highlight-properties"><div class="highlight"><pre><span
class="na">a1.channels.channel1.kafka.consumer.ssl.key.password</span><span
class="o">=</span><span class="s"><password to access the key></span>
+</pre></div>
+</div>
+<p><strong>Kerberos and Kafka Source:</strong></p>
+<p>To use Kafka source with a Kafka cluster secured with Kerberos, set the <tt
class="docutils literal"><span
class="pre">consumer.security.protocol</span></tt> properties noted above for
consumer.
+The Kerberos keytab and principal to be used with Kafka brokers is specified
in a JAAS file’s “KafkaClient” section. “Client”
section describes the Zookeeper connection if needed.
+See <a class="reference external"
href="http://kafka.apache.org/documentation.html#security_sasl_clientconfig">Kafka
doc</a>
+for information on the JAAS file contents. The location of this JAAS file and
optionally the system wide kerberos configuration can be specified via
JAVA_OPTS in flume-env.sh:</p>
+<div class="highlight-properties"><div class="highlight"><pre><span
class="na">JAVA_OPTS</span><span class="o">=</span><span
class="s">"$JAVA_OPTS
-Djava.security.krb5.conf=/path/to/krb5.conf"</span>
+<span class="na">JAVA_OPTS</span><span class="o">=</span><span
class="s">"$JAVA_OPTS
-Djava.security.auth.login.config=/path/to/flume_jaas.conf"</span>
+</pre></div>
+</div>
+<p>Example secure configuration using SASL_PLAINTEXT:</p>
+<div class="highlight-properties"><div class="highlight"><pre><span
class="na">a1.channels.channel1.type</span> <span class="o">=</span> <span
class="s">org.apache.flume.channel.kafka.KafkaChannel</span>
+<span class="na">a1.channels.channel1.kafka.bootstrap.servers</span> <span
class="o">=</span> <span class="s">kafka-1:9093,kafka-2:9093,kafka-3:9093</span>
+<span class="na">a1.channels.channel1.kafka.topic</span> <span
class="o">=</span> <span class="s">channel1</span>
+<span class="na">a1.channels.channel1.kafka.consumer.group.id</span> <span
class="o">=</span> <span class="s">flume-consumer</span>
+<span class="na">a1.channels.channel1.kafka.consumer.security.protocol</span>
<span class="o">=</span> <span class="s">SASL_PLAINTEXT</span>
+<span class="na">a1.channels.channel1.kafka.consumer.sasl.mechanism</span>
<span class="o">=</span> <span class="s">GSSAPI</span>
+<span
class="na">a1.channels.channel1.kafka.consumer.sasl.kerberos.service.name</span>
<span class="o">=</span> <span class="s">kafka</span>
+</pre></div>
+</div>
+<p>Example secure configuration using SASL_SSL:</p>
+<div class="highlight-properties"><div class="highlight"><pre><span
class="na">a1.channels.channel1.type</span> <span class="o">=</span> <span
class="s">org.apache.flume.channel.kafka.KafkaChannel</span>
+<span class="na">a1.channels.channel1.kafka.bootstrap.servers</span> <span
class="o">=</span> <span class="s">kafka-1:9093,kafka-2:9093,kafka-3:9093</span>
+<span class="na">a1.channels.channel1.kafka.topic</span> <span
class="o">=</span> <span class="s">channel1</span>
+<span class="na">a1.channels.channel1.kafka.consumer.group.id</span> <span
class="o">=</span> <span class="s">flume-consumer</span>
+<span class="na">a1.channels.channel1.kafka.consumer.security.protocol</span>
<span class="o">=</span> <span class="s">SASL_SSL</span>
+<span class="na">a1.channels.channel1.kafka.consumer.sasl.mechanism</span>
<span class="o">=</span> <span class="s">GSSAPI</span>
+<span
class="na">a1.channels.channel1.kafka.consumer.sasl.kerberos.service.name</span>
<span class="o">=</span> <span class="s">kafka</span>
+<span
class="na">a1.channels.channel1.kafka.consumer.ssl.truststore.location</span><span
class="o">=</span><span class="s">/path/to/truststore.jks</span>
+<span
class="na">a1.channels.channel1.kafka.consumer.ssl.truststore.password</span><span
class="o">=</span><span class="s"><password to access the
truststore></span>
+</pre></div>
+</div>
+<p>Sample JAAS file. For reference of its content please see client config
sections of the desired authentication mechanism (GSSAPI/PLAIN)
+in Kafka documentation of <a class="reference external"
href="http://kafka.apache.org/documentation#security_sasl_clientconfig">SASL
configuration</a>.
+Since the Kafka Source may also connect to Zookeeper for offset migration, the
“Client” section was also added to this example.
+This won’t be needed unless you require offset migration, or you require
this section for other secure components.
+Also please make sure that the operating system user of the Flume processes
has read privileges on the jaas and keytab files.</p>
+<div class="highlight-javascript"><div class="highlight"><pre><span
class="nx">Client</span> <span class="p">{</span>
+ <span class="nx">com</span><span class="p">.</span><span
class="nb">sun</span><span class="p">.</span><span
class="nx">security</span><span class="p">.</span><span
class="nx">auth</span><span class="p">.</span><span
class="nx">module</span><span class="p">.</span><span
class="nx">Krb5LoginModule</span> <span class="nx">required</span>
+ <span class="nx">useKeyTab</span><span class="o">=</span><span
class="kc">true</span>
+ <span class="nx">storeKey</span><span class="o">=</span><span
class="kc">true</span>
+ <span class="nx">keyTab</span><span class="o">=</span><span
class="s2">"/path/to/keytabs/flume.keytab"</span>
+ <span class="nx">principal</span><span class="o">=</span><span
class="s2">"flume/flumehost1.example.com@YOURKERBEROSREALM"</span><span
class="p">;</span>
+<span class="p">};</span>
+
+<span class="nx">KafkaClient</span> <span class="p">{</span>
+ <span class="nx">com</span><span class="p">.</span><span
class="nb">sun</span><span class="p">.</span><span
class="nx">security</span><span class="p">.</span><span
class="nx">auth</span><span class="p">.</span><span
class="nx">module</span><span class="p">.</span><span
class="nx">Krb5LoginModule</span> <span class="nx">required</span>
+ <span class="nx">useKeyTab</span><span class="o">=</span><span
class="kc">true</span>
+ <span class="nx">storeKey</span><span class="o">=</span><span
class="kc">true</span>
+ <span class="nx">keyTab</span><span class="o">=</span><span
class="s2">"/path/to/keytabs/flume.keytab"</span>
+ <span class="nx">principal</span><span class="o">=</span><span
class="s2">"flume/flumehost1.example.com@YOURKERBEROSREALM"</span><span
class="p">;</span>
+<span class="p">};</span>
</pre></div>
</div>
</div>
@@ -1613,21 +1949,21 @@ Flume event and sent via the connected c
<span class="na">a1.channels</span> <span class="o">=</span> <span
class="s">c1</span>
<span class="na">a1.sources.r1.type</span> <span class="o">=</span> <span
class="s">netcat</span>
<span class="na">a1.sources.r1.bind</span> <span class="o">=</span> <span
class="s">0.0.0.0</span>
-<span class="na">a1.sources.r1.bind</span> <span class="o">=</span> <span
class="s">6666</span>
+<span class="na">a1.sources.r1.port</span> <span class="o">=</span> <span
class="s">6666</span>
<span class="na">a1.sources.r1.channels</span> <span class="o">=</span> <span
class="s">c1</span>
</pre></div>
</div>
</div>
<div class="section" id="sequence-generator-source">
<h4>Sequence Generator Source<a class="headerlink"
href="#sequence-generator-source" title="Permalink to this headline">¶</a></h4>
-<p>A simple sequence generator that continuously generates events with a
counter
-that starts from 0 and increments by 1. Useful mainly for testing.
-Required properties are in <strong>bold</strong>.</p>
+<p>A simple sequence generator that continuously generates events with a
counter that starts from 0,
+increments by 1 and stops at totalEvents. Retries when it can’t send
events to the channel. Useful
+mainly for testing. Required properties are in <strong>bold</strong>.</p>
<table border="1" class="docutils">
<colgroup>
-<col width="20%" />
-<col width="16%" />
-<col width="64%" />
+<col width="19%" />
+<col width="21%" />
+<col width="60%" />
</colgroup>
<thead valign="bottom">
<tr class="row-odd"><th class="head">Property Name</th>
@@ -1664,6 +2000,10 @@ Required properties are in <strong>bold<
<td>1</td>
<td> </td>
</tr>
+<tr class="row-odd"><td>totalEvents</td>
+<td>Long.MAX_VALUE</td>
+<td>Number of unique events sent by the source.</td>
+</tr>
</tbody>
</table>
<p>Example for agent named a1:</p>
@@ -2395,8 +2735,8 @@ required.</p>
<p>The following are the escape sequences supported:</p>
<table border="1" class="docutils">
<colgroup>
-<col width="10%" />
-<col width="90%" />
+<col width="15%" />
+<col width="85%" />
</colgroup>
<thead valign="bottom">
<tr class="row-odd"><th class="head">Alias</th>
@@ -2473,8 +2813,19 @@ required.</p>
<tr class="row-even"><td>%z</td>
<td>+hhmm numeric timezone (for example, -0400)</td>
</tr>
+<tr class="row-odd"><td>%[localhost]</td>
+<td>Substitute the hostname of the host where the agent is running</td>
+</tr>
+<tr class="row-even"><td>%[IP]</td>
+<td>Substitute the IP address of the host where the agent is running</td>
+</tr>
+<tr class="row-odd"><td>%[FQDN]</td>
+<td>Substitute the canonical hostname of the host where the agent is
running</td>
+</tr>
</tbody>
</table>
+<p>Note: The escape strings %[localhost], %[IP] and %[FQDN] all rely on
Java’s ability to obtain the
+hostname, which may fail in some networking environments.</p>
<p>The file in use will have the name mangled to include ”.tmp” at
the end. Once
the file is closed, this extension is removed. This allows excluding partially
complete files in the directory.
@@ -2662,8 +3013,7 @@ timestamp 11:54:34 AM, June 12, 2012 wil
Events are written using Hive transactions. As soon as a set of events are
committed to Hive, they become
immediately visible to Hive queries. Partitions to which flume will stream to
can either be pre-created
or, optionally, Flume can create them if they are missing. Fields from
incoming event data are mapped to
-corresponding columns in the Hive table. <strong>This sink is provided as a
preview feature and not recommended
-for use in production.</strong></p>
+corresponding columns in the Hive table.</p>
<table border="1" class="docutils">
<colgroup>
<col width="15%" />
@@ -2918,8 +3268,9 @@ accept tab separated input containing th
</div>
<div class="section" id="logger-sink">
<h4>Logger Sink<a class="headerlink" href="#logger-sink" title="Permalink to
this headline">¶</a></h4>
-<p>Logs event at INFO level. Typically useful for testing/debugging purpose.
-Required properties are in <strong>bold</strong>.</p>
+<p>Logs event at INFO level. Typically useful for testing/debugging purpose.
Required properties are
+in <strong>bold</strong>. This sink is the only exception which doesn’t
require the extra configuration
+explained in the <a class="reference internal"
href="#logging-raw-data">Logging raw data</a> section.</p>
<table border="1" class="docutils">
<colgroup>
<col width="20%" />
@@ -3243,9 +3594,9 @@ backslash, like this: “\n”)<
Required properties are in <strong>bold</strong>.</p>
<table border="1" class="docutils">
<colgroup>
-<col width="13%" />
+<col width="17%" />
<col width="5%" />
-<col width="82%" />
+<col width="78%" />
</colgroup>
<thead valign="bottom">
<tr class="row-odd"><th class="head">Property Name</th>
@@ -3266,15 +3617,27 @@ Required properties are in <strong>bold<
<td>–</td>
<td>The directory where files will be stored</td>
</tr>
-<tr class="row-odd"><td>sink.rollInterval</td>
+<tr class="row-odd"><td>sink.pathManager</td>
+<td>DEFAULT</td>
+<td>The PathManager implementation to use.</td>
+</tr>
+<tr class="row-even"><td>sink.pathManager.extension</td>
+<td>–</td>
+<td>The file extension if the default PathManager is used.</td>
+</tr>
+<tr class="row-odd"><td>sink.pathManager.prefix</td>
+<td>–</td>
+<td>A character string to add to the beginning of the file name if the default
PathManager is used</td>
+</tr>
+<tr class="row-even"><td>sink.rollInterval</td>
<td>30</td>
<td>Roll the file every 30 seconds. Specifying 0 will disable rolling and
cause all events to be written to a single file.</td>
</tr>
-<tr class="row-even"><td>sink.serializer</td>
+<tr class="row-odd"><td>sink.serializer</td>
<td>TEXT</td>
<td>Other possible options include <tt class="docutils literal"><span
class="pre">avro_event</span></tt> or the FQCN of an implementation of
EventSerializer.Builder interface.</td>
</tr>
-<tr class="row-odd"><td>batchSize</td>
+<tr class="row-even"><td>batchSize</td>
<td>100</td>
<td> </td>
</tr>
@@ -3827,13 +4190,14 @@ the kerberos principal</td>
<p>This is a Flume Sink implementation that can publish data to a
<a class="reference external" href="http://kafka.apache.org/">Kafka</a> topic.
One of the objective is to integrate Flume
with Kafka so that pull based processing systems can process the data coming
-through various Flume sources. This currently supports Kafka 0.8.x series of
releases.</p>
+through various Flume sources. This currently supports Kafka 0.9.x series of
releases.</p>
+<p>This version of Flume no longer supports Older Versions (0.8.x) of
Kafka.</p>
<p>Required properties are marked in bold font.</p>
<table border="1" class="docutils">
<colgroup>
-<col width="20%" />
-<col width="12%" />
-<col width="68%" />
+<col width="18%" />
+<col width="10%" />
+<col width="72%" />
</colgroup>
<thead valign="bottom">
<tr class="row-odd"><th class="head">Property Name</th>
@@ -3846,34 +4210,65 @@ through various Flume sources. This curr
<td>–</td>
<td>Must be set to <tt class="docutils literal"><span
class="pre">org.apache.flume.sink.kafka.KafkaSink</span></tt></td>
</tr>
-<tr class="row-odd"><td><strong>brokerList</strong></td>
+<tr class="row-odd"><td><strong>kafka.bootstrap.servers</strong></td>
<td>–</td>
<td>List of brokers Kafka-Sink will connect to, to get the list of topic
partitions
This can be a partial list of brokers, but we recommend at least two for HA.
The format is comma separated list of hostname:port</td>
</tr>
-<tr class="row-even"><td>topic</td>
+<tr class="row-even"><td>kafka.topic</td>
<td>default-flume-topic</td>
<td>The topic in Kafka to which the messages will be published. If this
parameter is configured,
messages will be published to this topic.
If the event header contains a “topic” field, the event will be
published to that topic
overriding the topic configured here.</td>
</tr>
-<tr class="row-odd"><td>batchSize</td>
+<tr class="row-odd"><td>flumeBatchSize</td>
<td>100</td>
<td>How many messages to process in one batch. Larger batches improve
throughput while adding latency.</td>
</tr>
-<tr class="row-even"><td>requiredAcks</td>
+<tr class="row-even"><td>kafka.producer.acks</td>
<td>1</td>
<td>How many replicas must acknowledge a message before its considered
successfully written.
Accepted values are 0 (Never wait for acknowledgement), 1 (wait for leader
only), -1 (wait for all replicas)
Set this to -1 to avoid data loss in some cases of leader failure.</td>
</tr>
-<tr class="row-odd"><td>Other Kafka Producer Properties</td>
+<tr class="row-odd"><td>useFlumeEventFormat</td>
+<td>false</td>
+<td>By default events are put as bytes onto the Kafka topic directly from the
event body. Set to
+true to store events as the Flume Avro binary format. Used in conjunction with
the same property
+on the KafkaSource or with the parseAsFlumeEvent property on the Kafka Channel
this will preserve
+any Flume headers for the producing side.</td>
+</tr>
+<tr class="row-even"><td>defaultPartitionId</td>
+<td>–</td>
+<td>Specifies a Kafka partition ID (integer) for all events in this channel to
be sent to, unless
+overriden by <tt class="docutils literal"><span
class="pre">partitionIdHeader</span></tt>. By default, if this property is not
set, events will be
+distributed by the Kafka Producer’s partitioner - including by <tt
class="docutils literal"><span class="pre">key</span></tt> if specified (or by a
+partitioner specified by <tt class="docutils literal"><span
class="pre">kafka.partitioner.class</span></tt>).</td>
+</tr>
+<tr class="row-odd"><td>partitionIdHeader</td>
+<td>–</td>
+<td>When set, the sink will take the value of the field named using the value
of this property
+from the event header and send the message to the specified partition of the
topic. If the
+value represents an invalid partition, an EventDeliveryException will be
thrown. If the header value
+is present then this setting overrides <tt class="docutils literal"><span
class="pre">defaultPartitionId</span></tt>.</td>
+</tr>
+<tr class="row-even"><td>kafka.producer.security.protocol</td>
+<td>PLAINTEXT</td>
+<td>Set to SASL_PLAINTEXT, SASL_SSL or SSL if writing to Kafka using some
level of security. See below for additional info on secure setup.</td>
+</tr>
+<tr class="row-odd"><td><em>more producer security props</em></td>
+<td> </td>
+<td>If using SASL_PLAINTEXT, SASL_SSL or SSL refer to <a class="reference
external" href="http://kafka.apache.org/documentation.html#security">Kafka
security</a> for additional
+properties that need to be set on producer.</td>
+</tr>
+<tr class="row-even"><td>Other Kafka Producer Properties</td>
<td>–</td>
<td>These properties are used to configure the Kafka Producer. Any producer
property supported
-by Kafka can be used. The only requirement is to prepend the property name
with the prefix <tt class="docutils literal"><span
class="pre">kafka.</span></tt>.
-For example: kafka.producer.type</td>
+by Kafka can be used. The only requirement is to prepend the property name
with the prefix
+<tt class="docutils literal"><span class="pre">kafka.producer</span></tt>.
+For example: kafka.producer.linger.ms</td>
</tr>
</tbody>
</table>
@@ -3884,19 +4279,152 @@ If <tt class="docutils literal"><span cl
If <tt class="docutils literal"><span class="pre">key</span></tt> exists in
the headers, the key will used by Kafka to partition the data between the topic
partitions. Events with same key
will be sent to the same partition. If the key is null, events will be sent to
random partitions.</p>
</div>
+<p>The Kafka sink also provides defaults for the
key.serializer(org.apache.kafka.common.serialization.StringSerializer)
+and
value.serializer(org.apache.kafka.common.serialization.ByteArraySerializer).
Modification of these parameters is not recommended.</p>
+<p>Deprecated Properties</p>
+<table border="1" class="docutils">
+<colgroup>
+<col width="22%" />
+<col width="13%" />
+<col width="65%" />
+</colgroup>
+<thead valign="bottom">
+<tr class="row-odd"><th class="head">Property Name</th>
+<th class="head">Default</th>
+<th class="head">Description</th>
+</tr>
+</thead>
+<tbody valign="top">
+<tr class="row-even"><td>brokerList</td>
+<td>–</td>
+<td>Use kafka.bootstrap.servers</td>
+</tr>
+<tr class="row-odd"><td>topic</td>
+<td>default-flume-topic</td>
+<td>Use kafka.topic</td>
+</tr>
+<tr class="row-even"><td>batchSize</td>
+<td>100</td>
+<td>Use kafka.flumeBatchSize</td>
+</tr>
+<tr class="row-odd"><td>requiredAcks</td>
+<td>1</td>
+<td>Use kafka.producer.acks</td>
+</tr>
+</tbody>
+</table>
<p>An example configuration of a Kafka sink is given below. Properties starting
-with the prefix <tt class="docutils literal"><span
class="pre">kafka</span></tt> (the last 3 properties) are used when
instantiating
-the Kafka producer. The properties that are passed when creating the Kafka
+with the prefix <tt class="docutils literal"><span
class="pre">kafka.producer</span></tt> the Kafka producer. The properties that
are passed when creating the Kafka
producer are not limited to the properties given in this example.
-Also it’s possible include your custom properties here and access them
inside
+Also it is possible to include your custom properties here and access them
inside
the preprocessor through the Flume Context object passed in as a method
argument.</p>
-<div class="highlight-properties"><div class="highlight"><pre><span
class="na">a1.sinks.k1.type</span> <span class="o">=</span> <span
class="s">org.apache.flume.sink.kafka.KafkaSink</span>
-<span class="na">a1.sinks.k1.topic</span> <span class="o">=</span> <span
class="s">mytopic</span>
-<span class="na">a1.sinks.k1.brokerList</span> <span class="o">=</span> <span
class="s">localhost:9092</span>
-<span class="na">a1.sinks.k1.requiredAcks</span> <span class="o">=</span>
<span class="s">1</span>
-<span class="na">a1.sinks.k1.batchSize</span> <span class="o">=</span> <span
class="s">20</span>
-<span class="na">a1.sinks.k1.channel</span> <span class="o">=</span> <span
class="s">c1</span>
+<div class="highlight-properties"><div class="highlight"><pre><span
class="na">a1.sinks.k1.channel</span> <span class="o">=</span> <span
class="s">c1</span>
+<span class="na">a1.sinks.k1.type</span> <span class="o">=</span> <span
class="s">org.apache.flume.sink.kafka.KafkaSink</span>
+<span class="na">a1.sinks.k1.kafka.topic</span> <span class="o">=</span> <span
class="s">mytopic</span>
+<span class="na">a1.sinks.k1.kafka.bootstrap.servers</span> <span
class="o">=</span> <span class="s">localhost:9092</span>
+<span class="na">a1.sinks.k1.kafka.flumeBatchSize</span> <span
class="o">=</span> <span class="s">20</span>
+<span class="na">a1.sinks.k1.kafka.producer.acks</span> <span
class="o">=</span> <span class="s">1</span>
+<span class="na">a1.sinks.k1.kafka.producer.linger.ms</span> <span
class="o">=</span> <span class="s">1</span>
+<span class="na">a1.sinks.ki.kafka.producer.compression.type</span> <span
class="o">=</span> <span class="s">snappy</span>
+</pre></div>
+</div>
+<p><strong>Security and Kafka Sink:</strong></p>
+<p>Secure authentication as well as data encryption is supported on the
communication channel between Flume and Kafka.
+For secure authentication SASL/GSSAPI (Kerberos V5) or SSL (even though the
parameter is named SSL, the actual protocol is a TLS implementation) can be
used from Kafka version 0.9.0.</p>
+<p>As of now data encryption is solely provided by SSL/TLS.</p>
+<p>Setting <tt class="docutils literal"><span
class="pre">kafka.producer.security.protocol</span></tt> to any of the
following value means:</p>
+<ul class="simple">
+<li><strong>SASL_PLAINTEXT</strong> - Kerberos or plaintext authentication
with no data encryption</li>
+<li><strong>SASL_SSL</strong> - Kerberos or plaintext authentication with data
encryption</li>
+<li><strong>SSL</strong> - TLS based encryption with optional
authentication.</li>
+</ul>
+<div class="admonition warning">
+<p class="first admonition-title">Warning</p>
+<p class="last">There is a performance degradation when SSL is enabled,
+the magnitude of which depends on the CPU type and the JVM implementation.
+Reference: <a class="reference external"
href="http://kafka.apache.org/documentation#security_overview">Kafka security
overview</a>
+and the jira for tracking this issue:
+<a class="reference external"
href="https://issues.apache.org/jira/browse/KAFKA-2561">KAFKA-2561</a></p>
+</div>
+<p><strong>TLS and Kafka Sink:</strong></p>
+<p>Please read the steps described in <a class="reference external"
href="http://kafka.apache.org/documentation#security_configclients">Configuring
Kafka Clients SSL</a>
+to learn about additional configuration settings for fine tuning for example
any of the following:
+security provider, cipher suites, enabled protocols, truststore or keystore
types.</p>
+<p>Example configuration with server side authentication and data
encryption.</p>
+<div class="highlight-properties"><div class="highlight"><pre><span
class="na">a1.channels.channel1.type</span> <span class="o">=</span> <span
class="s">org.apache.flume.channel.kafka.KafkaChannel</span>
+<span class="na">a1.channels.channel1.kafka.bootstrap.servers</span> <span
class="o">=</span> <span class="s">kafka-1:9093,kafka-2:9093,kafka-3:9093</span>
+<span class="na">a1.channels.channel1.kafka.topic</span> <span
class="o">=</span> <span class="s">channel1</span>
+<span class="na">a1.channels.channel1.kafka.producer.security.protocol</span>
<span class="o">=</span> <span class="s">SSL</span>
+<span
class="na">a1.channels.channel1.kafka.producer.ssl.truststore.location</span>
<span class="o">=</span> <span class="s">/path/to/truststore.jks</span>
+<span
class="na">a1.channels.channel1.kafka.producer.ssl.truststore.password</span>
<span class="o">=</span> <span class="s"><password to access the
truststore></span>
+</pre></div>
+</div>
+<p>Note: By default the property <tt class="docutils literal"><span
class="pre">ssl.endpoint.identification.algorithm</span></tt>
+is not defined, so hostname verification is not performed.
+In order to enable hostname verification, set the following properties</p>
+<div class="highlight-properties"><div class="highlight"><pre><span
class="na">a1.channels.channel1.kafka.producer.ssl.endpoint.identification.algorithm</span>
<span class="o">=</span> <span class="s">HTTPS</span>
+</pre></div>
+</div>
+<p>Once enabled, clients will verify the server’s fully qualified domain
name (FQDN)
+against one of the following two fields:</p>
+<ol class="arabic simple">
+<li>Common Name (CN) <a class="reference external"
href="https://tools.ietf.org/html/rfc6125#section-2.3">https://tools.ietf.org/html/rfc6125#section-2.3</a></li>
+<li>Subject Alternative Name (SAN) <a class="reference external"
href="https://tools.ietf.org/html/rfc5280#section-4.2.1.6">https://tools.ietf.org/html/rfc5280#section-4.2.1.6</a></li>
+</ol>
+<p>If client side authentication is also required then additionally the
following should be added to Flume agent configuration.
+Each Flume agent has to have its client certificate which has to be trusted by
Kafka brokers either
+individually or by their signature chain. Common example is to sign each
client certificate by a single Root CA
+which in turn is trusted by Kafka brokers.</p>
+<div class="highlight-properties"><div class="highlight"><pre><span
class="na">a1.channels.channel1.kafka.producer.ssl.keystore.location</span>
<span class="o">=</span> <span class="s">/path/to/client.keystore.jks</span>
+<span
class="na">a1.channels.channel1.kafka.producer.ssl.keystore.password</span>
<span class="o">=</span> <span class="s"><password to access the
keystore></span>
+</pre></div>
+</div>
+<p>If keystore and key use different password protection then <tt
class="docutils literal"><span class="pre">ssl.key.password</span></tt>
property will
+provide the required additional secret for producer keystore:</p>
+<div class="highlight-properties"><div class="highlight"><pre><span
class="na">a1.channels.channel1.kafka.producer.ssl.key.password</span> <span
class="o">=</span> <span class="s"><password to access the key></span>
+</pre></div>
+</div>
+<p><strong>Kerberos and Kafka Sink:</strong></p>
+<p>To use Kafka sink with a Kafka cluster secured with Kerberos, set the <tt
class="docutils literal"><span
class="pre">producer.security.protocol</span></tt> property noted above for
producer.
+The Kerberos keytab and principal to be used with Kafka brokers is specified
in a JAAS file’s “KafkaClient” section. “Client”
section describes the Zookeeper connection if needed.
+See <a class="reference external"
href="http://kafka.apache.org/documentation.html#security_sasl_clientconfig">Kafka
doc</a>
+for information on the JAAS file contents. The location of this JAAS file and
optionally the system wide kerberos configuration can be specified via
JAVA_OPTS in flume-env.sh:</p>
+<div class="highlight-properties"><div class="highlight"><pre><span
class="na">JAVA_OPTS</span><span class="o">=</span><span
class="s">"$JAVA_OPTS
-Djava.security.krb5.conf=/path/to/krb5.conf"</span>
+<span class="na">JAVA_OPTS</span><span class="o">=</span><span
class="s">"$JAVA_OPTS
-Djava.security.auth.login.config=/path/to/flume_jaas.conf"</span>
+</pre></div>
+</div>
+<p>Example secure configuration using SASL_PLAINTEXT:</p>
+<div class="highlight-properties"><div class="highlight"><pre><span
class="na">a1.channels.channel1.type</span> <span class="o">=</span> <span
class="s">org.apache.flume.channel.kafka.KafkaChannel</span>
+<span class="na">a1.channels.channel1.kafka.bootstrap.servers</span> <span
class="o">=</span> <span class="s">kafka-1:9093,kafka-2:9093,kafka-3:9093</span>
+<span class="na">a1.channels.channel1.kafka.topic</span> <span
class="o">=</span> <span class="s">channel1</span>
+<span class="na">a1.channels.channel1.kafka.producer.security.protocol</span>
<span class="o">=</span> <span class="s">SASL_PLAINTEXT</span>
+<span class="na">a1.channels.channel1.kafka.producer.sasl.mechanism</span>
<span class="o">=</span> <span class="s">GSSAPI</span>
+<span
class="na">a1.channels.channel1.kafka.producer.sasl.kerberos.service.name</span>
<span class="o">=</span> <span class="s">kafka</span>
+</pre></div>
+</div>
+<p>Example secure configuration using SASL_SSL:</p>
+<div class="highlight-properties"><div class="highlight"><pre><span
class="na">a1.channels.channel1.type</span> <span class="o">=</span> <span
class="s">org.apache.flume.channel.kafka.KafkaChannel</span>
+<span class="na">a1.channels.channel1.kafka.bootstrap.servers</span> <span
class="o">=</span> <span class="s">kafka-1:9093,kafka-2:9093,kafka-3:9093</span>
+<span class="na">a1.channels.channel1.kafka.topic</span> <span
class="o">=</span> <span class="s">channel1</span>
+<span class="na">a1.channels.channel1.kafka.producer.security.protocol</span>
<span class="o">=</span> <span class="s">SASL_SSL</span>
+<span class="na">a1.channels.channel1.kafka.producer.sasl.mechanism</span>
<span class="o">=</span> <span class="s">GSSAPI</span>
+<span
class="na">a1.channels.channel1.kafka.producer.sasl.kerberos.service.name</span>
<span class="o">=</span> <span class="s">kafka</span>
+<span
class="na">a1.channels.channel1.kafka.producer.ssl.truststore.location</span>
<span class="o">=</span> <span class="s">/path/to/truststore.jks</span>
+<span
class="na">a1.channels.channel1.kafka.producer.ssl.truststore.password</span>
<span class="o">=</span> <span class="s"><password to access the
truststore></span>
+</pre></div>
+</div>
+<p>Sample JAAS file. For reference of its content please see client config
sections of the desired authentication mechanism (GSSAPI/PLAIN)
+in Kafka documentation of <a class="reference external"
href="http://kafka.apache.org/documentation#security_sasl_clientconfig">SASL
configuration</a>.
+Unlike the Kafka Source or Kafka Channel a “Client” section is not
required, unless it is needed by other connecting components. Also please make
sure
+that the operating system user of the Flume processes has read privileges on
the jaas and keytab files.</p>
+<div class="highlight-javascript"><div class="highlight"><pre><span
class="nx">KafkaClient</span> <span class="p">{</span>
+ <span class="nx">com</span><span class="p">.</span><span
class="nb">sun</span><span class="p">.</span><span
class="nx">security</span><span class="p">.</span><span
class="nx">auth</span><span class="p">.</span><span
class="nx">module</span><span class="p">.</span><span
class="nx">Krb5LoginModule</span> <span class="nx">required</span>
+ <span class="nx">useKeyTab</span><span class="o">=</span><span
class="kc">true</span>
+ <span class="nx">storeKey</span><span class="o">=</span><span
class="kc">true</span>
+ <span class="nx">keyTab</span><span class="o">=</span><span
class="s2">"/path/to/keytabs/flume.keytab"</span>
+ <span class="nx">principal</span><span class="o">=</span><span
class="s2">"flume/flumehost1.example.com@YOURKERBEROSREALM"</span><span
class="p">;</span>
+<span class="p">};</span>
</pre></div>
</div>
</div>
@@ -4101,16 +4629,29 @@ READ_COMMITTED, SERIALIZABLE, REPEATABLE
<h4>Kafka Channel<a class="headerlink" href="#kafka-channel" title="Permalink
to this headline">¶</a></h4>
<p>The events are stored in a Kafka cluster (must be installed separately).
Kafka provides high availability and
replication, so in case an agent or a kafka broker crashes, the events are
immediately available to other sinks</p>
-<p>The Kafka channel can be used for multiple scenarios:
-* With Flume source and sink - it provides a reliable and highly available
channel for events
-* With Flume source and interceptor but no sink - it allows writing Flume
events into a Kafka topic, for use by other apps
-* With Flume sink, but no source - it is a low-latency, fault tolerant way to
send events from Kafka to Flume sources such as HDFS, HBase or Solr</p>
+<p>The Kafka channel can be used for multiple scenarios:</p>
+<ol class="arabic simple">
+<li>With Flume source and sink - it provides a reliable and highly available
channel for events</li>
+<li>With Flume source and interceptor but no sink - it allows writing Flume
events into a Kafka topic, for use by other apps</li>
+<li>With Flume sink, but no source - it is a low-latency, fault tolerant way
to send events from Kafka to Flume sinks such as HDFS, HBase or Solr</li>
+</ol>
+<p>This version of Flume requires Kafka version 0.9 or greater due to the
reliance on the Kafka clients shipped with that version. The configuration of
+the channel has changed compared to previous flume versions.</p>
+<p>The configuration parameters are organized as such:</p>
+<ol class="arabic simple">
+<li>Configuration values related to the channel generically are applied at the
channel config level, eg: a1.channel.k1.type =</li>
+<li>Configuration values related to Kafka or how the Channel operates are
prefixed with “kafka.”, (this are analgous to CommonClient Configs)
eg: a1.channels.k1.kafka.topic and a1.channels.k1.kafka.bootstrap.servers. This
is not dissimilar to how the hdfs sink operates</li>
+<li>Properties specific to the producer/consumer are prefixed by
kafka.producer or kafka.consumer</li>
+<li>Where possible, the Kafka paramter names are used, eg: bootstrap.servers
and acks</li>
+</ol>
+<p>This version of flume is backwards-compatible with previous versions,
however deprecated properties are indicated in the table below and a warning
message
+is logged on startup when they are present in the configuration file.</p>
<p>Required properties are in <strong>bold</strong>.</p>
<table border="1" class="docutils">
<colgroup>
-<col width="14%" />
-<col width="16%" />
-<col width="70%" />
+<col width="19%" />
+<col width="13%" />
+<col width="68%" />
</colgroup>
<thead valign="bottom">
<tr class="row-odd"><th class="head">Property Name</th>
@@ -4123,49 +4664,110 @@ replication, so in case an agent or a ka
<td>–</td>
<td>The component type name, needs to be <tt class="docutils literal"><span
class="pre">org.apache.flume.channel.kafka.KafkaChannel</span></tt></td>
</tr>
-<tr class="row-odd"><td><strong>brokerList</strong></td>
+<tr class="row-odd"><td><strong>kafka.bootstrap.servers</strong></td>
<td>–</td>
<td>List of brokers in the Kafka cluster used by the channel
This can be a partial list of brokers, but we recommend at least two for HA.
The format is comma separated list of hostname:port</td>
</tr>
-<tr class="row-even"><td><strong>zookeeperConnect</strong></td>
-<td>–</td>
-<td>URI of ZooKeeper used by Kafka cluster
-The format is comma separated list of hostname:port. If chroot is used, it is
added once at the end.
-For example: zookeeper-1:2181,zookeeper-2:2182,zookeeper-3:2181/kafka</td>
-</tr>
-<tr class="row-odd"><td>topic</td>
+<tr class="row-even"><td>kafka.topic</td>
<td>flume-channel</td>
<td>Kafka topic which the channel will use</td>
</tr>
-<tr class="row-even"><td>groupId</td>
+<tr class="row-odd"><td>kafka.consumer.group.id</td>
<td>flume</td>
<td>Consumer group ID the channel uses to register with Kafka.
Multiple channels must use the same topic and group to ensure that when one
agent fails another can get the data
Note that having non-channel consumers with the same ID can lead to data
loss.</td>
</tr>
-<tr class="row-odd"><td>parseAsFlumeEvent</td>
+<tr class="row-even"><td>parseAsFlumeEvent</td>
<td>true</td>
<td>Expecting Avro datums with FlumeEvent schema in the channel.
-This should be true if Flume source is writing to the channel
-And false if other producers are writing into the topic that the channel is
using
-Flume source messages to Kafka can be parsed outside of Flume by using
+This should be true if Flume source is writing to the channel and false if
other producers are
+writing into the topic that the channel is using. Flume source messages to
Kafka can be parsed outside of Flume by using
org.apache.flume.source.avro.AvroFlumeEvent provided by the flume-ng-sdk
artifact</td>
</tr>
-<tr class="row-even"><td>readSmallestOffset</td>
+<tr class="row-odd"><td>migrateZookeeperOffsets</td>
+<td>true</td>
+<td>When no Kafka stored offset is found, look up the offsets in Zookeeper and
commit them to Kafka.
+This should be true to support seamless Kafka client migration from older
versions of Flume. Once migrated this can be set
+to false, though that should generally not be required. If no Zookeeper offset
is found the kafka.consumer.auto.offset.reset
+configuration defines how offsets are handled.</td>
+</tr>
+<tr class="row-even"><td>pollTimeout</td>
+<td>500</td>
+<td>The amount of time(in milliseconds) to wait in the “poll()”
call of the consumer.
+<a class="reference external"
href="https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long">https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long</a>)</td>
+</tr>
+<tr class="row-odd"><td>defaultPartitionId</td>
+<td>–</td>
+<td>Specifies a Kafka partition ID (integer) for all events in this channel to
be sent to, unless
+overriden by <tt class="docutils literal"><span
class="pre">partitionIdHeader</span></tt>. By default, if this property is not
set, events will be
+distributed by the Kafka Producer’s partitioner - including by <tt
class="docutils literal"><span class="pre">key</span></tt> if specified (or by a
+partitioner specified by <tt class="docutils literal"><span
class="pre">kafka.partitioner.class</span></tt>).</td>
+</tr>
+<tr class="row-even"><td>partitionIdHeader</td>
+<td>–</td>
+<td>When set, the producer will take the value of the field named using the
value of this property
+from the event header and send the message to the specified partition of the
topic. If the
+value represents an invalid partition the event will not be accepted into the
channel. If the header value
+is present then this setting overrides <tt class="docutils literal"><span
class="pre">defaultPartitionId</span></tt>.</td>
+</tr>
+<tr class="row-odd"><td>kafka.consumer.auto.offset.reset</td>
+<td>latest</td>
+<td>What to do when there is no initial offset in Kafka or if the current
offset does not exist any more on the server
+(e.g. because that data has been deleted):
+earliest: automatically reset the offset to the earliest offset
+latest: automatically reset the offset to the latest offset
+none: throw exception to the consumer if no previous offset is found for the
consumer’s group
+anything else: throw exception to the consumer.</td>
+</tr>
+<tr class="row-even"><td>kafka.producer.security.protocol</td>
+<td>PLAINTEXT</td>
+<td>Set to SASL_PLAINTEXT, SASL_SSL or SSL if writing to Kafka using some
level of security. See below for additional info on secure setup.</td>
+</tr>
+<tr class="row-odd"><td>kafka.consumer.security.protocol</td>
+<td>PLAINTEXT</td>
+<td>Same as kafka.producer.security.protocol but for reading/consuming from
Kafka.</td>
+</tr>
+<tr class="row-even"><td><em>more producer/consumer security props</em></td>
+<td> </td>
+<td>If using SASL_PLAINTEXT, SASL_SSL or SSL refer to <a class="reference
external" href="http://kafka.apache.org/documentation.html#security">Kafka
security</a> for additional
+properties that need to be set on producer/consumer.</td>
+</tr>
+</tbody>
+</table>
+<p>Deprecated Properties</p>
+<table border="1" class="docutils">
+<colgroup>
+<col width="19%" />
+<col width="15%" />
+<col width="66%" />
+</colgroup>
+<thead valign="bottom">
+<tr class="row-odd"><th class="head">Property Name</th>
+<th class="head">Default</th>
+<th class="head">Description</th>
+</tr>
+</thead>
+<tbody valign="top">
+<tr class="row-even"><td>brokerList</td>
+<td>–</td>
+<td>List of brokers in the Kafka cluster used by the channel
+This can be a partial list of brokers, but we recommend at least two for HA.
+The format is comma separated list of hostname:port</td>
+</tr>
+<tr class="row-odd"><td>topic</td>
+<td>flume-channel</td>
+<td>Use kafka.topic</td>
+</tr>
+<tr class="row-even"><td>groupId</td>
+<td>flume</td>
+<td>Use kafka.consumer.group.id</td>
+</tr>
+<tr class="row-odd"><td>readSmallestOffset</td>
<td>false</td>
-<td>When set to true, the channel will read all data in the topic, starting
from the oldest event
-when false, it will read only events written after the channel started
-When “parseAsFlumeEvent” is true, this will be false. Flume source
will start prior to the sinks and this
-guarantees that events sent by source before sinks start will not be lost.</td>
-</tr>
-<tr class="row-odd"><td>Other Kafka Properties</td>
-<td>–</td>
-<td>These properties are used to configure the Kafka Producer and Consumer
used by the channel.
-Any property supported by Kafka can be used.
-The only requirement is to prepend the property name with the prefix <tt
class="docutils literal"><span class="pre">kafka.</span></tt>.
-For example: kafka.producer.type</td>
+<td>Use kafka.consumer.auto.offset.reset</td>
</tr>
</tbody>
</table>
@@ -4174,12 +4776,135 @@ For example: kafka.producer.type</td>
<p class="last">Due to the way the channel is load balanced, there may be
duplicate events when the agent first starts up</p>
</div>
<p>Example for agent named a1:</p>
-<div class="highlight-properties"><div class="highlight"><pre><span
class="na">a1.channels.channel1.type</span> <span class="o">=</span> <span
class="s">org.apache.flume.channel.kafka.KafkaChannel</span>
-<span class="na">a1.channels.channel1.capacity</span> <span class="o">=</span>
<span class="s">10000</span>
-<span class="na">a1.channels.channel1.transactionCapacity</span> <span
class="o">=</span> <span class="s">1000</span>
-<span class="na">a1.channels.channel1.brokerList</span><span
class="o">=</span><span class="s">kafka-2:9092,kafka-3:9092</span>
-<span class="na">a1.channels.channel1.topic</span><span
class="o">=</span><span class="s">channel1</span>
-<span class="na">a1.channels.channel1.zookeeperConnect</span><span
class="o">=</span><span class="s">kafka-1:2181</span>
+<div class="highlight-properties"><div class="highlight"><pre><span
class="na">a1.channels.channel1.type</span> <span class="o">=</span> <span
class="s">org.apache.flume.channel.kafka.KafkaChannel</span>
+<span class="na">a1.channels.channel1.kafka.bootstrap.servers</span> <span
class="o">=</span> <span class="s">kafka-1:9092,kafka-2:9092,kafka-3:9092</span>
+<span class="na">a1.channels.channel1.kafka.topic</span> <span
class="o">=</span> <span class="s">channel1</span>
+<span class="na">a1.channels.channel1.kafka.consumer.group.id</span> <span
class="o">=</span> <span class="s">flume-consumer</span>
+</pre></div>
+</div>
+<p><strong>Security and Kafka Channel:</strong></p>
+<p>Secure authentication as well as data encryption is supported on the
communication channel between Flume and Kafka.
+For secure authentication SASL/GSSAPI (Kerberos V5) or SSL (even though the
parameter is named SSL, the actual protocol is a TLS implementation) can be
used from Kafka version 0.9.0.</p>
+<p>As of now data encryption is solely provided by SSL/TLS.</p>
+<p>Setting <tt class="docutils literal"><span
class="pre">kafka.producer|consumer.security.protocol</span></tt> to any of the
following value means:</p>
+<ul class="simple">
+<li><strong>SASL_PLAINTEXT</strong> - Kerberos or plaintext authentication
with no data encryption</li>
+<li><strong>SASL_SSL</strong> - Kerberos or plaintext authentication with data
encryption</li>
+<li><strong>SSL</strong> - TLS based encryption with optional
authentication.</li>
+</ul>
+<div class="admonition warning">
+<p class="first admonition-title">Warning</p>
+<p class="last">There is a performance degradation when SSL is enabled,
+the magnitude of which depends on the CPU type and the JVM implementation.
+Reference: <a class="reference external"
href="http://kafka.apache.org/documentation#security_overview">Kafka security
overview</a>
+and the jira for tracking this issue:
+<a class="reference external"
href="https://issues.apache.org/jira/browse/KAFKA-2561">KAFKA-2561</a></p>
+</div>
+<p><strong>TLS and Kafka Channel:</strong></p>
+<p>Please read the steps described in <a class="reference external"
href="http://kafka.apache.org/documentation#security_configclients">Configuring
Kafka Clients SSL</a>
+to learn about additional configuration settings for fine tuning for example
any of the following:
+security provider, cipher suites, enabled protocols, truststore or keystore
types.</p>
+<p>Example configuration with server side authentication and data
encryption.</p>
+<div class="highlight-properties"><div class="highlight"><pre><span
class="na">a1.channels.channel1.type</span> <span class="o">=</span> <span
class="s">org.apache.flume.channel.kafka.KafkaChannel</span>
+<span class="na">a1.channels.channel1.kafka.bootstrap.servers</span> <span
class="o">=</span> <span class="s">kafka-1:9093,kafka-2:9093,kafka-3:9093</span>
+<span class="na">a1.channels.channel1.kafka.topic</span> <span
class="o">=</span> <span class="s">channel1</span>
+<span class="na">a1.channels.channel1.kafka.consumer.group.id</span> <span
class="o">=</span> <span class="s">flume-consumer</span>
+<span class="na">a1.channels.channel1.kafka.producer.security.protocol</span>
<span class="o">=</span> <span class="s">SSL</span>
+<span
class="na">a1.channels.channel1.kafka.producer.ssl.truststore.location</span>
<span class="o">=</span> <span class="s">/path/to/truststore.jks</span>
+<span
class="na">a1.channels.channel1.kafka.producer.ssl.truststore.password</span>
<span class="o">=</span> <span class="s"><password to access the
truststore></span>
+<span class="na">a1.channels.channel1.kafka.consumer.security.protocol</span>
<span class="o">=</span> <span class="s">SSL</span>
+<span
class="na">a1.channels.channel1.kafka.consumer.ssl.truststore.location</span>
<span class="o">=</span> <span class="s">/path/to/truststore.jks</span>
+<span
class="na">a1.channels.channel1.kafka.consumer.ssl.truststore.password</span>
<span class="o">=</span> <span class="s"><password to access the
truststore></span>
+</pre></div>
+</div>
+<p>Note: By default the property <tt class="docutils literal"><span
class="pre">ssl.endpoint.identification.algorithm</span></tt>
+is not defined, so hostname verification is not performed.
+In order to enable hostname verification, set the following properties</p>
+<div class="highlight-properties"><div class="highlight"><pre><span
class="na">a1.channels.channel1.kafka.producer.ssl.endpoint.identification.algorithm</span>
<span class="o">=</span> <span class="s">HTTPS</span>
+<span
class="na">a1.channels.channel1.kafka.consumer.ssl.endpoint.identification.algorithm</span>
<span class="o">=</span> <span class="s">HTTPS</span>
+</pre></div>
+</div>
+<p>Once enabled, clients will verify the server’s fully qualified domain
name (FQDN)
+against one of the following two fields:</p>
+<ol class="arabic simple">
+<li>Common Name (CN) <a class="reference external"
href="https://tools.ietf.org/html/rfc6125#section-2.3">https://tools.ietf.org/html/rfc6125#section-2.3</a></li>
+<li>Subject Alternative Name (SAN) <a class="reference external"
href="https://tools.ietf.org/html/rfc5280#section-4.2.1.6">https://tools.ietf.org/html/rfc5280#section-4.2.1.6</a></li>
+</ol>
+<p>If client side authentication is also required then additionally the
following should be added to Flume agent configuration.
+Each Flume agent has to have its client certificate which has to be trusted by
Kafka brokers either
+individually or by their signature chain. Common example is to sign each
client certificate by a single Root CA
+which in turn is trusted by Kafka brokers.</p>
+<div class="highlight-properties"><div class="highlight"><pre><span
class="na">a1.channels.channel1.kafka.producer.ssl.keystore.location</span>
<span class="o">=</span> <span class="s">/path/to/client.keystore.jks</span>
+<span
class="na">a1.channels.channel1.kafka.producer.ssl.keystore.password</span>
<span class="o">=</span> <span class="s"><password to access the
keystore></span>
+<span
class="na">a1.channels.channel1.kafka.consumer.ssl.keystore.location</span>
<span class="o">=</span> <span class="s">/path/to/client.keystore.jks</span>
+<span
class="na">a1.channels.channel1.kafka.consumer.ssl.keystore.password</span>
<span class="o">=</span> <span class="s"><password to access the
keystore></span>
+</pre></div>
+</div>
+<p>If keystore and key use different password protection then <tt
class="docutils literal"><span class="pre">ssl.key.password</span></tt>
property will
+provide the required additional secret for both consumer and producer
keystores:</p>
+<div class="highlight-properties"><div class="highlight"><pre><span
class="na">a1.channels.channel1.kafka.producer.ssl.key.password</span> <span
class="o">=</span> <span class="s"><password to access the key></span>
+<span class="na">a1.channels.channel1.kafka.consumer.ssl.key.password</span>
<span class="o">=</span> <span class="s"><password to access the
key></span>
+</pre></div>
+</div>
+<p><strong>Kerberos and Kafka Channel:</strong></p>
+<p>To use Kafka channel with a Kafka cluster secured with Kerberos, set the
<tt class="docutils literal"><span
class="pre">producer/consumer.security.protocol</span></tt> properties noted
above for producer and/or consumer.
+The Kerberos keytab and principal to be used with Kafka brokers is specified
in a JAAS file’s “KafkaClient” section. “Client”
section describes the Zookeeper connection if needed.
+See <a class="reference external"
href="http://kafka.apache.org/documentation.html#security_sasl_clientconfig">Kafka
doc</a>
+for information on the JAAS file contents. The location of this JAAS file and
optionally the system wide kerberos configuration can be specified via
JAVA_OPTS in flume-env.sh:</p>
+<div class="highlight-properties"><div class="highlight"><pre><span
class="na">JAVA_OPTS</span><span class="o">=</span><span
class="s">"$JAVA_OPTS
-Djava.security.krb5.conf=/path/to/krb5.conf"</span>
+<span class="na">JAVA_OPTS</span><span class="o">=</span><span
class="s">"$JAVA_OPTS
-Djava.security.auth.login.config=/path/to/flume_jaas.conf"</span>
+</pre></div>
+</div>
+<p>Example secure configuration using SASL_PLAINTEXT:</p>
+<div class="highlight-properties"><div class="highlight"><pre><span
class="na">a1.channels.channel1.type</span> <span class="o">=</span> <span
class="s">org.apache.flume.channel.kafka.KafkaChannel</span>
+<span class="na">a1.channels.channel1.kafka.bootstrap.servers</span> <span
class="o">=</span> <span class="s">kafka-1:9093,kafka-2:9093,kafka-3:9093</span>
+<span class="na">a1.channels.channel1.kafka.topic</span> <span
class="o">=</span> <span class="s">channel1</span>
+<span class="na">a1.channels.channel1.kafka.consumer.group.id</span> <span
class="o">=</span> <span class="s">flume-consumer</span>
+<span class="na">a1.channels.channel1.kafka.producer.security.protocol</span>
<span class="o">=</span> <span class="s">SASL_PLAINTEXT</span>
+<span class="na">a1.channels.channel1.kafka.producer.sasl.mechanism</span>
<span class="o">=</span> <span class="s">GSSAPI</span>
+<span
class="na">a1.channels.channel1.kafka.producer.sasl.kerberos.service.name</span>
<span class="o">=</span> <span class="s">kafka</span>
+<span class="na">a1.channels.channel1.kafka.consumer.security.protocol</span>
<span class="o">=</span> <span class="s">SASL_PLAINTEXT</span>
+<span class="na">a1.channels.channel1.kafka.consumer.sasl.mechanism</span>
<span class="o">=</span> <span class="s">GSSAPI</span>
+<span
class="na">a1.channels.channel1.kafka.consumer.sasl.kerberos.service.name</span>
<span class="o">=</span> <span class="s">kafka</span>
+</pre></div>
+</div>
+<p>Example secure configuration using SASL_SSL:</p>
+<div class="highlight-properties"><div class="highlight"><pre><span
class="na">a1.channels.channel1.type</span> <span class="o">=</span> <span
class="s">org.apache.flume.channel.kafka.KafkaChannel</span>
+<span class="na">a1.channels.channel1.kafka.bootstrap.servers</span> <span
class="o">=</span> <span class="s">kafka-1:9093,kafka-2:9093,kafka-3:9093</span>
+<span class="na">a1.channels.channel1.kafka.topic</span> <span
class="o">=</span> <span class="s">channel1</span>
+<span class="na">a1.channels.channel1.kafka.consumer.group.id</span> <span
class="o">=</span> <span class="s">flume-consumer</span>
+<span class="na">a1.channels.channel1.kafka.producer.security.protocol</span>
<span class="o">=</span> <span class="s">SASL_SSL</span>
+<span class="na">a1.channels.channel1.kafka.producer.sasl.mechanism</span>
<span class="o">=</span> <span class="s">GSSAPI</span>
+<span
class="na">a1.channels.channel1.kafka.producer.sasl.kerberos.service.name</span>
<span class="o">=</span> <span class="s">kafka</span>
+<span
class="na">a1.channels.channel1.kafka.producer.ssl.truststore.location</span>
<span class="o">=</span> <span class="s">/path/to/truststore.jks</span>
+<span
class="na">a1.channels.channel1.kafka.producer.ssl.truststore.password</span>
<span class="o">=</span> <span class="s"><password to access the
truststore></span>
+<span class="na">a1.channels.channel1.kafka.consumer.security.protocol</span>
<span class="o">=</span> <span class="s">SASL_SSL</span>
+<span class="na">a1.channels.channel1.kafka.consumer.sasl.mechanism</span>
<span class="o">=</span> <span class="s">GSSAPI</span>
+<span
class="na">a1.channels.channel1.kafka.consumer.sasl.kerberos.service.name</span>
<span class="o">=</span> <span class="s">kafka</span>
+<span
class="na">a1.channels.channel1.kafka.consumer.ssl.truststore.location</span>
<span class="o">=</span> <span class="s">/path/to/truststore.jks</span>
[... 193 lines stripped ...]