This is an automated email from the ASF dual-hosted git repository. mattyb149 pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new dbf92b9 NIFI-5829 Create Lookup Controller Services for RecordSetWriter and RecordReader dbf92b9 is described below commit dbf92b9a728c9b739d79d98bbe56ec6ce3681999 Author: Peter Wicks <patric...@gmail.com> AuthorDate: Fri Jan 24 16:49:10 2020 +0000 NIFI-5829 Create Lookup Controller Services for RecordSetWriter and RecordReader Signed-off-by: Matthew Burgess <mattyb...@apache.org> This closes #3188 --- .../java/org/apache/nifi/lookup/ReaderLookup.java | 158 ++++++++++++++ .../apache/nifi/lookup/RecordSetWriterLookup.java | 163 ++++++++++++++ .../org.apache.nifi.controller.ControllerService | 3 + .../org/apache/nifi/lookup/TestReaderLookup.java | 180 ++++++++++++++++ .../nifi/lookup/TestRecordSetWriterLookup.java | 234 +++++++++++++++++++++ .../serialization/TestRecordReaderProcessor.java | 46 ++++ .../TestRecordSetWriterProcessor.java | 46 ++++ 7 files changed, 830 insertions(+) diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/lookup/ReaderLookup.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/lookup/ReaderLookup.java new file mode 100644 index 0000000..fa43f82 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/lookup/ReaderLookup.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.lookup; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@Tags({"lookup", "parse", "record", "row", "reader"}) +@SeeAlso({RecordSetWriterLookup.class}) +@CapabilityDescription("Provides a RecordReaderFactory that can be used to dynamically select another RecordReaderFactory. This service " + + "requires an variable named 'recordreader.name' to be passed in when asking for a record record, and will throw an exception " + + "if the variable is missing. The value of 'recordreader.name' will be used to select the RecordReaderFactory that has been " + + "registered with that name. This will allow multiple RecordReaderFactory's to be defined and registered, and then selected " + + "dynamically at runtime by tagging flow files with the appropriate 'recordreader.name' variable.") +@DynamicProperty(name = "Name of the RecordReader", value = "A RecordReaderFactory controller service", expressionLanguageScope = ExpressionLanguageScope.NONE, + description = "") +public class ReaderLookup extends AbstractControllerService implements RecordReaderFactory { + + public static final String RECORDREADER_NAME_VARIABLE = "recordreader.name"; + + private volatile Map<String, RecordReaderFactory> recordReaderFactoryMap; + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .description("The RecordReaderFactory to return when recordreader.name = '" + propertyDescriptorName + "'") + .identifiesControllerService(RecordReaderFactory.class) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + } + + @Override + protected Collection<ValidationResult> customValidate(ValidationContext context) { + final List<ValidationResult> results = new ArrayList<>(); + + int numDefinedServices = 0; + for (final PropertyDescriptor descriptor : context.getProperties().keySet()) { + if (descriptor.isDynamic()) { + numDefinedServices++; + } + + final String referencedId = context.getProperty(descriptor).getValue(); + if (this.getIdentifier().equals(referencedId)) { + results.add(new ValidationResult.Builder() + .subject(descriptor.getDisplayName()) + .explanation("the current service cannot be registered as a RecordReaderFactory to lookup") + .valid(false) + .build()); + } + } + + if (numDefinedServices == 0) { + results.add(new ValidationResult.Builder() + .subject(this.getClass().getSimpleName()) + .explanation("at least one RecordReaderFactory must be defined via dynamic properties") + .valid(false) + .build()); + } + + return results; + } + + @OnEnabled + public void onEnabled(final ConfigurationContext context) { + final Map<String,RecordReaderFactory> serviceMap = new HashMap<>(); + + for (final PropertyDescriptor descriptor : context.getProperties().keySet()) { + if (descriptor.isDynamic()) { + final RecordReaderFactory recordReaderFactory = context.getProperty(descriptor).asControllerService(RecordReaderFactory.class); + serviceMap.put(descriptor.getName(), recordReaderFactory); + } + } + + recordReaderFactoryMap = Collections.unmodifiableMap(serviceMap); + } + + @OnDisabled + public void onDisabled() { + recordReaderFactoryMap = null; + } + + + @Override + public RecordReader createRecordReader(FlowFile flowFile, InputStream in, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException { + if(flowFile == null) { + throw new UnsupportedOperationException("Cannot lookup a RecordReaderFactory without variables."); + } + + return createRecordReader(flowFile.getAttributes(), in, flowFile.getSize(), logger); + } + + @Override + public RecordReader createRecordReader(Map<String, String> variables, InputStream in, long inputLength, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException { + if(variables == null) { + throw new UnsupportedOperationException("Cannot lookup a RecordReaderFactory without variables."); + } + + if (!variables.containsKey(RECORDREADER_NAME_VARIABLE)) { + throw new ProcessException("Variables must contain a variables name '" + RECORDREADER_NAME_VARIABLE + "'"); + } + + final String recordReaderName = variables.get(RECORDREADER_NAME_VARIABLE); + if (StringUtils.isBlank(recordReaderName)) { + throw new ProcessException(RECORDREADER_NAME_VARIABLE + " cannot be null or blank"); + } + + final RecordReaderFactory recordReaderFactory = recordReaderFactoryMap.get(recordReaderName); + if (recordReaderFactory == null) { + throw new ProcessException("No RecordReaderFactory was found for " + RECORDREADER_NAME_VARIABLE + + "'" + recordReaderName + "'"); + } + + return recordReaderFactory.createRecordReader(variables, in, inputLength, logger); + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/lookup/RecordSetWriterLookup.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/lookup/RecordSetWriterLookup.java new file mode 100644 index 0000000..d69d887 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/lookup/RecordSetWriterLookup.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.lookup; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.RecordSchema; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@Tags({"lookup", "result", "set", "writer", "serializer", "record", "recordset", "row"}) +@SeeAlso({ReaderLookup.class}) +@CapabilityDescription("Provides a RecordSetWriterFactory that can be used to dynamically select another RecordSetWriterFactory. This service " + + "requires a variable named 'recordsetwriter.name' to be passed in when asking for a schema or record set writer, and will throw an exception " + + "if the variable is missing. The value of 'recordsetwriter.name' will be used to select the RecordSetWriterFactory that has been " + + "registered with that name. This will allow multiple RecordSetWriterFactory's to be defined and registered, and then selected " + + "dynamically at runtime by tagging flow files with the appropriate 'recordsetwriter.name' variable.") +@DynamicProperty(name = "Name of the RecordSetWriter", value = "A RecordSetWriterFactory controller service", expressionLanguageScope = ExpressionLanguageScope.NONE, + description = "") +public class RecordSetWriterLookup extends AbstractControllerService implements RecordSetWriterFactory { + + public static final String RECORDWRITER_NAME_VARIABLE = "recordsetwriter.name"; + + private volatile Map<String,RecordSetWriterFactory> recordSetWriterFactoryMap; + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .description("The RecordSetWriterFactory to return when recordwriter.name = '" + propertyDescriptorName + "'") + .identifiesControllerService(RecordSetWriterFactory.class) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + } + + @Override + protected Collection<ValidationResult> customValidate(ValidationContext context) { + final List<ValidationResult> results = new ArrayList<>(); + + int numDefinedServices = 0; + for (final PropertyDescriptor descriptor : context.getProperties().keySet()) { + if (descriptor.isDynamic()) { + numDefinedServices++; + } + + final String referencedId = context.getProperty(descriptor).getValue(); + if (this.getIdentifier().equals(referencedId)) { + results.add(new ValidationResult.Builder() + .subject(descriptor.getDisplayName()) + .explanation("the current service cannot be registered as a RecordSetWriterFactory to lookup") + .valid(false) + .build()); + } + } + + if (numDefinedServices == 0) { + results.add(new ValidationResult.Builder() + .subject(this.getClass().getSimpleName()) + .explanation("at least one RecordSetWriterFactory must be defined via dynamic properties") + .valid(false) + .build()); + } + + return results; + } + + @OnEnabled + public void onEnabled(final ConfigurationContext context) { + final Map<String,RecordSetWriterFactory> serviceMap = new HashMap<>(); + + for (final PropertyDescriptor descriptor : context.getProperties().keySet()) { + if (descriptor.isDynamic()) { + final RecordSetWriterFactory recordSetWriterFactory = context.getProperty(descriptor).asControllerService(RecordSetWriterFactory.class); + serviceMap.put(descriptor.getName(), recordSetWriterFactory); + } + } + + recordSetWriterFactoryMap = Collections.unmodifiableMap(serviceMap); + } + + @OnDisabled + public void onDisabled() { + recordSetWriterFactoryMap = null; + } + + + @Override + public RecordSchema getSchema(Map<String, String> variables, RecordSchema readSchema) throws SchemaNotFoundException, IOException { + return getRecordSetWriterFactory(variables).getSchema(variables, readSchema); + } + + @Override + public RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, OutputStream out) { + throw new UnsupportedOperationException("Cannot lookup RecordSetWriterFactory without variables"); + } + + @Override + public RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, OutputStream out, Map<String, String> variables) throws SchemaNotFoundException, IOException { + return getRecordSetWriterFactory(variables).createWriter(logger, schema, out, variables); + } + + private RecordSetWriterFactory getRecordSetWriterFactory(Map<String, String> variables){ + if (variables == null) { + throw new UnsupportedOperationException("Cannot lookup RecordSetWriterFactory without variables"); + } + + if (!variables.containsKey(RECORDWRITER_NAME_VARIABLE)) { + throw new ProcessException("Attributes must contain an variables name '" + RECORDWRITER_NAME_VARIABLE + "'"); + } + + final String recordSetWriterName = variables.get(RECORDWRITER_NAME_VARIABLE); + if (StringUtils.isBlank(recordSetWriterName)) { + throw new ProcessException(RECORDWRITER_NAME_VARIABLE + " cannot be null or blank"); + } + + final RecordSetWriterFactory recordSetWriterFactory = recordSetWriterFactoryMap.get(recordSetWriterName); + if (recordSetWriterFactory == null) { + throw new ProcessException("No RecordSetWriterFactory was found for " + RECORDWRITER_NAME_VARIABLE + + "'" + recordSetWriterName + "'"); + } + + return recordSetWriterFactory; + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService index 39a59aa..18bab05 100755 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -20,6 +20,9 @@ org.apache.nifi.json.JsonTreeReader org.apache.nifi.json.JsonPathReader org.apache.nifi.json.JsonRecordSetWriter +org.apache.nifi.lookup.RecordSetWriterLookup +org.apache.nifi.lookup.ReaderLookup + org.apache.nifi.csv.CSVReader org.apache.nifi.csv.CSVRecordSetWriter diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/lookup/TestReaderLookup.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/lookup/TestReaderLookup.java new file mode 100644 index 0000000..bc6366d --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/lookup/TestReaderLookup.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.lookup; + +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.TestRecordReaderProcessor; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.io.InputStream; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +public class TestReaderLookup { + + private MockRecordReaderFactory recordReaderA; + private MockRecordReaderFactory recordReaderB; + + private ReaderLookup readerLookup; + private TestRunner runner; + + @Before + public void setup() throws InitializationException { + recordReaderA = new MockRecordReaderFactory("A"); + recordReaderB = new MockRecordReaderFactory("B"); + + readerLookup = new ReaderLookup(); + runner = TestRunners.newTestRunner(TestRecordReaderProcessor.class); + + final String rrServiceAIdentifier = "rr-A"; + runner.addControllerService(rrServiceAIdentifier, recordReaderA); + + final String rrServiceBIdentifier = "rr-B"; + runner.addControllerService(rrServiceBIdentifier, recordReaderB); + + runner.addControllerService("rr-lookup", readerLookup); + runner.setProperty(readerLookup, "A", rrServiceAIdentifier); + runner.setProperty(readerLookup, "B", rrServiceBIdentifier); + + runner.enableControllerService(recordReaderA); + runner.enableControllerService(recordReaderB); + runner.enableControllerService(readerLookup); + } + + @Test + public void testLookupServiceByName() throws SchemaNotFoundException, MalformedRecordException, IOException { + final Map<String,String> attributes = new HashMap<>(); + attributes.put(ReaderLookup.RECORDREADER_NAME_VARIABLE, "A"); + + MockRecordReader recordReader = (MockRecordReader) readerLookup.createRecordReader(attributes, null, -1, null); + assertNotNull(recordReader); + assertEquals(recordReaderA.name, recordReader.name); + + attributes.put(ReaderLookup.RECORDREADER_NAME_VARIABLE, "B"); + + recordReader = (MockRecordReader) readerLookup.createRecordReader(attributes, null, -1, null); + assertNotNull(recordReader); + assertEquals(recordReaderB.name, recordReader.name); + } + + @Test(expected = UnsupportedOperationException.class) + public void testLookupWithoutAttributes() throws SchemaNotFoundException, MalformedRecordException, IOException { + Map<String,String> attributes = null; + readerLookup.createRecordReader(attributes, null, -1, null); + } + + @Test(expected = UnsupportedOperationException.class) + public void testLookupWithoutFlowFile() throws SchemaNotFoundException, MalformedRecordException, IOException { + FlowFile flowFile = null; + readerLookup.createRecordReader(flowFile, null, null); + } + + @Test(expected = ProcessException.class) + public void testLookupMissingNameAttribute() throws SchemaNotFoundException, MalformedRecordException, IOException { + final Map<String,String> attributes = new HashMap<>(); + readerLookup.createRecordReader(attributes, null, -1, null); + } + + @Test(expected = ProcessException.class) + public void testLookupWithNameThatDoesNotExist() throws SchemaNotFoundException, MalformedRecordException, IOException { + final Map<String,String> attributes = new HashMap<>(); + attributes.put(ReaderLookup.RECORDREADER_NAME_VARIABLE, "DOES-NOT-EXIST"); + readerLookup.createRecordReader(attributes, null, -1, null); + } + + @Test + public void testCustomValidateAtLeaseOneServiceDefined() throws InitializationException { + // enable lookup service with no services registered, verify not valid + runner = TestRunners.newTestRunner(TestRecordReaderProcessor.class); + runner.addControllerService("rr-lookup", readerLookup); + runner.assertNotValid(readerLookup); + + final String rrServiceAIdentifier = "rr-A"; + runner.addControllerService(rrServiceAIdentifier, recordReaderA); + + // register a service and now verify valid + runner.setProperty(readerLookup, "A", rrServiceAIdentifier); + runner.enableControllerService(readerLookup); + runner.assertValid(readerLookup); + } + + @Test + public void testCustomValidateSelfReferenceNotAllowed() throws InitializationException { + runner = TestRunners.newTestRunner(TestRecordReaderProcessor.class); + runner.addControllerService("rr-lookup", readerLookup); + runner.setProperty(readerLookup, "lookup", "lookup"); + runner.assertNotValid(readerLookup); + } + + /** + * A mock RecordReaderFactory that has a name for tracking purposes. + */ + private static class MockRecordReaderFactory extends AbstractControllerService implements RecordReaderFactory { + + private String name; + + public MockRecordReaderFactory(String name) { + this.name = name; + } + + @Override + public RecordReader createRecordReader(Map<String, String> variables, InputStream in, long inputLength, ComponentLog logger) + throws MalformedRecordException, IOException, SchemaNotFoundException { + return new MockRecordReader(this.name); + } + } + + private static class MockRecordReader implements RecordReader { + public String name; + + public MockRecordReader(String name) { + this.name = name; + } + + @Override + public Record nextRecord(boolean coerceTypes, boolean dropUnknownFields) throws IOException, MalformedRecordException { + return null; + } + + @Override + public RecordSchema getSchema() throws MalformedRecordException { + return null; + } + + @Override + public void close() throws IOException { + + } + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/lookup/TestRecordSetWriterLookup.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/lookup/TestRecordSetWriterLookup.java new file mode 100644 index 0000000..4154800 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/lookup/TestRecordSetWriterLookup.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.lookup; + +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.schema.access.SchemaNotFoundException;; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.TestRecordSetWriterProcessor; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; +import org.apache.nifi.serialization.record.SchemaIdentifier; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +public class TestRecordSetWriterLookup { + + private MockRecordSetWriterFactory recordSetWriterA; + private MockRecordSetWriterFactory recordSetWriterB; + + private RecordSetWriterLookup recordSetWriterLookup; + private TestRunner runner; + + @Before + public void setup() throws InitializationException { + recordSetWriterA = new MockRecordSetWriterFactory("A"); + recordSetWriterB = new MockRecordSetWriterFactory("B"); + + recordSetWriterLookup = new RecordSetWriterLookup(); + runner = TestRunners.newTestRunner(TestRecordSetWriterProcessor.class); + + final String rrServiceAIdentifier = "rr-A"; + runner.addControllerService(rrServiceAIdentifier, recordSetWriterA); + + final String rrServiceBIdentifier = "rr-B"; + runner.addControllerService(rrServiceBIdentifier, recordSetWriterB); + + runner.addControllerService("rr-lookup", recordSetWriterLookup); + runner.setProperty(recordSetWriterLookup, "A", rrServiceAIdentifier); + runner.setProperty(recordSetWriterLookup, "B", rrServiceBIdentifier); + + runner.enableControllerService(recordSetWriterA); + runner.enableControllerService(recordSetWriterB); + runner.enableControllerService(recordSetWriterLookup); + } + + @Test + public void testLookupServiceByName() throws SchemaNotFoundException, IOException { + final Map<String,String> attributes = new HashMap<>(); + attributes.put(RecordSetWriterLookup.RECORDWRITER_NAME_VARIABLE, "A"); + + RecordSchema recordSchema = recordSetWriterLookup.getSchema(attributes, null); + assertNotNull(recordSchema); + assertEquals(recordSetWriterA.name, recordSchema.getIdentifier().getName().get()); + + MockRecordSetWriter writer = (MockRecordSetWriter) recordSetWriterLookup.createWriter(null, null, null, attributes); + assertNotNull(writer); + assertEquals(recordSetWriterA.name, writer.name); + + attributes.put(RecordSetWriterLookup.RECORDWRITER_NAME_VARIABLE, "B"); + + recordSchema = recordSetWriterLookup.getSchema(attributes, null); + assertNotNull(recordSchema); + assertEquals(recordSetWriterB.name, recordSchema.getIdentifier().getName().get()); + + writer = (MockRecordSetWriter) recordSetWriterLookup.createWriter(null, null, null, attributes); + assertNotNull(writer); + assertEquals(recordSetWriterB.name, writer.name); + } + + @Test(expected = UnsupportedOperationException.class) + public void testLookupWithoutAttributes() throws SchemaNotFoundException, IOException { + Map<String,String> attributes = null; + recordSetWriterLookup.createWriter(null, null, null, attributes); + } + + @Test(expected = UnsupportedOperationException.class) + public void testLookupSchemaWithoutAttributes() throws SchemaNotFoundException, IOException { + Map<String,String> attributes = null; + recordSetWriterLookup.getSchema(attributes, null); + } + + @Test(expected = ProcessException.class) + public void testLookupMissingNameAttribute() throws SchemaNotFoundException, IOException { + final Map<String,String> attributes = new HashMap<>(); + recordSetWriterLookup.createWriter(null, null, null, attributes); + } + + @Test(expected = ProcessException.class) + public void testLookupSchemaMissingNameAttribute() throws SchemaNotFoundException, IOException { + final Map<String,String> attributes = new HashMap<>(); + recordSetWriterLookup.getSchema(attributes, null); + } + + @Test(expected = ProcessException.class) + public void testLookupWithNameThatDoesNotExist() throws SchemaNotFoundException, IOException { + final Map<String,String> attributes = new HashMap<>(); + attributes.put(RecordSetWriterLookup.RECORDWRITER_NAME_VARIABLE, "DOES-NOT-EXIST"); + recordSetWriterLookup.createWriter(null, null, null, attributes); + } + + @Test(expected = ProcessException.class) + public void testLookupSchemaWithNameThatDoesNotExist() throws SchemaNotFoundException, IOException { + final Map<String,String> attributes = new HashMap<>(); + attributes.put(RecordSetWriterLookup.RECORDWRITER_NAME_VARIABLE, "DOES-NOT-EXIST"); + recordSetWriterLookup.getSchema(attributes, null); + } + + @Test + public void testCustomValidateAtLeaseOneServiceDefined() throws InitializationException { + // enable lookup service with no services registered, verify not valid + runner = TestRunners.newTestRunner(TestRecordSetWriterProcessor.class); + runner.addControllerService("rr-lookup", recordSetWriterLookup); + runner.assertNotValid(recordSetWriterLookup); + + final String rrServiceAIdentifier = "rr-A"; + runner.addControllerService(rrServiceAIdentifier, recordSetWriterA); + + // register a service and now verify valid + runner.setProperty(recordSetWriterLookup, "A", rrServiceAIdentifier); + runner.enableControllerService(recordSetWriterLookup); + runner.assertValid(recordSetWriterLookup); + } + + @Test + public void testCustomValidateSelfReferenceNotAllowed() throws InitializationException { + runner = TestRunners.newTestRunner(TestRecordSetWriterProcessor.class); + runner.addControllerService("rr-lookup", recordSetWriterLookup); + runner.setProperty(recordSetWriterLookup, "lookup", "lookup"); + runner.assertNotValid(recordSetWriterLookup); + } + + /** + * A mock RecordSetWriterFactory that has a name for tracking purposes. + */ + private static class MockRecordSetWriterFactory extends AbstractControllerService implements RecordSetWriterFactory { + + private String name; + + public MockRecordSetWriterFactory(String name) { + this.name = name; + } + + + @Override + public RecordSchema getSchema(Map<String, String> variables, RecordSchema readSchema) throws SchemaNotFoundException, IOException { + return new SimpleRecordSchema(SchemaIdentifier.builder().name(name).build()); + } + + @Override + public RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, OutputStream out) throws SchemaNotFoundException, IOException { + return new MockRecordSetWriter(name); + } + + @Override + public RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, OutputStream out, Map<String, String> variables) throws SchemaNotFoundException, IOException { + return new MockRecordSetWriter(name); + } + } + + private static class MockRecordSetWriter implements RecordSetWriter { + public String name; + + public MockRecordSetWriter(String name) { + this.name = name; + } + + + @Override + public WriteResult write(RecordSet recordSet) throws IOException { + return null; + } + + @Override + public void beginRecordSet() throws IOException { + + } + + @Override + public WriteResult finishRecordSet() throws IOException { + return null; + } + + @Override + public WriteResult write(Record record) throws IOException { + return null; + } + + @Override + public String getMimeType() { + return null; + } + + @Override + public void flush() throws IOException { + + } + + @Override + public void close() throws IOException { + + } + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/serialization/TestRecordReaderProcessor.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/serialization/TestRecordReaderProcessor.java new file mode 100644 index 0000000..48b0144 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/serialization/TestRecordReaderProcessor.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.serialization; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; + +public class TestRecordReaderProcessor extends AbstractProcessor { + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + List<PropertyDescriptor> propDescs = new ArrayList<>(); + propDescs.add(new PropertyDescriptor.Builder() + .name("record-reader") + .displayName("Record Reader") + .description("Specifies the Controller Service to use for reading incoming data") + .identifiesControllerService(RecordReaderFactory.class) + .required(true) + .build()); + return propDescs; + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/serialization/TestRecordSetWriterProcessor.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/serialization/TestRecordSetWriterProcessor.java new file mode 100644 index 0000000..e7ac605 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/serialization/TestRecordSetWriterProcessor.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.serialization; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; + +import java.util.ArrayList; +import java.util.List; + +public class TestRecordSetWriterProcessor extends AbstractProcessor { + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + List<PropertyDescriptor> propDescs = new ArrayList<>(); + propDescs.add(new PropertyDescriptor.Builder() + .name("record-writer") + .displayName("Record Writer") + .description("Specifies the Controller Service to use for writing out the records") + .identifiesControllerService(RecordSetWriterFactory.class) + .required(true) + .build()); + return propDescs; + } +} \ No newline at end of file