NIFI-413: Formatted code to fix checkstyle failures
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/f2f90560 Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/f2f90560 Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/f2f90560 Branch: refs/heads/NIFI-680 Commit: f2f90560557de70ef4404672ce53c4593995a5f1 Parents: f5226ad Author: Mark Payne <marka...@hotmail.com> Authored: Mon Jun 22 12:05:02 2015 -0400 Committer: Mark Payne <marka...@hotmail.com> Committed: Mon Jun 22 12:05:02 2015 -0400 ---------------------------------------------------------------------- .../apache/nifi/processors/kafka/PutKafka.java | 319 ++++++++++--------- .../nifi/processors/kafka/TestPutKafka.java | 79 ++--- 2 files changed, 203 insertions(+), 195 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f2f90560/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java index e572622..d83c7bf 100644 --- a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java +++ b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java @@ -61,7 +61,7 @@ import org.apache.nifi.util.LongHolder; import scala.actors.threadpool.Arrays; @SupportsBatching -@Tags({"Apache", "Kafka", "Put", "Send", "Message", "PubSub"}) +@Tags({ "Apache", "Kafka", "Put", "Send", "Message", "PubSub" }) @CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka") public class PutKafka extends AbstractProcessor { @@ -69,13 +69,13 @@ public class PutKafka extends AbstractProcessor { private static final String BROKER_REGEX = SINGLE_BROKER_REGEX + "(?:,\\s*" + SINGLE_BROKER_REGEX + ")*"; public static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("-1", "Guarantee Replicated Delivery", "FlowFile will be routed to" - + " failure unless the message is replicated to the appropriate number of Kafka Nodes according to the Topic configuration"); + + " failure unless the message is replicated to the appropriate number of Kafka Nodes according to the Topic configuration"); public static final AllowableValue DELIVERY_ONE_NODE = new AllowableValue("1", "Guarantee Single Node Delivery", "FlowFile will be routed" - + " to success if the message is received by a single Kafka node, whether or not it is replicated. This is faster than" - + " <Guarantee Replicated Delivery> but can result in data loss if a Kafka node crashes"); + + " to success if the message is received by a single Kafka node, whether or not it is replicated. This is faster than" + + " <Guarantee Replicated Delivery> but can result in data loss if a Kafka node crashes"); public static final AllowableValue DELIVERY_BEST_EFFORT = new AllowableValue("0", "Best Effort", "FlowFile will be routed to success after" - + " successfully writing the content to a Kafka node, without waiting for a response. This provides the best performance but may result" - + " in data loss."); + + " successfully writing the content to a Kafka node, without waiting for a response. This provides the best performance but may result" + + " in data loss."); /** * AllowableValue for a Producer Type that synchronously sends messages to Kafka @@ -86,7 +86,7 @@ public class PutKafka extends AbstractProcessor { * AllowableValue for a Producer Type that asynchronously sends messages to Kafka */ public static final AllowableValue PRODUCTER_TYPE_ASYNCHRONOUS = new AllowableValue("async", "Asynchronous", "Batch messages before sending them to Kafka." - + " While this will improve throughput, it opens the possibility that a failure on the client machine will drop unsent data."); + + " While this will improve throughput, it opens the possibility that a failure on the client machine will drop unsent data."); /** * AllowableValue for sending messages to Kafka without compression @@ -103,150 +103,156 @@ public class PutKafka extends AbstractProcessor { */ public static final AllowableValue COMPRESSION_CODEC_SNAPPY = new AllowableValue("snappy", "Snappy", "Compress messages using Snappy"); + public static final PropertyDescriptor SEED_BROKERS = new PropertyDescriptor.Builder() - .name("Known Brokers") - .description("A comma-separated list of known Kafka Brokers in the format <host>:<port>") - .required(true) - .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile(BROKER_REGEX))) - .expressionLanguageSupported(false) - .build(); + .name("Known Brokers") + .description("A comma-separated list of known Kafka Brokers in the format <host>:<port>") + .required(true) + .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile(BROKER_REGEX))) + .expressionLanguageSupported(false) + .build(); public static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder() - .name("Topic Name") - .description("The Kafka Topic of interest") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) - .build(); + .name("Topic Name") + .description("The Kafka Topic of interest") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); public static final PropertyDescriptor KEY = new PropertyDescriptor.Builder() - .name("Kafka Key") - .description("The Key to use for the Message") - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) - .build(); + .name("Kafka Key") + .description("The Key to use for the Message") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); public static final PropertyDescriptor DELIVERY_GUARANTEE = new PropertyDescriptor.Builder() - .name("Delivery Guarantee") - .description("Specifies the requirement for guaranteeing that a message is sent to Kafka") - .required(true) - .expressionLanguageSupported(false) - .allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED) - .defaultValue(DELIVERY_BEST_EFFORT.getValue()) - .build(); + .name("Delivery Guarantee") + .description("Specifies the requirement for guaranteeing that a message is sent to Kafka") + .required(true) + .expressionLanguageSupported(false) + .allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED) + .defaultValue(DELIVERY_BEST_EFFORT.getValue()) + .build(); public static final PropertyDescriptor MESSAGE_DELIMITER = new PropertyDescriptor.Builder() - .name("Message Delimiter") - .description("Specifies the delimiter to use for splitting apart multiple messages within a single FlowFile. " - + "If not specified, the entire content of the FlowFile will be used as a single message. " - + "If specified, the contents of the FlowFile will be split on this delimiter and each section " - + "sent as a separate Kafka message.") - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) - .build(); + .name("Message Delimiter") + .description("Specifies the delimiter to use for splitting apart multiple messages within a single FlowFile. " + + "If not specified, the entire content of the FlowFile will be used as a single message. " + + "If specified, the contents of the FlowFile will be split on this delimiter and each section " + + "sent as a separate Kafka message.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor.Builder() - .name("Max Buffer Size") - .description("The maximum amount of data to buffer in memory before sending to Kafka") - .required(true) - .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) - .expressionLanguageSupported(false) - .defaultValue("1 MB") - .build(); + .name("Max Buffer Size") + .description("The maximum amount of data to buffer in memory before sending to Kafka") + .required(true) + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .expressionLanguageSupported(false) + .defaultValue("1 MB") + .build(); public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder() - .name("Communications Timeout") - .description("The amount of time to wait for a response from Kafka before determining that there is a communications error") - .required(true) - .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .expressionLanguageSupported(false) - .defaultValue("30 secs") - .build(); + .name("Communications Timeout") + .description("The amount of time to wait for a response from Kafka before determining that there is a communications error") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(false) + .defaultValue("30 secs") + .build(); public static final PropertyDescriptor CLIENT_NAME = new PropertyDescriptor.Builder() - .name("Client Name") - .description("Client Name to use when communicating with Kafka") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(false) - .build(); + .name("Client Name") + .description("Client Name to use when communicating with Kafka") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(false) + .build(); public static final PropertyDescriptor PRODUCER_TYPE = new PropertyDescriptor.Builder() - .name("Producer Type") - .description("This parameter specifies whether the messages are sent asynchronously in a background thread.") - .required(true) - .allowableValues(PRODUCTER_TYPE_SYNCHRONOUS, PRODUCTER_TYPE_ASYNCHRONOUS) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(false) - .defaultValue(PRODUCTER_TYPE_SYNCHRONOUS.getValue()) - .build(); - public static final PropertyDescriptor BATCH_NUM_MESSAGES = new PropertyDescriptor.Builder() - .name("Async Batch Size") - .description("Used only if Producer Type is set to \"" + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"." - + " The number of messages to send in one batch when using " + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " mode." - + " The producer will wait until either this number of messages are ready" - + " to send or \"Queue Buffering Max Time\" is reached.") - .required(true) - .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) - .defaultValue("200").build(); - public static final PropertyDescriptor QUEUE_BUFFERING_MAX = new PropertyDescriptor.Builder() - .name("Queue Buffering Max Time") - .description("Used only if Producer Type is set to \"" + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"." - + " Maximum time to buffer data when using " + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " mode. For example a setting of 100 ms" - + " will try to batch together 100ms of messages to send at once. This will improve" - + " throughput but adds message delivery latency due to the buffering.") - .required(true) - .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .defaultValue("5 secs").build(); - public static final PropertyDescriptor QUEUE_BUFFERING_MAX_MESSAGES = new PropertyDescriptor.Builder() - .name("Queue Buffer Max Count") - .description("Used only if Producer Type is set to \"" + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"." - + " The maximum number of unsent messages that can be queued up in the producer when" - + " using " + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " mode before either the producer must be blocked or data must be dropped.") - .required(true) - .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) - .defaultValue("10000").build(); - public static final PropertyDescriptor QUEUE_ENQUEUE_TIMEOUT = new PropertyDescriptor.Builder() - .name("Queue Enqueue Timeout") - .description("Used only if Producer Type is set to \"" + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"." - + " The amount of time to block before dropping messages when running in " - + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " mode" - + " and the buffer has reached the \"Queue Buffer Max Count\". If set to 0, events will" - + " be enqueued immediately or dropped if the queue is full (the producer send call will" - + " never block). If not set, the producer will block indefinitely and never willingly" - + " drop a send.") - .required(false) - .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .build(); - public static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder() - .name("Compression Codec") - .description("This parameter allows you to specify the compression codec for all" - + " data generated by this producer.") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .allowableValues(COMPRESSION_CODEC_NONE, COMPRESSION_CODEC_GZIP, COMPRESSION_CODEC_SNAPPY) - .defaultValue(COMPRESSION_CODEC_NONE.getValue()).build(); - public static final PropertyDescriptor COMPRESSED_TOPICS = new PropertyDescriptor.Builder() - .name("Compressed Topics") - .description("This parameter allows you to set whether compression should be turned on" - + " for particular topics. If the compression codec is anything other than" - + " \"" + COMPRESSION_CODEC_NONE.getDisplayName() + "\", enable compression only for specified topics if any." - + " If the list of compressed topics is empty, then enable the specified" - + " compression codec for all topics. If the compression codec is " + COMPRESSION_CODEC_NONE.getDisplayName() + "," - + " compression is disabled for all topics") - .required(false).build(); + .name("Producer Type") + .description("This parameter specifies whether the messages are sent asynchronously in a background thread.") + .required(true) + .allowableValues(PRODUCTER_TYPE_SYNCHRONOUS, PRODUCTER_TYPE_ASYNCHRONOUS) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(false) + .defaultValue(PRODUCTER_TYPE_SYNCHRONOUS.getValue()) + .build(); + public static final PropertyDescriptor BATCH_NUM_MESSAGES = new PropertyDescriptor.Builder() + .name("Async Batch Size") + .description("Used only if Producer Type is set to \"" + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"." + + " The number of messages to send in one batch when using " + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " mode." + + " The producer will wait until either this number of messages are ready" + + " to send or \"Queue Buffering Max Time\" is reached.") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("200") + .build(); + public static final PropertyDescriptor QUEUE_BUFFERING_MAX = new PropertyDescriptor.Builder() + .name("Queue Buffering Max Time") + .description("Used only if Producer Type is set to \"" + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"." + + " Maximum time to buffer data when using " + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " mode. For example a setting of 100 ms" + + " will try to batch together 100ms of messages to send at once. This will improve" + + " throughput but adds message delivery latency due to the buffering.") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("5 secs") + .build(); + public static final PropertyDescriptor QUEUE_BUFFERING_MAX_MESSAGES = new PropertyDescriptor.Builder() + .name("Queue Buffer Max Count") + .description("Used only if Producer Type is set to \"" + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"." + + " The maximum number of unsent messages that can be queued up in the producer when" + + " using " + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " mode before either the producer must be blocked or data must be dropped.") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("10000") + .build(); + public static final PropertyDescriptor QUEUE_ENQUEUE_TIMEOUT = new PropertyDescriptor.Builder() + .name("Queue Enqueue Timeout") + .description("Used only if Producer Type is set to \"" + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"." + + " The amount of time to block before dropping messages when running in " + + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " mode" + + " and the buffer has reached the \"Queue Buffer Max Count\". If set to 0, events will" + + " be enqueued immediately or dropped if the queue is full (the producer send call will" + + " never block). If not set, the producer will block indefinitely and never willingly" + + " drop a send.") + .required(false) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .build(); + public static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder() + .name("Compression Codec") + .description("This parameter allows you to specify the compression codec for all" + + " data generated by this producer.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .allowableValues(COMPRESSION_CODEC_NONE, COMPRESSION_CODEC_GZIP, COMPRESSION_CODEC_SNAPPY) + .defaultValue(COMPRESSION_CODEC_NONE.getValue()) + .build(); + public static final PropertyDescriptor COMPRESSED_TOPICS = new PropertyDescriptor.Builder() + .name("Compressed Topics") + .description("This parameter allows you to set whether compression should be turned on" + + " for particular topics. If the compression codec is anything other than" + + " \"" + COMPRESSION_CODEC_NONE.getDisplayName() + "\", enable compression only for specified topics if any." + + " If the list of compressed topics is empty, then enable the specified" + + " compression codec for all topics. If the compression codec is " + COMPRESSION_CODEC_NONE.getDisplayName() + "," + + " compression is disabled for all topics") + .required(false) + .build(); public static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("Any FlowFile that is successfully sent to Kafka will be routed to this Relationship") - .build(); + .name("success") + .description("Any FlowFile that is successfully sent to Kafka will be routed to this Relationship") + .build(); public static final Relationship REL_FAILURE = new Relationship.Builder() - .name("failure") - .description("Any FlowFile that cannot be sent to Kafka will be routed to this Relationship") - .build(); + .name("failure") + .description("Any FlowFile that cannot be sent to Kafka will be routed to this Relationship") + .build(); private final BlockingQueue<Producer<byte[], byte[]>> producers = new LinkedBlockingQueue<>(); @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { final PropertyDescriptor clientName = new PropertyDescriptor.Builder() - .fromPropertyDescriptor(CLIENT_NAME) - .defaultValue("NiFi-" + getIdentifier()) - .build(); + .fromPropertyDescriptor(CLIENT_NAME) + .defaultValue("NiFi-" + getIdentifier()) + .build(); final List<PropertyDescriptor> props = new ArrayList<>(); props.add(SEED_BROKERS); @@ -269,13 +275,14 @@ public class PutKafka extends AbstractProcessor { @Override public Collection<ValidationResult> customValidate(final ValidationContext context) { - final List<ValidationResult> errors = new ArrayList<>(super.customValidate(context)); + final List<ValidationResult> errors = new ArrayList<>(super.customValidate(context)); final Integer batchMessages = context.getProperty(BATCH_NUM_MESSAGES).asInteger(); final Integer bufferMaxMessages = context.getProperty(QUEUE_BUFFERING_MAX_MESSAGES).asInteger(); if (batchMessages > bufferMaxMessages) { - errors.add(new ValidationResult.Builder().subject("Batch Size, Queue Buffer").valid(false).explanation("Batch Size (" + batchMessages + ") must be equal to or less than the Queue Buffer Max Count (" + bufferMaxMessages + ")").build()); + errors.add(new ValidationResult.Builder().subject("Batch Size, Queue Buffer").valid(false) + .explanation("Batch Size (" + batchMessages + ") must be equal to or less than the Queue Buffer Max Count (" + bufferMaxMessages + ")").build()); } return errors; @@ -311,23 +318,23 @@ public class PutKafka extends AbstractProcessor { properties.setProperty("producer.type", context.getProperty(PRODUCER_TYPE).getValue()); properties.setProperty("batch.num.messages", context.getProperty(BATCH_NUM_MESSAGES).getValue()); - Long queueBufferingMillis = context.getProperty(QUEUE_BUFFERING_MAX).asTimePeriod(TimeUnit.MILLISECONDS); - if(queueBufferingMillis != null) { - properties.setProperty("queue.buffering.max.ms", String.valueOf(queueBufferingMillis)); + final Long queueBufferingMillis = context.getProperty(QUEUE_BUFFERING_MAX).asTimePeriod(TimeUnit.MILLISECONDS); + if (queueBufferingMillis != null) { + properties.setProperty("queue.buffering.max.ms", String.valueOf(queueBufferingMillis)); } properties.setProperty("queue.buffering.max.messages", context.getProperty(QUEUE_BUFFERING_MAX_MESSAGES).getValue()); - Long queueEnqueueTimeoutMillis = context.getProperty(QUEUE_ENQUEUE_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS); - if(queueEnqueueTimeoutMillis != null) { - properties.setProperty("queue.enqueue.timeout.ms", String.valueOf(queueEnqueueTimeoutMillis)); + final Long queueEnqueueTimeoutMillis = context.getProperty(QUEUE_ENQUEUE_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS); + if (queueEnqueueTimeoutMillis != null) { + properties.setProperty("queue.enqueue.timeout.ms", String.valueOf(queueEnqueueTimeoutMillis)); } - String compressionCodec = context.getProperty(COMPRESSION_CODEC).getValue(); + final String compressionCodec = context.getProperty(COMPRESSION_CODEC).getValue(); properties.setProperty("compression.codec", compressionCodec); - - String compressedTopics = context.getProperty(COMPRESSED_TOPICS).getValue(); - if(compressedTopics != null) { - properties.setProperty("compressed.topics", compressedTopics); + + final String compressedTopics = context.getProperty(COMPRESSED_TOPICS).getValue(); + if (compressedTopics != null) { + properties.setProperty("compressed.topics", compressedTopics); } return new ProducerConfig(properties); @@ -338,7 +345,7 @@ public class PutKafka extends AbstractProcessor { } private Producer<byte[], byte[]> borrowProducer(final ProcessContext context) { - Producer<byte[], byte[]> producer = producers.poll(); + final Producer<byte[], byte[]> producer = producers.poll(); return producer == null ? createProducer(context) : producer; } @@ -348,7 +355,7 @@ public class PutKafka extends AbstractProcessor { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - FlowFile flowFile = session.get(); + final FlowFile flowFile = session.get(); if (flowFile == null) { return; } @@ -356,7 +363,7 @@ public class PutKafka extends AbstractProcessor { final long start = System.nanoTime(); final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue(); final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue(); - final byte[] keyBytes = (key == null) ? null : key.getBytes(StandardCharsets.UTF_8); + final byte[] keyBytes = key == null ? null : key.getBytes(StandardCharsets.UTF_8); String delimiter = context.getProperty(MESSAGE_DELIMITER).evaluateAttributeExpressions(flowFile).getValue(); if (delimiter != null) { delimiter = delimiter.replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t"); @@ -389,9 +396,9 @@ public class PutKafka extends AbstractProcessor { session.getProvenanceReporter().send(flowFile, "kafka://" + topic); session.transfer(flowFile, REL_SUCCESS); - getLogger().info("Successfully sent {} to Kafka in {} millis", new Object[]{flowFile, TimeUnit.NANOSECONDS.toMillis(nanos)}); + getLogger().info("Successfully sent {} to Kafka in {} millis", new Object[] { flowFile, TimeUnit.NANOSECONDS.toMillis(nanos) }); } catch (final Exception e) { - getLogger().error("Failed to send {} to Kafka due to {}; routing to failure", new Object[]{flowFile, e}); + getLogger().error("Failed to send {} to Kafka due to {}; routing to failure", new Object[] { flowFile, e }); session.transfer(flowFile, REL_FAILURE); error = true; } finally { @@ -426,7 +433,7 @@ public class PutKafka extends AbstractProcessor { int nextByte; try (final InputStream bufferedIn = new BufferedInputStream(rawIn); - final ByteCountingInputStream in = new ByteCountingInputStream(bufferedIn)) { + final ByteCountingInputStream in = new ByteCountingInputStream(bufferedIn)) { // read until we're out of data. while (!streamFinished) { @@ -514,7 +521,7 @@ public class PutKafka extends AbstractProcessor { final long nanos = System.nanoTime() - start; session.getProvenanceReporter().send(flowFile, "kafka://" + topic, "Sent " + messagesSent.get() + " messages"); session.transfer(flowFile, REL_SUCCESS); - getLogger().info("Successfully sent {} messages to Kafka for {} in {} millis", new Object[]{messagesSent.get(), flowFile, TimeUnit.NANOSECONDS.toMillis(nanos)}); + getLogger().info("Successfully sent {} messages to Kafka for {} in {} millis", new Object[] { messagesSent.get(), flowFile, TimeUnit.NANOSECONDS.toMillis(nanos) }); } catch (final ProcessException pe) { error = true; @@ -524,7 +531,7 @@ public class PutKafka extends AbstractProcessor { final long offset = lastMessageOffset.get(); if (offset == 0L) { // all of the messages failed to send. Route FlowFile to failure - getLogger().error("Failed to send {} to Kafka due to {}; routing to fialure", new Object[]{flowFile, pe.getCause()}); + getLogger().error("Failed to send {} to Kafka due to {}; routing to fialure", new Object[] { flowFile, pe.getCause() }); session.transfer(flowFile, REL_FAILURE); } else { // Some of the messages were sent successfully. We want to split off the successful messages from the failed messages. @@ -532,8 +539,8 @@ public class PutKafka extends AbstractProcessor { final FlowFile failedMessages = session.clone(flowFile, offset, flowFile.getSize() - offset); getLogger().error("Successfully sent {} of the messages from {} but then failed to send the rest. Original FlowFile split into" - + " two: {} routed to 'success', {} routed to 'failure'. Failure was due to {}", new Object[]{ - messagesSent.get(), flowFile, successfulMessages, failedMessages, pe.getCause()}); + + " two: {} routed to 'success', {} routed to 'failure'. Failure was due to {}", new Object[] { + messagesSent.get(), flowFile, successfulMessages, failedMessages, pe.getCause() }); session.transfer(successfulMessages, REL_SUCCESS); session.transfer(failedMessages, REL_FAILURE); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f2f90560/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java index 5d1eacf..750d406 100644 --- a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java +++ b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java @@ -212,7 +212,7 @@ public class TestPutKafka { } private void keyValuePutExecute(final TestRunner runner) { - final Map<String, String> attributes = new HashMap<>(); + final Map<String, String> attributes = new HashMap<>(); attributes.put("kafka.topic", "test"); attributes.put("kafka.key", "key3"); @@ -229,32 +229,32 @@ public class TestPutKafka { final MockFlowFile mff = mffs.get(0); assertTrue(Arrays.equals(data, mff.toByteArray())); - } + } @Test public void testProducerConfigDefault() { - final TestableProcessor processor = new TestableProcessor(); - TestRunner runner = TestRunners.newTestRunner(processor); + final TestableProcessor processor = new TestableProcessor(); + final TestRunner runner = TestRunners.newTestRunner(processor); - runner.setProperty(PutKafka.TOPIC, "topic1"); + runner.setProperty(PutKafka.TOPIC, "topic1"); runner.setProperty(PutKafka.KEY, "key1"); runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n"); - ProcessContext context = runner.getProcessContext(); - ProducerConfig config = processor.createConfig(context); + final ProcessContext context = runner.getProcessContext(); + final ProducerConfig config = processor.createConfig(context); // Check the codec - CompressionCodec codec = config.compressionCodec(); + final CompressionCodec codec = config.compressionCodec(); assertTrue(codec instanceof kafka.message.NoCompressionCodec$); // Check compressed topics - Seq<String> compressedTopics = config.compressedTopics(); + final Seq<String> compressedTopics = config.compressedTopics(); assertEquals(0, compressedTopics.size()); // Check the producer type - String actualProducerType = config.producerType(); + final String actualProducerType = config.producerType(); assertEquals(PutKafka.PRODUCER_TYPE.getDefaultValue(), actualProducerType); } @@ -262,10 +262,10 @@ public class TestPutKafka { @Test public void testProducerConfigAsyncWithCompression() { - final TestableProcessor processor = new TestableProcessor(); - TestRunner runner = TestRunners.newTestRunner(processor); + final TestableProcessor processor = new TestableProcessor(); + final TestRunner runner = TestRunners.newTestRunner(processor); - runner.setProperty(PutKafka.TOPIC, "topic1"); + runner.setProperty(PutKafka.TOPIC, "topic1"); runner.setProperty(PutKafka.KEY, "key1"); runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n"); @@ -273,33 +273,33 @@ public class TestPutKafka { runner.setProperty(PutKafka.COMPRESSION_CODEC, PutKafka.COMPRESSION_CODEC_SNAPPY.getValue()); runner.setProperty(PutKafka.COMPRESSED_TOPICS, "topic01,topic02,topic03"); - ProcessContext context = runner.getProcessContext(); - ProducerConfig config = processor.createConfig(context); + final ProcessContext context = runner.getProcessContext(); + final ProducerConfig config = processor.createConfig(context); // Check that the codec is snappy - CompressionCodec codec = config.compressionCodec(); + final CompressionCodec codec = config.compressionCodec(); assertTrue(codec instanceof kafka.message.SnappyCompressionCodec$); // Check compressed topics - Seq<String> compressedTopics = config.compressedTopics(); + final Seq<String> compressedTopics = config.compressedTopics(); assertEquals(3, compressedTopics.size()); assertTrue(compressedTopics.contains("topic01")); assertTrue(compressedTopics.contains("topic02")); assertTrue(compressedTopics.contains("topic03")); // Check the producer type - String actualProducerType = config.producerType(); + final String actualProducerType = config.producerType(); assertEquals("async", actualProducerType); } - + @Test public void testProducerConfigAsyncQueueThresholds() { - final TestableProcessor processor = new TestableProcessor(); - TestRunner runner = TestRunners.newTestRunner(processor); + final TestableProcessor processor = new TestableProcessor(); + final TestRunner runner = TestRunners.newTestRunner(processor); - runner.setProperty(PutKafka.TOPIC, "topic1"); + runner.setProperty(PutKafka.TOPIC, "topic1"); runner.setProperty(PutKafka.KEY, "key1"); runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n"); @@ -308,27 +308,27 @@ public class TestPutKafka { runner.setProperty(PutKafka.QUEUE_BUFFERING_MAX_MESSAGES, "535"); runner.setProperty(PutKafka.QUEUE_ENQUEUE_TIMEOUT, "200 ms"); - ProcessContext context = runner.getProcessContext(); - ProducerConfig config = processor.createConfig(context); + final ProcessContext context = runner.getProcessContext(); + final ProducerConfig config = processor.createConfig(context); // Check that the queue thresholds were properly translated assertEquals(7000, config.queueBufferingMaxMs()); assertEquals(535, config.queueBufferingMaxMessages()); assertEquals(200, config.queueEnqueueTimeoutMs()); - + // Check the producer type - String actualProducerType = config.producerType(); + final String actualProducerType = config.producerType(); assertEquals("async", actualProducerType); } - + @Test public void testProducerConfigInvalidBatchSize() { - final TestableProcessor processor = new TestableProcessor(); - TestRunner runner = TestRunners.newTestRunner(processor); + final TestableProcessor processor = new TestableProcessor(); + final TestRunner runner = TestRunners.newTestRunner(processor); - runner.setProperty(PutKafka.TOPIC, "topic1"); + runner.setProperty(PutKafka.TOPIC, "topic1"); runner.setProperty(PutKafka.KEY, "key1"); runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n"); @@ -339,28 +339,28 @@ public class TestPutKafka { runner.assertNotValid(); } - + @Test public void testProducerConfigAsyncDefaultEnqueueTimeout() { - final TestableProcessor processor = new TestableProcessor(); - TestRunner runner = TestRunners.newTestRunner(processor); + final TestableProcessor processor = new TestableProcessor(); + final TestRunner runner = TestRunners.newTestRunner(processor); - runner.setProperty(PutKafka.TOPIC, "topic1"); + runner.setProperty(PutKafka.TOPIC, "topic1"); runner.setProperty(PutKafka.KEY, "key1"); runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n"); runner.setProperty(PutKafka.PRODUCER_TYPE, PutKafka.PRODUCTER_TYPE_ASYNCHRONOUS.getValue()); // Do not set QUEUE_ENQUEUE_TIMEOUT - ProcessContext context = runner.getProcessContext(); - ProducerConfig config = processor.createConfig(context); + final ProcessContext context = runner.getProcessContext(); + final ProducerConfig config = processor.createConfig(context); // Check that the enqueue timeout defaults to -1 assertEquals(-1, config.queueEnqueueTimeoutMs()); // Check the producer type - String actualProducerType = config.producerType(); + final String actualProducerType = config.producerType(); assertEquals("async", actualProducerType); } @@ -391,12 +391,13 @@ public class TestPutKafka { public MockProducer getProducer() { return producer; } - + /** * Exposed for test verification */ + @Override public ProducerConfig createConfig(final ProcessContext context) { - return super.createConfig(context); + return super.createConfig(context); } }