NIFI-2322, NIFI-2423, NIFI-2412 Kafka improvements - Fixed KafkaConsumer's connection block when broker is not available - Fixed Serializer/Deserializer configs in both Consume/Publish Kafka - Added sensitive properties for SSL ket/trust stores
NIFI-2322 fixed tests Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c39a127e Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c39a127e Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c39a127e Branch: refs/heads/master Commit: c39a127ec8e0d27efa6cedb2c5837310dccd3d6a Parents: 8d380dc Author: Oleg Zhurakousky <o...@suitcase.io> Authored: Fri Jul 29 09:06:47 2016 -0400 Committer: Oleg Zhurakousky <o...@suitcase.io> Committed: Fri Aug 5 14:14:38 2016 -0400 ---------------------------------------------------------------------- .../kafka/pubsub/AbstractKafkaProcessor.java | 29 +++++++++++ .../processors/kafka/pubsub/ConsumeKafka.java | 52 ++++++++++++++++++-- .../processors/kafka/pubsub/PublishKafka.java | 8 ++- .../kafka/pubsub/ConsumeKafkaTest.java | 2 + 4 files changed, 86 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/c39a127e/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessor.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessor.java index 8bae304..04f9365 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessor.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessor.java @@ -115,6 +115,31 @@ abstract class AbstractKafkaProcessor<T extends Closeable> extends AbstractSessi .expressionLanguageSupported(true) .build(); + static final PropertyDescriptor SSL_KEY_PASSWORD = new PropertyDescriptor.Builder() + .name("ssl.key.password") + .displayName("SSL Key Password") + .description("The password of the private key in the key store file. Corresponds to Kafka's 'ssl.key.password' property.") + .required(false) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .sensitive(true) + .build(); + static final PropertyDescriptor SSL_KEYSTORE_PASSWORD = new PropertyDescriptor.Builder() + .name("ssl.keystore.password") + .displayName("SSK Keystore Password") + .description("The store password for the key store file. Corresponds to Kafka's 'ssl.keystore.password' property.") + .required(false) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .sensitive(true) + .build(); + static final PropertyDescriptor SSL_TRUSTSTORE_PASSWORD = new PropertyDescriptor.Builder() + .name("ssl.truststore.password") + .displayName("SSL Truststore Password") + .description("The password for the trust store file. Corresponds to Kafka's 'ssl.truststore.password' property.") + .required(false) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .sensitive(true) + .build(); + static final Builder MESSAGE_DEMARCATOR_BUILDER = new PropertyDescriptor.Builder() .name("message-demarcator") .displayName("Message Demarcator") @@ -141,6 +166,10 @@ abstract class AbstractKafkaProcessor<T extends Closeable> extends AbstractSessi SHARED_DESCRIPTORS.add(CLIENT_ID); SHARED_DESCRIPTORS.add(SECURITY_PROTOCOL); SHARED_DESCRIPTORS.add(KERBEROS_PRINCIPLE); + SHARED_DESCRIPTORS.add(SSL_KEY_PASSWORD); + SHARED_DESCRIPTORS.add(SSL_KEYSTORE_PASSWORD); + SHARED_DESCRIPTORS.add(SSL_TRUSTSTORE_PASSWORD); + SHARED_RELATIONSHIPS.add(REL_SUCCESS); } http://git-wip-us.apache.org/repos/asf/nifi/blob/c39a127e/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java index 2ed2db9..2bc1cfb 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java @@ -18,6 +18,8 @@ package org.apache.nifi.processors.kafka.pubsub; import java.io.IOException; import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.Socket; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; @@ -48,6 +50,7 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSessionFactory; import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; @@ -228,10 +231,24 @@ public class ConsumeKafka extends AbstractKafkaProcessor<Consumer<byte[], byte[] : null; this.topic = context.getProperty(TOPIC).evaluateAttributeExpressions().getValue(); this.brokers = context.getProperty(BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue(); - Properties kafkaProperties = this.buildKafkaProperties(context); - kafkaProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); - kafkaProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + + /* + * Since we are using unconventional way to validate if connectivity to + * broker is possible we need a mechanism to be able to disable it. + * 'check.connection' property will serve as such mechanism + */ + if (!kafkaProperties.getProperty("check.connection").equals("false")) { + this.checkIfInitialConnectionPossible(); + } + + System.out.println(kafkaProperties); + if (!kafkaProperties.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) { + kafkaProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + } + if (!kafkaProperties.containsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) { + kafkaProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + } KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(kafkaProperties); consumer.subscribe(Collections.singletonList(this.topic)); @@ -239,6 +256,35 @@ public class ConsumeKafka extends AbstractKafkaProcessor<Consumer<byte[], byte[] } /** + * Checks via brute force if it is possible to establish connection to at + * least one broker. If not this method will throw {@link ProcessException}. + */ + private void checkIfInitialConnectionPossible(){ + String[] br = this.brokers.split(","); + boolean connectionPossible = false; + for (int i = 0; i < br.length && !connectionPossible; i++) { + String hostPortCombo = br[i]; + String[] hostPort = hostPortCombo.split(":"); + Socket client = null; + try { + client = new Socket(); + client.connect(new InetSocketAddress(hostPort[0].trim(), Integer.parseInt(hostPort[1].trim())), 10000); + connectionPossible = true; + } catch (Exception e) { + this.logger.error("Connection to '" + hostPortCombo + "' is not possible", e); + } finally { + try { + client.close(); + } catch (IOException e) { + // ignore + } + } + } + if (!connectionPossible){ + throw new ProcessException("Connection to " + this.brokers + " is not possible. See logs for more details"); + } + } + /** * Will release flow file. Releasing of the flow file in the context of this * operation implies the following: * http://git-wip-us.apache.org/repos/asf/nifi/blob/c39a127e/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java index 6703c04..059c5f3 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java @@ -225,8 +225,12 @@ public class PublishKafka extends AbstractKafkaProcessor<KafkaPublisher> { @Override protected KafkaPublisher buildKafkaResource(ProcessContext context, ProcessSession session) { Properties kafkaProperties = this.buildKafkaProperties(context); - kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); - kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); + if (!kafkaProperties.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) { + kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); + } + if (!kafkaProperties.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) { + kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); + } this.brokers = context.getProperty(BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue(); KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, this.getLogger()); return publisher; http://git-wip-us.apache.org/repos/asf/nifi/blob/c39a127e/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java index 374a91b..7daac98 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java @@ -84,6 +84,7 @@ public class ConsumeKafkaTest { runner.setProperty(ConsumeKafka.CLIENT_ID, "foo"); runner.setProperty(ConsumeKafka.GROUP_ID, "foo"); runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, ConsumeKafka.OFFSET_EARLIEST); + runner.setProperty("check.connection", "false"); byte[][] values = new byte[][] { "Hello-1".getBytes(StandardCharsets.UTF_8), "Hello-2".getBytes(StandardCharsets.UTF_8), "Hello-3".getBytes(StandardCharsets.UTF_8) }; @@ -130,6 +131,7 @@ public class ConsumeKafkaTest { runner.setProperty(ConsumeKafka.GROUP_ID, "foo"); runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, ConsumeKafka.OFFSET_EARLIEST); runner.setProperty(ConsumeKafka.MESSAGE_DEMARCATOR, "blah"); + runner.setProperty("check.connection", "false"); byte[][] values = new byte[][] { "Hello-1".getBytes(StandardCharsets.UTF_8), "Hi-2".getBytes(StandardCharsets.UTF_8) };