NIFI-3354 Added support for simple AVRO/CSV/JSON transformers that utilize 
external Schema
Added support for simple Key/Value Schema Registry as Controller Service
Added support for registering multiple schemas as dynamic properties of Schema 
Registry  Controller Service
Added the following 8 processors
- ExtractAvroFieldsViaSchemaRegistry
- TransformAvroToCSVViaSchemaRegistry
- TransformAvroToJsonViaSchemaRegistry
- TransformCSVToAvroViaSchemaRegistry
- TransformCSVToJsonViaSchemaRegistry
- TransformJsonToAvroViaSchemaRegistry
- TransformJsonToCSVViaSchemaRegistry
- UpdateAttributeWithSchemaViaSchemaRegistry

polishing

NIFI-3354 Adding support for HDFS Schema Registry, unions and default values in 
the Avro Schema and NULL columns in the source CSV

NIFI-3354 Adding support for logicalTypes per the Avro 1.7.7 spec

NIFI-3354 polishing and restructuring CSVUtils

NIFI-3354 renamed processors to address PR comment

NIFI-3354 addressed latest PR comments
- removed HDFS-based ControllerService. It will be migrated into a separate 
bundle as a true extension.
- removed UpdateAttribute. . . processor
- added mime.type attribute to all Transform* processors

NIFI-3354 added missing L&N entries

This closes pr/1436


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/6a1854c9
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/6a1854c9
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/6a1854c9

Branch: refs/heads/master
Commit: 6a1854c9758005a67d5315f31533fdb88ec55b81
Parents: ded18b9
Author: Oleg Zhurakousky <o...@suitcase.io>
Authored: Fri Jan 20 10:04:48 2017 -0500
Committer: Oleg Zhurakousky <o...@suitcase.io>
Committed: Fri Feb 17 14:32:06 2017 -0500

----------------------------------------------------------------------
 nifi-assembly/pom.xml                           |   5 +
 .../nifi-registry-nar/pom.xml                   |  31 ++
 .../src/main/resources/META-INF/LICENSE         | 240 +++++++++++++++
 .../src/main/resources/META-INF/NOTICE          |  66 ++++
 .../nifi-registry-processors/pom.xml            |  74 +++++
 .../processors/AbstractCSVTransformer.java      |  57 ++++
 .../processors/AbstractContentTransformer.java  | 101 +++++++
 .../processors/AbstractTransformer.java         |  93 ++++++
 .../schemaregistry/processors/AvroUtils.java    |  67 +++++
 .../processors/BaseContentTransformer.java      |  51 ++++
 .../processors/BaseTransformer.java             | 189 ++++++++++++
 .../schemaregistry/processors/CSVUtils.java     | 299 +++++++++++++++++++
 .../processors/ExtractAvroFields.java           | 100 +++++++
 .../schemaregistry/processors/JsonUtils.java    |  74 +++++
 .../processors/RegistryCommon.java              |  84 ++++++
 .../processors/TransformAvroToCSV.java          |  57 ++++
 .../processors/TransformAvroToJson.java         |  46 +++
 .../processors/TransformCSVToAvro.java          |  80 +++++
 .../processors/TransformCSVToJson.java          |  80 +++++
 .../processors/TransformJsonToAvro.java         |  45 +++
 .../processors/TransformJsonToCSV.java          |  45 +++
 .../org.apache.nifi.processor.Processor         |  21 ++
 .../processors/TransformersTest.java            | 188 ++++++++++++
 .../expected_ouput_csv/decimal_logicalType.txt  |   1 +
 .../decimal_logicalType_invalid_scale.txt       |   1 +
 ...mal_logicalType_valid_scale_with_default.txt |   1 +
 .../decimal_logicalType_with_default.txt        |   1 +
 .../expected_ouput_csv/primitive_types.txt      |   1 +
 .../primitive_types_with_matching_default.txt   |   1 +
 .../union_null_last_field_with_default.txt      |   1 +
 .../union_null_middle_field_with_default.txt    |   1 +
 .../expected_ouput_csv/union_with_default.txt   |   1 +
 ...l_logicalType_invalid_scale_with_default.txt |  16 +
 ...mal_logicalType_valid_scale_with_default.txt |  16 +
 ..._logicalType_valid_scale_with_no_default.txt |  15 +
 .../input_avro/primitive_types_no_defaults.txt  |  11 +
 .../primitive_types_union_with_defaults.txt     |  11 +
 .../primitive_types_with_matching_default.txt   |  11 +
 .../primitive_types_with_mismatch_default.txt   |  11 +
 .../input_avro/union_and_matching_defaults.txt  |  18 ++
 .../input_avro/union_and_mismatch_defaults.txt  |  18 ++
 .../resources/input_csv/decimal_logicalType.txt |   1 +
 .../decimal_logicalType_missing_value.txt       |   1 +
 .../resources/input_csv/primitive_types.txt     |   1 +
 .../primitive_types_with_matching_default.txt   |   1 +
 .../union_null_last_field_with_default.txt      |   1 +
 .../union_null_middle_field_with_default.txt    |   1 +
 .../resources/input_csv/union_with_default.txt  |   1 +
 .../input_csv/union_with_missing_value.txt      |   1 +
 .../nifi-registry-service/pom.xml               |  48 +++
 .../schemaregistry/services/SchemaRegistry.java |  46 +++
 .../services/SimpleKeyValueSchemaRegistry.java  |  96 ++++++
 ...org.apache.nifi.controller.ControllerService |  15 +
 .../SimpleKeyValueSchemaRegistryTest.java       |  70 +++++
 nifi-nar-bundles/nifi-registry-bundle/pom.xml   |  42 +++
 nifi-nar-bundles/pom.xml                        |   1 +
 pom.xml                                         |   6 +
 57 files changed, 2561 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
index 2f798fe..77722bb 100755
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -370,6 +370,11 @@ language governing permissions and limitations under the 
License. -->
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-registry-nar</artifactId>
+            <type>nar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-hive-nar</artifactId>
             <type>nar</type>
         </dependency>

