Repository: kafka
Updated Branches:
  refs/heads/0.10.0 056a78dff -> c6e9717df


KAFKA-3421: Connect developer guide update and several fixes

This is a follow up of KAKFA-3421 to update the connect developer guide to 
include the configuration validation. Also includes a couple of minor fixes.

Author: Liquan Pei <[email protected]>

Reviewers: Jason Gustafson <[email protected]>, Ewen Cheslack-Postava 
<[email protected]>

Closes #1366 from Ishiihara/connect-dev-doc

(cherry picked from commit 527b98d82f5142ab6a5efc26e84f6b0a21aec062)
Signed-off-by: Ewen Cheslack-Postava <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c6e9717d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c6e9717d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c6e9717d

Branch: refs/heads/0.10.0
Commit: c6e9717dfd3b9a9c16c32cb7ac64abc7ce3ebe6c
Parents: 056a78d
Author: Liquan Pei <[email protected]>
Authored: Thu May 12 18:14:37 2016 -0700
Committer: Ewen Cheslack-Postava <[email protected]>
Committed: Thu May 12 18:15:43 2016 -0700

----------------------------------------------------------------------
 config/connect-distributed.properties | 14 ++++++--
 docs/connect.html                     | 54 +++++++++++++++++++++---------
 2 files changed, 50 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c6e9717d/config/connect-distributed.properties
----------------------------------------------------------------------
diff --git a/config/connect-distributed.properties 
b/config/connect-distributed.properties
index b25339f..931b853 100644
--- a/config/connect-distributed.properties
+++ b/config/connect-distributed.properties
@@ -18,6 +18,7 @@
 # These are defaults. This file just demonstrates how to override some 
settings.
 bootstrap.servers=localhost:9092
 
+# unique name for the cluster, used in forming the Connect cluster group. Note 
that this must not conflict with consumer group IDs
 group.id=connect-cluster
 
 # The converters specify the format of data in Kafka and how to translate it 
into Connect data. Every Connect user will
@@ -36,8 +37,15 @@ 
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
 internal.key.converter.schemas.enable=false
 internal.value.converter.schemas.enable=false
 
+# Topic to use for storing offsets. This topic should have many partitions and 
be replicated.
 offset.storage.topic=connect-offsets
-# Flush much faster than normal, which is useful for testing/debugging
-offset.flush.interval.ms=10000
+
+# Topic to use for storing connector and task configurations; note that this 
should be a single partition, highly replicated topic.
+# You may need to manually create the topic to ensure single partition for the 
config topic as auto created topics may have multiple partitions.
 config.storage.topic=connect-configs
