NIFI-2322, NIFI-2423, NIFI-2412 Kafka improvements
- Fixed KafkaConsumer's connection block when broker is not available
- Fixed Serializer/Deserializer configs in both Consume/Publish Kafka
- Added sensitive properties for SSL ket/trust stores

NIFI-2322 fixed tests


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c39a127e
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c39a127e
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c39a127e

Branch: refs/heads/master
Commit: c39a127ec8e0d27efa6cedb2c5837310dccd3d6a
Parents: 8d380dc
Author: Oleg Zhurakousky <o...@suitcase.io>
Authored: Fri Jul 29 09:06:47 2016 -0400
Committer: Oleg Zhurakousky <o...@suitcase.io>
Committed: Fri Aug 5 14:14:38 2016 -0400

----------------------------------------------------------------------
 .../kafka/pubsub/AbstractKafkaProcessor.java    | 29 +++++++++++
 .../processors/kafka/pubsub/ConsumeKafka.java   | 52 ++++++++++++++++++--
 .../processors/kafka/pubsub/PublishKafka.java   |  8 ++-
 .../kafka/pubsub/ConsumeKafkaTest.java          |  2 +
 4 files changed, 86 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/c39a127e/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessor.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessor.java
index 8bae304..04f9365 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessor.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessor.java
@@ -115,6 +115,31 @@ abstract class AbstractKafkaProcessor<T extends Closeable> 
extends AbstractSessi
             .expressionLanguageSupported(true)
             .build();
 