http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-nar/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-nar/pom.xml 
b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-nar/pom.xml
new file mode 100644
index 0000000..dfdf214
--- /dev/null
+++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-nar/pom.xml
@@ -0,0 +1,31 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- 
+    Copyright 2016 Hortoworks, Inc. All rights reserved. 
+    
+    Hortonworks, Inc. licenses this file to you under the Apache License, 
+       Version 2.0 (the "License"); you may not use this file except in 
compliance 
+       with the License. You may obtain a copy of the License at 
http://www.apache.org/licenses/LICENSE-2.0 
+       Unless required by applicable law or agreed to in writing, software 
distributed 
+       under the License is distributed on an "AS IS" BASIS, WITHOUT 
WARRANTIES 
+       OR CONDITIONS OF ANY KIND, either express or implied. See the License 
for 
+       the specific language governing permissions and limitations under the 
License. 
+       See the associated NOTICE file for additional information regarding 
copyright 
+       ownership. 
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-registry-bundle</artifactId>
+        <version>1.2.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-registry-nar</artifactId>
+    <packaging>nar</packaging>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-registry-processors</artifactId>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-nar/src/main/resources/META-INF/LICENSE
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-nar/src/main/resources/META-INF/LICENSE
 
b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-nar/src/main/resources/META-INF/LICENSE
new file mode 100644
index 0000000..70db055
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-nar/src/main/resources/META-INF/LICENSE
@@ -0,0 +1,240 @@
+
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+   
+   APACHE NIFI SUBCOMPONENTS:
+
+The Apache NiFi project contains subcomponents with separate copyright
+notices and license terms. Your use of the source code for the these
+subcomponents is subject to the terms and conditions of the following
+licenses. 
+
+  The binary distribution of this product bundles 'Paranamer Core' which is 
available
+  under a BSD style license.
+
+    Copyright (c) 2006 Paul Hammant & ThoughtWorks Inc
+     All rights reserved.
+    
+     Redistribution and use in source and binary forms, with or without
+     modification, are permitted provided that the following conditions
+     are met:
+     1. Redistributions of source code must retain the above copyright
+        notice, this list of conditions and the following disclaimer.
+     2. Redistributions in binary form must reproduce the above copyright
+        notice, this list of conditions and the following disclaimer in the
+        documentation and/or other materials provided with the distribution.
+     3. Neither the name of the copyright holders nor the names of its
+        contributors may be used to endorse or promote products derived from
+        this software without specific prior written permission.
+    
+     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
IS"
+     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+     ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+     LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+     CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+     SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+     INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+     CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+     ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+     THE POSSIBILITY OF SUCH DAMAGE.
+

http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-nar/src/main/resources/META-INF/NOTICE
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-nar/src/main/resources/META-INF/NOTICE
 