-status.storage.topic=connect-status
\ No newline at end of file
+
+# Topic to use for storing statuses. This topic can have multiple partitions 
and should be replicated.
+status.storage.topic=connect-status
+
+# Flush much faster than normal, which is useful for testing/debugging
+offset.flush.interval.ms=10000
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/c6e9717d/docs/connect.html
----------------------------------------------------------------------
diff --git a/docs/connect.html b/docs/connect.html
index a362dde..4ba406e 100644
--- a/docs/connect.html
+++ b/docs/connect.html
@@ -53,15 +53,17 @@ Distributed mode handles automatic balancing of work, 
allows you to scale up (or
 &gt; bin/connect-distributed.sh config/connect-distributed.properties
 </pre>
 
-The difference is in the class which is started and the configuration 
parameters which change how the Kafka Connect process decides where to store 
configurations, how to assign work, and where to store offsets. In particular, 
the following configuration parameters are critical to set before starting your 
cluster:
+The difference is in the class which is started and the configuration 
parameters which change how the Kafka Connect process decides where to store 
configurations, how to assign work, and where to store offsets and task 
statues. In the distributed mode, Kafka Connect stores the offsets, configs and 
task statuses in Kafka topics. It is recommended to manually create the topics 
for offset, configs and statuses in order to achieve the desired the number of 
partitions and replication factors. If the topics are not yet created when 
starting Kafka Connect, the topics will be auto created with default number of 
partitions and replication factor, which may not be best suited for its usage.
 
+In particular, the following configuration parameters are critical to set 
before starting your cluster:
 <ul>
     <li><code>group.id</code> (default <code>connect-cluster</code>) - unique 
name for the cluster, used in forming the Connect cluster group; note that this 
<b>must not conflict</b> with consumer group IDs</li>
-    <li><code>config.storage.topic</code> (default 
<code>connect-configs</code>) - topic to use for storing connector and task 
configurations; note that this should be a single partition, highly replicated 
topic</li>
-    <li><code>offset.storage.topic</code> (default 
<code>connect-offsets</code>) - topic to use for ; this topic should have many 
partitions and be replicated</li>
+    <li><code>config.storage.topic</code> (default 
<code>connect-configs</code>) - topic to use for storing connector and task 
configurations; note that this should be a single partition, highly replicated 
topic. You may need to manually create the topic to ensure single partition for 
the config topic as auto created topics may have multiple partitions.</li>
+    <li><code>offset.storage.topic</code> (default 
<code>connect-offsets</code>) - topic to use for storing offsets; this topic 
should have many partitions and be replicated</li>
+    <li><code>status.storage.topic</code> (default 
<code>connect-status</code>) - topic to use for storing statuses; this topic 
can have multiple partitions and should be replicated</li>
 </ul>
 
-Note that in distributed mode the connector configurations are not passed on 
the command line. Instead, use the REST API described below to create, modify, 
and destroy connectors.
+Note that in distributed mode the connector configurations are not passed on 
the command line. Instead, use the REST API described below to create, modify, 
and destroy connectors. 
 
 
 <h4><a id="connect_configuring" href="#connect_configuring">Configuring 
Connectors</a></h4>
@@ -158,7 +160,7 @@ The easiest method to fill in is 
<code>getTaskClass()</code>, which defines the
 
 <pre>
 @Override
-public Class<? extends Task> getTaskClass() {
+public Class&lt;? extends Task&gt; getTaskClass() {
     return FileStreamSourceTask.class;
 }
 </pre>
@@ -179,7 +181,7 @@ public void stop() {
 }
 </pre>
 
-Finally, the real core of the implementation is in 
<code>getTaskConfigs()</code>. In this case we're only
+Finally, the real core of the implementation is in 
<code>getTaskConfigs()</code>. In this case we are only
 handling a single file, so even though we may be permitted to generate more 
tasks as per the
 <code>maxTasks</code> argument, we return a list with only one entry:
 
@@ -225,7 +227,7 @@ public class FileStreamSourceTask extends 
SourceTask&lt;Object, Object&gt; {
 
     @Override
     public synchronized void stop() {
-        stream.close()
+        stream.close();
     }
 </pre>
 
@@ -241,8 +243,8 @@ public List&lt;SourceRecord&gt; poll() throws 
InterruptedException {
         while (streamValid(stream) &amp;&amp; records.isEmpty()) {
             LineAndOffset line = readToNextLine(stream);
             if (line != null) {
-                Map<String, Object> sourcePartition = 
Collections.singletonMap("filename", filename);
-                Map<String, Object> sourceOffset = 
Collections.singletonMap("position", streamOffset);
+                Map&lt;String, Object&gt; sourcePartition = 
Collections.singletonMap("filename", filename);
+                Map&lt;String, Object&gt; sourceOffset = 
Collections.singletonMap("position", streamOffset);
                 records.add(new SourceRecord(sourcePartition, sourceOffset, 
topic, Schema.STRING_SCHEMA, line));
             } else {
                 Thread.sleep(1);
@@ -267,11 +269,13 @@ The previous section described how to implement a simple 
<code>SourceTask</code>
 
 <pre>
 public abstract class SinkTask implements Task {
-public void initialize(SinkTaskContext context) { ... }
-
-public abstract void put(Collection&lt;SinkRecord&gt; records);
+    public void initialize(SinkTaskContext context) {
+        this.context = context;
+    }
 
-public abstract void flush(Map&lt;TopicPartition, Long&gt; offsets);
+    public abstract void put(Collection&lt;SinkRecord&gt; records);
+     
+    public abstract void flush(Map&lt;TopicPartition, Long&gt; offsets);
 </pre>
 
 The <code>SinkTask</code> documentation contains full details, but this 
interface is nearly as simple as the <code>SourceTask</code>. The 
<code>put()</code> method should contain most of the implementation, accepting 
sets of <code>SinkRecords</code>, performing any required translation, and 
storing them in the destination system. This method does not need to ensure the 
data has been fully written to the destination system before returning. In 
fact, in many cases internal buffering will be useful so an entire batch of 
records can be sent at once, reducing the overhead of inserting events into the 
downstream data store. The <code>SinkRecords</code> contain essentially the 
same information as <code>SourceRecords</code>: Kafka topic, partition, offset 
and the event key and value.
@@ -305,8 +309,8 @@ Kafka Connect is intended to define bulk data copying jobs, 
such as copying an e
 Source connectors need to monitor the source system for changes, e.g. table 
additions/deletions in a database. When they pick up changes, they should 
notify the framework via the <code>ConnectorContext</code> object that 
reconfiguration is necessary. For example, in a <code>SourceConnector</code>:
 
 <pre>
-if (inputsChanged())
-    this.context.requestTaskReconfiguration();
+    if (inputsChanged())
+        this.context.requestTaskReconfiguration();
 </pre>
 
 The framework will promptly request new configuration information and update 
the tasks, allowing them to gracefully commit their progress before 
reconfiguring them. Note that in the <code>SourceConnector</code> this 
monitoring is currently left up to the connector implementation. If an extra 
thread is required to perform this monitoring, the connector must allocate it 
itself.
@@ -315,6 +319,26 @@ Ideally this code for monitoring changes would be isolated 
to the <code>Connecto
 
 <code>SinkConnectors</code> usually only have to handle the addition of 
streams, which may translate to new entries in their outputs (e.g., a new 
database table). The framework manages any changes to the Kafka input, such as 
when the set of input topics changes because of a regex subscription. 
<code>SinkTasks</code> should expect new input streams, which may require 
creating new resources in the downstream system, such as a new table in a 
database. The trickiest situation to handle in these cases may be conflicts 
between multiple <code>SinkTasks</code> seeing a new input stream for the first 
time and simultaneously trying to create the new resource. 
<code>SinkConnectors</code>, on the other hand, will generally require no 
special code for handling a dynamic set of streams.
 
+<h4><a id="connect_configs" href="#connect_configs">Connect Configuration 
Validation</a></h4>
+
+Kafka Connect allows you to validate connector configurations before 
submitting a connector to be executed and can provide feedback about errors and 
recommended values. To take advantage of this, connector developers need to 
provide an implementation of <code>config()</code> to expose the configuration 
definition to the framework.
+
+The following code in <code>FileStreamSourceConnector</code> defines the 
configuration and exposes it to the framework.
+
+<pre>
+    private static final ConfigDef CONFIG_DEF = new ConfigDef()
+        .define(FILE_CONFIG, Type.STRING, Importance.HIGH, "Source filename.")
+        .define(TOPIC_CONFIG, Type.STRING, Importance.HIGH, "The topic to 
publish data to");
+
+    public ConfigDef config() {
+        return CONFIG_DEF;
+    }
+</pre>
+
+<code>ConfigDef</code> class is used for specifying the set of expected 
configurations. For each configuration, you can specify the name, the type, the 
default value, the documentation, the group information, the order in the 
group, the width of the configuration value and the name suitable for display 
in the UI. Plus, you can provide special validation logic used for single 
configuration validation by overriding the <code>Validator</code> class. 
Moreover, as there may be dependencies between configurations, for example, the 
valid values and visibility of a configuration may change according to the 
values of other configurations. To handle this, <code>ConfigDef</code> allows 
you to specify the dependents of a configuration and to provide an 
implementation of <code>Recommender</code> to get valid values and set 
visibility of a configuration given the current configuration values.
+
+Also, the <code>validate()</code> method in <code>Connector</code> provides a 
default validation implementation which returns a list of allowed 
configurations together with configuration errors and recommended values for 
each configuration. However, it does not use the recommended values for 
configuration validation. You may provide an override of the default 
implementation for customized configuration validation, which may use the 
recommended values.
+
 <h4><a id="connect_schemas" href="#connect_schemas">Working with 
Schemas</a></h4>
 
 The FileStream connectors are good examples because they are simple, but they 
also have trivially structured data -- each line is just a string. Almost all 
practical connectors will need schemas with more complex data formats.

Reply via email to