NIFI-1280 added support for RecordSchema in SchemaRegistry

Signed-off-by: Mark Payne <marka...@hotmail.com>
Signed-off-by: Matt Burgess <mattyb...@apache.org>

NIFI-1280: Updated SimpleKeyValueSchemaRegistry to make use of new CHOICE 
RecordFieldType - Update Record Readers to use SchemaRegistry controller 
service. Moved SchemaRegistry api into its own maven module and added to 
standard-services-api so that we can properly add dependencies on it. Code 
cleanup and bug fixes

Signed-off-by: Matt Burgess <mattyb...@apache.org>

NIFI-1280: Fixed checkstyle violations and license exclusions for RAT plugin

Signed-off-by: Matt Burgess <mattyb...@apache.org>

NIFI-1280: Addressed feedback from PR Review

Signed-off-by: Matt Burgess <mattyb...@apache.org>

NIFI-1280: Additional changes/doc to support QueryFlowFile and Record 
Readers/Writers

This closes #1652


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/68c592ea
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/68c592ea
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/68c592ea

Branch: refs/heads/master
Commit: 68c592ea43d30754ec07c42cf10563fe9db185ae
Parents: a88d3bf
Author: Oleg Zhurakousky <o...@suitcase.io>
Authored: Thu Mar 30 09:12:07 2017 -0400
Committer: Matt Burgess <mattyb...@apache.org>
Committed: Tue Apr 11 19:29:35 2017 -0400

----------------------------------------------------------------------
 .../apache/nifi/util/MockProcessSession.java    |   4 +-
 .../nifi/util/TestMockProcessSession.java       |  16 +-
 nifi-nar-bundles/nifi-hive-bundle/pom.xml       |  10 +
 .../nifi-registry-nar/pom.xml                   |   7 +-
 .../nifi-registry-processors/pom.xml            |  74 ---
 .../processors/AbstractCSVTransformer.java      |  57 --
 .../processors/AbstractContentTransformer.java  | 101 ---
 .../processors/AbstractTransformer.java         |  93 ---
 .../schemaregistry/processors/AvroUtils.java    |  67 --
 .../processors/BaseContentTransformer.java      |  51 --
 .../processors/BaseTransformer.java             | 189 ------
 .../schemaregistry/processors/CSVUtils.java     | 302 ---------
 .../processors/ExtractAvroFields.java           | 100 ---
 .../schemaregistry/processors/JsonUtils.java    |  74 ---
 .../processors/RegistryCommon.java              |  84 ---
 .../processors/TransformAvroToCSV.java          |  57 --
 .../processors/TransformAvroToJson.java         |  46 --
 .../processors/TransformCSVToAvro.java          |  80 ---
 .../processors/TransformCSVToJson.java          |  80 ---
 .../processors/TransformJsonToAvro.java         |  45 --
 .../processors/TransformJsonToCSV.java          |  45 --
 .../org.apache.nifi.processor.Processor         |  21 -
 .../processors/TransformersTest.java            | 220 -------
 .../expected_ouput_csv/decimal_logicalType.txt  |   1 -
 .../decimal_logicalType_invalid_scale.txt       |   1 -
 ...mal_logicalType_valid_scale_with_default.txt |   1 -
 .../decimal_logicalType_with_default.txt        |   1 -
 .../expected_ouput_csv/primitive_types.txt      |   1 -
 .../primitive_types_with_matching_default.txt   |   1 -
 .../union_null_last_field_with_default.txt      |   1 -
 .../union_null_middle_field_with_default.txt    |   1 -
 .../expected_ouput_csv/union_with_default.txt   |   1 -
 ...l_logicalType_invalid_scale_with_default.txt |  16 -
 ...mal_logicalType_valid_scale_with_default.txt |  16 -
 ..._logicalType_valid_scale_with_no_default.txt |  15 -
 .../input_avro/primitive_types_no_defaults.txt  |  11 -
 .../primitive_types_union_with_defaults.txt     |  11 -
 .../primitive_types_with_matching_default.txt   |  11 -
 .../primitive_types_with_mismatch_default.txt   |  11 -
 .../input_avro/union_and_matching_defaults.txt  |  18 -
 .../input_avro/union_and_mismatch_defaults.txt  |  18 -
 .../resources/input_csv/decimal_logicalType.txt |   1 -
 .../decimal_logicalType_missing_value.txt       |   1 -
 .../resources/input_csv/primitive_types.txt     |   1 -
 .../primitive_types_with_matching_default.txt   |   1 -
 .../union_null_last_field_with_default.txt      |   1 -
 .../union_null_middle_field_with_default.txt    |   1 -
 .../resources/input_csv/union_with_default.txt  |   1 -
 .../input_csv/union_with_missing_value.txt      |   1 -
 .../nifi-registry-service/pom.xml               |  12 +
 .../services/AvroSchemaRegistry.java            | 217 +++++++
 .../services/AvroSchemaValidator.java           |  57 ++
 .../schemaregistry/services/SchemaRegistry.java |  46 --
 .../services/SimpleKeyValueSchemaRegistry.java  |  96 ---
 ...org.apache.nifi.controller.ControllerService |   2 +-
 .../SimpleKeyValueSchemaRegistryTest.java       |  70 ---
 .../services/TestAvroSchemaRegistry.java        | 111 ++++
 nifi-nar-bundles/nifi-registry-bundle/pom.xml   |   5 -
 .../nifi-standard-processors/pom.xml            |   4 +-
 .../nifi/processors/standard/QueryFlowFile.java |  23 +-
 .../nifi/queryflowfile/FlowFileEnumerator.java  |   6 +-
 .../nifi/queryflowfile/FlowFileTable.java       |   4 +-
 .../additionalDetails.html                      |   3 +-
 .../processors/standard/TestQueryFlowFile.java  | 130 +---
 .../standard/util/record/MockRecordParser.java  | 107 ++++
 .../standard/util/record/MockRecordWriter.java  |  80 +++
 .../pom.xml                                     |   2 +-
 .../apache/nifi/serialization/RecordReader.java |   1 -
 .../apache/nifi/serialization/RecordWriter.java |   2 +-
 .../serialization/RowRecordReaderFactory.java   |   7 +-
 .../nifi/serialization/record/DataType.java     |  31 +-
 .../nifi/serialization/record/MapRecord.java    | 201 +-----
 .../nifi/serialization/record/Record.java       |   4 +-
 .../serialization/record/RecordFieldType.java   | 208 ++++++-
 .../record/ResultSetRecordSet.java              | 168 ++++-
 .../record/type/ArrayDataType.java              |  67 ++
 .../record/type/ChoiceDataType.java             |  68 +++
 .../record/type/RecordDataType.java             |  63 ++
 .../record/util/DataTypeUtils.java              | 608 +++++++++++++++++++
 .../util/IllegalTypeConversionException.java    |  29 +
 .../src/main/resources/META-INF/NOTICE          |  11 +-
 .../.gitignore                                  |   1 -
 .../nifi-record-serialization-services/pom.xml  |  19 +-
 .../java/org/apache/nifi/avro/AvroReader.java   |  13 +-
 .../org/apache/nifi/avro/AvroRecordReader.java  | 177 +++---
 .../apache/nifi/avro/AvroRecordSetWriter.java   |  15 +-
 .../java/org/apache/nifi/avro/AvroTypeUtil.java | 159 +++++
 .../org/apache/nifi/avro/WriteAvroResult.java   | 308 ++++------
 .../java/org/apache/nifi/csv/CSVReader.java     |  58 +-
 .../org/apache/nifi/csv/CSVRecordReader.java    | 197 ++----
 .../org/apache/nifi/csv/CSVRecordSetWriter.java |  40 +-
 .../main/java/org/apache/nifi/csv/CSVUtils.java | 204 +++++++
 .../nifi/csv/SingleCharacterValidator.java      |  62 ++
 .../org/apache/nifi/csv/WriteCSVResult.java     |  33 +-
 .../java/org/apache/nifi/grok/GrokReader.java   |  26 +-
 .../org/apache/nifi/grok/GrokRecordReader.java  |  54 +-
 .../nifi/json/AbstractJsonRowRecordReader.java  | 127 +---
 .../org/apache/nifi/json/JsonPathReader.java    |  47 +-
 .../nifi/json/JsonPathRowRecordReader.java      | 241 ++------
 .../org/apache/nifi/json/JsonPathValidator.java |  12 -
 .../apache/nifi/json/JsonRecordSetWriter.java   |   8 +-
 .../org/apache/nifi/json/JsonTreeReader.java    |  54 +-
 .../nifi/json/JsonTreeRowRecordReader.java      | 121 ++--
 .../org/apache/nifi/json/PropertyNameUtil.java  |  88 ---
 .../org/apache/nifi/json/WriteJsonResult.java   | 162 ++---
 .../serialization/AbstractRecordSetWriter.java  |  84 ---
 .../nifi/serialization/DataTypeUtils.java       | 165 -----
 .../DateTimeTextRecordSetWriter.java            |  57 ++
 .../nifi/serialization/DateTimeUtils.java       |  50 ++
 .../SchemaRegistryRecordReader.java             | 110 ++++
 .../UserTypeOverrideRowReader.java              |  78 ---
 .../nifi/text/FreeFormTextRecordSetWriter.java  |   6 +-
 .../additionalDetails.html                      | 150 +++--
 .../additionalDetails.html                      |  97 +--
 .../additionalDetails.html                      | 318 +++++-----
 .../additionalDetails.html                      | 281 ++++++---
 .../apache/nifi/avro/TestAvroRecordReader.java  | 121 +++-
 .../apache/nifi/avro/TestWriteAvroResult.java   | 202 ++++++
 .../apache/nifi/csv/TestCSVRecordReader.java    |  94 +--
 .../org/apache/nifi/csv/TestWriteCSVResult.java |  23 +-
 .../apache/nifi/grok/TestGrokRecordReader.java  |  11 +-
 .../nifi/json/TestJsonPathRowRecordReader.java  | 109 +++-
 .../nifi/json/TestJsonTreeRowRecordReader.java  | 104 +++-
 .../apache/nifi/json/TestWriteJsonResult.java   |   9 +-
 .../src/test/resources/avro/datatypes.avsc      |  47 ++
 .../src/test/resources/avro/logical-types.avsc  |  34 ++
 .../nifi-schema-registry-service-api/pom.xml    |  32 +
 .../schemaregistry/services/SchemaRegistry.java |  53 ++
 .../nifi-standard-services-api-nar/pom.xml      |   5 +
 nifi-nar-bundles/nifi-standard-services/pom.xml |   1 +
 pom.xml                                         |  12 +-
 131 files changed, 4423 insertions(+), 4466 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