b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-nar/src/main/resources/META-INF/NOTICE
new file mode 100644
index 0000000..73a4e4e
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-nar/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,66 @@
+nifi-registry-nar
+Copyright 2014-2017 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+===========================================
+Apache Software License v2
+===========================================
+
+The following binary components are provided under the Apache Software License 
v2
+
+  (ASLv2) Apache Commons IO
+    The following NOTICE information applies:
+      Apache Commons IO
+      Copyright 2002-2016 The Apache Software Foundation
+
+  (ASLv2) Apache Commons Lang
+    The following NOTICE information applies:
+      Apache Commons Lang
+      Copyright 2001-2015 The Apache Software Foundation
+  
+  (ASLv2) Apache Avro
+    The following NOTICE information applies:
+      Apache Avro
+      Copyright 2009-2013 The Apache Software Foundation
+      
+  (ASLv2) Apache Commons Compress
+    The following NOTICE information applies:
+      Apache Commons Compress
+      Copyright 2002-2014 The Apache Software Foundation
+      
+  (ASLv2) Snappy Java
+    The following NOTICE information applies:
+      This product includes software developed by Google
+       Snappy: http://code.google.com/p/snappy/ (New BSD License)
+      
+      This product includes software developed by Apache
+       PureJavaCrc32C from apache-hadoop-common http://hadoop.apache.org/
+       (Apache 2.0 license)
+
+      This library containd statically linked libstdc++. This inclusion is 
allowed by 
+      "GCC RUntime Library Exception" 
+      http://gcc.gnu.org/onlinedocs/libstdc++/manual/license.html
+
+  (ASLv2) Jackson JSON processor
+    The following NOTICE information applies:
+      # Jackson JSON processor
+
+      Jackson is a high-performance, Free/Open Source JSON processing library.
+      It was originally written by Tatu Saloranta (tatu.salora...@iki.fi), and 
has
+      been in development since 2007.
+      It is currently developed by a community of developers, as well as 
supported
+      commercially by FasterXML.com.
+
+      ## Licensing
+
+      Jackson core and extension components may licensed under different 
licenses.
+      To find the details that apply to this artifact see the accompanying 
LICENSE file.
+      For more information, including possible other licensing options, contact
+      FasterXML.com (http://fasterxml.com).
+
+      ## Credits
+
+      A list of contributors may be found from CREDITS file, which is included
+      in some artifacts (usually source distributions); but is always available
+      from the source code management (SCM) system project uses.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/pom.xml 
b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/pom.xml
new file mode 100644
index 0000000..0ea83ee
--- /dev/null
+++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/pom.xml
@@ -0,0 +1,74 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Copyright 2016 Hortoworks, Inc. All rights reserved. Hortonworks, Inc. 
+       licenses this file to you under the Apache License, Version 2.0 (the 
"License"); 
+       you may not use this file except in compliance with the License. You 
may 
+       obtain a copy of the License at 
http://www.apache.org/licenses/LICENSE-2.0 
+       Unless required by applicable law or agreed to in writing, software 
distributed 
+       under the License is distributed on an "AS IS" BASIS, WITHOUT 
WARRANTIES 
+       OR CONDITIONS OF ANY KIND, either express or implied. See the License 
for 
+       the specific language governing permissions and limitations under the 
License. 
+       See the associated NOTICE file for additional information regarding 
copyright 
+       ownership. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.nifi</groupId>
+               <artifactId>nifi-registry-bundle</artifactId>
+               <version>1.2.0-SNAPSHOT</version>
+       </parent>
+
+       <artifactId>nifi-registry-processors</artifactId>
+       <packaging>jar</packaging>
+
+       <build>
+               <plugins>
+                       <plugin>
+                               <groupId>org.apache.rat</groupId>
+                               <artifactId>apache-rat-plugin</artifactId>
+                               <configuration>
+                                       <excludes combine.children="append">
+                                               
<exclude>src/test/resources/expected_ouput_csv/*</exclude>
+                                               
<exclude>src/test/resources/input_avro/*</exclude>
+                                               
<exclude>src/test/resources/input_csv/*</exclude>                               
        
+                                       </excludes>
+                               </configuration>
+                       </plugin>
+               </plugins>
+       </build>
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.nifi</groupId>
+                       <artifactId>nifi-api</artifactId>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.nifi</groupId>
+                       <artifactId>nifi-utils</artifactId>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.avro</groupId>
+                       <artifactId>avro</artifactId>
+               </dependency>
+               <dependency>
+                       <groupId>org.codehaus.jackson</groupId>
+                       <artifactId>jackson-mapper-asl</artifactId>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.nifi</groupId>
+                       <artifactId>nifi-registry-service</artifactId>
+                       <version>1.2.0-SNAPSHOT</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.nifi</groupId>
+                       <artifactId>nifi-mock</artifactId>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>pl.pragmatists</groupId>
+                       <artifactId>JUnitParams</artifactId>
+                       <version>1.0.5</version>
+                       <scope>test</scope>
+               </dependency>
+       </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AbstractCSVTransformer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AbstractCSVTransformer.java
 
b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AbstractCSVTransformer.java
new file mode 100644
index 0000000..54497dc
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AbstractCSVTransformer.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schemaregistry.processors;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.schemaregistry.services.SchemaRegistry;
+
+/**
+ * Base processor for implementing transform-like processors for CSV
+ * transformations that integrate with Schema Registry (see
+ * {@link SchemaRegistry})
+ */
+abstract class AbstractCSVTransformer extends AbstractContentTransformer {
+
+    static final List<PropertyDescriptor> BASE_CSV_DESCRIPTORS;
+
+    static {
+        List<PropertyDescriptor> descriptors = new 
ArrayList<PropertyDescriptor>();
+        descriptors.addAll(BASE_DESCRIPTORS);
+        descriptors.add(DELIMITER);
+        BASE_CSV_DESCRIPTORS = Collections.unmodifiableList(descriptors);
+    }
+
+    protected volatile char delimiter;
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return BASE_CSV_DESCRIPTORS;
+    }
+
+    @Override
+    @OnScheduled
+    public void onScheduled(ProcessContext context) {
+        super.onScheduled(context);
+        this.delimiter = context.getProperty(DELIMITER).getValue().charAt(0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AbstractContentTransformer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AbstractContentTransformer.java
 
b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AbstractContentTransformer.java
new file mode 100644
index 0000000..403b52a
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AbstractContentTransformer.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schemaregistry.processors;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.avro.Schema;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.schemaregistry.services.SchemaRegistry;
+
+/**
+ * Base processor for implementing transform-like processors that integrate 
with
+ * Schema Registry (see {@link SchemaRegistry})
+ */
+abstract class AbstractContentTransformer extends BaseContentTransformer 
implements RegistryCommon {
+
+    static final List<PropertyDescriptor> BASE_DESCRIPTORS;
+
+    static {
+        List<PropertyDescriptor> descriptors = new 
ArrayList<PropertyDescriptor>();
+        descriptors.add(REGISTRY_SERVICE);
+        descriptors.add(SCHEMA_NAME);
+        descriptors.add(SCHEMA_TYPE);
+        BASE_DESCRIPTORS = Collections.unmodifiableList(descriptors);
+    }
+
+    volatile SchemaRegistry schemaRegistryDelegate;
+
+    /**
+     *
+     */
+    @Override
+    @OnScheduled
+    public void onScheduled(ProcessContext context) {
+        this.schemaRegistryDelegate = 
context.getProperty(REGISTRY_SERVICE).asControllerService(SchemaRegistry.class);
+    }
+
+    /**
+     *
+     */
+    @Override
+    protected Map<String, String> transform(InputStream in, OutputStream out, 
InvocationContextProperties contextProperties) {
+        Schema schema = 
RegistryCommon.retrieveSchema(this.schemaRegistryDelegate, contextProperties);
+        return this.transform(in, out, contextProperties, schema);
+    }
+
+    /**
+     * This operation is designed to allow sub-classes to provide
+     * implementations that read content of the provided {@link InputStream} 
and
+     * write content (same or different) into the provided {@link 
OutputStream}.
+     * Both {@link InputStream} and {@link OutputStream} represent the content
+     * of the in/out {@link FlowFile} and are both required to NOT be null;
+     * <p>
+     * The returned {@link Map} represents attributes that will be added to the
+     * outgoing FlowFile. It can be null, in which case no attributes will be
+     * added to the resulting {@link FlowFile}.
+     *
+     *
+     * @param in
+     *            {@link InputStream} representing data to be transformed
+     * @param out
+     *            {@link OutputStream} representing target stream to wrote
+     *            transformed data. Can be null if no output needs to be
+     *            written.
+     * @param contextProperties
+     *            instance of {@link InvocationContextProperties}
+     * @param schema
+     *            instance of {@link Schema}
+     */
+    protected abstract Map<String, String> transform(InputStream in, 
OutputStream out, InvocationContextProperties contextProperties, Schema schema);
+
+    /**
+     *
+     */
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return BASE_DESCRIPTORS;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AbstractTransformer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AbstractTransformer.java
 
b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AbstractTransformer.java
new file mode 100644
index 0000000..13dd4a5
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AbstractTransformer.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schemaregistry.processors;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.avro.Schema;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.schemaregistry.services.SchemaRegistry;
+
+/**
+ * Base processor for implementing transform-like processors that integrate 
with
+ * Schema Registry (see {@link SchemaRegistry})
+ */
+abstract class AbstractTransformer extends BaseTransformer implements 
RegistryCommon {
+
+    static final List<PropertyDescriptor> BASE_DESCRIPTORS;
+
+    static {
+        List<PropertyDescriptor> descriptors = new 
ArrayList<PropertyDescriptor>();
+        descriptors.add(REGISTRY_SERVICE);
+        descriptors.add(SCHEMA_NAME);
+        BASE_DESCRIPTORS = Collections.unmodifiableList(descriptors);
+    }
+
+    volatile SchemaRegistry schemaRegistryDelegate;
+
+    /**
+     *
+     */
+    @Override
+    @OnScheduled
+    public void onScheduled(ProcessContext context) {
+        this.schemaRegistryDelegate = 
context.getProperty(REGISTRY_SERVICE).asControllerService(SchemaRegistry.class);
+    }
+
+    /**
+     * This operation is designed to allow sub-classes to provide
+     * implementations that read content of the provided {@link InputStream}
+     * that represent the content of the incoming {@link FlowFile}.
+     * <p>
+     * The returned {@link Map} represents attributes that will be added to the
+     * outgoing FlowFile.
+     *
+     *
+     * @param in
+     *            {@link InputStream} representing data to be transformer
+     * @param contextProperties
+     *            instance of {@link InvocationContextProperties}
+     * @param schema
+     *            instance of avro {@link Schema}
+     */
+    protected abstract Map<String, String> transform(InputStream in, 
InvocationContextProperties contextProperties, Schema schema);
+
+    /**
+     *
+     */
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return BASE_DESCRIPTORS;
+    }
+
+    /**
+     *
+     */
+    @Override
+    protected Map<String, String> transform(InputStream in, OutputStream out, 
InvocationContextProperties contextProperties) {
+        Schema schema = 
RegistryCommon.retrieveSchema(this.schemaRegistryDelegate, contextProperties);
+        return this.transform(in, contextProperties, schema);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AvroUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AvroUtils.java
 
b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AvroUtils.java
new file mode 100644
index 0000000..b967af9
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AvroUtils.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schemaregistry.processors;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.nifi.flowfile.FlowFile;
+
+/**
+ * Various Avro related utility operations relevant to transforming contents of
+ * the {@link FlowFile} between Avro formats.
+ */
+class AvroUtils {
+
+    /**
+     * Reads provided {@link InputStream} into Avro {@link GenericRecord}
+     * applying provided {@link Schema} returning the resulting GenericRecord.
+     */
+    public static GenericRecord read(InputStream in, Schema schema) {
+        GenericDatumReader<GenericRecord> datumReader = new 
GenericDatumReader<>(schema);
+        GenericRecord avroRecord = null;
+        try {
+            avroRecord = datumReader.read(null, 
DecoderFactory.get().binaryDecoder(in, null));
+            return avroRecord;
+        } catch (Exception e) {
+            throw new IllegalStateException("Failed to read AVRO record", e);
+        }
+    }
+
+    /**
+     * Writes provided {@link GenericRecord} into the provided
+     * {@link OutputStream}.
+     */
+    public static void write(GenericRecord record, OutputStream out) {
+        BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
+        DatumWriter<GenericRecord> writer = new 
GenericDatumWriter<>(record.getSchema());
+        try {
+            writer.write(record, encoder);
+            encoder.flush();
+        } catch (Exception e) {
+            throw new IllegalStateException("Failed to write AVRO record", e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/BaseContentTransformer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/BaseContentTransformer.java
 
b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/BaseContentTransformer.java
new file mode 100644
index 0000000..12586ac
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/BaseContentTransformer.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schemaregistry.processors;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.io.StreamCallback;
+
+/**
+ * Base processor which contains common functionality for processors that
+ * receive {@link FlowFile} and output {@link FlowFile} while also modifying 
the
+ * content of the {@link FlowFile}
+ */
+public abstract class BaseContentTransformer extends BaseTransformer {
+
+    @Override
+    protected FlowFile doTransform(ProcessContext context, ProcessSession 
session, FlowFile flowFile, InvocationContextProperties contextProperties) {
+        AtomicReference<Map<String, String>> attributeRef = new 
AtomicReference<Map<String, String>>();
+        flowFile = session.write(flowFile, new StreamCallback() {
+            @Override
+            public void process(InputStream in, OutputStream out) throws 
IOException {
+                attributeRef.set(transform(in, out, contextProperties));
+            }
+        });
+        if (attributeRef.get() != null) {
+            flowFile = session.putAllAttributes(flowFile, attributeRef.get());
+        }
+        return flowFile;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/BaseTransformer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/BaseTransformer.java
 
b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/BaseTransformer.java
new file mode 100644
index 0000000..e1cc98c
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/BaseTransformer.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schemaregistry.processors;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+
+/**
+ * Base processor which contains common functionality for processors that
+ * receive {@link FlowFile} and output {@link FlowFile} and contain only two
+ * {@link Relationship}s (i.e., success and failure). Every successful 
execution
+ * of
+ * {@link #doTransform(ProcessContext, ProcessSession, FlowFile, 
InvocationContextProperties)}
+ * operation will result in transferring {@link FlowFile} to 'success'
+ * relationship while any exception will result in such file going to 
'failure'.
+ */
+public abstract class BaseTransformer extends AbstractProcessor {
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Successfully retrieved schema from Schema Registry")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("FlowFiles that failed to find a schema are sent to 
this relationship")
+            .build();
+
+    private static final Set<Relationship> BASE_RELATIONSHIPS;
+
+    static {
+        Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        BASE_RELATIONSHIPS = Collections.unmodifiableSet(relationships);
+    }
+
+    private final Map<PropertyDescriptor, String> propertyInstanceValues = new 
HashMap<>();
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile != null) {
+            try {
+                InvocationContextProperties contextProperties = new 
InvocationContextProperties(context, flowFile);
+                flowFile = this.doTransform(context, session, flowFile, 
contextProperties);
+                session.transfer(flowFile, REL_SUCCESS);
+            } catch (Exception e) {
+                this.getLogger().error("Failed FlowFile processing, routing to 
failure. Issue: " + e.getMessage(), e);
+                session.transfer(flowFile, REL_FAILURE);
+            }
+        } else {
+            context.yield();
+        }
+    }
+
+    @OnScheduled
+    public void onScheduled(ProcessContext context) {
+        List<PropertyDescriptor> propertyDescriptors = 
this.getSupportedPropertyDescriptors();
+        for (PropertyDescriptor propertyDescriptor : propertyDescriptors) {
+            if (!propertyDescriptor.isExpressionLanguageSupported()){
+                this.propertyInstanceValues.put(propertyDescriptor, 
context.getProperty(propertyDescriptor).getValue());
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    protected FlowFile doTransform(ProcessContext context, ProcessSession 
session, FlowFile flowFile,  InvocationContextProperties contextProperties) {
+        AtomicReference<Map<String, String>> attributeRef = new 
AtomicReference<Map<String, String>>();
+        session.read(flowFile, new InputStreamCallback() {
+            @Override
+            public void process(InputStream in) throws IOException {
+                attributeRef.set(transform(in, null, contextProperties));
+            }
+        });
+        if (attributeRef.get() != null) {
+            flowFile = session.putAllAttributes(flowFile, attributeRef.get());
+        }
+        return flowFile;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return BASE_RELATIONSHIPS;
+    }
+
+    /**
+     * This operation is designed to allow sub-classes to provide
+     * implementations that read content of the provided {@link InputStream} 
and
+     * write content (same or different) it into the provided
+     * {@link OutputStream}. Both {@link InputStream} and {@link OutputStream}
+     * represent the content of the in/out {@link FlowFile}. The
+     * {@link OutputStream} can be null if no output needs to be written.
+     * <p>
+     * The returned {@link Map} represents attributes that will be added to the
+     * outgoing FlowFile.
+     *
+     *
+     * @param in
+     *            {@link InputStream} representing data to be transformed
+     * @param out
+     *            {@link OutputStream} representing target stream to wrote
+     *            transformed data. Can be null if no output needs to be
+     *            written.
+     * @param contextProperties
+     *            instance of {@link InvocationContextProperties}
+     */
+    protected abstract Map<String, String> transform(InputStream in, 
OutputStream out, InvocationContextProperties contextProperties);
+
+    /**
+     * Properties object that gathers the value of the
+     * {@link PropertyDescriptor} within the context of
+     * {@link Processor#onTrigger(ProcessContext, ProcessSessionFactory)}
+     * invocation. It maintains the knowledge of instance properties vs.
+     * invocation properties that the values of which are set by evaluating
+     * expression against the incoming {@link FlowFile}.
+     */
+    public class InvocationContextProperties {
+        private final Map<PropertyDescriptor, String> propertyInvocationValues 
= new HashMap<>();
+
+        InvocationContextProperties(ProcessContext context, FlowFile flowFile) 
{
+            List<PropertyDescriptor> propertyDescriptors = 
BaseTransformer.this.getSupportedPropertyDescriptors();
+            for (PropertyDescriptor propertyDescriptor : propertyDescriptors) {
+                if (propertyDescriptor.isExpressionLanguageSupported()) {
+                    PropertyValue value = 
context.getProperty(propertyDescriptor)
+                            .evaluateAttributeExpressions(flowFile);
+                    this.propertyInvocationValues.put(propertyDescriptor, 
value.getValue());
+                }
+            }
+        }
+
+        /**
+         * Returns the value of the property within the context of
+         * {@link Processor#onTrigger(ProcessContext, ProcessSessionFactory)}
+         * invocation.
+         */
+        public String getPropertyValue(PropertyDescriptor propertyDescriptor, 
boolean notNull) {
+            String propertyValue = 
propertyInstanceValues.containsKey(propertyDescriptor)
+                    ? propertyInstanceValues.get(propertyDescriptor)
+                            : propertyInvocationValues.get(propertyDescriptor);
+            if (notNull && propertyValue == null) {
+                throw new IllegalArgumentException("Property '" + 
propertyDescriptor + "' evaluatd to null");
+            }
+            return propertyValue;
+        }
+
+        @Override
+        public String toString() {
+            return "Instance: " + propertyInstanceValues + "; Invocation: " + 
propertyInvocationValues;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/CSVUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/CSVUtils.java
 
b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/CSVUtils.java
new file mode 100644
index 0000000..bded6fa
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/CSVUtils.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schemaregistry.processors;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.StringWriter;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.text.DecimalFormat;
+import java.text.NumberFormat;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.flowfile.FlowFile;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.node.BooleanNode;
+import org.codehaus.jackson.node.DoubleNode;
+import org.codehaus.jackson.node.IntNode;
+import org.codehaus.jackson.node.LongNode;
+import org.codehaus.jackson.node.TextNode;
+
+/**
+ * Various CSV related utility operations relevant to transforming contents of
+ * the {@link FlowFile} between CSV and AVRO formats.
+ */
+class CSVUtils {
+    /**
+     * Provides a {@link Validator} to ensure that provided value is a valid
+     * character.
+     */
+    public static final Validator CHAR_VALIDATOR = new Validator() {
+        @Override
+        public ValidationResult validate(String subject, String input, 
ValidationContext context) {
+            // Allows special, escaped characters as input, which is then 
un-escaped and converted to a single character.
+            // Examples for special characters: \t (or \u0009), \f.
+            if (input.length() > 1) {
+                input = StringEscapeUtils.unescapeJava(input);
+            }
+            return new ValidationResult.Builder().subject(subject).input(input)
+                    .explanation("Only non-null single characters are 
supported")
+                    .valid(input.length() == 1 && input.charAt(0) != 
0).build();
+        }
+    };
+
+    public static GenericRecord read(InputStream record, char delimiter, 
Schema schema, char quoteChar) {
+        Record avroRecord = new GenericData.Record(schema);
+        String[] parsedRecord = 
parseFields(convertInputStreamToString(record), delimiter, quoteChar);
+        List<Field> fields = schema.getFields();
+        if (parsedRecord.length != fields.size()) {
+            throw new IllegalStateException("Incompatible schema. Parsed 
fields count does not match the count of fields from schema. "
+                    + "Schema: " + schema.toString(true) + "\n Record: " + 
record);
+        }
+
+        for (int i = 0; i < fields.size(); i++) {
+            Field field = fields.get(i);
+            Type type = field.schema().getType();
+            updateRecord(field, type, parsedRecord[i], avroRecord);
+        }
+        return avroRecord;
+    }
+
+    /**
+     * Parses provided record into fields using provided delimiter. The
+     * 'quoteChar' is used to ensure that if a delimiter char is in quotes it
+     * will not be parsed into a separate filed.
+     */
+    public static String[] parseFields(String record, char delimiter, char 
quoteChar) {
+        List<String> result = new ArrayList<String>();
+        int start = 0;
+        boolean inQuotes = false;
+        for (int i = 0; i < record.length(); i++) {
+            if (record.charAt(i) == quoteChar) {
+                inQuotes = !inQuotes;
+            }
+            boolean atLastChar = (i == record.length() - 1);
+            if (atLastChar) {
+                if (record.charAt(i) == delimiter) {
+                    //missing last column value, add NULL
+                    result.add(record.substring(start,i));
+                    result.add(null);
+                } else {
+                    result.add(record.substring(start));
+                }
+            } else if (record.charAt(i) == delimiter && !inQuotes) {
+                if (start == i) {
+                    //There is no value, so add NULL to indicated the absence 
of a value for this field.
+                    result.add(null);
+                } else {
+                    result.add(record.substring(start, i));
+                }
+                start = i + 1;
+            }
+        }
+        return result.toArray(new String[]{});
+    }
+
+    /**
+     * Writes {@link GenericRecord} as CSV (delimited) record to the
+     * {@link OutputStream} using provided delimiter.
+     */
+    public static void write(GenericRecord record, char delimiter, 
OutputStream out) {
+        List<Field> fields = record.getSchema().getFields();
+
+        String delimiterToUse = "";
+        try {
+            for (Field field : fields) {
+                out.write(delimiterToUse.getBytes(StandardCharsets.UTF_8));
+                Object fieldValue = record.get(field.name());
+                if (null == fieldValue) {
+                    out.write(new byte[0]);
+                } else {
+                    if (Type.BYTES == field.schema().getType()) {
+                        // need to create it from the ByteBuffer it is 
serialized as.
+                        // need to ensure the type is one of the logical ones 
we support and if so convert it.
+                        
if(!"decimal".contentEquals(field.getProp("logicalType"))){
+                            throw new IllegalArgumentException("The field '" + 
field.name() + "' has a logical type of '" +
+                                    field.getProp("logicalType") + "' that is 
currently not supported.");
+                        }
+
+                        JsonNode rawPrecision = field.getJsonProp("precision");
+                        if(null == rawPrecision){
+                            throw new IllegalArgumentException("The field '" + 
field.name() + "' is missing the required precision property");
+                        }
+                        int precision = rawPrecision.asInt();
+                        JsonNode rawScale = field.getJsonProp("scale");
+                        int scale = null == rawScale ? 0 : rawScale.asInt();
+
+                        // write out the decimal with the precision and scale.
+                        NumberFormat numberFormat = 
DecimalFormat.getInstance();
+                        numberFormat.setGroupingUsed(false);
+                        normalizeNumberFormat(numberFormat, scale, precision);
+                        final String rawValue  = new 
String(((ByteBuffer)fieldValue).array());
+                        out.write(numberFormat.format(new 
BigDecimal(rawValue)).getBytes(StandardCharsets.UTF_8));
+                    } else {
+                        
out.write(fieldValue.toString().getBytes(StandardCharsets.UTF_8));
+                    }
+                }
+                if (delimiterToUse.length() == 0) {
+                    delimiterToUse = String.valueOf(delimiter);
+                }
+            }
+        } catch (IOException e) {
+            throw new IllegalStateException("Failed to parse AVRO Record", e);
+        }
+    }
+
+    /**
+     * According to the 1.7.7 spec If a logical type is invalid, for example a
+     * decimal with scale greater than its precision,then implementations 
should
+     * ignore the logical type and use the underlying Avro type.
+     */
+    private static void normalizeNumberFormat(NumberFormat numberFormat, int 
scale, int precision) {
+        if (scale < precision) {
+            // write out with the specified precision and scale.
+            numberFormat.setMaximumIntegerDigits(precision);
+            numberFormat.setMaximumFractionDigits(scale);
+            numberFormat.setMinimumFractionDigits(scale);
+        }
+    }
+
+    /**
+     *
+     */
+    private static String convertInputStreamToString(InputStream record) {
+        StringWriter writer = new StringWriter();
+        try {
+            IOUtils.copy(record, writer, StandardCharsets.UTF_8);
+        } catch (Exception e) {
+            throw new IllegalStateException("Failed to read InputStream into 
String", e);
+        }
+        return writer.toString();
+    }
+
+    /**
+     *
+     */
+    private static ByteBuffer encodeLogicalType(final Field field, final 
String fieldValue) {
+        String logicalType = field.getProp("logicalType");
+        if (!"decimal".contentEquals(logicalType)) {
+            throw new IllegalArgumentException("The field '" + field.name() + 
"' has a logical type of '" + logicalType
+                    + "' that is currently not supported.");
+        }
+
+        JsonNode rawPrecision = field.getJsonProp("precision");
+        if (null == rawPrecision) {
+            throw new IllegalArgumentException("The field '" + field.name() + 
"' is missing the required precision property");
+        }
+        int precision = rawPrecision.asInt();
+        JsonNode rawScale = field.getJsonProp("scale");
+        int scale = null == rawScale ? 0 : rawScale.asInt();
+
+        NumberFormat numberFormat = DecimalFormat.getInstance();
+        numberFormat.setGroupingUsed(false);
+        normalizeNumberFormat(numberFormat, scale, precision);
+        BigDecimal decimal = null == fieldValue ? new 
BigDecimal(retrieveDefaultFieldValue(field).asText()) : new 
BigDecimal(fieldValue);
+        return 
ByteBuffer.wrap(numberFormat.format(decimal).getBytes(StandardCharsets.UTF_8));
+    }
+
+    /**
+     *
+     */
+    private static JsonNode retrieveDefaultFieldValue(Field field) {
+        JsonNode jsonNode = field.defaultValue();
+        if (null == jsonNode) {
+            throw new IllegalArgumentException("The field '" + field.name() + 
"' is NULL and there is no default value supplied in the Avro Schema");
+        }
+        return jsonNode;
+    }
+
+    /**
+     *
+     */
+    private static void updateRecord(Field field, Type type, String 
providedValue, Record avroRecord) {
+        if (Type.NULL != type) {
+            Object value;
+            if (Type.INT == type) {
+                value = null == providedValue ? possiblyGetDefaultValue(field, 
IntNode.class).getIntValue()
+                        : Integer.parseInt(providedValue);
+                avroRecord.put(field.name(), value);
+            } else if (Type.BOOLEAN == type) {
+                value = null == providedValue
+                        ? possiblyGetDefaultValue(field, 
BooleanNode.class).getBooleanValue()
+                        : Boolean.parseBoolean(providedValue);
+                avroRecord.put(field.name(), value);
+            } else if (Type.DOUBLE == type) {
+                value = null == providedValue ? possiblyGetDefaultValue(field, 
DoubleNode.class).getDoubleValue()
+                        : Double.parseDouble(providedValue);
+                avroRecord.put(field.name(), value);
+            } else if (Type.FLOAT == type) {
+                value = null == providedValue ? possiblyGetDefaultValue(field, 
DoubleNode.class).getDoubleValue()
+                        : Float.parseFloat(providedValue);
+                avroRecord.put(field.name(), value);
+            } else if (Type.LONG == type) {
+                value = null == providedValue ? possiblyGetDefaultValue(field, 
LongNode.class).getLongValue()
+                        : Long.parseLong(providedValue);
+                avroRecord.put(field.name(), value);
+            } else if (Type.STRING == type) {
+                value = null == providedValue ? possiblyGetDefaultValue(field, 
TextNode.class).getTextValue()
+                        : providedValue;
+                avroRecord.put(field.name(), value);
+            } else if (Type.BYTES == type) {
+                value = encodeLogicalType(field, providedValue);
+                avroRecord.put(field.name(), value);
+            } else if (Type.UNION == type) {
+                field.schema().getTypes()
+                        .forEach(schema -> updateRecord(field, 
schema.getType(), providedValue, avroRecord));
+            } else if (Type.ARRAY == type || Type.ENUM == type || Type.FIXED 
== type || Type.MAP == type
+                    || Type.NULL == type || Type.RECORD == type) {
+                throw new IllegalArgumentException("The field type '" + type + 
"' is not supported at the moment");
+            } else {
+                avroRecord.put(field.name(), providedValue);
+            }
+        }
+    }
+
+    /**
+     * Check to see if there is a default value to use, if not will throw
+     * {@link IllegalArgumentException}
+     */
+    private static <T extends JsonNode> JsonNode possiblyGetDefaultValue(Field 
field, Class<T> expectedDefaultType) {
+        JsonNode jsonNode = retrieveDefaultFieldValue(field);
+        if (field.schema().getType() != Type.UNION && 
!expectedDefaultType.isAssignableFrom(jsonNode.getClass())) {
+            // since we do not support schema evolution here we need to throw 
an
+            // exception here as the data is in error.
+            throw new IllegalArgumentException("The field '" + field.name() + 
"' has a default value that "
+                    + "does not match the field type. Field Type is: '" + 
expectedDefaultType.getName() + "' and the "
+                    + "default value type is: '" + 
field.defaultValue().toString());
+        }
+        return jsonNode;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/ExtractAvroFields.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/ExtractAvroFields.java
 
b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/ExtractAvroFields.java
new file mode 100644
index 0000000..2ab83c5
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/ExtractAvroFields.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schemaregistry.processors;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+
+@Tags({ "registry", "schema", "avro", "extract", "evaluate" })
+@CapabilityDescription("Extracts Avro field and assigns it to the FlowFile 
attribute")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@DynamicProperty(name = "Avro field name", value = "FlowFile attribute name to 
set the extracted field",
+                 description = "The value of the Avro field specified by 'Avro 
field name' will be extracted and set as "
+                         + "FlowFile attribute under name specified by the 
value of this property.")
+public final class ExtractAvroFields extends AbstractTransformer {
+
+    private static final List<PropertyDescriptor> DESCRIPTORS;
+
+    static {
+        List<PropertyDescriptor> descriptors = new 
ArrayList<PropertyDescriptor>();
+        descriptors.addAll(BASE_DESCRIPTORS);
+        descriptors.add(SCHEMA_TYPE);
+        DESCRIPTORS = Collections.unmodifiableList(descriptors);
+    }
+
+    private volatile Map<String, String> dynamicProperties;
+
+    /**
+     *
+     */
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return DESCRIPTORS;
+    }
+
+    /**
+     *
+     */
+    @Override
+    @OnScheduled
+    public void onScheduled(ProcessContext context) {
+        super.onScheduled(context);
+        this.dynamicProperties = context.getProperties().entrySet().stream()
+                .filter(p -> p.getKey().isDynamic())
+                .collect(Collectors.toMap(p -> p.getKey().getName(), p -> 
p.getValue()));
+    }
+
+    /**
+     *
+     */
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String 
propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .name(propertyDescriptorName)
+                .expressionLanguageSupported(false)
+                .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+                .required(false)
+                .dynamic(true)
+                .build();
+    }
+
+    /**
+     *
+     */
+    @Override
+    protected Map<String, String> transform(InputStream in, 
InvocationContextProperties contextProperties, Schema schema) {
+        GenericRecord avroRecord = AvroUtils.read(in, schema);
+        Map<String, String> attributes = 
this.dynamicProperties.entrySet().stream().collect(
+                Collectors.toMap(dProp -> dProp.getValue(), dProp -> 
String.valueOf(avroRecord.get(dProp.getKey()))));
+        return Collections.unmodifiableMap(attributes);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/JsonUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/JsonUtils.java
 
b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/JsonUtils.java
new file mode 100644
index 0000000..81c98b3
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/JsonUtils.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schemaregistry.processors;
+
+import java.io.DataInputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.io.JsonEncoder;
+import org.apache.nifi.flowfile.FlowFile;
+
+/**
+ * Various Json related utility operations relevant to transforming contents of
+ * the {@link FlowFile} between JSON and AVRO formats.
+ */
+class JsonUtils {
+
+    /**
+     * Writes provided {@link GenericRecord} into the provided
+     * {@link OutputStream} as JSON.
+     */
+    public static void write(GenericRecord record, OutputStream out) {
+        try {
+            DatumWriter<GenericRecord> writer = new 
GenericDatumWriter<GenericRecord>(record.getSchema());
+            JsonEncoder encoder = 
EncoderFactory.get().jsonEncoder(record.getSchema(), out);
+            writer.write(record, encoder);
+            encoder.flush();
+        } catch (Exception e) {
+            throw new IllegalStateException("Failed to read GenericRecord", e);
+        }
+    }
+
+    /**
+     * Reads provided {@link InputStream} as ISON into Avro
+     * {@link GenericRecord} applying provided {@link Schema} returning the
+     * resulting GenericRecord.
+     */
+    public static GenericRecord read(InputStream jsonIs, Schema schema) {
+        DataInputStream din = new DataInputStream(jsonIs);
+        try {
+            Decoder decoder = DecoderFactory.get().jsonDecoder(schema, din);
+            DatumReader<GenericData.Record> reader = new 
GenericDatumReader<>(schema);
+            return reader.read(null, decoder);
+        } catch (Exception e) {
+            throw new IllegalStateException("Failed to parse incoming Json 
input stream into Avro GenericRecord. "
+                    + "Possible reason: the value may not be a valid JSON or 
incompatible schema is provided. Schema was '"
+                    + schema.toString(true) + "'.", e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/RegistryCommon.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/RegistryCommon.java
 
b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/RegistryCommon.java
new file mode 100644
index 0000000..3fc1530
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/RegistryCommon.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schemaregistry.processors;
+
+import org.apache.avro.Schema;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.util.StandardValidators;
+import 
org.apache.nifi.schemaregistry.processors.BaseTransformer.InvocationContextProperties;
+import org.apache.nifi.schemaregistry.services.SchemaRegistry;
+
+/**
+ * Strategy that encapsulates common properties and functionalities used by all
+ * processors that integrate with Schema Registry.
+ */
+interface RegistryCommon {
+
+    static final String SCHEMA_ATTRIBUTE_NAME = "schema.text";
+
+    static final PropertyDescriptor REGISTRY_SERVICE = new 
PropertyDescriptor.Builder()
+            .name("schema-registry-service")
+            .displayName("Schema Registry Service")
+            .description("The Schema Registry Service for 
serializing/deserializing messages as well as schema retrieval.")
+            .required(true)
+            .identifiesControllerService(SchemaRegistry.class)
+            .build();
+
+    static final PropertyDescriptor SCHEMA_NAME = new 
PropertyDescriptor.Builder()
+            .name("schema-name")
+            .displayName("Schema Name")
+            .description("The name of schema.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
+
+    static final PropertyDescriptor SCHEMA_TYPE = new 
PropertyDescriptor.Builder()
+            .name("schema-type")
+            .displayName("Schema Type")
+            .description("The type of schema (avro is the the only current 
supported schema).")
+            .required(true)
+            .allowableValues("avro")
+            .defaultValue("avro")
+            .expressionLanguageSupported(true)
+            .build();
+
+    static final PropertyDescriptor DELIMITER = new 
PropertyDescriptor.Builder()
+            .name("csv-delimiter")
+            .displayName("CSV delimiter")
+            .description("Delimiter character for CSV records")
+            .addValidator(CSVUtils.CHAR_VALIDATOR)
+            .defaultValue(",")
+            .build();
+
+    static final PropertyDescriptor QUOTE = new PropertyDescriptor.Builder()
+            .name("csv-quote-character")
+            .displayName("CSV quote character")
+            .description("Quote character for CSV values")
+            .addValidator(CSVUtils.CHAR_VALIDATOR)
+            .defaultValue("\"")
+            .build();
+    /**
+     * Utility operation to retrieve and parse {@link Schema} from Schema
+     * Registry using provided {@link SchemaRegistry};
+     */
+    static Schema retrieveSchema(SchemaRegistry schemaRegistry, 
InvocationContextProperties contextProperties) {
+        String schemaName = contextProperties.getPropertyValue(SCHEMA_NAME, 
true);
+        String schemaText = schemaRegistry.retrieveSchemaText(schemaName);
+        return new Schema.Parser().parse(schemaText);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformAvroToCSV.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformAvroToCSV.java
 
b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformAvroToCSV.java
new file mode 100644
index 0000000..aa0d418
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformAvroToCSV.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schemaregistry.processors;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+
+@Tags({ "registry", "schema", "avro", "csv", "transform" })
+@CapabilityDescription("Transforms AVRO content of the Flow File to CSV using 
the schema provided by the Schema Registry Service.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+public final class TransformAvroToCSV extends AbstractCSVTransformer {
+
+    /**
+     *
+     */
+    @Override
+    protected Map<String, String> transform(InputStream in, OutputStream out, 
InvocationContextProperties contextProperties, Schema schema) {
+        byte[] buff = null;
+        try {
+            ByteArrayOutputStream bos = new ByteArrayOutputStream();
+            IOUtils.copy(in, bos);
+            buff = bos.toByteArray();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        ByteArrayInputStream is = new ByteArrayInputStream(buff);
+        GenericRecord avroRecord = AvroUtils.read(is, schema);
+        CSVUtils.write(avroRecord, this.delimiter, out);
+        return Collections.singletonMap(CoreAttributes.MIME_TYPE.key(), 
"text/csv");
+    }
+}

Reply via email to