NIFI-3863: Initial implementation of Lookup Services. Implemented LookupRecord processors. This required some refactoring of RecordSetWriter interface, so refactored that interface and all implementations and references of it
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/9bd0246a Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/9bd0246a Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/9bd0246a Branch: refs/heads/master Commit: 9bd0246a96cfcda01b972834823250a9cddf77a8 Parents: 3b98abb Author: Mark Payne <marka...@hotmail.com> Authored: Wed May 10 13:09:45 2017 -0400 Committer: joewitt <joew...@apache.org> Committed: Fri May 19 01:02:41 2017 -0400 ---------------------------------------------------------------------- .../controller/AbstractControllerService.java | 28 ++ nifi-assembly/pom.xml | 5 + .../nifi/record/path/StandardFieldValue.java | 7 +- .../serialization/AbstractRecordSetWriter.java | 105 ++++++ .../nifi/serialization/RecordSetWriter.java | 21 +- .../apache/nifi/serialization/RecordWriter.java | 7 +- .../hadoop/AbstractFetchHDFSRecord.java | 11 +- .../serialization/record/MockRecordParser.java | 5 +- .../serialization/record/MockRecordWriter.java | 66 ++-- .../repository/StandardProcessSession.java | 12 +- .../processors/kafka/pubsub/ConsumerLease.java | 32 +- .../kafka/pubsub/PublishKafkaRecord_0_10.java | 14 +- .../processors/kafka/pubsub/PublisherLease.java | 14 +- .../pubsub/TestPublishKafkaRecord_0_10.java | 19 +- .../kafka/pubsub/util/MockRecordWriter.java | 19 +- .../processors/parquet/FetchParquetTest.java | 4 +- .../services/AvroSchemaValidator.java | 5 +- .../record/script/ScriptedRecordSetWriter.java | 9 +- .../script/ScriptedRecordSetWriterTest.groovy | 7 +- .../groovy/test_record_writer_inline.groovy | 35 +- .../nifi-standard-processors/pom.xml | 4 + .../standard/AbstractRecordProcessor.java | 33 +- .../standard/AbstractRouteRecord.java | 223 +++++++++++++ .../nifi/processors/standard/LookupRecord.java | 208 ++++++++++++ .../nifi/processors/standard/QueryRecord.java | 29 +- .../nifi/processors/standard/SplitRecord.java | 31 +- .../org.apache.nifi.processor.Processor | 1 + .../processors/standard/TestLookupRecord.java | 232 +++++++++++++ .../processors/standard/TestQueryRecord.java | 20 +- .../nifi-lookup-service-api/pom.xml | 36 ++ .../nifi/lookup/LookupFailureException.java | 37 +++ .../org/apache/nifi/lookup/LookupService.java | 41 +++ .../apache/nifi/lookup/RecordLookupService.java | 41 +++ .../apache/nifi/lookup/StringLookupService.java | 39 +++ .../nifi-lookup-services-nar/pom.xml | 35 ++ .../src/main/resources/META-INF/NOTICE | 73 ++++ .../nifi-lookup-services/pom.xml | 51 +++ .../lookup/SimpleKeyValueLookupService.java | 58 ++++ .../nifi/lookup/maxmind/AnonymousIpSchema.java | 35 ++ .../apache/nifi/lookup/maxmind/CitySchema.java | 55 +++ .../nifi/lookup/maxmind/ContainerSchema.java | 37 +++ .../nifi/lookup/maxmind/DatabaseReader.java | 252 ++++++++++++++ .../nifi/lookup/maxmind/IPLookupService.java | 332 +++++++++++++++++++ .../apache/nifi/lookup/maxmind/IspSchema.java | 34 ++ ...org.apache.nifi.controller.ControllerService | 17 + .../additionalDetails.html | 102 ++++++ .../nifi-lookup-services-bundle/pom.xml | 28 ++ .../serialization/RecordSetWriterFactory.java | 5 +- .../apache/nifi/avro/AvroRecordSetWriter.java | 8 +- .../org/apache/nifi/avro/WriteAvroResult.java | 14 +- .../avro/WriteAvroResultWithExternalSchema.java | 77 ++--- .../nifi/avro/WriteAvroResultWithSchema.java | 63 ++-- .../org/apache/nifi/csv/CSVRecordSetWriter.java | 6 +- .../org/apache/nifi/csv/WriteCSVResult.java | 89 ++--- .../apache/nifi/json/JsonRecordSetWriter.java | 6 +- .../org/apache/nifi/json/WriteJsonResult.java | 75 ++--- .../nifi/text/FreeFormTextRecordSetWriter.java | 5 +- .../apache/nifi/text/FreeFormTextWriter.java | 36 +- .../apache/nifi/avro/TestWriteAvroResult.java | 27 +- .../avro/TestWriteAvroResultWithSchema.java | 6 +- .../avro/TestWriteAvroResultWithoutSchema.java | 6 +- .../org/apache/nifi/csv/TestWriteCSVResult.java | 57 ++-- .../apache/nifi/json/TestWriteJsonResult.java | 37 +-- .../nifi-standard-services-api-nar/pom.xml | 5 + nifi-nar-bundles/nifi-standard-services/pom.xml | 2 + pom.xml | 16 + 66 files changed, 2619 insertions(+), 430 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-api/src/main/java/org/apache/nifi/controller/AbstractControllerService.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/AbstractControllerService.java b/nifi-api/src/main/java/org/apache/nifi/controller/AbstractControllerService.java index 6a27761..9762f3e 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/AbstractControllerService.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/AbstractControllerService.java @@ -16,7 +16,11 @@ */ package org.apache.nifi.controller; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; import org.apache.nifi.components.AbstractConfigurableComponent; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.state.StateManager; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.ProcessorInitializationContext; @@ -28,6 +32,7 @@ public abstract class AbstractControllerService extends AbstractConfigurableComp private ControllerServiceLookup serviceLookup; private ComponentLog logger; private StateManager stateManager; + private volatile ConfigurationContext configurationContext; @Override public final void initialize(final ControllerServiceInitializationContext context) throws InitializationException { @@ -75,4 +80,27 @@ public abstract class AbstractControllerService extends AbstractConfigurableComp protected StateManager getStateManager() { return stateManager; } + + @OnEnabled + public final void abstractStoreConfigContext(final ConfigurationContext configContext) { + this.configurationContext = configContext; + } + + @OnDisabled + public final void abstractClearConfigContext() { + this.configurationContext = null; + } + + protected ConfigurationContext getConfigurationContext() { + final ConfigurationContext context = this.configurationContext; + if (context == null) { + throw new IllegalStateException("No Configuration Context exists"); + } + + return configurationContext; + } + + protected PropertyValue getProperty(final PropertyDescriptor descriptor) { + return getConfigurationContext().getProperty(descriptor); + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-assembly/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index 18cc689..5c7bff7 100755 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -198,6 +198,11 @@ </dependency> <dependency> <groupId>org.apache.nifi</groupId> + <artifactId>nifi-lookup-services-nar</artifactId> + <type>nar</type> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> <artifactId>nifi-poi-nar</artifactId> <type>nar</type> </dependency> http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/StandardFieldValue.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/StandardFieldValue.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/StandardFieldValue.java index 4447fed..b02deb4 100644 --- a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/StandardFieldValue.java +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/StandardFieldValue.java @@ -103,7 +103,12 @@ public class StandardFieldValue implements FieldValue { public void updateValue(final Object newValue) { final Optional<Record> parentRecord = getParentRecord(); if (!parentRecord.isPresent()) { - throw new UnsupportedOperationException("Cannot update the field value because the value is not associated with any record"); + if (value instanceof Record) { + ((Record) value).setValue(getField().getFieldName(), newValue); + return; + } else { + throw new UnsupportedOperationException("Cannot update the field value because the value is not associated with any record"); + } } parentRecord.get().setValue(getField().getFieldName(), newValue); http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java new file mode 100644 index 0000000..5feb264 --- /dev/null +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.serialization; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Collections; +import java.util.Map; + +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSet; + +public abstract class AbstractRecordSetWriter implements RecordSetWriter { + private final OutputStream out; + private int recordCount = 0; + private boolean activeRecordSet = false; + + public AbstractRecordSetWriter(final OutputStream out) { + this.out = out; + } + + @Override + public void close() throws IOException { + this.out.close(); + } + + @Override + public WriteResult write(final RecordSet recordSet) throws IOException { + beginRecordSet(); + Record record; + while ((record = recordSet.next()) != null) { + write(record); + recordCount++; + } + return finishRecordSet(); + } + + protected OutputStream getOutputStream() { + return out; + } + + protected final int getRecordCount() { + return recordCount; + } + + protected final boolean isRecordSetActive() { + return activeRecordSet; + } + + @Override + public final void beginRecordSet() throws IOException { + if (activeRecordSet) { + throw new IllegalStateException("Cannot begin a RecordSet because a RecordSet has already begun"); + } + + activeRecordSet = true; + onBeginRecordSet(); + } + + @Override + public final WriteResult finishRecordSet() throws IOException { + if (!isRecordSetActive()) { + throw new IllegalStateException("Cannot finish RecordSet because no RecordSet has begun"); + } + + final Map<String, String> attributes = onFinishRecordSet(); + return WriteResult.of(recordCount, attributes == null ? Collections.emptyMap() : attributes); + } + + /** + * Method that is called as a result of {@link #beginRecordSet()} being called. This gives subclasses + * the chance to react to a new RecordSet beginning but prevents the subclass from changing how this + * implementation maintains its internal state. By default, this method does nothing. + * + * @throws IOException if unable to write the necessary data for a new RecordSet + */ + protected void onBeginRecordSet() throws IOException { + } + + /** + * Method that is called by {@link #finishRecordSet()} when a RecordSet is finished. This gives subclasses + * the chance to react to a RecordSet being completed but prevents the subclass from changing how this + * implementation maintains its internal state. + * + * @return a Map of key/value pairs that should be added to the FlowFile as attributes + */ + protected Map<String, String> onFinishRecordSet() throws IOException { + return Collections.emptyMap(); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordSetWriter.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordSetWriter.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordSetWriter.java index 7d6fa1c..7c29cfe 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordSetWriter.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordSetWriter.java @@ -37,9 +37,26 @@ public interface RecordSetWriter extends RecordWriter { * Writes the given result set to the given output stream * * @param recordSet the record set to serialize - * @param out the OutputStream to write to + * + * @return the results of writing the data + * @throws IOException if unable to write to the given OutputStream + */ + WriteResult write(RecordSet recordSet) throws IOException; + + /** + * Begins a new RecordSet + * + * @throws IOException if unable to write to the underlying OutputStream + * @throws IllegalStateException if a RecordSet has already been started + */ + void beginRecordSet() throws IOException; + + /** + * Finishes the currently active RecordSet and returns a WriteResult that includes information about what was written + * * @return the results of writing the data * @throws IOException if unable to write to the given OutputStream + * @throws IllegalStateException if a RecordSet has not been started via {@link #beginRecordSet()} */ - WriteResult write(RecordSet recordSet, OutputStream out) throws IOException; + WriteResult finishRecordSet() throws IOException; } http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordWriter.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordWriter.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordWriter.java index aa298d9..720953c 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordWriter.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordWriter.java @@ -17,21 +17,20 @@ package org.apache.nifi.serialization; +import java.io.Closeable; import java.io.IOException; -import java.io.OutputStream; import org.apache.nifi.serialization.record.Record; -public interface RecordWriter { +public interface RecordWriter extends Closeable { /** * Writes the given result set to the given output stream * * @param record the record set to serialize - * @param out the OutputStream to write to * @return the results of writing the data * @throws IOException if unable to write to the given OutputStream */ - WriteResult write(Record record, OutputStream out) throws IOException; + WriteResult write(Record record) throws IOException; /** * @return the MIME Type that the Result Set Writer produces. This will be added to FlowFiles using http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java index 8883965..7cc6bb5 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java @@ -188,12 +188,14 @@ public abstract class AbstractFetchHDFSRecord extends AbstractHadoopProcessor { final RecordSetWriterFactory recordSetWriterFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); final RecordSchema schema = recordSetWriterFactory.getSchema(originalFlowFile, new NullInputStream(0)); - final RecordSetWriter recordSetWriter = recordSetWriterFactory.createWriter(getLogger(), schema); final StopWatch stopWatch = new StopWatch(true); // use a child FlowFile so that if any error occurs we can route the original untouched FlowFile to retry/failure child = session.create(originalFlowFile); + + final FlowFile writableFlowFile = child; + final AtomicReference<String> mimeTypeRef = new AtomicReference<>(); child = session.write(child, (final OutputStream rawOut) -> { try (final BufferedOutputStream out = new BufferedOutputStream(rawOut); final HDFSRecordReader recordReader = createHDFSRecordReader(context, originalFlowFile, configuration, path)) { @@ -212,7 +214,10 @@ public abstract class AbstractFetchHDFSRecord extends AbstractHadoopProcessor { } }; - writeResult.set(recordSetWriter.write(recordSet, out)); + try (final RecordSetWriter recordSetWriter = recordSetWriterFactory.createWriter(getLogger(), schema, writableFlowFile, out)) { + writeResult.set(recordSetWriter.write(recordSet)); + mimeTypeRef.set(recordSetWriter.getMimeType()); + } } catch (Exception e) { exceptionHolder.set(e); } @@ -230,7 +235,7 @@ public abstract class AbstractFetchHDFSRecord extends AbstractHadoopProcessor { final Map<String,String> attributes = new HashMap<>(writeResult.get().getAttributes()); attributes.put(RECORD_COUNT_ATTR, String.valueOf(writeResult.get().getRecordCount())); - attributes.put(CoreAttributes.MIME_TYPE.key(), recordSetWriter.getMimeType()); + attributes.put(CoreAttributes.MIME_TYPE.key(), mimeTypeRef.get()); successFlowFile = session.putAllAttributes(successFlowFile, attributes); final URI uri = path.toUri(); http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordParser.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordParser.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordParser.java index e3ed23e..251eb46 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordParser.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordParser.java @@ -42,7 +42,7 @@ import org.apache.nifi.serialization.record.RecordSchema; public class MockRecordParser extends AbstractControllerService implements RecordReaderFactory { private final List<Object[]> records = new ArrayList<>(); private final List<RecordField> fields = new ArrayList<>(); - private final int failAfterN; + private int failAfterN; public MockRecordParser() { this(-1); @@ -52,6 +52,9 @@ public class MockRecordParser extends AbstractControllerService implements Recor this.failAfterN = failAfterN; } + public void failAfter(final int failAfterN) { + this.failAfterN = failAfterN; + } public void addSchemaField(final String fieldName, final RecordFieldType type) { fields.add(new RecordField(fieldName, type.getDataType())); http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java index b4253ee..525a51f 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java @@ -36,6 +36,10 @@ public class MockRecordWriter extends AbstractControllerService implements Recor private final int failAfterN; private final boolean quoteValues; + public MockRecordWriter() { + this(null); + } + public MockRecordWriter(final String header) { this(header, true, -1); } @@ -56,12 +60,16 @@ public class MockRecordWriter extends AbstractControllerService implements Recor } @Override - public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema) { + public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final FlowFile flowFile, final OutputStream out) { return new RecordSetWriter() { + private int recordCount = 0; + @Override - public WriteResult write(final RecordSet rs, final OutputStream out) throws IOException { - out.write(header.getBytes()); - out.write("\n".getBytes()); + public WriteResult write(final RecordSet rs) throws IOException { + if (header != null) { + out.write(header.getBytes()); + out.write("\n".getBytes()); + } int recordCount = 0; Record record = null; @@ -75,14 +83,14 @@ public class MockRecordWriter extends AbstractControllerService implements Recor int i = 0; for (final String fieldName : record.getSchema().getFieldNames()) { final String val = record.getAsString(fieldName); - if (quoteValues) { - out.write("\"".getBytes()); - if (val != null) { + if (val != null) { + if (quoteValues) { + out.write("\"".getBytes()); + out.write(val.getBytes()); + out.write("\"".getBytes()); + } else { out.write(val.getBytes()); } - out.write("\"".getBytes()); - } else if (val != null) { - out.write(val.getBytes()); } if (i++ < numCols - 1) { @@ -101,20 +109,24 @@ public class MockRecordWriter extends AbstractControllerService implements Recor } @Override - public WriteResult write(Record record, OutputStream out) throws IOException { - out.write(header.getBytes()); - out.write("\n".getBytes()); + public WriteResult write(Record record) throws IOException { + if (header != null) { + out.write(header.getBytes()); + out.write("\n".getBytes()); + } final int numCols = record.getSchema().getFieldCount(); int i = 0; for (final String fieldName : record.getSchema().getFieldNames()) { final String val = record.getAsString(fieldName); - if (quoteValues) { - out.write("\"".getBytes()); - out.write(val.getBytes()); - out.write("\"".getBytes()); - } else { - out.write(val.getBytes()); + if (val != null) { + if (quoteValues) { + out.write("\"".getBytes()); + out.write(val.getBytes()); + out.write("\"".getBytes()); + } else { + out.write(val.getBytes()); + } } if (i++ < numCols - 1) { @@ -123,8 +135,24 @@ public class MockRecordWriter extends AbstractControllerService implements Recor } out.write("\n".getBytes()); + recordCount++; + return WriteResult.of(1, Collections.emptyMap()); } + + @Override + public void close() throws IOException { + out.close(); + } + + @Override + public void beginRecordSet() throws IOException { + } + + @Override + public WriteResult finishRecordSet() throws IOException { + return WriteResult.of(recordCount, Collections.emptyMap()); + } }; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index b307930..62326a2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -2460,19 +2460,21 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE @Override public void close() throws IOException { + if (closed) { + return; + } + + closed = true; writeRecursionSet.remove(sourceFlowFile); final long bytesWritten = countingOut.getBytesWritten(); - if (!closed) { - StandardProcessSession.this.bytesWritten += bytesWritten; - closed = true; - } + StandardProcessSession.this.bytesWritten += bytesWritten; openOutputStreams.remove(sourceFlowFile); + flush(); removeTemporaryClaim(record); - flush(); final FlowFileRecord newFile = new StandardFlowFileRecord.Builder() .fromFlowFile(record.getCurrent()) .contentClaim(updatedClaim) http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java index 1ccce07..563ece6 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java @@ -413,16 +413,12 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe FlowFile flowFile = session.create(); try { - final RecordSetWriter writer; + final RecordSchema schema; + try { - final RecordSchema schema = writerFactory.getSchema(flowFile, new ByteArrayInputStream(records.get(0).value())); - writer = writerFactory.createWriter(logger, schema); + schema = writerFactory.getSchema(flowFile, new ByteArrayInputStream(records.get(0).value())); } catch (final Exception e) { - logger.error( - "Failed to obtain a Record Writer for serializing Kafka messages. This generally happens because the " - + "Record Writer cannot obtain the appropriate Schema, due to failure to connect to a remote Schema Registry " - + "or due to the Schema Access Strategy being dependent upon FlowFile Attributes that are not available. " - + "Will roll back the Kafka message offsets.", e); + logger.error("Failed to obtain Schema for FlowFile. Will roll back the Kafka message offsets.", e); try { rollback(topicPartition); @@ -436,6 +432,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe final FlowFile ff = flowFile; final AtomicReference<WriteResult> writeResult = new AtomicReference<>(); + final AtomicReference<String> mimeTypeRef = new AtomicReference<>(); flowFile = session.write(flowFile, rawOut -> { final Iterator<ConsumerRecord<byte[], byte[]>> itr = records.iterator(); @@ -479,15 +476,28 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe } }; - try (final OutputStream out = new BufferedOutputStream(rawOut)) { - writeResult.set(writer.write(recordSet, out)); + try (final OutputStream out = new BufferedOutputStream(rawOut); + final RecordSetWriter writer = writerFactory.createWriter(logger, schema, ff, out)) { + writeResult.set(writer.write(recordSet)); + mimeTypeRef.set(writer.getMimeType()); + } catch (final Exception e) { + logger.error("Failed to write records to FlowFile. Will roll back the Kafka message offsets.", e); + + try { + rollback(topicPartition); + } catch (final Exception rollbackException) { + logger.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException); + } + + yield(); + throw new ProcessException(e); } }); final WriteResult result = writeResult.get(); if (result.getRecordCount() > 0) { final Map<String, String> attributes = new HashMap<>(result.getAttributes()); - attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType()); + attributes.put(CoreAttributes.MIME_TYPE.key(), mimeTypeRef.get()); attributes.put("record.count", String.valueOf(result.getRecordCount())); attributes.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(topicPartition.partition())); http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java index 442ccc5..e48568b 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java @@ -58,7 +58,6 @@ import org.apache.nifi.serialization.MalformedRecordException; import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.RecordReaderFactory; import org.apache.nifi.serialization.RecordSetWriterFactory; -import org.apache.nifi.serialization.RecordWriter; import org.apache.nifi.serialization.record.RecordSchema; @Tags({"Apache", "Kafka", "Record", "csv", "json", "avro", "logs", "Put", "Send", "Message", "PubSub", "0.10.x"}) @@ -324,14 +323,13 @@ public class PublishKafkaRecord_0_10 extends AbstractProcessor { final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue(); final String messageKeyField = context.getProperty(MESSAGE_KEY_FIELD).evaluateAttributeExpressions(flowFile).getValue(); - final RecordWriter writer; - try (final InputStream in = new BufferedInputStream(session.read(flowFile))) { - final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); - final RecordSchema schema = writerFactory.getSchema(flowFile, in); + final RecordSchema schema; + final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); - writer = writerFactory.createWriter(getLogger(), schema); + try (final InputStream in = new BufferedInputStream(session.read(flowFile))) { + schema = writerFactory.getSchema(flowFile, in); } catch (final Exception e) { - getLogger().error("Failed to create a Record Writer for {}; routing to failure", new Object[] {flowFile, e}); + getLogger().error("Failed to determine Schema for writing messages to Kafka for {}; routing to failure", new Object[] {flowFile, e}); session.transfer(flowFile, REL_FAILURE); continue; } @@ -342,7 +340,7 @@ public class PublishKafkaRecord_0_10 extends AbstractProcessor { public void process(final InputStream rawIn) throws IOException { try (final InputStream in = new BufferedInputStream(rawIn)) { final RecordReader reader = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class).createRecordReader(flowFile, in, getLogger()); - lease.publish(flowFile, reader, writer, messageKeyField, topic); + lease.publish(flowFile, reader, writerFactory, schema, messageKeyField, topic); } catch (final SchemaNotFoundException | MalformedRecordException e) { throw new ProcessException(e); } http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java index be2697b..4b3a3ae 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java @@ -32,9 +32,12 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.serialization.RecordReader; -import org.apache.nifi.serialization.RecordWriter; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.RecordSet; import org.apache.nifi.stream.io.exception.TokenTooLargeException; import org.apache.nifi.stream.io.util.StreamDemarcator; @@ -93,7 +96,8 @@ public class PublisherLease implements Closeable { } } - void publish(final FlowFile flowFile, final RecordReader reader, final RecordWriter writer, final String messageKeyField, final String topic) throws IOException { + void publish(final FlowFile flowFile, final RecordReader reader, final RecordSetWriterFactory writerFactory, final RecordSchema schema, + final String messageKeyField, final String topic) throws IOException { if (tracker == null) { tracker = new InFlightMessageTracker(); } @@ -104,11 +108,11 @@ public class PublisherLease implements Closeable { final RecordSet recordSet = reader.createRecordSet(); int recordCount = 0; - try { + try (final RecordSetWriter writer = writerFactory.createWriter(logger, schema, flowFile, baos)) { while ((record = recordSet.next()) != null) { recordCount++; baos.reset(); - writer.write(record, baos); + writer.write(record); final byte[] messageContent = baos.toByteArray(); final String key = messageKeyField == null ? null : record.getAsString(messageKeyField); @@ -127,6 +131,8 @@ public class PublisherLease implements Closeable { } } catch (final TokenTooLargeException ttle) { tracker.fail(flowFile, ttle); + } catch (final SchemaNotFoundException snfe) { + throw new IOException(snfe); } catch (final Exception e) { tracker.fail(flowFile, e); poison(); http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_0_10.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_0_10.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_0_10.java index 8c6efb7..7cff2a7 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_0_10.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_0_10.java @@ -44,8 +44,8 @@ import org.apache.nifi.processors.kafka.pubsub.util.MockRecordWriter; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.RecordSetWriterFactory; -import org.apache.nifi.serialization.RecordWriter; import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -65,7 +65,8 @@ public class TestPublishKafkaRecord_0_10 { public void setup() throws InitializationException, IOException { mockPool = mock(PublisherPool.class); mockLease = mock(PublisherLease.class); - Mockito.doCallRealMethod().when(mockLease).publish(any(FlowFile.class), any(RecordReader.class), any(RecordWriter.class), any(String.class), any(String.class)); + Mockito.doCallRealMethod().when(mockLease).publish(any(FlowFile.class), any(RecordReader.class), any(RecordSetWriterFactory.class), + any(RecordSchema.class), any(String.class), any(String.class)); when(mockPool.obtainPublisher()).thenReturn(mockLease); @@ -103,7 +104,7 @@ public class TestPublishKafkaRecord_0_10 { runner.run(); runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_SUCCESS, 1); - verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordWriter.class), eq(null), eq(TOPIC_NAME)); + verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME)); verify(mockLease, times(1)).complete(); verify(mockLease, times(0)).poison(); verify(mockLease, times(1)).close(); @@ -122,7 +123,7 @@ public class TestPublishKafkaRecord_0_10 { runner.run(); runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_SUCCESS, 3); - verify(mockLease, times(3)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordWriter.class), eq(null), eq(TOPIC_NAME)); + verify(mockLease, times(3)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME)); verify(mockLease, times(1)).complete(); verify(mockLease, times(0)).poison(); verify(mockLease, times(1)).close(); @@ -137,7 +138,7 @@ public class TestPublishKafkaRecord_0_10 { runner.run(); runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_FAILURE, 1); - verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordWriter.class), eq(null), eq(TOPIC_NAME)); + verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME)); verify(mockLease, times(1)).complete(); verify(mockLease, times(1)).close(); } @@ -154,7 +155,7 @@ public class TestPublishKafkaRecord_0_10 { runner.run(); runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_FAILURE, 3); - verify(mockLease, times(3)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordWriter.class), eq(null), eq(TOPIC_NAME)); + verify(mockLease, times(3)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME)); verify(mockLease, times(1)).complete(); verify(mockLease, times(1)).close(); } @@ -176,7 +177,7 @@ public class TestPublishKafkaRecord_0_10 { runner.run(); runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_SUCCESS, 2); - verify(mockLease, times(2)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordWriter.class), eq(null), eq(TOPIC_NAME)); + verify(mockLease, times(2)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME)); verify(mockLease, times(4)).publish(any(FlowFile.class), eq(null), any(byte[].class), eq(TOPIC_NAME), any(InFlightMessageTracker.class)); verify(mockLease, times(1)).complete(); verify(mockLease, times(0)).poison(); @@ -206,7 +207,7 @@ public class TestPublishKafkaRecord_0_10 { runner.run(); runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_SUCCESS, 1); - verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordWriter.class), eq(null), eq(TOPIC_NAME)); + verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME)); verify(mockLease, times(1)).complete(); verify(mockLease, times(0)).poison(); verify(mockLease, times(1)).close(); @@ -240,7 +241,7 @@ public class TestPublishKafkaRecord_0_10 { runner.assertTransferCount(PublishKafkaRecord_0_10.REL_SUCCESS, 2); runner.assertTransferCount(PublishKafkaRecord_0_10.REL_FAILURE, 2); - verify(mockLease, times(4)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordWriter.class), eq(null), eq(TOPIC_NAME)); + verify(mockLease, times(4)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME)); verify(mockLease, times(1)).complete(); verify(mockLease, times(1)).close(); http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java index a1abda4..27df57b 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java @@ -58,10 +58,10 @@ public class MockRecordWriter extends AbstractControllerService implements Recor } @Override - public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema) { + public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final FlowFile flowFile, final OutputStream out) { return new RecordSetWriter() { @Override - public WriteResult write(final RecordSet rs, final OutputStream out) throws IOException { + public WriteResult write(final RecordSet rs) throws IOException { out.write(header.getBytes()); out.write("\n".getBytes()); @@ -102,7 +102,20 @@ public class MockRecordWriter extends AbstractControllerService implements Recor } @Override - public WriteResult write(Record record, OutputStream out) throws IOException { + public WriteResult write(Record record) throws IOException { + return null; + } + + @Override + public void close() throws IOException { + } + + @Override + public void beginRecordSet() throws IOException { + } + + @Override + public WriteResult finishRecordSet() throws IOException { return null; } }; http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java index 5b4819a..6ecfa59 100644 --- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java @@ -217,11 +217,11 @@ public class FetchParquetTest { configure(proc); final RecordSetWriter recordSetWriter = Mockito.mock(RecordSetWriter.class); - when(recordSetWriter.write(any(RecordSet.class), any(OutputStream.class))).thenThrow(new IOException("IOException")); + when(recordSetWriter.write(any(RecordSet.class))).thenThrow(new IOException("IOException")); final RecordSetWriterFactory recordSetWriterFactory = Mockito.mock(RecordSetWriterFactory.class); when(recordSetWriterFactory.getIdentifier()).thenReturn("mock-writer-factory"); - when(recordSetWriterFactory.createWriter(any(ComponentLog.class), any(RecordSchema.class))).thenReturn(recordSetWriter); + when(recordSetWriterFactory.createWriter(any(ComponentLog.class), any(RecordSchema.class), any(FlowFile.class), any(OutputStream.class))).thenReturn(recordSetWriter); testRunner.addControllerService("mock-writer-factory", recordSetWriterFactory); testRunner.enableControllerService(recordSetWriterFactory); http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaValidator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaValidator.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaValidator.java index 32b700f..904aab6 100644 --- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaValidator.java +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaValidator.java @@ -18,9 +18,11 @@ package org.apache.nifi.schemaregistry.services; import org.apache.avro.Schema; +import org.apache.nifi.avro.AvroTypeUtil; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.Validator; +import org.apache.nifi.serialization.record.SchemaIdentifier; public class AvroSchemaValidator implements Validator { @@ -36,7 +38,8 @@ public class AvroSchemaValidator implements Validator { } try { - new Schema.Parser().parse(input); + final Schema avroSchema = new Schema.Parser().parse(input); + AvroTypeUtil.createSchema(avroSchema, input, SchemaIdentifier.EMPTY); return new ValidationResult.Builder() .input(input) http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java index 1dde047..b18e9de 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java @@ -34,6 +34,7 @@ import javax.script.Invocable; import javax.script.ScriptException; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.lang.reflect.UndeclaredThrowableException; import java.util.Collection; import java.util.HashSet; @@ -52,15 +53,12 @@ public class ScriptedRecordSetWriter extends AbstractScriptedRecordFactory<Recor super.onEnabled(context); } - public RecordSetWriter createWriter(ComponentLog logger, FlowFile flowFile, InputStream in) throws SchemaNotFoundException, IOException { - return createWriter(logger, getSchema(flowFile, in)); - } @Override - public RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema) throws SchemaNotFoundException, IOException { + public RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, FlowFile flowFile, OutputStream out) throws SchemaNotFoundException, IOException { if (recordFactory.get() != null) { try { - return recordFactory.get().createWriter(logger, schema); + return recordFactory.get().createWriter(logger, schema, flowFile, out); } catch (UndeclaredThrowableException ute) { throw new IOException(ute.getCause()); } @@ -131,6 +129,7 @@ public class ScriptedRecordSetWriter extends AbstractScriptedRecordFactory<Recor } } catch (final Exception ex) { + ex.printStackTrace(); final ComponentLog logger = getLogger(); final String message = "Unable to load script: " + ex.getLocalizedMessage(); http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedRecordSetWriterTest.groovy ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedRecordSetWriterTest.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedRecordSetWriterTest.groovy index 2e1c03d..96fda19 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedRecordSetWriterTest.groovy +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedRecordSetWriterTest.groovy @@ -104,7 +104,9 @@ class ScriptedRecordSetWriterTest { InputStream inStream = new ByteArrayInputStream('Flow file content not used'.bytes) def schema = recordSetWriterFactory.getSchema(mockFlowFile, inStream) - RecordSetWriter recordSetWriter = recordSetWriterFactory.createWriter(logger, schema) + + ByteArrayOutputStream outputStream = new ByteArrayOutputStream() + RecordSetWriter recordSetWriter = recordSetWriterFactory.createWriter(logger, schema, mockFlowFile, outputStream) assertNotNull(recordSetWriter) def recordSchema = new SimpleRecordSchema( @@ -119,8 +121,7 @@ class ScriptedRecordSetWriterTest { new MapRecord(recordSchema, ['id': 3, 'name': 'Ramon', 'code': 300]) ] as MapRecord[] - ByteArrayOutputStream outputStream = new ByteArrayOutputStream() - recordSetWriter.write(RecordSet.of(recordSchema, records), outputStream) + recordSetWriter.write(RecordSet.of(recordSchema, records)) def xml = new XmlSlurper().parseText(outputStream.toString()) assertEquals('1', xml.record[0].id.toString()) http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy index 4fae4fe..b0daaca 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy @@ -34,9 +34,15 @@ import org.apache.nifi.stream.io.NonCloseableOutputStream class GroovyRecordSetWriter implements RecordSetWriter { - + private int recordCount = 0; + private final OutputStream out; + + public GroovyRecordSetWriter(final OutputStream out) { + this.out = out; + } + @Override - WriteResult write(Record r, OutputStream out) throws IOException { + WriteResult write(Record r) throws IOException { new OutputStreamWriter(new NonCloseableOutputStream(out)).with {osw -> new MarkupBuilder(osw).record { r.schema.fieldNames.each {fieldName -> @@ -44,7 +50,9 @@ class GroovyRecordSetWriter implements RecordSetWriter { } } } - WriteResult.of(0, [:]) + + recordCount++; + WriteResult.of(1, [:]) } @Override @@ -53,10 +61,10 @@ class GroovyRecordSetWriter implements RecordSetWriter { } @Override - WriteResult write(final RecordSet rs, final OutputStream rawOut) throws IOException { + WriteResult write(final RecordSet rs) throws IOException { int count = 0 - new OutputStreamWriter(new NonCloseableOutputStream(rawOut)).with {osw -> + new OutputStreamWriter(new NonCloseableOutputStream(out)).with {osw -> new MarkupBuilder(osw).recordSet { Record r @@ -73,6 +81,18 @@ class GroovyRecordSetWriter implements RecordSetWriter { } WriteResult.of(count, [:]) } + + public void beginRecordSet() throws IOException { + } + + @Override + public WriteResult finishRecordSet() throws IOException { + return WriteResult.of(recordCount, [:]); + } + + @Override + public void close() throws IOException { + } } class GroovyRecordSetWriterFactory extends AbstractControllerService implements RecordSetWriterFactory { @@ -83,9 +103,10 @@ class GroovyRecordSetWriterFactory extends AbstractControllerService implements } @Override - RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema) throws SchemaNotFoundException, IOException { - return new GroovyRecordSetWriter() + RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, FlowFile flowFile, OutputStream out) throws SchemaNotFoundException, IOException { + return new GroovyRecordSetWriter(out) } + } writer = new GroovyRecordSetWriterFactory() http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml index a86c836..a0932e0 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml @@ -42,6 +42,10 @@ </dependency> <dependency> <groupId>org.apache.nifi</groupId> + <artifactId>nifi-lookup-service-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> <artifactId>nifi-flowfile-packager</artifactId> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java index b6cc83b..52fcbb8 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java @@ -27,7 +27,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; @@ -101,26 +101,27 @@ public abstract class AbstractRecordProcessor extends AbstractProcessor { final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); - final RecordSetWriter writer; final RecordSchema writeSchema; try (final InputStream rawIn = session.read(flowFile); final InputStream in = new BufferedInputStream(rawIn)) { writeSchema = writerFactory.getSchema(flowFile, in); - writer = writerFactory.createWriter(getLogger(), writeSchema); } catch (final Exception e) { - getLogger().error("Failed to convert records for {}; will route to failure", new Object[] {flowFile, e}); + getLogger().error("Failed to process records for {}; will route to failure", new Object[] {flowFile, e}); session.transfer(flowFile, REL_FAILURE); return; } - final AtomicReference<WriteResult> writeResultRef = new AtomicReference<>(); + final Map<String, String> attributes = new HashMap<>(); + final AtomicInteger recordCount = new AtomicInteger(); final FlowFile original = flowFile; try { flowFile = session.write(flowFile, new StreamCallback() { @Override public void process(final InputStream in, final OutputStream out) throws IOException { + try (final RecordReader reader = readerFactory.createRecordReader(original, in, getLogger())) { + final RecordSetWriter writer = writerFactory.createWriter(getLogger(), writeSchema, original, out); final RecordSet recordSet = new RecordSet() { @Override @@ -151,8 +152,11 @@ public abstract class AbstractRecordProcessor extends AbstractProcessor { } }; - final WriteResult writeResult = writer.write(recordSet, out); - writeResultRef.set(writeResult); + final WriteResult writeResult = writer.write(recordSet); + attributes.put("record.count", String.valueOf(writeResult.getRecordCount())); + attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType()); + attributes.putAll(writeResult.getAttributes()); + recordCount.set(writeResult.getRecordCount()); } catch (final SchemaNotFoundException | MalformedRecordException e) { throw new ProcessException("Could not parse incoming data", e); @@ -160,22 +164,17 @@ public abstract class AbstractRecordProcessor extends AbstractProcessor { } }); } catch (final Exception e) { - getLogger().error("Failed to convert {}", new Object[] {flowFile, e}); + getLogger().error("Failed to process {}", new Object[] {flowFile, e}); session.transfer(flowFile, REL_FAILURE); return; } - final WriteResult writeResult = writeResultRef.get(); - - final Map<String, String> attributes = new HashMap<>(); - attributes.put("record.count", String.valueOf(writeResult.getRecordCount())); - attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType()); - attributes.putAll(writeResult.getAttributes()); - flowFile = session.putAllAttributes(flowFile, attributes); session.transfer(flowFile, REL_SUCCESS); - session.adjustCounter("Records Processed", writeResult.getRecordCount(), false); - getLogger().info("Successfully converted {} records for {}", new Object[] {writeResult.getRecordCount(), flowFile}); + + final int count = recordCount.get(); + session.adjustCounter("Records Processed", count, false); + getLogger().info("Successfully converted {} records for {}", new Object[] {count, flowFile}); } protected abstract Record process(Record record, RecordSchema writeSchema, FlowFile flowFile, ProcessContext context); http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRouteRecord.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRouteRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRouteRecord.java new file mode 100644 index 0000000..955023f --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRouteRecord.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.util.Tuple; + +public abstract class AbstractRouteRecord<T> extends AbstractProcessor { + static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder() + .name("record-reader") + .displayName("Record Reader") + .description("Specifies the Controller Service to use for reading incoming data") + .identifiesControllerService(RecordReaderFactory.class) + .required(true) + .build(); + static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder() + .name("record-writer") + .displayName("Record Writer") + .description("Specifies the Controller Service to use for writing out the records") + .identifiesControllerService(RecordSetWriterFactory.class) + .required(true) + .build(); + + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("If a FlowFile cannot be transformed from the configured input format to the configured output format, " + + "the unchanged FlowFile will be routed to this relationship") + .build(); + static final Relationship REL_ORIGINAL = new Relationship.Builder() + .name("original") + .description("Once a FlowFile has been processed and any derivative FlowFiles have been transferred, the original FlowFile will be transferred to this relationship.") + .build(); + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(RECORD_READER); + properties.add(RECORD_WRITER); + return properties; + } + + @Override + public Set<Relationship> getRelationships() { + final Set<Relationship> relationships = new HashSet<>(); + if (isRouteOriginal()) { + relationships.add(REL_ORIGINAL); + } + + relationships.add(REL_FAILURE); + return relationships; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final T flowFileContext = getFlowFileContext(flowFile, context); + + final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); + final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); + final RecordSchema writeSchema; + try (final InputStream rawIn = session.read(flowFile); + final InputStream in = new BufferedInputStream(rawIn)) { + writeSchema = writerFactory.getSchema(flowFile, in); + } catch (final Exception e) { + getLogger().error("Failed to process records for {}; will route to failure", new Object[] {flowFile, e}); + session.transfer(flowFile, REL_FAILURE); + return; + } + + final AtomicInteger numRecords = new AtomicInteger(0); + final Map<Relationship, Tuple<FlowFile, RecordSetWriter>> writers = new HashMap<>(); + final FlowFile original = flowFile; + try { + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { + try (final RecordReader reader = readerFactory.createRecordReader(original, in, getLogger())) { + + Record record; + while ((record = reader.nextRecord()) != null) { + final Set<Relationship> relationships = route(record, writeSchema, original, context, flowFileContext); + numRecords.incrementAndGet(); + + for (final Relationship relationship : relationships) { + final RecordSetWriter recordSetWriter; + Tuple<FlowFile, RecordSetWriter> tuple = writers.get(relationship); + if (tuple == null) { + FlowFile outFlowFile = session.create(original); + final OutputStream out = session.write(outFlowFile); + recordSetWriter = writerFactory.createWriter(getLogger(), writeSchema, original, out); + recordSetWriter.beginRecordSet(); + + tuple = new Tuple<>(outFlowFile, recordSetWriter); + writers.put(relationship, tuple); + } else { + recordSetWriter = tuple.getValue(); + } + + recordSetWriter.write(record); + } + } + } catch (final SchemaNotFoundException | MalformedRecordException e) { + throw new ProcessException("Could not parse incoming data", e); + } + } + }); + + for (final Map.Entry<Relationship, Tuple<FlowFile, RecordSetWriter>> entry : writers.entrySet()) { + final Relationship relationship = entry.getKey(); + final Tuple<FlowFile, RecordSetWriter> tuple = entry.getValue(); + final RecordSetWriter writer = tuple.getValue(); + FlowFile childFlowFile = tuple.getKey(); + + final WriteResult writeResult = writer.finishRecordSet(); + + try { + writer.close(); + } catch (final IOException ioe) { + getLogger().warn("Failed to close Writer for {}", new Object[] {childFlowFile}); + } + + final Map<String, String> attributes = new HashMap<>(); + attributes.put("record.count", String.valueOf(writeResult.getRecordCount())); + attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType()); + attributes.putAll(writeResult.getAttributes()); + + childFlowFile = session.putAllAttributes(childFlowFile, attributes); + session.transfer(childFlowFile, relationship); + session.adjustCounter("Records Processed", writeResult.getRecordCount(), false); + session.adjustCounter("Records Routed to " + relationship.getName(), writeResult.getRecordCount(), false); + + session.getProvenanceReporter().route(childFlowFile, relationship); + } + } catch (final Exception e) { + getLogger().error("Failed to process {}", new Object[] {flowFile, e}); + + for (final Tuple<FlowFile, RecordSetWriter> tuple : writers.values()) { + try { + tuple.getValue().close(); + } catch (final Exception e1) { + getLogger().warn("Failed to close Writer for {}; some resources may not be cleaned up appropriately", new Object[] {tuple.getKey()}); + } + + session.remove(tuple.getKey()); + } + + session.transfer(flowFile, REL_FAILURE); + return; + } finally { + for (final Tuple<FlowFile, RecordSetWriter> tuple : writers.values()) { + final RecordSetWriter writer = tuple.getValue(); + try { + writer.close(); + } catch (final Exception e) { + getLogger().warn("Failed to close Record Writer for {}; some resources may not be properly cleaned up", new Object[] {tuple.getKey(), e}); + } + } + } + + if (isRouteOriginal()) { + flowFile = session.putAttribute(flowFile, "record.count", String.valueOf(numRecords)); + session.transfer(flowFile, REL_ORIGINAL); + } else { + session.remove(flowFile); + } + + getLogger().info("Successfully processed {}, creating {} derivative FlowFiles and processing {} records", new Object[] {flowFile, writers.size(), numRecords}); + } + + protected abstract Set<Relationship> route(Record record, RecordSchema writeSchema, FlowFile flowFile, ProcessContext context, T flowFileContext); + + protected abstract boolean isRouteOriginal(); + + protected abstract T getFlowFileContext(FlowFile flowFile, ProcessContext context); +}