Repository: nifi Updated Branches: refs/heads/master 6d16fdf17 -> 6937a6cf6
NIFI-3953: This closes #1837. Allow multiple schemas on same kafka topic/partition for ConsumeKafkaRecord_0_10 Also, updated record writers to ensure that they write the schema as appropriate if not using a RecordSet. Updated ConsumeKafkaRecord to allow for multiple schemas to be on same topic and partition Signed-off-by: joewitt <joew...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/6937a6cf Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/6937a6cf Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/6937a6cf Branch: refs/heads/master Commit: 6937a6cf64c2f9e437b96550259575488eb284ec Parents: 6d16fdf Author: Mark Payne <marka...@hotmail.com> Authored: Mon May 22 12:02:18 2017 -0400 Committer: joewitt <joew...@apache.org> Committed: Mon May 22 14:37:53 2017 -0400 ---------------------------------------------------------------------- .../serialization/AbstractRecordSetWriter.java | 4 +- .../processors/kafka/pubsub/ConsumerLease.java | 179 ++++++++++--------- .../processors/kafka/pubsub/PublisherLease.java | 1 + .../avro/WriteAvroResultWithExternalSchema.java | 8 +- .../org/apache/nifi/csv/WriteCSVResult.java | 6 + .../org/apache/nifi/json/WriteJsonResult.java | 7 + 6 files changed, 120 insertions(+), 85 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/6937a6cf/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java index 6ce9138..4de5ce3 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java @@ -68,7 +68,7 @@ public abstract class AbstractRecordSetWriter implements RecordSetWriter { return recordCount; } - protected final boolean isRecordSetActive() { + protected final boolean isActiveRecordSet() { return activeRecordSet; } @@ -84,7 +84,7 @@ public abstract class AbstractRecordSetWriter implements RecordSetWriter { @Override public final WriteResult finishRecordSet() throws IOException { - if (!isRecordSetActive()) { + if (!isActiveRecordSet()) { throw new IllegalStateException("Cannot finish RecordSet because no RecordSet has begun"); } http://git-wip-us.apache.org/repos/asf/nifi/blob/6937a6cf/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java index 563ece6..effd2e4 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java @@ -21,22 +21,18 @@ import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_10.RE import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING; import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING; -import java.io.BufferedOutputStream; import java.io.ByteArrayInputStream; import java.io.Closeable; -import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import javax.xml.bind.DatatypeConverter; @@ -57,11 +53,10 @@ import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.RecordReaderFactory; import org.apache.nifi.serialization.RecordSetWriter; import org.apache.nifi.serialization.RecordSetWriterFactory; -import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.WriteResult; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordSchema; -import org.apache.nifi.serialization.record.RecordSet; +import org.apache.nifi.util.Tuple; /** * This class represents a lease to access a Kafka Consumer object. The lease is @@ -411,94 +406,116 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe return; } - FlowFile flowFile = session.create(); - try { - final RecordSchema schema; + final Map<RecordSchema, Tuple<FlowFile, RecordSetWriter>> writers = new HashMap<>(); - try { - schema = writerFactory.getSchema(flowFile, new ByteArrayInputStream(records.get(0).value())); - } catch (final Exception e) { - logger.error("Failed to obtain Schema for FlowFile. Will roll back the Kafka message offsets.", e); + // In order to obtain a RecordReader from the RecordReaderFactory, we need to give it a FlowFile. + // We don't want to create a new FlowFile for each record that we receive, so we will just create + // a "temporary flowfile" that will be removed in the finally block below and use that to pass to + // the createRecordReader method. + final FlowFile tempFlowFile = session.create(); + try { + final Iterator<ConsumerRecord<byte[], byte[]>> itr = records.iterator(); + while (itr.hasNext()) { + final ConsumerRecord<byte[], byte[]> consumerRecord = itr.next(); + final InputStream in = new ByteArrayInputStream(consumerRecord.value()); + final Record record; try { - rollback(topicPartition); - } catch (final Exception rollbackException) { - logger.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException); - } + final RecordReader reader = readerFactory.createRecordReader(tempFlowFile, in, logger); + record = reader.nextRecord(); + } catch (final Exception e) { + // If we are unable to parse the data, we need to transfer it to 'parse failure' relationship + final Map<String, String> attributes = new HashMap<>(); + attributes.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(consumerRecord.offset())); + attributes.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(topicPartition.partition())); + attributes.put(KafkaProcessorUtils.KAFKA_TOPIC, topicPartition.topic()); - yield(); - throw new ProcessException(e); - } + FlowFile failureFlowFile = session.create(); + failureFlowFile = session.write(failureFlowFile, out -> out.write(consumerRecord.value())); + failureFlowFile = session.putAllAttributes(failureFlowFile, attributes); - final FlowFile ff = flowFile; - final AtomicReference<WriteResult> writeResult = new AtomicReference<>(); - final AtomicReference<String> mimeTypeRef = new AtomicReference<>(); + final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, topicPartition.topic()); + session.getProvenanceReporter().receive(failureFlowFile, transitUri); - flowFile = session.write(flowFile, rawOut -> { - final Iterator<ConsumerRecord<byte[], byte[]>> itr = records.iterator(); + session.transfer(failureFlowFile, REL_PARSE_FAILURE); + logger.error("Failed to parse message from Kafka using the configured Record Reader. " + + "Will route message as its own FlowFile to the 'parse.failure' relationship", e); - final RecordSchema emptySchema = new SimpleRecordSchema(Collections.emptyList()); - final RecordSet recordSet = new RecordSet() { - @Override - public RecordSchema getSchema() throws IOException { - return emptySchema; - } + session.adjustCounter("Parse Failures", 1, false); + continue; + } + + final RecordSchema recordSchema = record.getSchema(); - @Override - public Record next() throws IOException { - while (itr.hasNext()) { - final ConsumerRecord<byte[], byte[]> consumerRecord = itr.next(); - - final InputStream in = new ByteArrayInputStream(consumerRecord.value()); - try { - final RecordReader reader = readerFactory.createRecordReader(ff, in, logger); - final Record record = reader.nextRecord(); - return record; - } catch (final Exception e) { - final Map<String, String> attributes = new HashMap<>(); - attributes.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(consumerRecord.offset())); - attributes.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(topicPartition.partition())); - attributes.put(KafkaProcessorUtils.KAFKA_TOPIC, topicPartition.topic()); - - FlowFile failureFlowFile = session.create(); - failureFlowFile = session.write(failureFlowFile, out -> out.write(consumerRecord.value())); - failureFlowFile = session.putAllAttributes(failureFlowFile, attributes); - - session.transfer(failureFlowFile, REL_PARSE_FAILURE); - logger.error("Failed to parse message from Kafka using the configured Record Reader. " - + "Will route message as its own FlowFile to the 'parse.failure' relationship", e); - - session.adjustCounter("Parse Failures", 1, false); - } + Tuple<FlowFile, RecordSetWriter> tuple = writers.get(recordSchema); + if (tuple == null) { + FlowFile flowFile = session.create(); + final OutputStream rawOut = session.write(flowFile); + + final RecordSchema writeSchema; + try { + writeSchema = writerFactory.getSchema(flowFile, new ByteArrayInputStream(records.get(0).value())); + } catch (final Exception e) { + logger.error("Failed to obtain Schema for FlowFile. Will roll back the Kafka message offsets.", e); + + try { + rollback(topicPartition); + } catch (final Exception rollbackException) { + logger.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException); } - return null; + yield(); + throw new ProcessException(e); } - }; - try (final OutputStream out = new BufferedOutputStream(rawOut); - final RecordSetWriter writer = writerFactory.createWriter(logger, schema, ff, out)) { - writeResult.set(writer.write(recordSet)); - mimeTypeRef.set(writer.getMimeType()); - } catch (final Exception e) { - logger.error("Failed to write records to FlowFile. Will roll back the Kafka message offsets.", e); + final RecordSetWriter writer = writerFactory.createWriter(logger, writeSchema, flowFile, rawOut); + writer.beginRecordSet(); - try { - rollback(topicPartition); - } catch (final Exception rollbackException) { - logger.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException); - } + tuple = new Tuple<>(flowFile, writer); + writers.put(recordSchema, tuple); + } + + final RecordSetWriter writer = tuple.getValue(); + writer.write(record); + } + } catch (final Exception e) { + logger.error("Failed to properly receive messages from Kafka. Will roll back session and any un-committed offsets from Kafka.", e); + + try { + rollback(topicPartition); + } catch (final Exception rollbackException) { + logger.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException); + } + + throw new ProcessException(e); + } finally { + session.remove(tempFlowFile); + } + + for (final Tuple<FlowFile, RecordSetWriter> tuple : writers.values()) { + FlowFile flowFile = tuple.getKey(); + final RecordSetWriter writer = tuple.getValue(); - yield(); - throw new ProcessException(e); + final WriteResult writeResult; + try { + writeResult = writer.finishRecordSet(); + writer.close(); + } catch (final Exception e) { + logger.error("Failed to finish writing records to Content Repository", e); + try { + rollback(topicPartition); + } catch (final Exception rollbackException) { + logger.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException); } - }); + throw new ProcessException(e); + } - final WriteResult result = writeResult.get(); - if (result.getRecordCount() > 0) { - final Map<String, String> attributes = new HashMap<>(result.getAttributes()); - attributes.put(CoreAttributes.MIME_TYPE.key(), mimeTypeRef.get()); - attributes.put("record.count", String.valueOf(result.getRecordCount())); + final int recordCount = writeResult.getRecordCount(); + if (recordCount > 0) { + final Map<String, String> attributes = new HashMap<>(); + attributes.putAll(writeResult.getAttributes()); + attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType()); + attributes.put("record.count", String.valueOf(recordCount)); attributes.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(topicPartition.partition())); attributes.put(KafkaProcessorUtils.KAFKA_TOPIC, topicPartition.topic()); @@ -509,17 +526,15 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, topicPartition.topic()); session.getProvenanceReporter().receive(flowFile, transitUri, executionDurationMillis); - session.adjustCounter("Records Received", result.getRecordCount(), false); + session.adjustCounter("Records Received", recordCount, false); session.transfer(flowFile, REL_SUCCESS); } else { session.remove(flowFile); } - } catch (final Exception e) { - session.remove(flowFile); - throw e; } } + private void populateAttributes(final BundleTracker tracker) { final Map<String, String> kafkaAttrs = new HashMap<>(); kafkaAttrs.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(tracker.initialOffset)); http://git-wip-us.apache.org/repos/asf/nifi/blob/6937a6cf/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java index 66641df..4238956 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java @@ -112,6 +112,7 @@ public class PublisherLease implements Closeable { while ((record = recordSet.next()) != null) { recordCount++; baos.reset(); + writer.write(record); writer.flush(); http://git-wip-us.apache.org/repos/asf/nifi/blob/6937a6cf/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java index 25d494e..8464e45 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java @@ -50,7 +50,6 @@ public class WriteAvroResultWithExternalSchema extends AbstractRecordSetWriter { this.buffered = new BufferedOutputStream(out); datumWriter = new GenericDatumWriter<>(avroSchema); - schemaAccessWriter.writeHeader(recordSchema, buffered); encoder = EncoderFactory.get().blockingBinaryEncoder(buffered, null); } @@ -67,6 +66,13 @@ public class WriteAvroResultWithExternalSchema extends AbstractRecordSetWriter { @Override public Map<String, String> writeRecord(final Record record) throws IOException { + // If we are not writing an active record set, then we need to ensure that we write the + // schema information. + if (!isActiveRecordSet()) { + flush(); + schemaAccessWriter.writeHeader(recordSchema, getOutputStream()); + } + final GenericRecord rec = AvroTypeUtil.createAvroRecord(record, avroSchema); datumWriter.write(rec, encoder); return schemaAccessWriter.getAttributes(recordSchema); http://git-wip-us.apache.org/repos/asf/nifi/blob/6937a6cf/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java index 34a51ba..00270ed 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java @@ -95,6 +95,12 @@ public class WriteCSVResult extends AbstractRecordSetWriter implements RecordSet @Override public Map<String, String> writeRecord(final Record record) throws IOException { + // If we are not writing an active record set, then we need to ensure that we write the + // schema information. + if (!isActiveRecordSet()) { + schemaWriter.writeHeader(recordSchema, getOutputStream()); + } + int i = 0; for (final RecordField recordField : recordSchema.getFields()) { fieldValues[i++] = record.getAsString(recordField, getFormat(recordField)); http://git-wip-us.apache.org/repos/asf/nifi/blob/6937a6cf/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java index 8acaa04..cccac12 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java @@ -104,6 +104,13 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe @Override public Map<String, String> writeRecord(final Record record) throws IOException { + // If we are not writing an active record set, then we need to ensure that we write the + // schema information. + if (!isActiveRecordSet()) { + generator.flush(); + schemaAccess.writeHeader(recordSchema, getOutputStream()); + } + writeRecord(record, recordSchema, generator, g -> g.writeStartObject(), g -> g.writeEndObject()); return schemaAccess.getAttributes(recordSchema); }