This is an automated email from the ASF dual-hosted git repository.

mattyb149 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new dbf92b9  NIFI-5829 Create Lookup Controller Services for 
RecordSetWriter and RecordReader
dbf92b9 is described below

commit dbf92b9a728c9b739d79d98bbe56ec6ce3681999
Author: Peter Wicks <patric...@gmail.com>
AuthorDate: Fri Jan 24 16:49:10 2020 +0000

    NIFI-5829 Create Lookup Controller Services for RecordSetWriter and 
RecordReader
    
    Signed-off-by: Matthew Burgess <mattyb...@apache.org>
    
    This closes #3188
---
 .../java/org/apache/nifi/lookup/ReaderLookup.java  | 158 ++++++++++++++
 .../apache/nifi/lookup/RecordSetWriterLookup.java  | 163 ++++++++++++++
 .../org.apache.nifi.controller.ControllerService   |   3 +
 .../org/apache/nifi/lookup/TestReaderLookup.java   | 180 ++++++++++++++++
 .../nifi/lookup/TestRecordSetWriterLookup.java     | 234 +++++++++++++++++++++
 .../serialization/TestRecordReaderProcessor.java   |  46 ++++
 .../TestRecordSetWriterProcessor.java              |  46 ++++
 7 files changed, 830 insertions(+)

diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/lookup/ReaderLookup.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/lookup/ReaderLookup.java
new file mode 100644
index 0000000..fa43f82
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/lookup/ReaderLookup.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.lookup;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Tags({"lookup", "parse", "record", "row", "reader"})
+@SeeAlso({RecordSetWriterLookup.class})
+@CapabilityDescription("Provides a RecordReaderFactory that can be used to 
dynamically select another RecordReaderFactory. This service " +
+        "requires an variable named 'recordreader.name' to be passed in when 
asking for a record record, and will throw an exception " +
+        "if the variable is missing. The value of 'recordreader.name' will be 
used to select the RecordReaderFactory that has been " +
+        "registered with that name. This will allow multiple 
RecordReaderFactory's to be defined and registered, and then selected " +
+        "dynamically at runtime by tagging flow files with the appropriate 
'recordreader.name' variable.")
+@DynamicProperty(name = "Name of the RecordReader", value = "A 
RecordReaderFactory controller service", expressionLanguageScope = 
ExpressionLanguageScope.NONE,
+        description = "")
+public class ReaderLookup extends AbstractControllerService implements 
RecordReaderFactory {
+
+    public static final String RECORDREADER_NAME_VARIABLE = 
"recordreader.name";
+
+    private volatile Map<String, RecordReaderFactory> recordReaderFactoryMap;
+
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .name(propertyDescriptorName)
+                .description("The RecordReaderFactory to return when 
recordreader.name = '" + propertyDescriptorName + "'")
+                .identifiesControllerService(RecordReaderFactory.class)
+                .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+                .build();
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext 
context) {
+        final List<ValidationResult> results = new ArrayList<>();
+
+        int numDefinedServices = 0;
+        for (final PropertyDescriptor descriptor : 
context.getProperties().keySet()) {
+            if (descriptor.isDynamic()) {
+                numDefinedServices++;
+            }
+
+            final String referencedId = 
context.getProperty(descriptor).getValue();
+            if (this.getIdentifier().equals(referencedId)) {
+                results.add(new ValidationResult.Builder()
+                        .subject(descriptor.getDisplayName())
+                        .explanation("the current service cannot be registered 
as a RecordReaderFactory to lookup")
+                        .valid(false)
+                        .build());
+            }
+        }
+
+        if (numDefinedServices == 0) {
+            results.add(new ValidationResult.Builder()
+                    .subject(this.getClass().getSimpleName())
+                    .explanation("at least one RecordReaderFactory must be 
defined via dynamic properties")
+                    .valid(false)
+                    .build());
+        }
+
+        return results;
+    }
+
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) {
+        final Map<String,RecordReaderFactory> serviceMap = new HashMap<>();
+
+        for (final PropertyDescriptor descriptor : 
context.getProperties().keySet()) {
+            if (descriptor.isDynamic()) {
+                final RecordReaderFactory recordReaderFactory = 
context.getProperty(descriptor).asControllerService(RecordReaderFactory.class);
+                serviceMap.put(descriptor.getName(), recordReaderFactory);
+            }
+        }
+
+        recordReaderFactoryMap = Collections.unmodifiableMap(serviceMap);
+    }
+
+    @OnDisabled
+    public void onDisabled() {
+        recordReaderFactoryMap = null;
+    }
+
+
+    @Override
+    public RecordReader createRecordReader(FlowFile flowFile, InputStream in, 
ComponentLog logger) throws MalformedRecordException, IOException, 
SchemaNotFoundException {
+        if(flowFile == null) {
+            throw new UnsupportedOperationException("Cannot lookup a 
RecordReaderFactory without variables.");
+        }
+
+        return createRecordReader(flowFile.getAttributes(), in, 
flowFile.getSize(), logger);
+    }
+
+    @Override
+    public RecordReader createRecordReader(Map<String, String> variables, 
InputStream in, long inputLength, ComponentLog logger) throws 
MalformedRecordException, IOException, SchemaNotFoundException {
+        if(variables == null) {
+            throw new UnsupportedOperationException("Cannot lookup a 
RecordReaderFactory without variables.");
+        }
+
+        if (!variables.containsKey(RECORDREADER_NAME_VARIABLE)) {
+            throw new ProcessException("Variables must contain a variables 
name '" + RECORDREADER_NAME_VARIABLE + "'");
+        }
+
+        final String recordReaderName = 
variables.get(RECORDREADER_NAME_VARIABLE);
+        if (StringUtils.isBlank(recordReaderName)) {
+            throw new ProcessException(RECORDREADER_NAME_VARIABLE + " cannot 
be null or blank");
+        }
+
+        final RecordReaderFactory recordReaderFactory = 
recordReaderFactoryMap.get(recordReaderName);
+        if (recordReaderFactory == null) {
+            throw new ProcessException("No RecordReaderFactory was found for " 
+ RECORDREADER_NAME_VARIABLE
+                    + "'" + recordReaderName + "'");
+        }
+
+        return recordReaderFactory.createRecordReader(variables, in, 
inputLength, logger);
+    }
+}
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/lookup/RecordSetWriterLookup.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/lookup/RecordSetWriterLookup.java
new file mode 100644
index 0000000..d69d887
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/lookup/RecordSetWriterLookup.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.lookup;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Tags({"lookup", "result", "set", "writer", "serializer", "record", 
"recordset", "row"})
+@SeeAlso({ReaderLookup.class})
+@CapabilityDescription("Provides a RecordSetWriterFactory that can be used to 
dynamically select another RecordSetWriterFactory. This service " +
+        "requires a variable named 'recordsetwriter.name' to be passed in when 
asking for a schema or record set writer, and will throw an exception " +
+        "if the variable is missing. The value of 'recordsetwriter.name' will 
be used to select the RecordSetWriterFactory that has been " +
+        "registered with that name. This will allow multiple 
RecordSetWriterFactory's to be defined and registered, and then selected " +
+        "dynamically at runtime by tagging flow files with the appropriate 
'recordsetwriter.name' variable.")
+@DynamicProperty(name = "Name of the RecordSetWriter", value = "A 
RecordSetWriterFactory controller service", expressionLanguageScope = 
ExpressionLanguageScope.NONE,
+        description = "")
+public class RecordSetWriterLookup extends AbstractControllerService 
implements RecordSetWriterFactory {
+
+    public static final String RECORDWRITER_NAME_VARIABLE = 
"recordsetwriter.name";
+
+    private volatile Map<String,RecordSetWriterFactory> 
recordSetWriterFactoryMap;
+
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .name(propertyDescriptorName)
+                .description("The RecordSetWriterFactory to return when 
recordwriter.name = '" + propertyDescriptorName + "'")
+                .identifiesControllerService(RecordSetWriterFactory.class)
+                .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+                .build();
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext 
context) {
+        final List<ValidationResult> results = new ArrayList<>();
+
+        int numDefinedServices = 0;
+        for (final PropertyDescriptor descriptor : 
context.getProperties().keySet()) {
+            if (descriptor.isDynamic()) {
+                numDefinedServices++;
+            }
+
+            final String referencedId = 
context.getProperty(descriptor).getValue();
+            if (this.getIdentifier().equals(referencedId)) {
+                results.add(new ValidationResult.Builder()
+                        .subject(descriptor.getDisplayName())
+                        .explanation("the current service cannot be registered 
as a RecordSetWriterFactory to lookup")
+                        .valid(false)
+                        .build());
+            }
+        }
+
+        if (numDefinedServices == 0) {
+            results.add(new ValidationResult.Builder()
+                    .subject(this.getClass().getSimpleName())
+                    .explanation("at least one RecordSetWriterFactory must be 
defined via dynamic properties")
+                    .valid(false)
+                    .build());
+        }
+
+        return results;
+    }
+
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) {
+        final Map<String,RecordSetWriterFactory> serviceMap = new HashMap<>();
+
+        for (final PropertyDescriptor descriptor : 
context.getProperties().keySet()) {
+            if (descriptor.isDynamic()) {
+                final RecordSetWriterFactory recordSetWriterFactory = 
context.getProperty(descriptor).asControllerService(RecordSetWriterFactory.class);
+                serviceMap.put(descriptor.getName(), recordSetWriterFactory);
+            }
+        }
+
+        recordSetWriterFactoryMap = Collections.unmodifiableMap(serviceMap);
+    }
+
+    @OnDisabled
+    public void onDisabled() {
+        recordSetWriterFactoryMap = null;
+    }
+
+
+    @Override
+    public RecordSchema getSchema(Map<String, String> variables, RecordSchema 
readSchema) throws SchemaNotFoundException, IOException {
+        return getRecordSetWriterFactory(variables).getSchema(variables, 
readSchema);
+    }
+
+    @Override
+    public RecordSetWriter createWriter(ComponentLog logger, RecordSchema 
schema, OutputStream out) {
+        throw new UnsupportedOperationException("Cannot lookup 
RecordSetWriterFactory without variables");
+    }
+
+    @Override
+    public RecordSetWriter createWriter(ComponentLog logger, RecordSchema 
schema, OutputStream out, Map<String, String> variables) throws 
SchemaNotFoundException, IOException {
+        return getRecordSetWriterFactory(variables).createWriter(logger, 
schema, out, variables);
+    }
+
+    private RecordSetWriterFactory getRecordSetWriterFactory(Map<String, 
String> variables){
+        if (variables == null) {
+            throw new UnsupportedOperationException("Cannot lookup 
RecordSetWriterFactory without variables");
+        }
+
+        if (!variables.containsKey(RECORDWRITER_NAME_VARIABLE)) {
+            throw new ProcessException("Attributes must contain an variables 
name '" + RECORDWRITER_NAME_VARIABLE + "'");
+        }
+
+        final String recordSetWriterName = 
variables.get(RECORDWRITER_NAME_VARIABLE);
+        if (StringUtils.isBlank(recordSetWriterName)) {
+            throw new ProcessException(RECORDWRITER_NAME_VARIABLE + " cannot 
be null or blank");
+        }
+
+        final RecordSetWriterFactory recordSetWriterFactory = 
recordSetWriterFactoryMap.get(recordSetWriterName);
+        if (recordSetWriterFactory == null) {
+            throw new ProcessException("No RecordSetWriterFactory was found 
for " + RECORDWRITER_NAME_VARIABLE
+                    + "'" + recordSetWriterName + "'");
+        }
+
+        return recordSetWriterFactory;
+    }
+}
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
index 39a59aa..18bab05 100755
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -20,6 +20,9 @@ org.apache.nifi.json.JsonTreeReader
 org.apache.nifi.json.JsonPathReader
 org.apache.nifi.json.JsonRecordSetWriter
 
+org.apache.nifi.lookup.RecordSetWriterLookup
+org.apache.nifi.lookup.ReaderLookup
+
 org.apache.nifi.csv.CSVReader
 org.apache.nifi.csv.CSVRecordSetWriter
 
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/lookup/TestReaderLookup.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/lookup/TestReaderLookup.java
new file mode 100644
index 0000000..bc6366d
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/lookup/TestReaderLookup.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.lookup;
+
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.TestRecordReaderProcessor;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class TestReaderLookup {
+
+    private MockRecordReaderFactory recordReaderA;
+    private MockRecordReaderFactory recordReaderB;
+
+    private ReaderLookup readerLookup;
+    private TestRunner runner;
+
+    @Before
+    public void setup() throws InitializationException {
+        recordReaderA = new MockRecordReaderFactory("A");
+        recordReaderB = new MockRecordReaderFactory("B");
+
+        readerLookup = new ReaderLookup();
+        runner = TestRunners.newTestRunner(TestRecordReaderProcessor.class);
+
+        final String rrServiceAIdentifier = "rr-A";
+        runner.addControllerService(rrServiceAIdentifier, recordReaderA);
+
+        final String rrServiceBIdentifier = "rr-B";
+        runner.addControllerService(rrServiceBIdentifier, recordReaderB);
+
+        runner.addControllerService("rr-lookup", readerLookup);
+        runner.setProperty(readerLookup, "A", rrServiceAIdentifier);
+        runner.setProperty(readerLookup, "B", rrServiceBIdentifier);
+
+        runner.enableControllerService(recordReaderA);
+        runner.enableControllerService(recordReaderB);
+        runner.enableControllerService(readerLookup);
+    }
+
+    @Test
+    public void testLookupServiceByName() throws SchemaNotFoundException, 
MalformedRecordException, IOException {
+        final Map<String,String> attributes = new HashMap<>();
+        attributes.put(ReaderLookup.RECORDREADER_NAME_VARIABLE, "A");
+
+        MockRecordReader recordReader = (MockRecordReader) 
readerLookup.createRecordReader(attributes, null, -1, null);
+        assertNotNull(recordReader);
+        assertEquals(recordReaderA.name, recordReader.name);
+
+        attributes.put(ReaderLookup.RECORDREADER_NAME_VARIABLE, "B");
+
+        recordReader = (MockRecordReader) 
readerLookup.createRecordReader(attributes, null, -1, null);
+        assertNotNull(recordReader);
+        assertEquals(recordReaderB.name, recordReader.name);
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testLookupWithoutAttributes() throws SchemaNotFoundException, 
MalformedRecordException, IOException {
+        Map<String,String> attributes = null;
+        readerLookup.createRecordReader(attributes, null, -1, null);
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testLookupWithoutFlowFile() throws SchemaNotFoundException, 
MalformedRecordException, IOException {
+        FlowFile flowFile = null;
+        readerLookup.createRecordReader(flowFile, null, null);
+    }
+
+    @Test(expected = ProcessException.class)
+    public void testLookupMissingNameAttribute() throws 
SchemaNotFoundException, MalformedRecordException, IOException {
+        final Map<String,String> attributes = new HashMap<>();
+        readerLookup.createRecordReader(attributes, null, -1, null);
+    }
+
+    @Test(expected = ProcessException.class)
+    public void testLookupWithNameThatDoesNotExist() throws 
SchemaNotFoundException, MalformedRecordException, IOException {
+        final Map<String,String> attributes = new HashMap<>();
+        attributes.put(ReaderLookup.RECORDREADER_NAME_VARIABLE, 
"DOES-NOT-EXIST");
+        readerLookup.createRecordReader(attributes, null, -1, null);
+    }
+
+    @Test
+    public void testCustomValidateAtLeaseOneServiceDefined() throws 
InitializationException {
+        // enable lookup service with no services registered, verify not valid
+        runner = TestRunners.newTestRunner(TestRecordReaderProcessor.class);
+        runner.addControllerService("rr-lookup", readerLookup);
+        runner.assertNotValid(readerLookup);
+
+        final String rrServiceAIdentifier = "rr-A";
+        runner.addControllerService(rrServiceAIdentifier, recordReaderA);
+
+        // register a service and now verify valid
+        runner.setProperty(readerLookup, "A", rrServiceAIdentifier);
+        runner.enableControllerService(readerLookup);
+        runner.assertValid(readerLookup);
+    }
+
+    @Test
+    public void testCustomValidateSelfReferenceNotAllowed() throws 
InitializationException {
+        runner = TestRunners.newTestRunner(TestRecordReaderProcessor.class);
+        runner.addControllerService("rr-lookup", readerLookup);
+        runner.setProperty(readerLookup, "lookup", "lookup");
+        runner.assertNotValid(readerLookup);
+    }
+
+    /**
+     * A mock RecordReaderFactory that has a name for tracking purposes.
+     */
+    private static class MockRecordReaderFactory extends 
AbstractControllerService implements RecordReaderFactory {
+
+        private String name;
+
+        public MockRecordReaderFactory(String name) {
+            this.name = name;
+        }
+
+        @Override
+        public RecordReader createRecordReader(Map<String, String> variables, 
InputStream in, long inputLength, ComponentLog logger)
+                throws MalformedRecordException, IOException, 
SchemaNotFoundException {
+            return new MockRecordReader(this.name);
+        }
+    }
+
+    private static class MockRecordReader implements RecordReader {
+        public String name;
+
+        public MockRecordReader(String name) {
+            this.name = name;
+        }
+
+        @Override
+        public Record nextRecord(boolean coerceTypes, boolean 
dropUnknownFields) throws IOException, MalformedRecordException {
+            return null;
+        }
+
+        @Override
+        public RecordSchema getSchema() throws MalformedRecordException {
+            return null;
+        }
+
+        @Override
+        public void close() throws IOException {
+
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/lookup/TestRecordSetWriterLookup.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/lookup/TestRecordSetWriterLookup.java
new file mode 100644
index 0000000..4154800
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/lookup/TestRecordSetWriterLookup.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.lookup;
+
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.schema.access.SchemaNotFoundException;;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.TestRecordSetWriterProcessor;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class TestRecordSetWriterLookup {
+
+    private MockRecordSetWriterFactory recordSetWriterA;
+    private MockRecordSetWriterFactory recordSetWriterB;
+
+    private RecordSetWriterLookup recordSetWriterLookup;
+    private TestRunner runner;
+
+    @Before
+    public void setup() throws InitializationException {
+        recordSetWriterA = new MockRecordSetWriterFactory("A");
+        recordSetWriterB = new MockRecordSetWriterFactory("B");
+
+        recordSetWriterLookup = new RecordSetWriterLookup();
+        runner = TestRunners.newTestRunner(TestRecordSetWriterProcessor.class);
+
+        final String rrServiceAIdentifier = "rr-A";
+        runner.addControllerService(rrServiceAIdentifier, recordSetWriterA);
+
+        final String rrServiceBIdentifier = "rr-B";
+        runner.addControllerService(rrServiceBIdentifier, recordSetWriterB);
+
+        runner.addControllerService("rr-lookup", recordSetWriterLookup);
+        runner.setProperty(recordSetWriterLookup, "A", rrServiceAIdentifier);
+        runner.setProperty(recordSetWriterLookup, "B", rrServiceBIdentifier);
+
+        runner.enableControllerService(recordSetWriterA);
+        runner.enableControllerService(recordSetWriterB);
+        runner.enableControllerService(recordSetWriterLookup);
+    }
+
+    @Test
+    public void testLookupServiceByName() throws SchemaNotFoundException, 
IOException {
+        final Map<String,String> attributes = new HashMap<>();
+        attributes.put(RecordSetWriterLookup.RECORDWRITER_NAME_VARIABLE, "A");
+
+        RecordSchema recordSchema = 
recordSetWriterLookup.getSchema(attributes, null);
+        assertNotNull(recordSchema);
+        assertEquals(recordSetWriterA.name, 
recordSchema.getIdentifier().getName().get());
+
+        MockRecordSetWriter writer = (MockRecordSetWriter) 
recordSetWriterLookup.createWriter(null, null, null, attributes);
+        assertNotNull(writer);
+        assertEquals(recordSetWriterA.name, writer.name);
+
+        attributes.put(RecordSetWriterLookup.RECORDWRITER_NAME_VARIABLE, "B");
+
+        recordSchema = recordSetWriterLookup.getSchema(attributes, null);
+        assertNotNull(recordSchema);
+        assertEquals(recordSetWriterB.name, 
recordSchema.getIdentifier().getName().get());
+
+        writer = (MockRecordSetWriter) 
recordSetWriterLookup.createWriter(null, null, null, attributes);
+        assertNotNull(writer);
+        assertEquals(recordSetWriterB.name, writer.name);
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testLookupWithoutAttributes() throws SchemaNotFoundException, 
IOException {
+        Map<String,String> attributes = null;
+        recordSetWriterLookup.createWriter(null, null, null, attributes);
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testLookupSchemaWithoutAttributes() throws 
SchemaNotFoundException, IOException {
+        Map<String,String> attributes = null;
+        recordSetWriterLookup.getSchema(attributes, null);
+    }
+
+    @Test(expected = ProcessException.class)
+    public void testLookupMissingNameAttribute() throws 
SchemaNotFoundException, IOException {
+        final Map<String,String> attributes = new HashMap<>();
+        recordSetWriterLookup.createWriter(null, null, null, attributes);
+    }
+
+    @Test(expected = ProcessException.class)
+    public void testLookupSchemaMissingNameAttribute() throws 
SchemaNotFoundException, IOException {
+        final Map<String,String> attributes = new HashMap<>();
+        recordSetWriterLookup.getSchema(attributes, null);
+    }
+
+    @Test(expected = ProcessException.class)
+    public void testLookupWithNameThatDoesNotExist() throws 
SchemaNotFoundException, IOException {
+        final Map<String,String> attributes = new HashMap<>();
+        attributes.put(RecordSetWriterLookup.RECORDWRITER_NAME_VARIABLE, 
"DOES-NOT-EXIST");
+        recordSetWriterLookup.createWriter(null, null, null, attributes);
+    }
+
+    @Test(expected = ProcessException.class)
+    public void testLookupSchemaWithNameThatDoesNotExist() throws 
SchemaNotFoundException, IOException {
+        final Map<String,String> attributes = new HashMap<>();
+        attributes.put(RecordSetWriterLookup.RECORDWRITER_NAME_VARIABLE, 
"DOES-NOT-EXIST");
+        recordSetWriterLookup.getSchema(attributes, null);
+    }
+
+    @Test
+    public void testCustomValidateAtLeaseOneServiceDefined() throws 
InitializationException {
+        // enable lookup service with no services registered, verify not valid
+        runner = TestRunners.newTestRunner(TestRecordSetWriterProcessor.class);
+        runner.addControllerService("rr-lookup", recordSetWriterLookup);
+        runner.assertNotValid(recordSetWriterLookup);
+
+        final String rrServiceAIdentifier = "rr-A";
+        runner.addControllerService(rrServiceAIdentifier, recordSetWriterA);
+
+        // register a service and now verify valid
+        runner.setProperty(recordSetWriterLookup, "A", rrServiceAIdentifier);
+        runner.enableControllerService(recordSetWriterLookup);
+        runner.assertValid(recordSetWriterLookup);
+    }
+
+    @Test
+    public void testCustomValidateSelfReferenceNotAllowed() throws 
InitializationException {
+        runner = TestRunners.newTestRunner(TestRecordSetWriterProcessor.class);
+        runner.addControllerService("rr-lookup", recordSetWriterLookup);
+        runner.setProperty(recordSetWriterLookup, "lookup", "lookup");
+        runner.assertNotValid(recordSetWriterLookup);
+    }
+
+    /**
+     * A mock RecordSetWriterFactory that has a name for tracking purposes.
+     */
+    private static class MockRecordSetWriterFactory extends 
AbstractControllerService implements RecordSetWriterFactory {
+
+        private String name;
+
+        public MockRecordSetWriterFactory(String name) {
+            this.name = name;
+        }
+
+
+        @Override
+        public RecordSchema getSchema(Map<String, String> variables, 
RecordSchema readSchema) throws SchemaNotFoundException, IOException {
+            return new 
SimpleRecordSchema(SchemaIdentifier.builder().name(name).build());
+        }
+
+        @Override
+        public RecordSetWriter createWriter(ComponentLog logger, RecordSchema 
schema, OutputStream out) throws SchemaNotFoundException, IOException {
+            return new MockRecordSetWriter(name);
+        }
+
+        @Override
+        public RecordSetWriter createWriter(ComponentLog logger, RecordSchema 
schema, OutputStream out, Map<String, String> variables) throws 
SchemaNotFoundException, IOException {
+            return new MockRecordSetWriter(name);
+        }
+    }
+
+    private static class MockRecordSetWriter implements RecordSetWriter {
+        public String name;
+
+        public MockRecordSetWriter(String name) {
+            this.name = name;
+        }
+
+
+        @Override
+        public WriteResult write(RecordSet recordSet) throws IOException {
+            return null;
+        }
+
+        @Override
+        public void beginRecordSet() throws IOException {
+
+        }
+
+        @Override
+        public WriteResult finishRecordSet() throws IOException {
+            return null;
+        }
+
+        @Override
+        public WriteResult write(Record record) throws IOException {
+            return null;
+        }
+
+        @Override
+        public String getMimeType() {
+            return null;
+        }
+
+        @Override
+        public void flush() throws IOException {
+
+        }
+
+        @Override
+        public void close() throws IOException {
+
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/serialization/TestRecordReaderProcessor.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/serialization/TestRecordReaderProcessor.java
new file mode 100644
index 0000000..48b0144
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/serialization/TestRecordReaderProcessor.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.serialization;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+
+public class TestRecordReaderProcessor extends AbstractProcessor {
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        List<PropertyDescriptor> propDescs = new ArrayList<>();
+        propDescs.add(new PropertyDescriptor.Builder()
+                .name("record-reader")
+                .displayName("Record Reader")
+                .description("Specifies the Controller Service to use for 
reading incoming data")
+                .identifiesControllerService(RecordReaderFactory.class)
+                .required(true)
+                .build());
+        return propDescs;
+    }
+}
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/serialization/TestRecordSetWriterProcessor.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/serialization/TestRecordSetWriterProcessor.java
new file mode 100644
index 0000000..e7ac605
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/serialization/TestRecordSetWriterProcessor.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.serialization;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class TestRecordSetWriterProcessor extends AbstractProcessor {
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        List<PropertyDescriptor> propDescs = new ArrayList<>();
+        propDescs.add(new PropertyDescriptor.Builder()
+                .name("record-writer")
+                .displayName("Record Writer")
+                .description("Specifies the Controller Service to use for 
writing out the records")
+                .identifiesControllerService(RecordSetWriterFactory.class)
+                .required(true)
+                .build());
+        return propDescs;
+    }
+}
\ No newline at end of file

Reply via email to