NIFI-3354 Added support for simple AVRO/CSV/JSON transformers that utilize external Schema Added support for simple Key/Value Schema Registry as Controller Service Added support for registering multiple schemas as dynamic properties of Schema Registry Controller Service Added the following 8 processors - ExtractAvroFieldsViaSchemaRegistry - TransformAvroToCSVViaSchemaRegistry - TransformAvroToJsonViaSchemaRegistry - TransformCSVToAvroViaSchemaRegistry - TransformCSVToJsonViaSchemaRegistry - TransformJsonToAvroViaSchemaRegistry - TransformJsonToCSVViaSchemaRegistry - UpdateAttributeWithSchemaViaSchemaRegistry
polishing NIFI-3354 Adding support for HDFS Schema Registry, unions and default values in the Avro Schema and NULL columns in the source CSV NIFI-3354 Adding support for logicalTypes per the Avro 1.7.7 spec NIFI-3354 polishing and restructuring CSVUtils NIFI-3354 renamed processors to address PR comment NIFI-3354 addressed latest PR comments - removed HDFS-based ControllerService. It will be migrated into a separate bundle as a true extension. - removed UpdateAttribute. . . processor - added mime.type attribute to all Transform* processors NIFI-3354 added missing L&N entries This closes pr/1436 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/6a1854c9 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/6a1854c9 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/6a1854c9 Branch: refs/heads/master Commit: 6a1854c9758005a67d5315f31533fdb88ec55b81 Parents: ded18b9 Author: Oleg Zhurakousky <o...@suitcase.io> Authored: Fri Jan 20 10:04:48 2017 -0500 Committer: Oleg Zhurakousky <o...@suitcase.io> Committed: Fri Feb 17 14:32:06 2017 -0500 ---------------------------------------------------------------------- nifi-assembly/pom.xml | 5 + .../nifi-registry-nar/pom.xml | 31 ++ .../src/main/resources/META-INF/LICENSE | 240 +++++++++++++++ .../src/main/resources/META-INF/NOTICE | 66 ++++ .../nifi-registry-processors/pom.xml | 74 +++++ .../processors/AbstractCSVTransformer.java | 57 ++++ .../processors/AbstractContentTransformer.java | 101 +++++++ .../processors/AbstractTransformer.java | 93 ++++++ .../schemaregistry/processors/AvroUtils.java | 67 +++++ .../processors/BaseContentTransformer.java | 51 ++++ .../processors/BaseTransformer.java | 189 ++++++++++++ .../schemaregistry/processors/CSVUtils.java | 299 +++++++++++++++++++ .../processors/ExtractAvroFields.java | 100 +++++++ .../schemaregistry/processors/JsonUtils.java | 74 +++++ .../processors/RegistryCommon.java | 84 ++++++ .../processors/TransformAvroToCSV.java | 57 ++++ .../processors/TransformAvroToJson.java | 46 +++ .../processors/TransformCSVToAvro.java | 80 +++++ .../processors/TransformCSVToJson.java | 80 +++++ .../processors/TransformJsonToAvro.java | 45 +++ .../processors/TransformJsonToCSV.java | 45 +++ .../org.apache.nifi.processor.Processor | 21 ++ .../processors/TransformersTest.java | 188 ++++++++++++ .../expected_ouput_csv/decimal_logicalType.txt | 1 + .../decimal_logicalType_invalid_scale.txt | 1 + ...mal_logicalType_valid_scale_with_default.txt | 1 + .../decimal_logicalType_with_default.txt | 1 + .../expected_ouput_csv/primitive_types.txt | 1 + .../primitive_types_with_matching_default.txt | 1 + .../union_null_last_field_with_default.txt | 1 + .../union_null_middle_field_with_default.txt | 1 + .../expected_ouput_csv/union_with_default.txt | 1 + ...l_logicalType_invalid_scale_with_default.txt | 16 + ...mal_logicalType_valid_scale_with_default.txt | 16 + ..._logicalType_valid_scale_with_no_default.txt | 15 + .../input_avro/primitive_types_no_defaults.txt | 11 + .../primitive_types_union_with_defaults.txt | 11 + .../primitive_types_with_matching_default.txt | 11 + .../primitive_types_with_mismatch_default.txt | 11 + .../input_avro/union_and_matching_defaults.txt | 18 ++ .../input_avro/union_and_mismatch_defaults.txt | 18 ++ .../resources/input_csv/decimal_logicalType.txt | 1 + .../decimal_logicalType_missing_value.txt | 1 + .../resources/input_csv/primitive_types.txt | 1 + .../primitive_types_with_matching_default.txt | 1 + .../union_null_last_field_with_default.txt | 1 + .../union_null_middle_field_with_default.txt | 1 + .../resources/input_csv/union_with_default.txt | 1 + .../input_csv/union_with_missing_value.txt | 1 + .../nifi-registry-service/pom.xml | 48 +++ .../schemaregistry/services/SchemaRegistry.java | 46 +++ .../services/SimpleKeyValueSchemaRegistry.java | 96 ++++++ ...org.apache.nifi.controller.ControllerService | 15 + .../SimpleKeyValueSchemaRegistryTest.java | 70 +++++ nifi-nar-bundles/nifi-registry-bundle/pom.xml | 42 +++ nifi-nar-bundles/pom.xml | 1 + pom.xml | 6 + 57 files changed, 2561 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-assembly/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index 2f798fe..77722bb 100755 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -370,6 +370,11 @@ language governing permissions and limitations under the License. --> </dependency> <dependency> <groupId>org.apache.nifi</groupId> + <artifactId>nifi-registry-nar</artifactId> + <type>nar</type> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> <artifactId>nifi-hive-nar</artifactId> <type>nar</type> </dependency> http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-nar/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-nar/pom.xml b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-nar/pom.xml new file mode 100644 index 0000000..dfdf214 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-nar/pom.xml @@ -0,0 +1,31 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Copyright 2016 Hortoworks, Inc. All rights reserved. + + Hortonworks, Inc. licenses this file to you under the Apache License, + Version 2.0 (the "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software distributed + under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES + OR CONDITIONS OF ANY KIND, either express or implied. See the License for + the specific language governing permissions and limitations under the License. + See the associated NOTICE file for additional information regarding copyright + ownership. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-registry-bundle</artifactId> + <version>1.2.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-registry-nar</artifactId> + <packaging>nar</packaging> + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-registry-processors</artifactId> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-nar/src/main/resources/META-INF/LICENSE ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-nar/src/main/resources/META-INF/LICENSE b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-nar/src/main/resources/META-INF/LICENSE new file mode 100644 index 0000000..70db055 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-nar/src/main/resources/META-INF/LICENSE @@ -0,0 +1,240 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + APACHE NIFI SUBCOMPONENTS: + +The Apache NiFi project contains subcomponents with separate copyright +notices and license terms. Your use of the source code for the these +subcomponents is subject to the terms and conditions of the following +licenses. + + The binary distribution of this product bundles 'Paranamer Core' which is available + under a BSD style license. + + Copyright (c) 2006 Paul Hammant & ThoughtWorks Inc + All rights reserved. + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions + are met: + 1. Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + 2. Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + 3. Neither the name of the copyright holders nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF + THE POSSIBILITY OF SUCH DAMAGE. + http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-nar/src/main/resources/META-INF/NOTICE ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-nar/src/main/resources/META-INF/NOTICE new file mode 100644 index 0000000..73a4e4e --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-nar/src/main/resources/META-INF/NOTICE @@ -0,0 +1,66 @@ +nifi-registry-nar +Copyright 2014-2017 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). +=========================================== +Apache Software License v2 +=========================================== + +The following binary components are provided under the Apache Software License v2 + + (ASLv2) Apache Commons IO + The following NOTICE information applies: + Apache Commons IO + Copyright 2002-2016 The Apache Software Foundation + + (ASLv2) Apache Commons Lang + The following NOTICE information applies: + Apache Commons Lang + Copyright 2001-2015 The Apache Software Foundation + + (ASLv2) Apache Avro + The following NOTICE information applies: + Apache Avro + Copyright 2009-2013 The Apache Software Foundation + + (ASLv2) Apache Commons Compress + The following NOTICE information applies: + Apache Commons Compress + Copyright 2002-2014 The Apache Software Foundation + + (ASLv2) Snappy Java + The following NOTICE information applies: + This product includes software developed by Google + Snappy: http://code.google.com/p/snappy/ (New BSD License) + + This product includes software developed by Apache + PureJavaCrc32C from apache-hadoop-common http://hadoop.apache.org/ + (Apache 2.0 license) + + This library containd statically linked libstdc++. This inclusion is allowed by + "GCC RUntime Library Exception" + http://gcc.gnu.org/onlinedocs/libstdc++/manual/license.html + + (ASLv2) Jackson JSON processor + The following NOTICE information applies: + # Jackson JSON processor + + Jackson is a high-performance, Free/Open Source JSON processing library. + It was originally written by Tatu Saloranta (tatu.salora...@iki.fi), and has + been in development since 2007. + It is currently developed by a community of developers, as well as supported + commercially by FasterXML.com. + + ## Licensing + + Jackson core and extension components may licensed under different licenses. + To find the details that apply to this artifact see the accompanying LICENSE file. + For more information, including possible other licensing options, contact + FasterXML.com (http://fasterxml.com). + + ## Credits + + A list of contributors may be found from CREDITS file, which is included + in some artifacts (usually source distributions); but is always available + from the source code management (SCM) system project uses. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/pom.xml b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/pom.xml new file mode 100644 index 0000000..0ea83ee --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/pom.xml @@ -0,0 +1,74 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- Copyright 2016 Hortoworks, Inc. All rights reserved. Hortonworks, Inc. + licenses this file to you under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. You may + obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software distributed + under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES + OR CONDITIONS OF ANY KIND, either express or implied. See the License for + the specific language governing permissions and limitations under the License. + See the associated NOTICE file for additional information regarding copyright + ownership. --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-registry-bundle</artifactId> + <version>1.2.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-registry-processors</artifactId> + <packaging>jar</packaging> + + <build> + <plugins> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + <configuration> + <excludes combine.children="append"> + <exclude>src/test/resources/expected_ouput_csv/*</exclude> + <exclude>src/test/resources/input_avro/*</exclude> + <exclude>src/test/resources/input_csv/*</exclude> + </excludes> + </configuration> + </plugin> + </plugins> + </build> + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + </dependency> + <dependency> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-mapper-asl</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-registry-service</artifactId> + <version>1.2.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mock</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>pl.pragmatists</groupId> + <artifactId>JUnitParams</artifactId> + <version>1.0.5</version> + <scope>test</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AbstractCSVTransformer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AbstractCSVTransformer.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AbstractCSVTransformer.java new file mode 100644 index 0000000..54497dc --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AbstractCSVTransformer.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schemaregistry.processors; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.schemaregistry.services.SchemaRegistry; + +/** + * Base processor for implementing transform-like processors for CSV + * transformations that integrate with Schema Registry (see + * {@link SchemaRegistry}) + */ +abstract class AbstractCSVTransformer extends AbstractContentTransformer { + + static final List<PropertyDescriptor> BASE_CSV_DESCRIPTORS; + + static { + List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>(); + descriptors.addAll(BASE_DESCRIPTORS); + descriptors.add(DELIMITER); + BASE_CSV_DESCRIPTORS = Collections.unmodifiableList(descriptors); + } + + protected volatile char delimiter; + + @Override + public List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return BASE_CSV_DESCRIPTORS; + } + + @Override + @OnScheduled + public void onScheduled(ProcessContext context) { + super.onScheduled(context); + this.delimiter = context.getProperty(DELIMITER).getValue().charAt(0); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AbstractContentTransformer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AbstractContentTransformer.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AbstractContentTransformer.java new file mode 100644 index 0000000..403b52a --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AbstractContentTransformer.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schemaregistry.processors; + +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.avro.Schema; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.schemaregistry.services.SchemaRegistry; + +/** + * Base processor for implementing transform-like processors that integrate with + * Schema Registry (see {@link SchemaRegistry}) + */ +abstract class AbstractContentTransformer extends BaseContentTransformer implements RegistryCommon { + + static final List<PropertyDescriptor> BASE_DESCRIPTORS; + + static { + List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>(); + descriptors.add(REGISTRY_SERVICE); + descriptors.add(SCHEMA_NAME); + descriptors.add(SCHEMA_TYPE); + BASE_DESCRIPTORS = Collections.unmodifiableList(descriptors); + } + + volatile SchemaRegistry schemaRegistryDelegate; + + /** + * + */ + @Override + @OnScheduled + public void onScheduled(ProcessContext context) { + this.schemaRegistryDelegate = context.getProperty(REGISTRY_SERVICE).asControllerService(SchemaRegistry.class); + } + + /** + * + */ + @Override + protected Map<String, String> transform(InputStream in, OutputStream out, InvocationContextProperties contextProperties) { + Schema schema = RegistryCommon.retrieveSchema(this.schemaRegistryDelegate, contextProperties); + return this.transform(in, out, contextProperties, schema); + } + + /** + * This operation is designed to allow sub-classes to provide + * implementations that read content of the provided {@link InputStream} and + * write content (same or different) into the provided {@link OutputStream}. + * Both {@link InputStream} and {@link OutputStream} represent the content + * of the in/out {@link FlowFile} and are both required to NOT be null; + * <p> + * The returned {@link Map} represents attributes that will be added to the + * outgoing FlowFile. It can be null, in which case no attributes will be + * added to the resulting {@link FlowFile}. + * + * + * @param in + * {@link InputStream} representing data to be transformed + * @param out + * {@link OutputStream} representing target stream to wrote + * transformed data. Can be null if no output needs to be + * written. + * @param contextProperties + * instance of {@link InvocationContextProperties} + * @param schema + * instance of {@link Schema} + */ + protected abstract Map<String, String> transform(InputStream in, OutputStream out, InvocationContextProperties contextProperties, Schema schema); + + /** + * + */ + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return BASE_DESCRIPTORS; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AbstractTransformer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AbstractTransformer.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AbstractTransformer.java new file mode 100644 index 0000000..13dd4a5 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AbstractTransformer.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schemaregistry.processors; + +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.avro.Schema; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.schemaregistry.services.SchemaRegistry; + +/** + * Base processor for implementing transform-like processors that integrate with + * Schema Registry (see {@link SchemaRegistry}) + */ +abstract class AbstractTransformer extends BaseTransformer implements RegistryCommon { + + static final List<PropertyDescriptor> BASE_DESCRIPTORS; + + static { + List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>(); + descriptors.add(REGISTRY_SERVICE); + descriptors.add(SCHEMA_NAME); + BASE_DESCRIPTORS = Collections.unmodifiableList(descriptors); + } + + volatile SchemaRegistry schemaRegistryDelegate; + + /** + * + */ + @Override + @OnScheduled + public void onScheduled(ProcessContext context) { + this.schemaRegistryDelegate = context.getProperty(REGISTRY_SERVICE).asControllerService(SchemaRegistry.class); + } + + /** + * This operation is designed to allow sub-classes to provide + * implementations that read content of the provided {@link InputStream} + * that represent the content of the incoming {@link FlowFile}. + * <p> + * The returned {@link Map} represents attributes that will be added to the + * outgoing FlowFile. + * + * + * @param in + * {@link InputStream} representing data to be transformer + * @param contextProperties + * instance of {@link InvocationContextProperties} + * @param schema + * instance of avro {@link Schema} + */ + protected abstract Map<String, String> transform(InputStream in, InvocationContextProperties contextProperties, Schema schema); + + /** + * + */ + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return BASE_DESCRIPTORS; + } + + /** + * + */ + @Override + protected Map<String, String> transform(InputStream in, OutputStream out, InvocationContextProperties contextProperties) { + Schema schema = RegistryCommon.retrieveSchema(this.schemaRegistryDelegate, contextProperties); + return this.transform(in, contextProperties, schema); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AvroUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AvroUtils.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AvroUtils.java new file mode 100644 index 0000000..b967af9 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AvroUtils.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schemaregistry.processors; + +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.BinaryEncoder; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.io.EncoderFactory; +import org.apache.nifi.flowfile.FlowFile; + +/** + * Various Avro related utility operations relevant to transforming contents of + * the {@link FlowFile} between Avro formats. + */ +class AvroUtils { + + /** + * Reads provided {@link InputStream} into Avro {@link GenericRecord} + * applying provided {@link Schema} returning the resulting GenericRecord. + */ + public static GenericRecord read(InputStream in, Schema schema) { + GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema); + GenericRecord avroRecord = null; + try { + avroRecord = datumReader.read(null, DecoderFactory.get().binaryDecoder(in, null)); + return avroRecord; + } catch (Exception e) { + throw new IllegalStateException("Failed to read AVRO record", e); + } + } + + /** + * Writes provided {@link GenericRecord} into the provided + * {@link OutputStream}. + */ + public static void write(GenericRecord record, OutputStream out) { + BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null); + DatumWriter<GenericRecord> writer = new GenericDatumWriter<>(record.getSchema()); + try { + writer.write(record, encoder); + encoder.flush(); + } catch (Exception e) { + throw new IllegalStateException("Failed to write AVRO record", e); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/BaseContentTransformer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/BaseContentTransformer.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/BaseContentTransformer.java new file mode 100644 index 0000000..12586ac --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/BaseContentTransformer.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schemaregistry.processors; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.io.StreamCallback; + +/** + * Base processor which contains common functionality for processors that + * receive {@link FlowFile} and output {@link FlowFile} while also modifying the + * content of the {@link FlowFile} + */ +public abstract class BaseContentTransformer extends BaseTransformer { + + @Override + protected FlowFile doTransform(ProcessContext context, ProcessSession session, FlowFile flowFile, InvocationContextProperties contextProperties) { + AtomicReference<Map<String, String>> attributeRef = new AtomicReference<Map<String, String>>(); + flowFile = session.write(flowFile, new StreamCallback() { + @Override + public void process(InputStream in, OutputStream out) throws IOException { + attributeRef.set(transform(in, out, contextProperties)); + } + }); + if (attributeRef.get() != null) { + flowFile = session.putAllAttributes(flowFile, attributeRef.get()); + } + return flowFile; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/BaseTransformer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/BaseTransformer.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/BaseTransformer.java new file mode 100644 index 0000000..e1cc98c --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/BaseTransformer.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schemaregistry.processors; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.Processor; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; + +/** + * Base processor which contains common functionality for processors that + * receive {@link FlowFile} and output {@link FlowFile} and contain only two + * {@link Relationship}s (i.e., success and failure). Every successful execution + * of + * {@link #doTransform(ProcessContext, ProcessSession, FlowFile, InvocationContextProperties)} + * operation will result in transferring {@link FlowFile} to 'success' + * relationship while any exception will result in such file going to 'failure'. + */ +public abstract class BaseTransformer extends AbstractProcessor { + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("Successfully retrieved schema from Schema Registry") + .build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("FlowFiles that failed to find a schema are sent to this relationship") + .build(); + + private static final Set<Relationship> BASE_RELATIONSHIPS; + + static { + Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + BASE_RELATIONSHIPS = Collections.unmodifiableSet(relationships); + } + + private final Map<PropertyDescriptor, String> propertyInstanceValues = new HashMap<>(); + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile != null) { + try { + InvocationContextProperties contextProperties = new InvocationContextProperties(context, flowFile); + flowFile = this.doTransform(context, session, flowFile, contextProperties); + session.transfer(flowFile, REL_SUCCESS); + } catch (Exception e) { + this.getLogger().error("Failed FlowFile processing, routing to failure. Issue: " + e.getMessage(), e); + session.transfer(flowFile, REL_FAILURE); + } + } else { + context.yield(); + } + } + + @OnScheduled + public void onScheduled(ProcessContext context) { + List<PropertyDescriptor> propertyDescriptors = this.getSupportedPropertyDescriptors(); + for (PropertyDescriptor propertyDescriptor : propertyDescriptors) { + if (!propertyDescriptor.isExpressionLanguageSupported()){ + this.propertyInstanceValues.put(propertyDescriptor, context.getProperty(propertyDescriptor).getValue()); + } + } + } + + /** + * + */ + protected FlowFile doTransform(ProcessContext context, ProcessSession session, FlowFile flowFile, InvocationContextProperties contextProperties) { + AtomicReference<Map<String, String>> attributeRef = new AtomicReference<Map<String, String>>(); + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(InputStream in) throws IOException { + attributeRef.set(transform(in, null, contextProperties)); + } + }); + if (attributeRef.get() != null) { + flowFile = session.putAllAttributes(flowFile, attributeRef.get()); + } + return flowFile; + } + + @Override + public Set<Relationship> getRelationships() { + return BASE_RELATIONSHIPS; + } + + /** + * This operation is designed to allow sub-classes to provide + * implementations that read content of the provided {@link InputStream} and + * write content (same or different) it into the provided + * {@link OutputStream}. Both {@link InputStream} and {@link OutputStream} + * represent the content of the in/out {@link FlowFile}. The + * {@link OutputStream} can be null if no output needs to be written. + * <p> + * The returned {@link Map} represents attributes that will be added to the + * outgoing FlowFile. + * + * + * @param in + * {@link InputStream} representing data to be transformed + * @param out + * {@link OutputStream} representing target stream to wrote + * transformed data. Can be null if no output needs to be + * written. + * @param contextProperties + * instance of {@link InvocationContextProperties} + */ + protected abstract Map<String, String> transform(InputStream in, OutputStream out, InvocationContextProperties contextProperties); + + /** + * Properties object that gathers the value of the + * {@link PropertyDescriptor} within the context of + * {@link Processor#onTrigger(ProcessContext, ProcessSessionFactory)} + * invocation. It maintains the knowledge of instance properties vs. + * invocation properties that the values of which are set by evaluating + * expression against the incoming {@link FlowFile}. + */ + public class InvocationContextProperties { + private final Map<PropertyDescriptor, String> propertyInvocationValues = new HashMap<>(); + + InvocationContextProperties(ProcessContext context, FlowFile flowFile) { + List<PropertyDescriptor> propertyDescriptors = BaseTransformer.this.getSupportedPropertyDescriptors(); + for (PropertyDescriptor propertyDescriptor : propertyDescriptors) { + if (propertyDescriptor.isExpressionLanguageSupported()) { + PropertyValue value = context.getProperty(propertyDescriptor) + .evaluateAttributeExpressions(flowFile); + this.propertyInvocationValues.put(propertyDescriptor, value.getValue()); + } + } + } + + /** + * Returns the value of the property within the context of + * {@link Processor#onTrigger(ProcessContext, ProcessSessionFactory)} + * invocation. + */ + public String getPropertyValue(PropertyDescriptor propertyDescriptor, boolean notNull) { + String propertyValue = propertyInstanceValues.containsKey(propertyDescriptor) + ? propertyInstanceValues.get(propertyDescriptor) + : propertyInvocationValues.get(propertyDescriptor); + if (notNull && propertyValue == null) { + throw new IllegalArgumentException("Property '" + propertyDescriptor + "' evaluatd to null"); + } + return propertyValue; + } + + @Override + public String toString() { + return "Instance: " + propertyInstanceValues + "; Invocation: " + propertyInvocationValues; + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/CSVUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/CSVUtils.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/CSVUtils.java new file mode 100644 index 0000000..bded6fa --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/CSVUtils.java @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schemaregistry.processors; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.StringWriter; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.text.DecimalFormat; +import java.text.NumberFormat; +import java.util.ArrayList; +import java.util.List; + +import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; +import org.apache.avro.Schema.Type; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericData.Record; +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringEscapeUtils; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.node.BooleanNode; +import org.codehaus.jackson.node.DoubleNode; +import org.codehaus.jackson.node.IntNode; +import org.codehaus.jackson.node.LongNode; +import org.codehaus.jackson.node.TextNode; + +/** + * Various CSV related utility operations relevant to transforming contents of + * the {@link FlowFile} between CSV and AVRO formats. + */ +class CSVUtils { + /** + * Provides a {@link Validator} to ensure that provided value is a valid + * character. + */ + public static final Validator CHAR_VALIDATOR = new Validator() { + @Override + public ValidationResult validate(String subject, String input, ValidationContext context) { + // Allows special, escaped characters as input, which is then un-escaped and converted to a single character. + // Examples for special characters: \t (or \u0009), \f. + if (input.length() > 1) { + input = StringEscapeUtils.unescapeJava(input); + } + return new ValidationResult.Builder().subject(subject).input(input) + .explanation("Only non-null single characters are supported") + .valid(input.length() == 1 && input.charAt(0) != 0).build(); + } + }; + + public static GenericRecord read(InputStream record, char delimiter, Schema schema, char quoteChar) { + Record avroRecord = new GenericData.Record(schema); + String[] parsedRecord = parseFields(convertInputStreamToString(record), delimiter, quoteChar); + List<Field> fields = schema.getFields(); + if (parsedRecord.length != fields.size()) { + throw new IllegalStateException("Incompatible schema. Parsed fields count does not match the count of fields from schema. " + + "Schema: " + schema.toString(true) + "\n Record: " + record); + } + + for (int i = 0; i < fields.size(); i++) { + Field field = fields.get(i); + Type type = field.schema().getType(); + updateRecord(field, type, parsedRecord[i], avroRecord); + } + return avroRecord; + } + + /** + * Parses provided record into fields using provided delimiter. The + * 'quoteChar' is used to ensure that if a delimiter char is in quotes it + * will not be parsed into a separate filed. + */ + public static String[] parseFields(String record, char delimiter, char quoteChar) { + List<String> result = new ArrayList<String>(); + int start = 0; + boolean inQuotes = false; + for (int i = 0; i < record.length(); i++) { + if (record.charAt(i) == quoteChar) { + inQuotes = !inQuotes; + } + boolean atLastChar = (i == record.length() - 1); + if (atLastChar) { + if (record.charAt(i) == delimiter) { + //missing last column value, add NULL + result.add(record.substring(start,i)); + result.add(null); + } else { + result.add(record.substring(start)); + } + } else if (record.charAt(i) == delimiter && !inQuotes) { + if (start == i) { + //There is no value, so add NULL to indicated the absence of a value for this field. + result.add(null); + } else { + result.add(record.substring(start, i)); + } + start = i + 1; + } + } + return result.toArray(new String[]{}); + } + + /** + * Writes {@link GenericRecord} as CSV (delimited) record to the + * {@link OutputStream} using provided delimiter. + */ + public static void write(GenericRecord record, char delimiter, OutputStream out) { + List<Field> fields = record.getSchema().getFields(); + + String delimiterToUse = ""; + try { + for (Field field : fields) { + out.write(delimiterToUse.getBytes(StandardCharsets.UTF_8)); + Object fieldValue = record.get(field.name()); + if (null == fieldValue) { + out.write(new byte[0]); + } else { + if (Type.BYTES == field.schema().getType()) { + // need to create it from the ByteBuffer it is serialized as. + // need to ensure the type is one of the logical ones we support and if so convert it. + if(!"decimal".contentEquals(field.getProp("logicalType"))){ + throw new IllegalArgumentException("The field '" + field.name() + "' has a logical type of '" + + field.getProp("logicalType") + "' that is currently not supported."); + } + + JsonNode rawPrecision = field.getJsonProp("precision"); + if(null == rawPrecision){ + throw new IllegalArgumentException("The field '" + field.name() + "' is missing the required precision property"); + } + int precision = rawPrecision.asInt(); + JsonNode rawScale = field.getJsonProp("scale"); + int scale = null == rawScale ? 0 : rawScale.asInt(); + + // write out the decimal with the precision and scale. + NumberFormat numberFormat = DecimalFormat.getInstance(); + numberFormat.setGroupingUsed(false); + normalizeNumberFormat(numberFormat, scale, precision); + final String rawValue = new String(((ByteBuffer)fieldValue).array()); + out.write(numberFormat.format(new BigDecimal(rawValue)).getBytes(StandardCharsets.UTF_8)); + } else { + out.write(fieldValue.toString().getBytes(StandardCharsets.UTF_8)); + } + } + if (delimiterToUse.length() == 0) { + delimiterToUse = String.valueOf(delimiter); + } + } + } catch (IOException e) { + throw new IllegalStateException("Failed to parse AVRO Record", e); + } + } + + /** + * According to the 1.7.7 spec If a logical type is invalid, for example a + * decimal with scale greater than its precision,then implementations should + * ignore the logical type and use the underlying Avro type. + */ + private static void normalizeNumberFormat(NumberFormat numberFormat, int scale, int precision) { + if (scale < precision) { + // write out with the specified precision and scale. + numberFormat.setMaximumIntegerDigits(precision); + numberFormat.setMaximumFractionDigits(scale); + numberFormat.setMinimumFractionDigits(scale); + } + } + + /** + * + */ + private static String convertInputStreamToString(InputStream record) { + StringWriter writer = new StringWriter(); + try { + IOUtils.copy(record, writer, StandardCharsets.UTF_8); + } catch (Exception e) { + throw new IllegalStateException("Failed to read InputStream into String", e); + } + return writer.toString(); + } + + /** + * + */ + private static ByteBuffer encodeLogicalType(final Field field, final String fieldValue) { + String logicalType = field.getProp("logicalType"); + if (!"decimal".contentEquals(logicalType)) { + throw new IllegalArgumentException("The field '" + field.name() + "' has a logical type of '" + logicalType + + "' that is currently not supported."); + } + + JsonNode rawPrecision = field.getJsonProp("precision"); + if (null == rawPrecision) { + throw new IllegalArgumentException("The field '" + field.name() + "' is missing the required precision property"); + } + int precision = rawPrecision.asInt(); + JsonNode rawScale = field.getJsonProp("scale"); + int scale = null == rawScale ? 0 : rawScale.asInt(); + + NumberFormat numberFormat = DecimalFormat.getInstance(); + numberFormat.setGroupingUsed(false); + normalizeNumberFormat(numberFormat, scale, precision); + BigDecimal decimal = null == fieldValue ? new BigDecimal(retrieveDefaultFieldValue(field).asText()) : new BigDecimal(fieldValue); + return ByteBuffer.wrap(numberFormat.format(decimal).getBytes(StandardCharsets.UTF_8)); + } + + /** + * + */ + private static JsonNode retrieveDefaultFieldValue(Field field) { + JsonNode jsonNode = field.defaultValue(); + if (null == jsonNode) { + throw new IllegalArgumentException("The field '" + field.name() + "' is NULL and there is no default value supplied in the Avro Schema"); + } + return jsonNode; + } + + /** + * + */ + private static void updateRecord(Field field, Type type, String providedValue, Record avroRecord) { + if (Type.NULL != type) { + Object value; + if (Type.INT == type) { + value = null == providedValue ? possiblyGetDefaultValue(field, IntNode.class).getIntValue() + : Integer.parseInt(providedValue); + avroRecord.put(field.name(), value); + } else if (Type.BOOLEAN == type) { + value = null == providedValue + ? possiblyGetDefaultValue(field, BooleanNode.class).getBooleanValue() + : Boolean.parseBoolean(providedValue); + avroRecord.put(field.name(), value); + } else if (Type.DOUBLE == type) { + value = null == providedValue ? possiblyGetDefaultValue(field, DoubleNode.class).getDoubleValue() + : Double.parseDouble(providedValue); + avroRecord.put(field.name(), value); + } else if (Type.FLOAT == type) { + value = null == providedValue ? possiblyGetDefaultValue(field, DoubleNode.class).getDoubleValue() + : Float.parseFloat(providedValue); + avroRecord.put(field.name(), value); + } else if (Type.LONG == type) { + value = null == providedValue ? possiblyGetDefaultValue(field, LongNode.class).getLongValue() + : Long.parseLong(providedValue); + avroRecord.put(field.name(), value); + } else if (Type.STRING == type) { + value = null == providedValue ? possiblyGetDefaultValue(field, TextNode.class).getTextValue() + : providedValue; + avroRecord.put(field.name(), value); + } else if (Type.BYTES == type) { + value = encodeLogicalType(field, providedValue); + avroRecord.put(field.name(), value); + } else if (Type.UNION == type) { + field.schema().getTypes() + .forEach(schema -> updateRecord(field, schema.getType(), providedValue, avroRecord)); + } else if (Type.ARRAY == type || Type.ENUM == type || Type.FIXED == type || Type.MAP == type + || Type.NULL == type || Type.RECORD == type) { + throw new IllegalArgumentException("The field type '" + type + "' is not supported at the moment"); + } else { + avroRecord.put(field.name(), providedValue); + } + } + } + + /** + * Check to see if there is a default value to use, if not will throw + * {@link IllegalArgumentException} + */ + private static <T extends JsonNode> JsonNode possiblyGetDefaultValue(Field field, Class<T> expectedDefaultType) { + JsonNode jsonNode = retrieveDefaultFieldValue(field); + if (field.schema().getType() != Type.UNION && !expectedDefaultType.isAssignableFrom(jsonNode.getClass())) { + // since we do not support schema evolution here we need to throw an + // exception here as the data is in error. + throw new IllegalArgumentException("The field '" + field.name() + "' has a default value that " + + "does not match the field type. Field Type is: '" + expectedDefaultType.getName() + "' and the " + + "default value type is: '" + field.defaultValue().toString()); + } + return jsonNode; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/ExtractAvroFields.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/ExtractAvroFields.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/ExtractAvroFields.java new file mode 100644 index 0000000..2ab83c5 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/ExtractAvroFields.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schemaregistry.processors; + +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.util.StandardValidators; + +@Tags({ "registry", "schema", "avro", "extract", "evaluate" }) +@CapabilityDescription("Extracts Avro field and assigns it to the FlowFile attribute") +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@DynamicProperty(name = "Avro field name", value = "FlowFile attribute name to set the extracted field", + description = "The value of the Avro field specified by 'Avro field name' will be extracted and set as " + + "FlowFile attribute under name specified by the value of this property.") +public final class ExtractAvroFields extends AbstractTransformer { + + private static final List<PropertyDescriptor> DESCRIPTORS; + + static { + List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>(); + descriptors.addAll(BASE_DESCRIPTORS); + descriptors.add(SCHEMA_TYPE); + DESCRIPTORS = Collections.unmodifiableList(descriptors); + } + + private volatile Map<String, String> dynamicProperties; + + /** + * + */ + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return DESCRIPTORS; + } + + /** + * + */ + @Override + @OnScheduled + public void onScheduled(ProcessContext context) { + super.onScheduled(context); + this.dynamicProperties = context.getProperties().entrySet().stream() + .filter(p -> p.getKey().isDynamic()) + .collect(Collectors.toMap(p -> p.getKey().getName(), p -> p.getValue())); + } + + /** + * + */ + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .expressionLanguageSupported(false) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .required(false) + .dynamic(true) + .build(); + } + + /** + * + */ + @Override + protected Map<String, String> transform(InputStream in, InvocationContextProperties contextProperties, Schema schema) { + GenericRecord avroRecord = AvroUtils.read(in, schema); + Map<String, String> attributes = this.dynamicProperties.entrySet().stream().collect( + Collectors.toMap(dProp -> dProp.getValue(), dProp -> String.valueOf(avroRecord.get(dProp.getKey())))); + return Collections.unmodifiableMap(attributes); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/JsonUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/JsonUtils.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/JsonUtils.java new file mode 100644 index 0000000..81c98b3 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/JsonUtils.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schemaregistry.processors; + +import java.io.DataInputStream; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.io.JsonEncoder; +import org.apache.nifi.flowfile.FlowFile; + +/** + * Various Json related utility operations relevant to transforming contents of + * the {@link FlowFile} between JSON and AVRO formats. + */ +class JsonUtils { + + /** + * Writes provided {@link GenericRecord} into the provided + * {@link OutputStream} as JSON. + */ + public static void write(GenericRecord record, OutputStream out) { + try { + DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(record.getSchema()); + JsonEncoder encoder = EncoderFactory.get().jsonEncoder(record.getSchema(), out); + writer.write(record, encoder); + encoder.flush(); + } catch (Exception e) { + throw new IllegalStateException("Failed to read GenericRecord", e); + } + } + + /** + * Reads provided {@link InputStream} as ISON into Avro + * {@link GenericRecord} applying provided {@link Schema} returning the + * resulting GenericRecord. + */ + public static GenericRecord read(InputStream jsonIs, Schema schema) { + DataInputStream din = new DataInputStream(jsonIs); + try { + Decoder decoder = DecoderFactory.get().jsonDecoder(schema, din); + DatumReader<GenericData.Record> reader = new GenericDatumReader<>(schema); + return reader.read(null, decoder); + } catch (Exception e) { + throw new IllegalStateException("Failed to parse incoming Json input stream into Avro GenericRecord. " + + "Possible reason: the value may not be a valid JSON or incompatible schema is provided. Schema was '" + + schema.toString(true) + "'.", e); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/RegistryCommon.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/RegistryCommon.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/RegistryCommon.java new file mode 100644 index 0000000..3fc1530 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/RegistryCommon.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schemaregistry.processors; + +import org.apache.avro.Schema; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schemaregistry.processors.BaseTransformer.InvocationContextProperties; +import org.apache.nifi.schemaregistry.services.SchemaRegistry; + +/** + * Strategy that encapsulates common properties and functionalities used by all + * processors that integrate with Schema Registry. + */ +interface RegistryCommon { + + static final String SCHEMA_ATTRIBUTE_NAME = "schema.text"; + + static final PropertyDescriptor REGISTRY_SERVICE = new PropertyDescriptor.Builder() + .name("schema-registry-service") + .displayName("Schema Registry Service") + .description("The Schema Registry Service for serializing/deserializing messages as well as schema retrieval.") + .required(true) + .identifiesControllerService(SchemaRegistry.class) + .build(); + + static final PropertyDescriptor SCHEMA_NAME = new PropertyDescriptor.Builder() + .name("schema-name") + .displayName("Schema Name") + .description("The name of schema.") + .required(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + static final PropertyDescriptor SCHEMA_TYPE = new PropertyDescriptor.Builder() + .name("schema-type") + .displayName("Schema Type") + .description("The type of schema (avro is the the only current supported schema).") + .required(true) + .allowableValues("avro") + .defaultValue("avro") + .expressionLanguageSupported(true) + .build(); + + static final PropertyDescriptor DELIMITER = new PropertyDescriptor.Builder() + .name("csv-delimiter") + .displayName("CSV delimiter") + .description("Delimiter character for CSV records") + .addValidator(CSVUtils.CHAR_VALIDATOR) + .defaultValue(",") + .build(); + + static final PropertyDescriptor QUOTE = new PropertyDescriptor.Builder() + .name("csv-quote-character") + .displayName("CSV quote character") + .description("Quote character for CSV values") + .addValidator(CSVUtils.CHAR_VALIDATOR) + .defaultValue("\"") + .build(); + /** + * Utility operation to retrieve and parse {@link Schema} from Schema + * Registry using provided {@link SchemaRegistry}; + */ + static Schema retrieveSchema(SchemaRegistry schemaRegistry, InvocationContextProperties contextProperties) { + String schemaName = contextProperties.getPropertyValue(SCHEMA_NAME, true); + String schemaText = schemaRegistry.retrieveSchemaText(schemaName); + return new Schema.Parser().parse(schemaText); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformAvroToCSV.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformAvroToCSV.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformAvroToCSV.java new file mode 100644 index 0000000..aa0d418 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformAvroToCSV.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schemaregistry.processors; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Collections; +import java.util.Map; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.io.IOUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.flowfile.attributes.CoreAttributes; + +@Tags({ "registry", "schema", "avro", "csv", "transform" }) +@CapabilityDescription("Transforms AVRO content of the Flow File to CSV using the schema provided by the Schema Registry Service.") +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +public final class TransformAvroToCSV extends AbstractCSVTransformer { + + /** + * + */ + @Override + protected Map<String, String> transform(InputStream in, OutputStream out, InvocationContextProperties contextProperties, Schema schema) { + byte[] buff = null; + try { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + IOUtils.copy(in, bos); + buff = bos.toByteArray(); + } catch (Exception e) { + e.printStackTrace(); + } + ByteArrayInputStream is = new ByteArrayInputStream(buff); + GenericRecord avroRecord = AvroUtils.read(is, schema); + CSVUtils.write(avroRecord, this.delimiter, out); + return Collections.singletonMap(CoreAttributes.MIME_TYPE.key(), "text/csv"); + } +}