+    static final PropertyDescriptor SSL_KEY_PASSWORD = new 
PropertyDescriptor.Builder()
+            .name("ssl.key.password")
+            .displayName("SSL Key Password")
+            .description("The password of the private key in the key store 
file. Corresponds to Kafka's 'ssl.key.password' property.")
+            .required(false)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .sensitive(true)
+            .build();
+    static final PropertyDescriptor SSL_KEYSTORE_PASSWORD = new 
PropertyDescriptor.Builder()
+            .name("ssl.keystore.password")
+            .displayName("SSK Keystore Password")
+            .description("The store password for the key store file. 
Corresponds to Kafka's 'ssl.keystore.password' property.")
+            .required(false)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .sensitive(true)
+            .build();
+    static final PropertyDescriptor SSL_TRUSTSTORE_PASSWORD = new 
PropertyDescriptor.Builder()
+            .name("ssl.truststore.password")
+            .displayName("SSL Truststore Password")
+            .description("The password for the trust store file. Corresponds 
to Kafka's 'ssl.truststore.password' property.")
+            .required(false)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .sensitive(true)
+            .build();
+
     static final Builder MESSAGE_DEMARCATOR_BUILDER = new 
PropertyDescriptor.Builder()
             .name("message-demarcator")
             .displayName("Message Demarcator")
@@ -141,6 +166,10 @@ abstract class AbstractKafkaProcessor<T extends Closeable> 
extends AbstractSessi
         SHARED_DESCRIPTORS.add(CLIENT_ID);
         SHARED_DESCRIPTORS.add(SECURITY_PROTOCOL);
         SHARED_DESCRIPTORS.add(KERBEROS_PRINCIPLE);
+        SHARED_DESCRIPTORS.add(SSL_KEY_PASSWORD);
+        SHARED_DESCRIPTORS.add(SSL_KEYSTORE_PASSWORD);
+        SHARED_DESCRIPTORS.add(SSL_TRUSTSTORE_PASSWORD);
+
         SHARED_RELATIONSHIPS.add(REL_SUCCESS);
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/c39a127e/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
index 2ed2db9..2bc1cfb 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
@@ -18,6 +18,8 @@ package org.apache.nifi.processors.kafka.pubsub;
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -48,6 +50,7 @@ import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessSessionFactory;
 import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
 
@@ -228,10 +231,24 @@ public class ConsumeKafka extends 
AbstractKafkaProcessor<Consumer<byte[], byte[]
                 : null;
         this.topic = 
context.getProperty(TOPIC).evaluateAttributeExpressions().getValue();
         this.brokers = 
context.getProperty(BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
-
         Properties kafkaProperties = this.buildKafkaProperties(context);
-        kafkaProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
-        kafkaProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
+
+        /*
+         * Since we are using unconventional way to validate if connectivity to
+         * broker is possible we need a mechanism to be able to disable it.
+         * 'check.connection' property will serve as such mechanism
+         */
+        if (!kafkaProperties.getProperty("check.connection").equals("false")) {
+            this.checkIfInitialConnectionPossible();
+        }
+
+        System.out.println(kafkaProperties);
+        if 
(!kafkaProperties.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) {
+            kafkaProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
+        }
+        if 
(!kafkaProperties.containsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) {
+            
kafkaProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
+        }
 
         KafkaConsumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(kafkaProperties);
         consumer.subscribe(Collections.singletonList(this.topic));
@@ -239,6 +256,35 @@ public class ConsumeKafka extends 
AbstractKafkaProcessor<Consumer<byte[], byte[]
     }
 
     /**
+     * Checks via brute force if it is possible to establish connection to at
+     * least one broker. If not this method will throw {@link 
ProcessException}.
+     */
+    private void checkIfInitialConnectionPossible(){
+        String[] br = this.brokers.split(",");
+        boolean connectionPossible = false;
+        for (int i = 0; i < br.length && !connectionPossible; i++) {
+            String hostPortCombo = br[i];
+            String[] hostPort = hostPortCombo.split(":");
+            Socket client = null;
+            try {
+                client = new Socket();
+                client.connect(new InetSocketAddress(hostPort[0].trim(), 
Integer.parseInt(hostPort[1].trim())), 10000);
+                connectionPossible = true;
+            } catch (Exception e) {
+                this.logger.error("Connection to '" + hostPortCombo + "' is 
not possible", e);
+            } finally {
+                try {
+                    client.close();
+                } catch (IOException e) {
+                    // ignore
+                }
+            }
+        }
+        if (!connectionPossible){
+            throw new ProcessException("Connection to " + this.brokers + " is 
not possible. See logs for more details");
+        }
+    }
+    /**
      * Will release flow file. Releasing of the flow file in the context of 
this
      * operation implies the following:
      *

http://git-wip-us.apache.org/repos/asf/nifi/blob/c39a127e/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java
index 6703c04..059c5f3 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java
@@ -225,8 +225,12 @@ public class PublishKafka extends 
AbstractKafkaProcessor<KafkaPublisher> {
     @Override
     protected KafkaPublisher buildKafkaResource(ProcessContext context, 
ProcessSession session) {
         Properties kafkaProperties = this.buildKafkaProperties(context);
-        kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
-        kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
+        if 
(!kafkaProperties.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
+            kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
+        }
+        if 
(!kafkaProperties.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
+            kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
+        }
         this.brokers = 
context.getProperty(BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
         KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, 
this.getLogger());
         return publisher;

http://git-wip-us.apache.org/repos/asf/nifi/blob/c39a127e/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
index 374a91b..7daac98 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
@@ -84,6 +84,7 @@ public class ConsumeKafkaTest {
         runner.setProperty(ConsumeKafka.CLIENT_ID, "foo");
         runner.setProperty(ConsumeKafka.GROUP_ID, "foo");
         runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, 
ConsumeKafka.OFFSET_EARLIEST);
+        runner.setProperty("check.connection", "false");
 
         byte[][] values = new byte[][] { 
"Hello-1".getBytes(StandardCharsets.UTF_8),
                 "Hello-2".getBytes(StandardCharsets.UTF_8), 
"Hello-3".getBytes(StandardCharsets.UTF_8) };
@@ -130,6 +131,7 @@ public class ConsumeKafkaTest {
         runner.setProperty(ConsumeKafka.GROUP_ID, "foo");
         runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, 
ConsumeKafka.OFFSET_EARLIEST);
         runner.setProperty(ConsumeKafka.MESSAGE_DEMARCATOR, "blah");
+        runner.setProperty("check.connection", "false");
 
         byte[][] values = new byte[][] { 
"Hello-1".getBytes(StandardCharsets.UTF_8),
                 "Hi-2".getBytes(StandardCharsets.UTF_8) };

Reply via email to