Repository: nifi
Updated Branches:
  refs/heads/master 36b16c9cd -> 58e4fb576


NIFI-4008: Allow 0 or more records within a message. This closes #1891.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/58e4fb57
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/58e4fb57
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/58e4fb57

Branch: refs/heads/master
Commit: 58e4fb576e457b3ce515a1b22ec383b606efd297
Parents: 36b16c9
Author: Koji Kawamura <ijokaruma...@apache.org>
Authored: Mon Jun 5 21:40:53 2017 +0900
Committer: Mark Payne <marka...@hotmail.com>
Committed: Mon Oct 2 15:40:29 2017 -0400

----------------------------------------------------------------------
 .../kafka/pubsub/ConsumeKafkaRecord_0_10.java   |   4 +-
 .../processors/kafka/pubsub/ConsumerLease.java  | 151 +++++++++----------
 2 files changed, 75 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/58e4fb57/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_0_10.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_0_10.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_0_10.java
index c44e25e..adb7a6f 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_0_10.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_0_10.java
@@ -55,11 +55,11 @@ import org.apache.nifi.serialization.RecordReaderFactory;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
 
 @CapabilityDescription("Consumes messages from Apache Kafka specifically built 
against the Kafka 0.10.x Consumer API. "
-    + "The complementary NiFi processor for sending messages is 
PublishKafka_0_10. Please note that, at this time, the Processor assumes that "
+    + "The complementary NiFi processor for sending messages is 
PublishKafkaRecord_0_10. Please note that, at this time, the Processor assumes 
that "
     + "all records that are retrieved from a given partition have the same 
schema. If any of the Kafka messages are pulled but cannot be parsed or written 
with the "
     + "configured Record Reader or Record Writer, the contents of the message 
will be written to a separate FlowFile, and that FlowFile will be transferred 
to the "
     + "'parse.failure' relationship. Otherwise, each FlowFile is sent to the 
'success' relationship and may contain many individual messages within the 
single FlowFile. "
-    + "A 'record.count' attribute is added to indicate how many messages are 
contained in the FlowFile.")
+    + "A 'record.count' attribute is added to indicate how many records are 
contained in the FlowFile.")
 @Tags({"Kafka", "Get", "Record", "csv", "avro", "json", "Ingest", "Ingress", 
"Topic", "PubSub", "Consume", "0.10.x"})
 @WritesAttributes({
     @WritesAttribute(attribute = "record.count", description = "The number of 
records received"),

http://git-wip-us.apache.org/repos/asf/nifi/blob/58e4fb57/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
index 64af412..8dc13f4 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
@@ -35,6 +35,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
 
 import javax.xml.bind.DatatypeConverter;
 
@@ -432,105 +433,99 @@ public abstract class ConsumerLease implements 
Closeable, ConsumerRebalanceListe
         bundleMap.put(bundleInfo, tracker);
     }
 
-    private void handleParseFailure(final ConsumerRecord<byte[], byte[]> 
consumerRecord, final ProcessSession session, final Exception cause) {
-        handleParseFailure(consumerRecord, session, cause, "Failed to parse 
message from Kafka using the configured Record Reader. "
-            + "Will route message as its own FlowFile to the 'parse.failure' 
relationship");
-    }
 
-    private void handleParseFailure(final ConsumerRecord<byte[], byte[]> 
consumerRecord, final ProcessSession session, final Exception cause, final 
String message) {
-        // If we are unable to parse the data, we need to transfer it to 
'parse failure' relationship
-        final Map<String, String> attributes = new HashMap<>();
-        attributes.put(KafkaProcessorUtils.KAFKA_OFFSET, 
String.valueOf(consumerRecord.offset()));
-        attributes.put(KafkaProcessorUtils.KAFKA_PARTITION, 
String.valueOf(consumerRecord.partition()));
-        attributes.put(KafkaProcessorUtils.KAFKA_TOPIC, 
consumerRecord.topic());
+    private void writeRecordData(final ProcessSession session, final 
List<ConsumerRecord<byte[], byte[]>> messages, final TopicPartition 
topicPartition) {
+        RecordSetWriter writer = null;
 
-        FlowFile failureFlowFile = session.create();
-        final byte[] value = consumerRecord.value();
-        if (value != null) {
-            failureFlowFile = session.write(failureFlowFile, out -> 
out.write(value));
-        }
-        failureFlowFile = session.putAllAttributes(failureFlowFile, 
attributes);
+        final BiConsumer<ConsumerRecord<byte[], byte[]>, Exception> 
handleParseFailure = (consumerRecord, e) -> {
+            // If we are unable to parse the data, we need to transfer it to 
'parse failure' relationship
+            // And continue to the next message.
+            final Map<String, String> attributes = new HashMap<>();
+            attributes.put(KafkaProcessorUtils.KAFKA_OFFSET, 
String.valueOf(consumerRecord.offset()));
+            attributes.put(KafkaProcessorUtils.KAFKA_PARTITION, 
String.valueOf(topicPartition.partition()));
+            attributes.put(KafkaProcessorUtils.KAFKA_TOPIC, 
topicPartition.topic());
 
-        final String transitUri = 
KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, 
consumerRecord.topic());
-        session.getProvenanceReporter().receive(failureFlowFile, transitUri);
+            FlowFile failureFlowFile = session.create();
+            failureFlowFile = session.write(failureFlowFile, out -> 
out.write(consumerRecord.value()));
+            failureFlowFile = session.putAllAttributes(failureFlowFile, 
attributes);
 
-        session.transfer(failureFlowFile, REL_PARSE_FAILURE);
+            final String transitUri = 
KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, 
topicPartition.topic());
+            session.getProvenanceReporter().receive(failureFlowFile, 
transitUri);
 
-        if (cause == null) {
-            logger.error(message);
-        } else {
-            logger.error(message, cause);
-        }
+            session.transfer(failureFlowFile, REL_PARSE_FAILURE);
+            logger.error("Failed to parse message from Kafka using the 
configured Record Reader. "
+                    + "Will route message as its own FlowFile to the 
'parse.failure' relationship", e);
 
-        session.adjustCounter("Parse Failures", 1, false);
-    }
-
-    private void writeRecordData(final ProcessSession session, final 
List<ConsumerRecord<byte[], byte[]>> records, final TopicPartition 
topicPartition) {
-        RecordSetWriter writer = null;
+            session.adjustCounter("Parse Failures", 1, false);
+        };
 
         try {
-            for (final ConsumerRecord<byte[], byte[]> consumerRecord : 
records) {
-                final Record record;
+            for (final ConsumerRecord<byte[], byte[]> consumerRecord : 
messages) {
                 try (final InputStream in = new 
ByteArrayInputStream(consumerRecord.value())) {
-                    final RecordReader reader = 
readerFactory.createRecordReader(Collections.EMPTY_MAP, in, logger);
-                    record = reader.nextRecord();
-                } catch (final Exception e) {
-                    handleParseFailure(consumerRecord, session, e);
-                    continue;
-                }
 
-                if (record == null) {
-                    handleParseFailure(consumerRecord, session, null);
-                    continue;
-                }
+                    final RecordReader reader;
+                    final Record firstRecord;
 
-                // Determine the bundle for this record.
-                final RecordSchema recordSchema = record.getSchema();
-                final BundleInformation bundleInfo = new 
BundleInformation(topicPartition, recordSchema);
-
-                BundleTracker tracker = bundleMap.get(bundleInfo);
-                if (tracker == null) {
-                    FlowFile flowFile = session.create();
-                    final OutputStream rawOut = session.write(flowFile);
-
-                    final RecordSchema writeSchema;
                     try {
-                        writeSchema = 
writerFactory.getSchema(Collections.emptyMap(), recordSchema);
+                        reader = 
readerFactory.createRecordReader(Collections.emptyMap(), in, logger);
+                        firstRecord = reader.nextRecord();
                     } catch (final Exception e) {
-                        logger.error("Failed to obtain Schema for FlowFile. 
Will roll back the Kafka message offsets.", e);
+                        handleParseFailure.accept(consumerRecord, e);
+                        continue;
+                    }
 
+                    if (firstRecord == null) {
+                        // If the message doesn't contain any record, do 
nothing.
+                        continue;
+                    }
+
+                    // Determine the bundle for this record.
+                    final RecordSchema recordSchema = firstRecord.getSchema();
+                    final BundleInformation bundleInfo = new 
BundleInformation(topicPartition, recordSchema);
+
+                    BundleTracker tracker = bundleMap.get(bundleInfo);
+                    if (tracker == null) {
+                        FlowFile flowFile = session.create();
+                        final OutputStream rawOut = session.write(flowFile);
+
+                        final RecordSchema writeSchema;
                         try {
-                            rollback(topicPartition);
-                        } catch (final Exception rollbackException) {
-                            logger.warn("Attempted to rollback Kafka message 
offset but was unable to do so", rollbackException);
+                            writeSchema = 
writerFactory.getSchema(Collections.emptyMap(), recordSchema);
+                        } catch (final Exception e) {
+                            logger.error("Failed to obtain Schema for 
FlowFile. Will roll back the Kafka message offsets.", e);
+
+                            try {
+                                rollback(topicPartition);
+                            } catch (final Exception rollbackException) {
+                                logger.warn("Attempted to rollback Kafka 
message offset but was unable to do so", rollbackException);
+                            }
+
+                            yield();
+                            throw new ProcessException(e);
                         }
 
-                        yield();
-                        throw new ProcessException(e);
-                    }
-
-                    writer = writerFactory.createWriter(logger, writeSchema, 
rawOut);
-                    writer.beginRecordSet();
+                        writer = writerFactory.createWriter(logger, 
writeSchema, rawOut);
+                        writer.beginRecordSet();
 
-                    tracker = new BundleTracker(consumerRecord, 
topicPartition, keyEncoding, writer);
-                    tracker.updateFlowFile(flowFile);
-                    bundleMap.put(bundleInfo, tracker);
-                } else {
-                    writer = tracker.recordWriter;
-                }
+                        tracker = new BundleTracker(consumerRecord, 
topicPartition, keyEncoding, writer);
+                        tracker.updateFlowFile(flowFile);
+                        bundleMap.put(bundleInfo, tracker);
+                    } else {
+                        writer = tracker.recordWriter;
+                    }
 
-                try {
-                    writer.write(record);
-                } catch (final RuntimeException re) {
-                    handleParseFailure(consumerRecord, session, re, "Failed to 
write message from Kafka using the configured Record Writer. "
-                        + "Will route message as its own FlowFile to the 
'parse.failure' relationship");
-                    continue;
+                    try {
+                        for (Record record = firstRecord; record != null; 
record = reader.nextRecord()) {
+                            writer.write(record);
+                            tracker.incrementRecordCount(1L);
+                            session.adjustCounter("Records Received", 1, 
false);
+                        }
+                    } catch (Exception e) {
+                        // Transfer it to 'parse failure' and continue to the 
next message.
+                        handleParseFailure.accept(consumerRecord, e);
+                    }
                 }
-
-                tracker.incrementRecordCount(1L);
             }
-
-            session.adjustCounter("Records Received", records.size(), false);
         } catch (final Exception e) {
             logger.error("Failed to properly receive messages from Kafka. Will 
roll back session and any un-committed offsets from Kafka.", e);
 

Reply via email to