----------------------------------------------------------------------
diff --git 
a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java 
b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
index 7dd9714..faf6e42 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
@@ -218,8 +218,8 @@ public class MockProcessSession implements ProcessSession {
                 }
             }
 
-            //            throw new FlowFileHandlingException("Cannot commit 
session because the following Input Streams were created via "
-            //                + "calls to ProcessSession.read(FlowFile) and 
never closed: " + openStreamCopy);
+            throw new FlowFileHandlingException("Cannot commit session because 
the following Input Streams were created via "
+                + "calls to ProcessSession.read(FlowFile) and never closed: " 
+ openStreamCopy);
         }
 
         committed = true;

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java
----------------------------------------------------------------------
diff --git 
a/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java 
b/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java
index e16afb3..d1c2bea 100644
--- a/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java
+++ b/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java
@@ -16,6 +16,14 @@
  */
 package org.apache.nifi.util;
 
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
@@ -28,14 +36,6 @@ import org.apache.nifi.stream.io.StreamUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Collections;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
-
 public class TestMockProcessSession {
 
     @Test(expected = AssertionError.class)

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-hive-bundle/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/pom.xml 
b/nifi-nar-bundles/nifi-hive-bundle/pom.xml
index 5c3f2a5..342e7ed 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-hive-bundle/pom.xml
@@ -38,6 +38,16 @@
         <module>nifi-hive-nar</module>
     </modules>
 
+    <dependencyManagement>
+        <dependencies>
+            <dependency>
+                <groupId>org.apache.avro</groupId>
+                <artifactId>avro</artifactId>
+                <version>1.7.7</version>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
+
     <build>
         <plugins>
             <plugin>

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-nar/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-nar/pom.xml 
b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-nar/pom.xml
index dfdf214..0780c85 100644
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-nar/pom.xml
+++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-nar/pom.xml
@@ -25,7 +25,12 @@
     <dependencies>
         <dependency>
             <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-registry-processors</artifactId>
+            <artifactId>nifi-standard-services-api-nar</artifactId>
+            <type>nar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-registry-service</artifactId>
         </dependency>
     </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/pom.xml 
b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/pom.xml
deleted file mode 100644
index 0ea83ee..0000000
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/pom.xml
+++ /dev/null
@@ -1,74 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!-- Copyright 2016 Hortoworks, Inc. All rights reserved. Hortonworks, Inc. 
-       licenses this file to you under the Apache License, Version 2.0 (the 
"License"); 
-       you may not use this file except in compliance with the License. You 
may 
-       obtain a copy of the License at 
http://www.apache.org/licenses/LICENSE-2.0 
-       Unless required by applicable law or agreed to in writing, software 
distributed 
-       under the License is distributed on an "AS IS" BASIS, WITHOUT 
WARRANTIES 
-       OR CONDITIONS OF ANY KIND, either express or implied. See the License 
for 
-       the specific language governing permissions and limitations under the 
License. 
-       See the associated NOTICE file for additional information regarding 
copyright 
-       ownership. -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-       <modelVersion>4.0.0</modelVersion>
-
-       <parent>
-               <groupId>org.apache.nifi</groupId>
-               <artifactId>nifi-registry-bundle</artifactId>
-               <version>1.2.0-SNAPSHOT</version>
-       </parent>
-
-       <artifactId>nifi-registry-processors</artifactId>
-       <packaging>jar</packaging>
-
-       <build>
-               <plugins>
-                       <plugin>
-                               <groupId>org.apache.rat</groupId>
-                               <artifactId>apache-rat-plugin</artifactId>
-                               <configuration>
-                                       <excludes combine.children="append">
-                                               
<exclude>src/test/resources/expected_ouput_csv/*</exclude>
-                                               
<exclude>src/test/resources/input_avro/*</exclude>
-                                               
<exclude>src/test/resources/input_csv/*</exclude>                               
        
-                                       </excludes>
-                               </configuration>
-                       </plugin>
-               </plugins>
-       </build>
-       <dependencies>
-               <dependency>
-                       <groupId>org.apache.nifi</groupId>
-                       <artifactId>nifi-api</artifactId>
-               </dependency>
-               <dependency>
-                       <groupId>org.apache.nifi</groupId>
-                       <artifactId>nifi-utils</artifactId>
-               </dependency>
-               <dependency>
-                       <groupId>org.apache.avro</groupId>
-                       <artifactId>avro</artifactId>
-               </dependency>
-               <dependency>
-                       <groupId>org.codehaus.jackson</groupId>
-                       <artifactId>jackson-mapper-asl</artifactId>
-               </dependency>
-               <dependency>
-                       <groupId>org.apache.nifi</groupId>
-                       <artifactId>nifi-registry-service</artifactId>
-                       <version>1.2.0-SNAPSHOT</version>
-               </dependency>
-               <dependency>
-                       <groupId>org.apache.nifi</groupId>
-                       <artifactId>nifi-mock</artifactId>
-                       <scope>test</scope>
-               </dependency>
-               <dependency>
-                       <groupId>pl.pragmatists</groupId>
-                       <artifactId>JUnitParams</artifactId>
-                       <version>1.0.5</version>
-                       <scope>test</scope>
-               </dependency>
-       </dependencies>
-</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AbstractCSVTransformer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AbstractCSVTransformer.java
 
b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AbstractCSVTransformer.java
deleted file mode 100644
index 54497dc..0000000
--- 
a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AbstractCSVTransformer.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.schemaregistry.processors;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.schemaregistry.services.SchemaRegistry;
-
-/**
- * Base processor for implementing transform-like processors for CSV
- * transformations that integrate with Schema Registry (see
- * {@link SchemaRegistry})
- */
-abstract class AbstractCSVTransformer extends AbstractContentTransformer {
-
-    static final List<PropertyDescriptor> BASE_CSV_DESCRIPTORS;
-
-    static {
-        List<PropertyDescriptor> descriptors = new 
ArrayList<PropertyDescriptor>();
-        descriptors.addAll(BASE_DESCRIPTORS);
-        descriptors.add(DELIMITER);
-        BASE_CSV_DESCRIPTORS = Collections.unmodifiableList(descriptors);
-    }
-
-    protected volatile char delimiter;
-
-    @Override
-    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return BASE_CSV_DESCRIPTORS;
-    }
-
-    @Override
-    @OnScheduled
-    public void onScheduled(ProcessContext context) {
-        super.onScheduled(context);
-        this.delimiter = context.getProperty(DELIMITER).getValue().charAt(0);
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AbstractContentTransformer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AbstractContentTransformer.java
 
b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AbstractContentTransformer.java
deleted file mode 100644
index 403b52a..0000000
--- 
a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AbstractContentTransformer.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.schemaregistry.processors;
-
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.avro.Schema;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.schemaregistry.services.SchemaRegistry;
-
-/**
- * Base processor for implementing transform-like processors that integrate 
with
- * Schema Registry (see {@link SchemaRegistry})
- */
-abstract class AbstractContentTransformer extends BaseContentTransformer 
implements RegistryCommon {
-
-    static final List<PropertyDescriptor> BASE_DESCRIPTORS;
-
-    static {
-        List<PropertyDescriptor> descriptors = new 
ArrayList<PropertyDescriptor>();
-        descriptors.add(REGISTRY_SERVICE);
-        descriptors.add(SCHEMA_NAME);
-        descriptors.add(SCHEMA_TYPE);
-        BASE_DESCRIPTORS = Collections.unmodifiableList(descriptors);
-    }
-
-    volatile SchemaRegistry schemaRegistryDelegate;
-
-    /**
-     *
-     */
-    @Override
-    @OnScheduled
-    public void onScheduled(ProcessContext context) {
-        this.schemaRegistryDelegate = 
context.getProperty(REGISTRY_SERVICE).asControllerService(SchemaRegistry.class);
-    }
-
-    /**
-     *
-     */
-    @Override
-    protected Map<String, String> transform(InputStream in, OutputStream out, 
InvocationContextProperties contextProperties) {
-        Schema schema = 
RegistryCommon.retrieveSchema(this.schemaRegistryDelegate, contextProperties);
-        return this.transform(in, out, contextProperties, schema);
-    }
-
-    /**
-     * This operation is designed to allow sub-classes to provide
-     * implementations that read content of the provided {@link InputStream} 
and
-     * write content (same or different) into the provided {@link 
OutputStream}.
-     * Both {@link InputStream} and {@link OutputStream} represent the content
-     * of the in/out {@link FlowFile} and are both required to NOT be null;
-     * <p>
-     * The returned {@link Map} represents attributes that will be added to the
-     * outgoing FlowFile. It can be null, in which case no attributes will be
-     * added to the resulting {@link FlowFile}.
-     *
-     *
-     * @param in
-     *            {@link InputStream} representing data to be transformed
-     * @param out
-     *            {@link OutputStream} representing target stream to wrote
-     *            transformed data. Can be null if no output needs to be
-     *            written.
-     * @param contextProperties
-     *            instance of {@link InvocationContextProperties}
-     * @param schema
-     *            instance of {@link Schema}
-     */
-    protected abstract Map<String, String> transform(InputStream in, 
OutputStream out, InvocationContextProperties contextProperties, Schema schema);
-
-    /**
-     *
-     */
-    @Override
-    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return BASE_DESCRIPTORS;
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AbstractTransformer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AbstractTransformer.java
 
b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AbstractTransformer.java
deleted file mode 100644
index 13dd4a5..0000000
--- 
a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AbstractTransformer.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.schemaregistry.processors;
-
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.avro.Schema;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.schemaregistry.services.SchemaRegistry;
-
-/**
- * Base processor for implementing transform-like processors that integrate 
with
- * Schema Registry (see {@link SchemaRegistry})
- */
-abstract class AbstractTransformer extends BaseTransformer implements 
RegistryCommon {
-
-    static final List<PropertyDescriptor> BASE_DESCRIPTORS;
-
-    static {
-        List<PropertyDescriptor> descriptors = new 
ArrayList<PropertyDescriptor>();
-        descriptors.add(REGISTRY_SERVICE);
-        descriptors.add(SCHEMA_NAME);
-        BASE_DESCRIPTORS = Collections.unmodifiableList(descriptors);
-    }
-
-    volatile SchemaRegistry schemaRegistryDelegate;
-
-    /**
-     *
-     */
-    @Override
-    @OnScheduled
-    public void onScheduled(ProcessContext context) {
-        this.schemaRegistryDelegate = 
context.getProperty(REGISTRY_SERVICE).asControllerService(SchemaRegistry.class);
-    }
-
-    /**
-     * This operation is designed to allow sub-classes to provide
-     * implementations that read content of the provided {@link InputStream}
-     * that represent the content of the incoming {@link FlowFile}.
-     * <p>
-     * The returned {@link Map} represents attributes that will be added to the
-     * outgoing FlowFile.
-     *
-     *
-     * @param in
-     *            {@link InputStream} representing data to be transformer
-     * @param contextProperties
-     *            instance of {@link InvocationContextProperties}
-     * @param schema
-     *            instance of avro {@link Schema}
-     */
-    protected abstract Map<String, String> transform(InputStream in, 
InvocationContextProperties contextProperties, Schema schema);
-
-    /**
-     *
-     */
-    @Override
-    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return BASE_DESCRIPTORS;
-    }
-
-    /**
-     *
-     */
-    @Override
-    protected Map<String, String> transform(InputStream in, OutputStream out, 
InvocationContextProperties contextProperties) {
-        Schema schema = 
RegistryCommon.retrieveSchema(this.schemaRegistryDelegate, contextProperties);
-        return this.transform(in, contextProperties, schema);
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AvroUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AvroUtils.java
 
b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AvroUtils.java
deleted file mode 100644
index b967af9..0000000
--- 
a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AvroUtils.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.schemaregistry.processors;
-
-import java.io.InputStream;
-import java.io.OutputStream;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.BinaryEncoder;
-import org.apache.avro.io.DatumWriter;
-import org.apache.avro.io.DecoderFactory;
-import org.apache.avro.io.EncoderFactory;
-import org.apache.nifi.flowfile.FlowFile;
-
-/**
- * Various Avro related utility operations relevant to transforming contents of
- * the {@link FlowFile} between Avro formats.
- */
-class AvroUtils {
-
-    /**
-     * Reads provided {@link InputStream} into Avro {@link GenericRecord}
-     * applying provided {@link Schema} returning the resulting GenericRecord.
-     */
-    public static GenericRecord read(InputStream in, Schema schema) {
-        GenericDatumReader<GenericRecord> datumReader = new 
GenericDatumReader<>(schema);
-        GenericRecord avroRecord = null;
-        try {
-            avroRecord = datumReader.read(null, 
DecoderFactory.get().binaryDecoder(in, null));
-            return avroRecord;
-        } catch (Exception e) {
-            throw new IllegalStateException("Failed to read AVRO record", e);
-        }
-    }
-
-    /**
-     * Writes provided {@link GenericRecord} into the provided
-     * {@link OutputStream}.
-     */
-    public static void write(GenericRecord record, OutputStream out) {
-        BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
-        DatumWriter<GenericRecord> writer = new 
GenericDatumWriter<>(record.getSchema());
-        try {
-            writer.write(record, encoder);
-            encoder.flush();
-        } catch (Exception e) {
-            throw new IllegalStateException("Failed to write AVRO record", e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/BaseContentTransformer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/BaseContentTransformer.java
 
b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/BaseContentTransformer.java
deleted file mode 100644
index 12586ac..0000000
--- 
a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/BaseContentTransformer.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.schemaregistry.processors;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.io.StreamCallback;
-
-/**
- * Base processor which contains common functionality for processors that
- * receive {@link FlowFile} and output {@link FlowFile} while also modifying 
the
- * content of the {@link FlowFile}
- */
-public abstract class BaseContentTransformer extends BaseTransformer {
-
-    @Override
-    protected FlowFile doTransform(ProcessContext context, ProcessSession 
session, FlowFile flowFile, InvocationContextProperties contextProperties) {
-        AtomicReference<Map<String, String>> attributeRef = new 
AtomicReference<Map<String, String>>();
-        flowFile = session.write(flowFile, new StreamCallback() {
-            @Override
-            public void process(InputStream in, OutputStream out) throws 
IOException {
-                attributeRef.set(transform(in, out, contextProperties));
-            }
-        });
-        if (attributeRef.get() != null) {
-            flowFile = session.putAllAttributes(flowFile, attributeRef.get());
-        }
-        return flowFile;
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/BaseTransformer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/BaseTransformer.java
 
b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/BaseTransformer.java
deleted file mode 100644
index e1cc98c..0000000
--- 
a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/BaseTransformer.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.schemaregistry.processors;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.PropertyValue;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.AbstractProcessor;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.ProcessSessionFactory;
-import org.apache.nifi.processor.Processor;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.InputStreamCallback;
-
-/**
- * Base processor which contains common functionality for processors that
- * receive {@link FlowFile} and output {@link FlowFile} and contain only two
- * {@link Relationship}s (i.e., success and failure). Every successful 
execution
- * of
- * {@link #doTransform(ProcessContext, ProcessSession, FlowFile, 
InvocationContextProperties)}
- * operation will result in transferring {@link FlowFile} to 'success'
- * relationship while any exception will result in such file going to 
'failure'.
- */
-public abstract class BaseTransformer extends AbstractProcessor {
-
-    public static final Relationship REL_SUCCESS = new Relationship.Builder()
-            .name("success")
-            .description("Successfully retrieved schema from Schema Registry")
-            .build();
-
-    public static final Relationship REL_FAILURE = new Relationship.Builder()
-            .name("failure")
-            .description("FlowFiles that failed to find a schema are sent to 
this relationship")
-            .build();
-
-    private static final Set<Relationship> BASE_RELATIONSHIPS;
-
-    static {
-        Set<Relationship> relationships = new HashSet<>();
-        relationships.add(REL_SUCCESS);
-        relationships.add(REL_FAILURE);
-        BASE_RELATIONSHIPS = Collections.unmodifiableSet(relationships);
-    }
-
-    private final Map<PropertyDescriptor, String> propertyInstanceValues = new 
HashMap<>();
-
-    @Override
-    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
-        FlowFile flowFile = session.get();
-        if (flowFile != null) {
-            try {
-                InvocationContextProperties contextProperties = new 
InvocationContextProperties(context, flowFile);
-                flowFile = this.doTransform(context, session, flowFile, 
contextProperties);
-                session.transfer(flowFile, REL_SUCCESS);
-            } catch (Exception e) {
-                this.getLogger().error("Failed FlowFile processing, routing to 
failure. Issue: " + e.getMessage(), e);
-                session.transfer(flowFile, REL_FAILURE);
-            }
-        } else {
-            context.yield();
-        }
-    }
-
-    @OnScheduled
-    public void onScheduled(ProcessContext context) {
-        List<PropertyDescriptor> propertyDescriptors = 
this.getSupportedPropertyDescriptors();
-        for (PropertyDescriptor propertyDescriptor : propertyDescriptors) {
-            if (!propertyDescriptor.isExpressionLanguageSupported()){
-                this.propertyInstanceValues.put(propertyDescriptor, 
context.getProperty(propertyDescriptor).getValue());
-            }
-        }
-    }
-
-    /**
-     *
-     */
-    protected FlowFile doTransform(ProcessContext context, ProcessSession 
session, FlowFile flowFile,  InvocationContextProperties contextProperties) {
-        AtomicReference<Map<String, String>> attributeRef = new 
AtomicReference<Map<String, String>>();
-        session.read(flowFile, new InputStreamCallback() {
-            @Override
-            public void process(InputStream in) throws IOException {
-                attributeRef.set(transform(in, null, contextProperties));
-            }
-        });
-        if (attributeRef.get() != null) {
-            flowFile = session.putAllAttributes(flowFile, attributeRef.get());
-        }
-        return flowFile;
-    }
-
-    @Override
-    public Set<Relationship> getRelationships() {
-        return BASE_RELATIONSHIPS;
-    }
-
-    /**
-     * This operation is designed to allow sub-classes to provide
-     * implementations that read content of the provided {@link InputStream} 
and
-     * write content (same or different) it into the provided
-     * {@link OutputStream}. Both {@link InputStream} and {@link OutputStream}
-     * represent the content of the in/out {@link FlowFile}. The
-     * {@link OutputStream} can be null if no output needs to be written.
-     * <p>
-     * The returned {@link Map} represents attributes that will be added to the
-     * outgoing FlowFile.
-     *
-     *
-     * @param in
-     *            {@link InputStream} representing data to be transformed
-     * @param out
-     *            {@link OutputStream} representing target stream to wrote
-     *            transformed data. Can be null if no output needs to be
-     *            written.
-     * @param contextProperties
-     *            instance of {@link InvocationContextProperties}
-     */
-    protected abstract Map<String, String> transform(InputStream in, 
OutputStream out, InvocationContextProperties contextProperties);
-
-    /**
-     * Properties object that gathers the value of the
-     * {@link PropertyDescriptor} within the context of
-     * {@link Processor#onTrigger(ProcessContext, ProcessSessionFactory)}
-     * invocation. It maintains the knowledge of instance properties vs.
-     * invocation properties that the values of which are set by evaluating
-     * expression against the incoming {@link FlowFile}.
-     */
-    public class InvocationContextProperties {
-        private final Map<PropertyDescriptor, String> propertyInvocationValues 
= new HashMap<>();
-
-        InvocationContextProperties(ProcessContext context, FlowFile flowFile) 
{
-            List<PropertyDescriptor> propertyDescriptors = 
BaseTransformer.this.getSupportedPropertyDescriptors();
-            for (PropertyDescriptor propertyDescriptor : propertyDescriptors) {
-                if (propertyDescriptor.isExpressionLanguageSupported()) {
-                    PropertyValue value = 
context.getProperty(propertyDescriptor)
-                            .evaluateAttributeExpressions(flowFile);
-                    this.propertyInvocationValues.put(propertyDescriptor, 
value.getValue());
-                }
-            }
-        }
-
-        /**
-         * Returns the value of the property within the context of
-         * {@link Processor#onTrigger(ProcessContext, ProcessSessionFactory)}
-         * invocation.
-         */
-        public String getPropertyValue(PropertyDescriptor propertyDescriptor, 
boolean notNull) {
-            String propertyValue = 
propertyInstanceValues.containsKey(propertyDescriptor)
-                    ? propertyInstanceValues.get(propertyDescriptor)
-                            : propertyInvocationValues.get(propertyDescriptor);
-            if (notNull && propertyValue == null) {
-                throw new IllegalArgumentException("Property '" + 
propertyDescriptor + "' evaluatd to null");
-            }
-            return propertyValue;
-        }
-
-        @Override
-        public String toString() {
-            return "Instance: " + propertyInstanceValues + "; Invocation: " + 
propertyInvocationValues;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/CSVUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/CSVUtils.java
 
b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/CSVUtils.java
deleted file mode 100644
index 58d5d6b..0000000
--- 
a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/CSVUtils.java
+++ /dev/null
@@ -1,302 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.schemaregistry.processors;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.StringWriter;
-import java.math.BigDecimal;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.text.DecimalFormat;
-import java.text.NumberFormat;
-import java.text.ParseException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.avro.Schema;
-import org.apache.avro.Schema.Field;
-import org.apache.avro.Schema.Type;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericData.Record;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.StringEscapeUtils;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.components.Validator;
-import org.apache.nifi.flowfile.FlowFile;
-import org.codehaus.jackson.JsonNode;
-import org.codehaus.jackson.node.BooleanNode;
-import org.codehaus.jackson.node.DoubleNode;
-import org.codehaus.jackson.node.IntNode;
-import org.codehaus.jackson.node.LongNode;
-import org.codehaus.jackson.node.TextNode;
-
-/**
- * Various CSV related utility operations relevant to transforming contents of
- * the {@link FlowFile} between CSV and AVRO formats.
- */
-class CSVUtils {
-    /**
-     * Provides a {@link Validator} to ensure that provided value is a valid
-     * character.
-     */
-    public static final Validator CHAR_VALIDATOR = new Validator() {
-        @Override
-        public ValidationResult validate(String subject, String input, 
ValidationContext context) {
-            // Allows special, escaped characters as input, which is then 
un-escaped and converted to a single character.
-            // Examples for special characters: \t (or \u0009), \f.
-            if (input.length() > 1) {
-                input = StringEscapeUtils.unescapeJava(input);
-            }
-            return new ValidationResult.Builder().subject(subject).input(input)
-                    .explanation("Only non-null single characters are 
supported")
-                    .valid(input.length() == 1 && input.charAt(0) != 
0).build();
-        }
-    };
-
-    public static GenericRecord read(InputStream record, char delimiter, 
Schema schema, char quoteChar) {
-        Record avroRecord = new GenericData.Record(schema);
-        String[] parsedRecord = 
parseFields(convertInputStreamToString(record), delimiter, quoteChar);
-        List<Field> fields = schema.getFields();
-        if (parsedRecord.length != fields.size()) {
-            throw new IllegalStateException("Incompatible schema. Parsed 
fields count does not match the count of fields from schema. "
-                    + "Schema: " + schema.toString(true) + "\n Record: " + 
record);
-        }
-
-        for (int i = 0; i < fields.size(); i++) {
-            Field field = fields.get(i);
-            Type type = field.schema().getType();
-            updateRecord(field, type, parsedRecord[i], avroRecord);
-        }
-        return avroRecord;
-    }
-
-    /**
-     * Parses provided record into fields using provided delimiter. The
-     * 'quoteChar' is used to ensure that if a delimiter char is in quotes it
-     * will not be parsed into a separate filed.
-     */
-    public static String[] parseFields(String record, char delimiter, char 
quoteChar) {
-        List<String> result = new ArrayList<String>();
-        int start = 0;
-        boolean inQuotes = false;
-        for (int i = 0; i < record.length(); i++) {
-            if (record.charAt(i) == quoteChar) {
-                inQuotes = !inQuotes;
-            }
-            boolean atLastChar = (i == record.length() - 1);
-            if (atLastChar) {
-                if (record.charAt(i) == delimiter) {
-                    //missing last column value, add NULL
-                    result.add(record.substring(start,i));
-                    result.add(null);
-                } else {
-                    result.add(record.substring(start));
-                }
-            } else if (record.charAt(i) == delimiter && !inQuotes) {
-                if (start == i) {
-                    //There is no value, so add NULL to indicated the absence 
of a value for this field.
-                    result.add(null);
-                } else {
-                    result.add(record.substring(start, i));
-                }
-                start = i + 1;
-            }
-        }
-        return result.toArray(new String[]{});
-    }
-
-    /**
-     * Writes {@link GenericRecord} as CSV (delimited) record to the
-     * {@link OutputStream} using provided delimiter.
-     */
-    public static void write(GenericRecord record, char delimiter, 
OutputStream out) {
-        List<Field> fields = record.getSchema().getFields();
-
-        String delimiterToUse = "";
-        try {
-            for (Field field : fields) {
-                out.write(delimiterToUse.getBytes(StandardCharsets.UTF_8));
-                Object fieldValue = record.get(field.name());
-                if (null == fieldValue) {
-                    out.write(new byte[0]);
-                } else {
-                    if (Type.BYTES == field.schema().getType()) {
-                        // need to create it from the ByteBuffer it is 
serialized as.
-                        // need to ensure the type is one of the logical ones 
we support and if so convert it.
-                        
if(!"decimal".contentEquals(field.getProp("logicalType"))){
-                            throw new IllegalArgumentException("The field '" + 
field.name() + "' has a logical type of '" +
-                                    field.getProp("logicalType") + "' that is 
currently not supported.");
-                        }
-
-                        JsonNode rawPrecision = field.getJsonProp("precision");
-                        if(null == rawPrecision){
-                            throw new IllegalArgumentException("The field '" + 
field.name() + "' is missing the required precision property");
-                        }
-                        int precision = rawPrecision.asInt();
-                        JsonNode rawScale = field.getJsonProp("scale");
-                        int scale = null == rawScale ? 0 : rawScale.asInt();
-
-                        // write out the decimal with the precision and scale.
-                        NumberFormat numberFormat = 
DecimalFormat.getInstance();
-                        numberFormat.setGroupingUsed(false);
-                        normalizeNumberFormat(numberFormat, scale, precision);
-                        String rawValue = new 
String(((ByteBuffer)fieldValue).array());
-                        // raw value needs to be parsed to ensure that 
BigDecimal will not throw an exception for specific locale
-                        rawValue = numberFormat.parse(rawValue).toString();
-                        out.write(numberFormat.format(new 
BigDecimal(rawValue)).getBytes(StandardCharsets.UTF_8));
-                    } else {
-                        
out.write(fieldValue.toString().getBytes(StandardCharsets.UTF_8));
-                    }
-                }
-                if (delimiterToUse.length() == 0) {
-                    delimiterToUse = String.valueOf(delimiter);
-                }
-            }
-        } catch (IOException | ParseException e) {
-            throw new IllegalStateException("Failed to parse AVRO Record", e);
-        }
-    }
-
-    /**
-     * According to the 1.7.7 spec If a logical type is invalid, for example a
-     * decimal with scale greater than its precision,then implementations 
should
-     * ignore the logical type and use the underlying Avro type.
-     */
-    private static void normalizeNumberFormat(NumberFormat numberFormat, int 
scale, int precision) {
-        if (scale < precision) {
-            // write out with the specified precision and scale.
-            numberFormat.setMaximumIntegerDigits(precision);
-            numberFormat.setMaximumFractionDigits(scale);
-            numberFormat.setMinimumFractionDigits(scale);
-        }
-    }
-
-    /**
-     *
-     */
-    private static String convertInputStreamToString(InputStream record) {
-        StringWriter writer = new StringWriter();
-        try {
-            IOUtils.copy(record, writer, StandardCharsets.UTF_8);
-        } catch (Exception e) {
-            throw new IllegalStateException("Failed to read InputStream into 
String", e);
-        }
-        return writer.toString();
-    }
-
-    /**
-     *
-     */
-    private static ByteBuffer encodeLogicalType(final Field field, final 
String fieldValue) {
-        String logicalType = field.getProp("logicalType");
-        if (!"decimal".contentEquals(logicalType)) {
-            throw new IllegalArgumentException("The field '" + field.name() + 
"' has a logical type of '" + logicalType
-                    + "' that is currently not supported.");
-        }
-
-        JsonNode rawPrecision = field.getJsonProp("precision");
-        if (null == rawPrecision) {
-            throw new IllegalArgumentException("The field '" + field.name() + 
"' is missing the required precision property");
-        }
-        int precision = rawPrecision.asInt();
-        JsonNode rawScale = field.getJsonProp("scale");
-        int scale = null == rawScale ? 0 : rawScale.asInt();
-
-        NumberFormat numberFormat = DecimalFormat.getInstance();
-        numberFormat.setGroupingUsed(false);
-        normalizeNumberFormat(numberFormat, scale, precision);
-        BigDecimal decimal = null == fieldValue ? new 
BigDecimal(retrieveDefaultFieldValue(field).asText()) : new 
BigDecimal(fieldValue);
-        return 
ByteBuffer.wrap(numberFormat.format(decimal).getBytes(StandardCharsets.UTF_8));
-    }
-
-    /**
-     *
-     */
-    private static JsonNode retrieveDefaultFieldValue(Field field) {
-        JsonNode jsonNode = field.defaultValue();
-        if (null == jsonNode) {
-            throw new IllegalArgumentException("The field '" + field.name() + 
"' is NULL and there is no default value supplied in the Avro Schema");
-        }
-        return jsonNode;
-    }
-
-    /**
-     *
-     */
-    private static void updateRecord(Field field, Type type, String 
providedValue, Record avroRecord) {
-        if (Type.NULL != type) {
-            Object value;
-            if (Type.INT == type) {
-                value = null == providedValue ? possiblyGetDefaultValue(field, 
IntNode.class).getIntValue()
-                        : Integer.parseInt(providedValue);
-                avroRecord.put(field.name(), value);
-            } else if (Type.BOOLEAN == type) {
-                value = null == providedValue
-                        ? possiblyGetDefaultValue(field, 
BooleanNode.class).getBooleanValue()
-                        : Boolean.parseBoolean(providedValue);
-                avroRecord.put(field.name(), value);
-            } else if (Type.DOUBLE == type) {
-                value = null == providedValue ? possiblyGetDefaultValue(field, 
DoubleNode.class).getDoubleValue()
-                        : Double.parseDouble(providedValue);
-                avroRecord.put(field.name(), value);
-            } else if (Type.FLOAT == type) {
-                value = null == providedValue ? possiblyGetDefaultValue(field, 
DoubleNode.class).getDoubleValue()
-                        : Float.parseFloat(providedValue);
-                avroRecord.put(field.name(), value);
-            } else if (Type.LONG == type) {
-                value = null == providedValue ? possiblyGetDefaultValue(field, 
LongNode.class).getLongValue()
-                        : Long.parseLong(providedValue);
-                avroRecord.put(field.name(), value);
-            } else if (Type.STRING == type) {
-                value = null == providedValue ? possiblyGetDefaultValue(field, 
TextNode.class).getTextValue()
-                        : providedValue;
-                avroRecord.put(field.name(), value);
-            } else if (Type.BYTES == type) {
-                value = encodeLogicalType(field, providedValue);
-                avroRecord.put(field.name(), value);
-            } else if (Type.UNION == type) {
-                field.schema().getTypes()
-                        .forEach(schema -> updateRecord(field, 
schema.getType(), providedValue, avroRecord));
-            } else if (Type.ARRAY == type || Type.ENUM == type || Type.FIXED 
== type || Type.MAP == type
-                    || Type.NULL == type || Type.RECORD == type) {
-                throw new IllegalArgumentException("The field type '" + type + 
"' is not supported at the moment");
-            } else {
-                avroRecord.put(field.name(), providedValue);
-            }
-        }
-    }
-
-    /**
-     * Check to see if there is a default value to use, if not will throw
-     * {@link IllegalArgumentException}
-     */
-    private static <T extends JsonNode> JsonNode possiblyGetDefaultValue(Field 
field, Class<T> expectedDefaultType) {
-        JsonNode jsonNode = retrieveDefaultFieldValue(field);
-        if (field.schema().getType() != Type.UNION && 
!expectedDefaultType.isAssignableFrom(jsonNode.getClass())) {
-            // since we do not support schema evolution here we need to throw 
an
-            // exception here as the data is in error.
-            throw new IllegalArgumentException("The field '" + field.name() + 
"' has a default value that "
-                    + "does not match the field type. Field Type is: '" + 
expectedDefaultType.getName() + "' and the "
-                    + "default value type is: '" + 
field.defaultValue().toString());
-        }
-        return jsonNode;
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/ExtractAvroFields.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/ExtractAvroFields.java
 
b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/ExtractAvroFields.java
deleted file mode 100644
index 2ab83c5..0000000
--- 
a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/ExtractAvroFields.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.schemaregistry.processors;
-
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.nifi.annotation.behavior.DynamicProperty;
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.util.StandardValidators;
-
-@Tags({ "registry", "schema", "avro", "extract", "evaluate" })
-@CapabilityDescription("Extracts Avro field and assigns it to the FlowFile 
attribute")
-@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
-@DynamicProperty(name = "Avro field name", value = "FlowFile attribute name to 
set the extracted field",
-                 description = "The value of the Avro field specified by 'Avro 
field name' will be extracted and set as "
-                         + "FlowFile attribute under name specified by the 
value of this property.")
-public final class ExtractAvroFields extends AbstractTransformer {
-
-    private static final List<PropertyDescriptor> DESCRIPTORS;
-
-    static {
-        List<PropertyDescriptor> descriptors = new 
ArrayList<PropertyDescriptor>();
-        descriptors.addAll(BASE_DESCRIPTORS);
-        descriptors.add(SCHEMA_TYPE);
-        DESCRIPTORS = Collections.unmodifiableList(descriptors);
-    }
-
-    private volatile Map<String, String> dynamicProperties;
-
-    /**
-     *
-     */
-    @Override
-    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return DESCRIPTORS;
-    }
-
-    /**
-     *
-     */
-    @Override
-    @OnScheduled
-    public void onScheduled(ProcessContext context) {
-        super.onScheduled(context);
-        this.dynamicProperties = context.getProperties().entrySet().stream()
-                .filter(p -> p.getKey().isDynamic())
-                .collect(Collectors.toMap(p -> p.getKey().getName(), p -> 
p.getValue()));
-    }
-
-    /**
-     *
-     */
-    @Override
-    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String 
propertyDescriptorName) {
-        return new PropertyDescriptor.Builder()
-                .name(propertyDescriptorName)
-                .expressionLanguageSupported(false)
-                .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
-                .required(false)
-                .dynamic(true)
-                .build();
-    }
-
-    /**
-     *
-     */
-    @Override
-    protected Map<String, String> transform(InputStream in, 
InvocationContextProperties contextProperties, Schema schema) {
-        GenericRecord avroRecord = AvroUtils.read(in, schema);
-        Map<String, String> attributes = 
this.dynamicProperties.entrySet().stream().collect(
-                Collectors.toMap(dProp -> dProp.getValue(), dProp -> 
String.valueOf(avroRecord.get(dProp.getKey()))));
-        return Collections.unmodifiableMap(attributes);
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/JsonUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/JsonUtils.java
 
b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/JsonUtils.java
deleted file mode 100644
index 81c98b3..0000000
--- 
a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/JsonUtils.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.schemaregistry.processors;
-
-import java.io.DataInputStream;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.io.DatumWriter;
-import org.apache.avro.io.Decoder;
-import org.apache.avro.io.DecoderFactory;
-import org.apache.avro.io.EncoderFactory;
-import org.apache.avro.io.JsonEncoder;
-import org.apache.nifi.flowfile.FlowFile;
-
-/**
- * Various Json related utility operations relevant to transforming contents of
- * the {@link FlowFile} between JSON and AVRO formats.
- */
-class JsonUtils {
-
-    /**
-     * Writes provided {@link GenericRecord} into the provided
-     * {@link OutputStream} as JSON.
-     */
-    public static void write(GenericRecord record, OutputStream out) {
-        try {
-            DatumWriter<GenericRecord> writer = new 
GenericDatumWriter<GenericRecord>(record.getSchema());
-            JsonEncoder encoder = 
EncoderFactory.get().jsonEncoder(record.getSchema(), out);
-            writer.write(record, encoder);
-            encoder.flush();
-        } catch (Exception e) {
-            throw new IllegalStateException("Failed to read GenericRecord", e);
-        }
-    }
-
-    /**
-     * Reads provided {@link InputStream} as ISON into Avro
-     * {@link GenericRecord} applying provided {@link Schema} returning the
-     * resulting GenericRecord.
-     */
-    public static GenericRecord read(InputStream jsonIs, Schema schema) {
-        DataInputStream din = new DataInputStream(jsonIs);
-        try {
-            Decoder decoder = DecoderFactory.get().jsonDecoder(schema, din);
-            DatumReader<GenericData.Record> reader = new 
GenericDatumReader<>(schema);
-            return reader.read(null, decoder);
-        } catch (Exception e) {
-            throw new IllegalStateException("Failed to parse incoming Json 
input stream into Avro GenericRecord. "
-                    + "Possible reason: the value may not be a valid JSON or 
incompatible schema is provided. Schema was '"
-                    + schema.toString(true) + "'.", e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/RegistryCommon.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/RegistryCommon.java
 
b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/RegistryCommon.java
deleted file mode 100644
index 3fc1530..0000000
--- 
a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/RegistryCommon.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.schemaregistry.processors;
-
-import org.apache.avro.Schema;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.processor.util.StandardValidators;
-import 
org.apache.nifi.schemaregistry.processors.BaseTransformer.InvocationContextProperties;
-import org.apache.nifi.schemaregistry.services.SchemaRegistry;
-
-/**
- * Strategy that encapsulates common properties and functionalities used by all
- * processors that integrate with Schema Registry.
- */
-interface RegistryCommon {
-
-    static final String SCHEMA_ATTRIBUTE_NAME = "schema.text";
-
-    static final PropertyDescriptor REGISTRY_SERVICE = new 
PropertyDescriptor.Builder()
-            .name("schema-registry-service")
-            .displayName("Schema Registry Service")
-            .description("The Schema Registry Service for 
serializing/deserializing messages as well as schema retrieval.")
-            .required(true)
-            .identifiesControllerService(SchemaRegistry.class)
-            .build();
-
-    static final PropertyDescriptor SCHEMA_NAME = new 
PropertyDescriptor.Builder()
-            .name("schema-name")
-            .displayName("Schema Name")
-            .description("The name of schema.")
-            .required(true)
-            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
-            .expressionLanguageSupported(true)
-            .build();
-
-    static final PropertyDescriptor SCHEMA_TYPE = new 
PropertyDescriptor.Builder()
-            .name("schema-type")
-            .displayName("Schema Type")
-            .description("The type of schema (avro is the the only current 
supported schema).")
-            .required(true)
-            .allowableValues("avro")
-            .defaultValue("avro")
-            .expressionLanguageSupported(true)
-            .build();
-
-    static final PropertyDescriptor DELIMITER = new 
PropertyDescriptor.Builder()
-            .name("csv-delimiter")
-            .displayName("CSV delimiter")
-            .description("Delimiter character for CSV records")
-            .addValidator(CSVUtils.CHAR_VALIDATOR)
-            .defaultValue(",")
-            .build();
-
-    static final PropertyDescriptor QUOTE = new PropertyDescriptor.Builder()
-            .name("csv-quote-character")
-            .displayName("CSV quote character")
-            .description("Quote character for CSV values")
-            .addValidator(CSVUtils.CHAR_VALIDATOR)
-            .defaultValue("\"")
-            .build();
-    /**
-     * Utility operation to retrieve and parse {@link Schema} from Schema
-     * Registry using provided {@link SchemaRegistry};
-     */
-    static Schema retrieveSchema(SchemaRegistry schemaRegistry, 
InvocationContextProperties contextProperties) {
-        String schemaName = contextProperties.getPropertyValue(SCHEMA_NAME, 
true);
-        String schemaText = schemaRegistry.retrieveSchemaText(schemaName);
-        return new Schema.Parser().parse(schemaText);
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformAvroToCSV.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformAvroToCSV.java
 
b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformAvroToCSV.java
deleted file mode 100644
index aa0d418..0000000
--- 
a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformAvroToCSV.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.schemaregistry.processors;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Collections;
-import java.util.Map;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.commons.io.IOUtils;
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-
-@Tags({ "registry", "schema", "avro", "csv", "transform" })
-@CapabilityDescription("Transforms AVRO content of the Flow File to CSV using 
the schema provided by the Schema Registry Service.")
-@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
-public final class TransformAvroToCSV extends AbstractCSVTransformer {
-
-    /**
-     *
-     */
-    @Override
-    protected Map<String, String> transform(InputStream in, OutputStream out, 
InvocationContextProperties contextProperties, Schema schema) {
-        byte[] buff = null;
-        try {
-            ByteArrayOutputStream bos = new ByteArrayOutputStream();
-            IOUtils.copy(in, bos);
-            buff = bos.toByteArray();
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-        ByteArrayInputStream is = new ByteArrayInputStream(buff);
-        GenericRecord avroRecord = AvroUtils.read(is, schema);
-        CSVUtils.write(avroRecord, this.delimiter, out);
-        return Collections.singletonMap(CoreAttributes.MIME_TYPE.key(), 
"text/csv");
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformAvroToJson.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformAvroToJson.java
 
b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformAvroToJson.java
deleted file mode 100644
index ba45563..0000000
--- 
a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformAvroToJson.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.schemaregistry.processors;
-
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Collections;
-import java.util.Map;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-
-
-@Tags({ "registry", "schema", "avro", "json", "transform" })
-@CapabilityDescription("Transforms AVRO content of the Flow File to JSON using 
the schema provided by the Schema Registry Service.")
-@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
-public final class TransformAvroToJson extends AbstractContentTransformer {
-
-    /**
-     *
-     */
-    @Override
-    protected Map<String, String> transform(InputStream in, OutputStream out, 
InvocationContextProperties contextProperties, Schema schema) {
-        GenericRecord avroRecord = AvroUtils.read(in, schema);
-        JsonUtils.write(avroRecord, out);
-        return Collections.singletonMap(CoreAttributes.MIME_TYPE.key(), 
"application/json");
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformCSVToAvro.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformCSVToAvro.java
 
b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformCSVToAvro.java
deleted file mode 100644
index f44e440..0000000
--- 
a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformCSVToAvro.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.schemaregistry.processors;
-
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.processor.ProcessContext;
-
-@Tags({ "csv", "avro", "transform", "registry", "schema" })
-@InputRequirement(Requirement.INPUT_REQUIRED)
-@CapabilityDescription("Transforms CSV content of the Flow File to Avro using 
the schema provided by the Schema Registry Service.")
-public final class TransformCSVToAvro extends AbstractCSVTransformer {
-
-    private static final List<PropertyDescriptor> DESCRIPTORS;
-
-    static {
-        List<PropertyDescriptor> descriptors = new 
ArrayList<PropertyDescriptor>();
-        descriptors.addAll(BASE_CSV_DESCRIPTORS);
-        descriptors.add(QUOTE);
-        DESCRIPTORS = Collections.unmodifiableList(descriptors);
-    }
-
-    private volatile char quoteChar;
-
-    /**
-     *
-     */
-    @Override
-    @OnScheduled
-    public void onScheduled(ProcessContext context) {
-        super.onScheduled(context);
-        this.quoteChar = context.getProperty(QUOTE).getValue().charAt(0);
-    }
-
-    /**
-     *
-     */
-    @Override
-    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return DESCRIPTORS;
-    }
-
-    /**
-     *
-     */
-    @Override
-    protected Map<String, String> transform(InputStream in, OutputStream out, 
InvocationContextProperties contextProperties, Schema schema) {
-        GenericRecord avroRecord = CSVUtils.read(in, this.delimiter, schema, 
this.quoteChar);
-        AvroUtils.write(avroRecord, out);
-        return Collections.singletonMap(CoreAttributes.MIME_TYPE.key(), 
"binary/avro");
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformCSVToJson.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformCSVToJson.java
 
b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformCSVToJson.java
deleted file mode 100644
index 2ce9fbe..0000000
--- 
a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformCSVToJson.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.schemaregistry.processors;
-
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.processor.ProcessContext;
-
-@Tags({ "csv", "json", "transform", "registry", "schema" })
-@InputRequirement(Requirement.INPUT_REQUIRED)
-@CapabilityDescription("Transforms CSV content of the Flow File to JSON using 
the schema provided by the Schema Registry Service.")
-public final class TransformCSVToJson extends AbstractCSVTransformer {
-
-    private static final List<PropertyDescriptor> DESCRIPTORS;
-
-    static {
-        List<PropertyDescriptor> descriptors = new 
ArrayList<PropertyDescriptor>();
-        descriptors.addAll(BASE_CSV_DESCRIPTORS);
-        descriptors.add(QUOTE);
-        DESCRIPTORS = Collections.unmodifiableList(descriptors);
-    }
-
-    private volatile char quoteChar;
-
-    /**
-     *
-     */
-    @Override
-    @OnScheduled
-    public void onScheduled(ProcessContext context) {
-        super.onScheduled(context);
-        this.quoteChar = context.getProperty(QUOTE).getValue().charAt(0);
-    }
-
-    /**
-     *
-     */
-    @Override
-    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return DESCRIPTORS;
-    }
-
-    /**
-     *
-     */
-    @Override
-    protected Map<String, String> transform(InputStream in, OutputStream out, 
InvocationContextProperties contextProperties, Schema schema) {
-        GenericRecord avroRecord = CSVUtils.read(in, this.delimiter, schema, 
this.quoteChar);
-        JsonUtils.write(avroRecord, out);
-        return Collections.singletonMap(CoreAttributes.MIME_TYPE.key(), 
"application/json");
-    }
-}

Reply via email to