NIFI-733: Make use of Client Name, Zookeeper Timeout, Kafka Timeout properties 
and add new Group ID property


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/786bc1d6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/786bc1d6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/786bc1d6

Branch: refs/heads/master
Commit: 786bc1d61260e2d8558747ca206d360aebdd1994
Parents: 063afe2
Author: Mark Payne <marka...@hotmail.com>
Authored: Fri Jul 3 09:04:52 2015 -0400
Committer: Mark Payne <marka...@hotmail.com>
Committed: Fri Jul 3 09:04:52 2015 -0400

----------------------------------------------------------------------
 .../apache/nifi/processors/kafka/GetKafka.java  | 28 +++++++++++++++-----
 1 file changed, 22 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/786bc1d6/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
 
b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
index 1b63a46..26590df 100644
--- 
a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
+++ 
b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
@@ -129,6 +129,7 @@ public class GetKafka extends AbstractProcessor {
             .expressionLanguageSupported(false)
             .defaultValue("\\n")
             .build();
+
     public static final PropertyDescriptor CLIENT_NAME = new 
PropertyDescriptor.Builder()
             .name("Client Name")
             .description("Client Name to use when communicating with Kafka")
@@ -136,6 +137,13 @@ public class GetKafka extends AbstractProcessor {
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .expressionLanguageSupported(false)
             .build();
+    public static final PropertyDescriptor GROUP_ID = new 
PropertyDescriptor.Builder()
+        .name("Group ID")
+        .description("A Group ID is used to identify consumers that are within 
the same consumer group")
+        .required(true)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(false)
+        .build();
 
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
             .name("success")
@@ -152,9 +160,13 @@ public class GetKafka extends AbstractProcessor {
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         final PropertyDescriptor clientNameWithDefault = new 
PropertyDescriptor.Builder()
-                .fromPropertyDescriptor(CLIENT_NAME)
-                .defaultValue("NiFi-" + getIdentifier())
-                .build();
+            .fromPropertyDescriptor(CLIENT_NAME)
+            .defaultValue("NiFi-" + getIdentifier())
+            .build();
+        final PropertyDescriptor groupIdWithDefault = new 
PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(GROUP_ID)
+            .defaultValue(getIdentifier())
+            .build();
 
         final List<PropertyDescriptor> props = new ArrayList<>();
         props.add(ZOOKEEPER_CONNECTION_STRING);
@@ -163,6 +175,7 @@ public class GetKafka extends AbstractProcessor {
         props.add(BATCH_SIZE);
         props.add(MESSAGE_DEMARCATOR);
         props.add(clientNameWithDefault);
+        props.add(groupIdWithDefault);
         props.add(KAFKA_TIMEOUT);
         props.add(ZOOKEEPER_TIMEOUT);
         return props;
@@ -184,10 +197,13 @@ public class GetKafka extends AbstractProcessor {
 
         final Properties props = new Properties();
         props.setProperty("zookeeper.connect", 
context.getProperty(ZOOKEEPER_CONNECTION_STRING).getValue());
-        props.setProperty("group.id", getIdentifier());
+        props.setProperty("group.id", 
context.getProperty(GROUP_ID).getValue());
+        props.setProperty("client.id", 
context.getProperty(CLIENT_NAME).getValue());
         props.setProperty("auto.commit.interval.ms", 
String.valueOf(context.getProperty(ZOOKEEPER_COMMIT_DELAY).asTimePeriod(TimeUnit.MILLISECONDS)));
         props.setProperty("auto.commit.enable", "true"); // just be explicit
         props.setProperty("auto.offset.reset", "smallest");
+        props.setProperty("zk.connectiontimeout.ms", 
context.getProperty(ZOOKEEPER_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).toString());
+        props.setProperty("socket.timeout.ms", 
context.getProperty(KAFKA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).toString());
 
         final ConsumerConfig consumerConfig = new ConsumerConfig(props);
         consumer = Consumer.createJavaConsumerConnector(consumerConfig);
@@ -236,7 +252,7 @@ public class GetKafka extends AbstractProcessor {
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
-        ConsumerIterator<byte[], byte[]> iterator = getStreamIterator();
+        final ConsumerIterator<byte[], byte[]> iterator = getStreamIterator();
         if (iterator == null) {
             return;
         }
@@ -293,7 +309,7 @@ public class GetKafka extends AbstractProcessor {
                 }
 
                 // add the message to the FlowFile's contents
-                final boolean firstMessage = (msgCount == 0);
+                final boolean firstMessage = msgCount == 0;
                 flowFile = session.append(flowFile, new OutputStreamCallback() 
{
                     @Override
                     public void process(final OutputStream out) throws 
IOException {

Reply via email to