NIFI-1280 added support for RecordSchema in SchemaRegistry Signed-off-by: Mark Payne <marka...@hotmail.com> Signed-off-by: Matt Burgess <mattyb...@apache.org>
NIFI-1280: Updated SimpleKeyValueSchemaRegistry to make use of new CHOICE RecordFieldType - Update Record Readers to use SchemaRegistry controller service. Moved SchemaRegistry api into its own maven module and added to standard-services-api so that we can properly add dependencies on it. Code cleanup and bug fixes Signed-off-by: Matt Burgess <mattyb...@apache.org> NIFI-1280: Fixed checkstyle violations and license exclusions for RAT plugin Signed-off-by: Matt Burgess <mattyb...@apache.org> NIFI-1280: Addressed feedback from PR Review Signed-off-by: Matt Burgess <mattyb...@apache.org> NIFI-1280: Additional changes/doc to support QueryFlowFile and Record Readers/Writers This closes #1652 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/68c592ea Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/68c592ea Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/68c592ea Branch: refs/heads/master Commit: 68c592ea43d30754ec07c42cf10563fe9db185ae Parents: a88d3bf Author: Oleg Zhurakousky <o...@suitcase.io> Authored: Thu Mar 30 09:12:07 2017 -0400 Committer: Matt Burgess <mattyb...@apache.org> Committed: Tue Apr 11 19:29:35 2017 -0400 ---------------------------------------------------------------------- .../apache/nifi/util/MockProcessSession.java | 4 +- .../nifi/util/TestMockProcessSession.java | 16 +- nifi-nar-bundles/nifi-hive-bundle/pom.xml | 10 + .../nifi-registry-nar/pom.xml | 7 +- .../nifi-registry-processors/pom.xml | 74 --- .../processors/AbstractCSVTransformer.java | 57 -- .../processors/AbstractContentTransformer.java | 101 --- .../processors/AbstractTransformer.java | 93 --- .../schemaregistry/processors/AvroUtils.java | 67 -- .../processors/BaseContentTransformer.java | 51 -- .../processors/BaseTransformer.java | 189 ------ .../schemaregistry/processors/CSVUtils.java | 302 --------- .../processors/ExtractAvroFields.java | 100 --- .../schemaregistry/processors/JsonUtils.java | 74 --- .../processors/RegistryCommon.java | 84 --- .../processors/TransformAvroToCSV.java | 57 -- .../processors/TransformAvroToJson.java | 46 -- .../processors/TransformCSVToAvro.java | 80 --- .../processors/TransformCSVToJson.java | 80 --- .../processors/TransformJsonToAvro.java | 45 -- .../processors/TransformJsonToCSV.java | 45 -- .../org.apache.nifi.processor.Processor | 21 - .../processors/TransformersTest.java | 220 ------- .../expected_ouput_csv/decimal_logicalType.txt | 1 - .../decimal_logicalType_invalid_scale.txt | 1 - ...mal_logicalType_valid_scale_with_default.txt | 1 - .../decimal_logicalType_with_default.txt | 1 - .../expected_ouput_csv/primitive_types.txt | 1 - .../primitive_types_with_matching_default.txt | 1 - .../union_null_last_field_with_default.txt | 1 - .../union_null_middle_field_with_default.txt | 1 - .../expected_ouput_csv/union_with_default.txt | 1 - ...l_logicalType_invalid_scale_with_default.txt | 16 - ...mal_logicalType_valid_scale_with_default.txt | 16 - ..._logicalType_valid_scale_with_no_default.txt | 15 - .../input_avro/primitive_types_no_defaults.txt | 11 - .../primitive_types_union_with_defaults.txt | 11 - .../primitive_types_with_matching_default.txt | 11 - .../primitive_types_with_mismatch_default.txt | 11 - .../input_avro/union_and_matching_defaults.txt | 18 - .../input_avro/union_and_mismatch_defaults.txt | 18 - .../resources/input_csv/decimal_logicalType.txt | 1 - .../decimal_logicalType_missing_value.txt | 1 - .../resources/input_csv/primitive_types.txt | 1 - .../primitive_types_with_matching_default.txt | 1 - .../union_null_last_field_with_default.txt | 1 - .../union_null_middle_field_with_default.txt | 1 - .../resources/input_csv/union_with_default.txt | 1 - .../input_csv/union_with_missing_value.txt | 1 - .../nifi-registry-service/pom.xml | 12 + .../services/AvroSchemaRegistry.java | 217 +++++++ .../services/AvroSchemaValidator.java | 57 ++ .../schemaregistry/services/SchemaRegistry.java | 46 -- .../services/SimpleKeyValueSchemaRegistry.java | 96 --- ...org.apache.nifi.controller.ControllerService | 2 +- .../SimpleKeyValueSchemaRegistryTest.java | 70 --- .../services/TestAvroSchemaRegistry.java | 111 ++++ nifi-nar-bundles/nifi-registry-bundle/pom.xml | 5 - .../nifi-standard-processors/pom.xml | 4 +- .../nifi/processors/standard/QueryFlowFile.java | 23 +- .../nifi/queryflowfile/FlowFileEnumerator.java | 6 +- .../nifi/queryflowfile/FlowFileTable.java | 4 +- .../additionalDetails.html | 3 +- .../processors/standard/TestQueryFlowFile.java | 130 +--- .../standard/util/record/MockRecordParser.java | 107 ++++ .../standard/util/record/MockRecordWriter.java | 80 +++ .../pom.xml | 2 +- .../apache/nifi/serialization/RecordReader.java | 1 - .../apache/nifi/serialization/RecordWriter.java | 2 +- .../serialization/RowRecordReaderFactory.java | 7 +- .../nifi/serialization/record/DataType.java | 31 +- .../nifi/serialization/record/MapRecord.java | 201 +----- .../nifi/serialization/record/Record.java | 4 +- .../serialization/record/RecordFieldType.java | 208 ++++++- .../record/ResultSetRecordSet.java | 168 ++++- .../record/type/ArrayDataType.java | 67 ++ .../record/type/ChoiceDataType.java | 68 +++ .../record/type/RecordDataType.java | 63 ++ .../record/util/DataTypeUtils.java | 608 +++++++++++++++++++ .../util/IllegalTypeConversionException.java | 29 + .../src/main/resources/META-INF/NOTICE | 11 +- .../.gitignore | 1 - .../nifi-record-serialization-services/pom.xml | 19 +- .../java/org/apache/nifi/avro/AvroReader.java | 13 +- .../org/apache/nifi/avro/AvroRecordReader.java | 177 +++--- .../apache/nifi/avro/AvroRecordSetWriter.java | 15 +- .../java/org/apache/nifi/avro/AvroTypeUtil.java | 159 +++++ .../org/apache/nifi/avro/WriteAvroResult.java | 308 ++++------ .../java/org/apache/nifi/csv/CSVReader.java | 58 +- .../org/apache/nifi/csv/CSVRecordReader.java | 197 ++---- .../org/apache/nifi/csv/CSVRecordSetWriter.java | 40 +- .../main/java/org/apache/nifi/csv/CSVUtils.java | 204 +++++++ .../nifi/csv/SingleCharacterValidator.java | 62 ++ .../org/apache/nifi/csv/WriteCSVResult.java | 33 +- .../java/org/apache/nifi/grok/GrokReader.java | 26 +- .../org/apache/nifi/grok/GrokRecordReader.java | 54 +- .../nifi/json/AbstractJsonRowRecordReader.java | 127 +--- .../org/apache/nifi/json/JsonPathReader.java | 47 +- .../nifi/json/JsonPathRowRecordReader.java | 241 ++------ .../org/apache/nifi/json/JsonPathValidator.java | 12 - .../apache/nifi/json/JsonRecordSetWriter.java | 8 +- .../org/apache/nifi/json/JsonTreeReader.java | 54 +- .../nifi/json/JsonTreeRowRecordReader.java | 121 ++-- .../org/apache/nifi/json/PropertyNameUtil.java | 88 --- .../org/apache/nifi/json/WriteJsonResult.java | 162 ++--- .../serialization/AbstractRecordSetWriter.java | 84 --- .../nifi/serialization/DataTypeUtils.java | 165 ----- .../DateTimeTextRecordSetWriter.java | 57 ++ .../nifi/serialization/DateTimeUtils.java | 50 ++ .../SchemaRegistryRecordReader.java | 110 ++++ .../UserTypeOverrideRowReader.java | 78 --- .../nifi/text/FreeFormTextRecordSetWriter.java | 6 +- .../additionalDetails.html | 150 +++-- .../additionalDetails.html | 97 +-- .../additionalDetails.html | 318 +++++----- .../additionalDetails.html | 281 ++++++--- .../apache/nifi/avro/TestAvroRecordReader.java | 121 +++- .../apache/nifi/avro/TestWriteAvroResult.java | 202 ++++++ .../apache/nifi/csv/TestCSVRecordReader.java | 94 +-- .../org/apache/nifi/csv/TestWriteCSVResult.java | 23 +- .../apache/nifi/grok/TestGrokRecordReader.java | 11 +- .../nifi/json/TestJsonPathRowRecordReader.java | 109 +++- .../nifi/json/TestJsonTreeRowRecordReader.java | 104 +++- .../apache/nifi/json/TestWriteJsonResult.java | 9 +- .../src/test/resources/avro/datatypes.avsc | 47 ++ .../src/test/resources/avro/logical-types.avsc | 34 ++ .../nifi-schema-registry-service-api/pom.xml | 32 + .../schemaregistry/services/SchemaRegistry.java | 53 ++ .../nifi-standard-services-api-nar/pom.xml | 5 + nifi-nar-bundles/nifi-standard-services/pom.xml | 1 + pom.xml | 12 +- 131 files changed, 4423 insertions(+), 4466 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java index 7dd9714..faf6e42 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java @@ -218,8 +218,8 @@ public class MockProcessSession implements ProcessSession { } } - // throw new FlowFileHandlingException("Cannot commit session because the following Input Streams were created via " - // + "calls to ProcessSession.read(FlowFile) and never closed: " + openStreamCopy); + throw new FlowFileHandlingException("Cannot commit session because the following Input Streams were created via " + + "calls to ProcessSession.read(FlowFile) and never closed: " + openStreamCopy); } committed = true; http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java b/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java index e16afb3..d1c2bea 100644 --- a/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java +++ b/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java @@ -16,6 +16,14 @@ */ package org.apache.nifi.util; +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; + import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; @@ -28,14 +36,6 @@ import org.apache.nifi.stream.io.StreamUtils; import org.junit.Assert; import org.junit.Test; -import static org.junit.Assert.assertEquals; - -import java.io.IOException; -import java.io.InputStream; -import java.util.Collections; -import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; - public class TestMockProcessSession { @Test(expected = AssertionError.class) http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-hive-bundle/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hive-bundle/pom.xml b/nifi-nar-bundles/nifi-hive-bundle/pom.xml index 5c3f2a5..342e7ed 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/pom.xml +++ b/nifi-nar-bundles/nifi-hive-bundle/pom.xml @@ -38,6 +38,16 @@ <module>nifi-hive-nar</module> </modules> + <dependencyManagement> + <dependencies> + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + <version>1.7.7</version> + </dependency> + </dependencies> + </dependencyManagement> + <build> <plugins> <plugin> http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-nar/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-nar/pom.xml b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-nar/pom.xml index dfdf214..0780c85 100644 --- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-nar/pom.xml +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-nar/pom.xml @@ -25,7 +25,12 @@ <dependencies> <dependency> <groupId>org.apache.nifi</groupId> - <artifactId>nifi-registry-processors</artifactId> + <artifactId>nifi-standard-services-api-nar</artifactId> + <type>nar</type> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-registry-service</artifactId> </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/pom.xml b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/pom.xml deleted file mode 100644 index 0ea83ee..0000000 --- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/pom.xml +++ /dev/null @@ -1,74 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- Copyright 2016 Hortoworks, Inc. All rights reserved. Hortonworks, Inc. - licenses this file to you under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. You may - obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software distributed - under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES - OR CONDITIONS OF ANY KIND, either express or implied. See the License for - the specific language governing permissions and limitations under the License. - See the associated NOTICE file for additional information regarding copyright - ownership. --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-registry-bundle</artifactId> - <version>1.2.0-SNAPSHOT</version> - </parent> - - <artifactId>nifi-registry-processors</artifactId> - <packaging>jar</packaging> - - <build> - <plugins> - <plugin> - <groupId>org.apache.rat</groupId> - <artifactId>apache-rat-plugin</artifactId> - <configuration> - <excludes combine.children="append"> - <exclude>src/test/resources/expected_ouput_csv/*</exclude> - <exclude>src/test/resources/input_avro/*</exclude> - <exclude>src/test/resources/input_csv/*</exclude> - </excludes> - </configuration> - </plugin> - </plugins> - </build> - <dependencies> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-api</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-utils</artifactId> - </dependency> - <dependency> - <groupId>org.apache.avro</groupId> - <artifactId>avro</artifactId> - </dependency> - <dependency> - <groupId>org.codehaus.jackson</groupId> - <artifactId>jackson-mapper-asl</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-registry-service</artifactId> - <version>1.2.0-SNAPSHOT</version> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-mock</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>pl.pragmatists</groupId> - <artifactId>JUnitParams</artifactId> - <version>1.0.5</version> - <scope>test</scope> - </dependency> - </dependencies> -</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AbstractCSVTransformer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AbstractCSVTransformer.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AbstractCSVTransformer.java deleted file mode 100644 index 54497dc..0000000 --- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AbstractCSVTransformer.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.schemaregistry.processors; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -import org.apache.nifi.annotation.lifecycle.OnScheduled; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.schemaregistry.services.SchemaRegistry; - -/** - * Base processor for implementing transform-like processors for CSV - * transformations that integrate with Schema Registry (see - * {@link SchemaRegistry}) - */ -abstract class AbstractCSVTransformer extends AbstractContentTransformer { - - static final List<PropertyDescriptor> BASE_CSV_DESCRIPTORS; - - static { - List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>(); - descriptors.addAll(BASE_DESCRIPTORS); - descriptors.add(DELIMITER); - BASE_CSV_DESCRIPTORS = Collections.unmodifiableList(descriptors); - } - - protected volatile char delimiter; - - @Override - public List<PropertyDescriptor> getSupportedPropertyDescriptors() { - return BASE_CSV_DESCRIPTORS; - } - - @Override - @OnScheduled - public void onScheduled(ProcessContext context) { - super.onScheduled(context); - this.delimiter = context.getProperty(DELIMITER).getValue().charAt(0); - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AbstractContentTransformer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AbstractContentTransformer.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AbstractContentTransformer.java deleted file mode 100644 index 403b52a..0000000 --- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AbstractContentTransformer.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.schemaregistry.processors; - -import java.io.InputStream; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; - -import org.apache.avro.Schema; -import org.apache.nifi.annotation.lifecycle.OnScheduled; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.schemaregistry.services.SchemaRegistry; - -/** - * Base processor for implementing transform-like processors that integrate with - * Schema Registry (see {@link SchemaRegistry}) - */ -abstract class AbstractContentTransformer extends BaseContentTransformer implements RegistryCommon { - - static final List<PropertyDescriptor> BASE_DESCRIPTORS; - - static { - List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>(); - descriptors.add(REGISTRY_SERVICE); - descriptors.add(SCHEMA_NAME); - descriptors.add(SCHEMA_TYPE); - BASE_DESCRIPTORS = Collections.unmodifiableList(descriptors); - } - - volatile SchemaRegistry schemaRegistryDelegate; - - /** - * - */ - @Override - @OnScheduled - public void onScheduled(ProcessContext context) { - this.schemaRegistryDelegate = context.getProperty(REGISTRY_SERVICE).asControllerService(SchemaRegistry.class); - } - - /** - * - */ - @Override - protected Map<String, String> transform(InputStream in, OutputStream out, InvocationContextProperties contextProperties) { - Schema schema = RegistryCommon.retrieveSchema(this.schemaRegistryDelegate, contextProperties); - return this.transform(in, out, contextProperties, schema); - } - - /** - * This operation is designed to allow sub-classes to provide - * implementations that read content of the provided {@link InputStream} and - * write content (same or different) into the provided {@link OutputStream}. - * Both {@link InputStream} and {@link OutputStream} represent the content - * of the in/out {@link FlowFile} and are both required to NOT be null; - * <p> - * The returned {@link Map} represents attributes that will be added to the - * outgoing FlowFile. It can be null, in which case no attributes will be - * added to the resulting {@link FlowFile}. - * - * - * @param in - * {@link InputStream} representing data to be transformed - * @param out - * {@link OutputStream} representing target stream to wrote - * transformed data. Can be null if no output needs to be - * written. - * @param contextProperties - * instance of {@link InvocationContextProperties} - * @param schema - * instance of {@link Schema} - */ - protected abstract Map<String, String> transform(InputStream in, OutputStream out, InvocationContextProperties contextProperties, Schema schema); - - /** - * - */ - @Override - protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { - return BASE_DESCRIPTORS; - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AbstractTransformer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AbstractTransformer.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AbstractTransformer.java deleted file mode 100644 index 13dd4a5..0000000 --- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AbstractTransformer.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.schemaregistry.processors; - -import java.io.InputStream; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; - -import org.apache.avro.Schema; -import org.apache.nifi.annotation.lifecycle.OnScheduled; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.schemaregistry.services.SchemaRegistry; - -/** - * Base processor for implementing transform-like processors that integrate with - * Schema Registry (see {@link SchemaRegistry}) - */ -abstract class AbstractTransformer extends BaseTransformer implements RegistryCommon { - - static final List<PropertyDescriptor> BASE_DESCRIPTORS; - - static { - List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>(); - descriptors.add(REGISTRY_SERVICE); - descriptors.add(SCHEMA_NAME); - BASE_DESCRIPTORS = Collections.unmodifiableList(descriptors); - } - - volatile SchemaRegistry schemaRegistryDelegate; - - /** - * - */ - @Override - @OnScheduled - public void onScheduled(ProcessContext context) { - this.schemaRegistryDelegate = context.getProperty(REGISTRY_SERVICE).asControllerService(SchemaRegistry.class); - } - - /** - * This operation is designed to allow sub-classes to provide - * implementations that read content of the provided {@link InputStream} - * that represent the content of the incoming {@link FlowFile}. - * <p> - * The returned {@link Map} represents attributes that will be added to the - * outgoing FlowFile. - * - * - * @param in - * {@link InputStream} representing data to be transformer - * @param contextProperties - * instance of {@link InvocationContextProperties} - * @param schema - * instance of avro {@link Schema} - */ - protected abstract Map<String, String> transform(InputStream in, InvocationContextProperties contextProperties, Schema schema); - - /** - * - */ - @Override - protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { - return BASE_DESCRIPTORS; - } - - /** - * - */ - @Override - protected Map<String, String> transform(InputStream in, OutputStream out, InvocationContextProperties contextProperties) { - Schema schema = RegistryCommon.retrieveSchema(this.schemaRegistryDelegate, contextProperties); - return this.transform(in, contextProperties, schema); - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AvroUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AvroUtils.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AvroUtils.java deleted file mode 100644 index b967af9..0000000 --- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AvroUtils.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.schemaregistry.processors; - -import java.io.InputStream; -import java.io.OutputStream; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericDatumReader; -import org.apache.avro.generic.GenericDatumWriter; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.io.BinaryEncoder; -import org.apache.avro.io.DatumWriter; -import org.apache.avro.io.DecoderFactory; -import org.apache.avro.io.EncoderFactory; -import org.apache.nifi.flowfile.FlowFile; - -/** - * Various Avro related utility operations relevant to transforming contents of - * the {@link FlowFile} between Avro formats. - */ -class AvroUtils { - - /** - * Reads provided {@link InputStream} into Avro {@link GenericRecord} - * applying provided {@link Schema} returning the resulting GenericRecord. - */ - public static GenericRecord read(InputStream in, Schema schema) { - GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema); - GenericRecord avroRecord = null; - try { - avroRecord = datumReader.read(null, DecoderFactory.get().binaryDecoder(in, null)); - return avroRecord; - } catch (Exception e) { - throw new IllegalStateException("Failed to read AVRO record", e); - } - } - - /** - * Writes provided {@link GenericRecord} into the provided - * {@link OutputStream}. - */ - public static void write(GenericRecord record, OutputStream out) { - BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null); - DatumWriter<GenericRecord> writer = new GenericDatumWriter<>(record.getSchema()); - try { - writer.write(record, encoder); - encoder.flush(); - } catch (Exception e) { - throw new IllegalStateException("Failed to write AVRO record", e); - } - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/BaseContentTransformer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/BaseContentTransformer.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/BaseContentTransformer.java deleted file mode 100644 index 12586ac..0000000 --- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/BaseContentTransformer.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.schemaregistry.processors; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Map; -import java.util.concurrent.atomic.AtomicReference; - -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.io.StreamCallback; - -/** - * Base processor which contains common functionality for processors that - * receive {@link FlowFile} and output {@link FlowFile} while also modifying the - * content of the {@link FlowFile} - */ -public abstract class BaseContentTransformer extends BaseTransformer { - - @Override - protected FlowFile doTransform(ProcessContext context, ProcessSession session, FlowFile flowFile, InvocationContextProperties contextProperties) { - AtomicReference<Map<String, String>> attributeRef = new AtomicReference<Map<String, String>>(); - flowFile = session.write(flowFile, new StreamCallback() { - @Override - public void process(InputStream in, OutputStream out) throws IOException { - attributeRef.set(transform(in, out, contextProperties)); - } - }); - if (attributeRef.get() != null) { - flowFile = session.putAllAttributes(flowFile, attributeRef.get()); - } - return flowFile; - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/BaseTransformer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/BaseTransformer.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/BaseTransformer.java deleted file mode 100644 index e1cc98c..0000000 --- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/BaseTransformer.java +++ /dev/null @@ -1,189 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.schemaregistry.processors; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.atomic.AtomicReference; - -import org.apache.nifi.annotation.lifecycle.OnScheduled; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.PropertyValue; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.processor.AbstractProcessor; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.ProcessSessionFactory; -import org.apache.nifi.processor.Processor; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.io.InputStreamCallback; - -/** - * Base processor which contains common functionality for processors that - * receive {@link FlowFile} and output {@link FlowFile} and contain only two - * {@link Relationship}s (i.e., success and failure). Every successful execution - * of - * {@link #doTransform(ProcessContext, ProcessSession, FlowFile, InvocationContextProperties)} - * operation will result in transferring {@link FlowFile} to 'success' - * relationship while any exception will result in such file going to 'failure'. - */ -public abstract class BaseTransformer extends AbstractProcessor { - - public static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("Successfully retrieved schema from Schema Registry") - .build(); - - public static final Relationship REL_FAILURE = new Relationship.Builder() - .name("failure") - .description("FlowFiles that failed to find a schema are sent to this relationship") - .build(); - - private static final Set<Relationship> BASE_RELATIONSHIPS; - - static { - Set<Relationship> relationships = new HashSet<>(); - relationships.add(REL_SUCCESS); - relationships.add(REL_FAILURE); - BASE_RELATIONSHIPS = Collections.unmodifiableSet(relationships); - } - - private final Map<PropertyDescriptor, String> propertyInstanceValues = new HashMap<>(); - - @Override - public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { - FlowFile flowFile = session.get(); - if (flowFile != null) { - try { - InvocationContextProperties contextProperties = new InvocationContextProperties(context, flowFile); - flowFile = this.doTransform(context, session, flowFile, contextProperties); - session.transfer(flowFile, REL_SUCCESS); - } catch (Exception e) { - this.getLogger().error("Failed FlowFile processing, routing to failure. Issue: " + e.getMessage(), e); - session.transfer(flowFile, REL_FAILURE); - } - } else { - context.yield(); - } - } - - @OnScheduled - public void onScheduled(ProcessContext context) { - List<PropertyDescriptor> propertyDescriptors = this.getSupportedPropertyDescriptors(); - for (PropertyDescriptor propertyDescriptor : propertyDescriptors) { - if (!propertyDescriptor.isExpressionLanguageSupported()){ - this.propertyInstanceValues.put(propertyDescriptor, context.getProperty(propertyDescriptor).getValue()); - } - } - } - - /** - * - */ - protected FlowFile doTransform(ProcessContext context, ProcessSession session, FlowFile flowFile, InvocationContextProperties contextProperties) { - AtomicReference<Map<String, String>> attributeRef = new AtomicReference<Map<String, String>>(); - session.read(flowFile, new InputStreamCallback() { - @Override - public void process(InputStream in) throws IOException { - attributeRef.set(transform(in, null, contextProperties)); - } - }); - if (attributeRef.get() != null) { - flowFile = session.putAllAttributes(flowFile, attributeRef.get()); - } - return flowFile; - } - - @Override - public Set<Relationship> getRelationships() { - return BASE_RELATIONSHIPS; - } - - /** - * This operation is designed to allow sub-classes to provide - * implementations that read content of the provided {@link InputStream} and - * write content (same or different) it into the provided - * {@link OutputStream}. Both {@link InputStream} and {@link OutputStream} - * represent the content of the in/out {@link FlowFile}. The - * {@link OutputStream} can be null if no output needs to be written. - * <p> - * The returned {@link Map} represents attributes that will be added to the - * outgoing FlowFile. - * - * - * @param in - * {@link InputStream} representing data to be transformed - * @param out - * {@link OutputStream} representing target stream to wrote - * transformed data. Can be null if no output needs to be - * written. - * @param contextProperties - * instance of {@link InvocationContextProperties} - */ - protected abstract Map<String, String> transform(InputStream in, OutputStream out, InvocationContextProperties contextProperties); - - /** - * Properties object that gathers the value of the - * {@link PropertyDescriptor} within the context of - * {@link Processor#onTrigger(ProcessContext, ProcessSessionFactory)} - * invocation. It maintains the knowledge of instance properties vs. - * invocation properties that the values of which are set by evaluating - * expression against the incoming {@link FlowFile}. - */ - public class InvocationContextProperties { - private final Map<PropertyDescriptor, String> propertyInvocationValues = new HashMap<>(); - - InvocationContextProperties(ProcessContext context, FlowFile flowFile) { - List<PropertyDescriptor> propertyDescriptors = BaseTransformer.this.getSupportedPropertyDescriptors(); - for (PropertyDescriptor propertyDescriptor : propertyDescriptors) { - if (propertyDescriptor.isExpressionLanguageSupported()) { - PropertyValue value = context.getProperty(propertyDescriptor) - .evaluateAttributeExpressions(flowFile); - this.propertyInvocationValues.put(propertyDescriptor, value.getValue()); - } - } - } - - /** - * Returns the value of the property within the context of - * {@link Processor#onTrigger(ProcessContext, ProcessSessionFactory)} - * invocation. - */ - public String getPropertyValue(PropertyDescriptor propertyDescriptor, boolean notNull) { - String propertyValue = propertyInstanceValues.containsKey(propertyDescriptor) - ? propertyInstanceValues.get(propertyDescriptor) - : propertyInvocationValues.get(propertyDescriptor); - if (notNull && propertyValue == null) { - throw new IllegalArgumentException("Property '" + propertyDescriptor + "' evaluatd to null"); - } - return propertyValue; - } - - @Override - public String toString() { - return "Instance: " + propertyInstanceValues + "; Invocation: " + propertyInvocationValues; - } - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/CSVUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/CSVUtils.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/CSVUtils.java deleted file mode 100644 index 58d5d6b..0000000 --- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/CSVUtils.java +++ /dev/null @@ -1,302 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.schemaregistry.processors; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.StringWriter; -import java.math.BigDecimal; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; -import java.text.DecimalFormat; -import java.text.NumberFormat; -import java.text.ParseException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.avro.Schema; -import org.apache.avro.Schema.Field; -import org.apache.avro.Schema.Type; -import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericData.Record; -import org.apache.avro.generic.GenericRecord; -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringEscapeUtils; -import org.apache.nifi.components.ValidationContext; -import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.components.Validator; -import org.apache.nifi.flowfile.FlowFile; -import org.codehaus.jackson.JsonNode; -import org.codehaus.jackson.node.BooleanNode; -import org.codehaus.jackson.node.DoubleNode; -import org.codehaus.jackson.node.IntNode; -import org.codehaus.jackson.node.LongNode; -import org.codehaus.jackson.node.TextNode; - -/** - * Various CSV related utility operations relevant to transforming contents of - * the {@link FlowFile} between CSV and AVRO formats. - */ -class CSVUtils { - /** - * Provides a {@link Validator} to ensure that provided value is a valid - * character. - */ - public static final Validator CHAR_VALIDATOR = new Validator() { - @Override - public ValidationResult validate(String subject, String input, ValidationContext context) { - // Allows special, escaped characters as input, which is then un-escaped and converted to a single character. - // Examples for special characters: \t (or \u0009), \f. - if (input.length() > 1) { - input = StringEscapeUtils.unescapeJava(input); - } - return new ValidationResult.Builder().subject(subject).input(input) - .explanation("Only non-null single characters are supported") - .valid(input.length() == 1 && input.charAt(0) != 0).build(); - } - }; - - public static GenericRecord read(InputStream record, char delimiter, Schema schema, char quoteChar) { - Record avroRecord = new GenericData.Record(schema); - String[] parsedRecord = parseFields(convertInputStreamToString(record), delimiter, quoteChar); - List<Field> fields = schema.getFields(); - if (parsedRecord.length != fields.size()) { - throw new IllegalStateException("Incompatible schema. Parsed fields count does not match the count of fields from schema. " - + "Schema: " + schema.toString(true) + "\n Record: " + record); - } - - for (int i = 0; i < fields.size(); i++) { - Field field = fields.get(i); - Type type = field.schema().getType(); - updateRecord(field, type, parsedRecord[i], avroRecord); - } - return avroRecord; - } - - /** - * Parses provided record into fields using provided delimiter. The - * 'quoteChar' is used to ensure that if a delimiter char is in quotes it - * will not be parsed into a separate filed. - */ - public static String[] parseFields(String record, char delimiter, char quoteChar) { - List<String> result = new ArrayList<String>(); - int start = 0; - boolean inQuotes = false; - for (int i = 0; i < record.length(); i++) { - if (record.charAt(i) == quoteChar) { - inQuotes = !inQuotes; - } - boolean atLastChar = (i == record.length() - 1); - if (atLastChar) { - if (record.charAt(i) == delimiter) { - //missing last column value, add NULL - result.add(record.substring(start,i)); - result.add(null); - } else { - result.add(record.substring(start)); - } - } else if (record.charAt(i) == delimiter && !inQuotes) { - if (start == i) { - //There is no value, so add NULL to indicated the absence of a value for this field. - result.add(null); - } else { - result.add(record.substring(start, i)); - } - start = i + 1; - } - } - return result.toArray(new String[]{}); - } - - /** - * Writes {@link GenericRecord} as CSV (delimited) record to the - * {@link OutputStream} using provided delimiter. - */ - public static void write(GenericRecord record, char delimiter, OutputStream out) { - List<Field> fields = record.getSchema().getFields(); - - String delimiterToUse = ""; - try { - for (Field field : fields) { - out.write(delimiterToUse.getBytes(StandardCharsets.UTF_8)); - Object fieldValue = record.get(field.name()); - if (null == fieldValue) { - out.write(new byte[0]); - } else { - if (Type.BYTES == field.schema().getType()) { - // need to create it from the ByteBuffer it is serialized as. - // need to ensure the type is one of the logical ones we support and if so convert it. - if(!"decimal".contentEquals(field.getProp("logicalType"))){ - throw new IllegalArgumentException("The field '" + field.name() + "' has a logical type of '" + - field.getProp("logicalType") + "' that is currently not supported."); - } - - JsonNode rawPrecision = field.getJsonProp("precision"); - if(null == rawPrecision){ - throw new IllegalArgumentException("The field '" + field.name() + "' is missing the required precision property"); - } - int precision = rawPrecision.asInt(); - JsonNode rawScale = field.getJsonProp("scale"); - int scale = null == rawScale ? 0 : rawScale.asInt(); - - // write out the decimal with the precision and scale. - NumberFormat numberFormat = DecimalFormat.getInstance(); - numberFormat.setGroupingUsed(false); - normalizeNumberFormat(numberFormat, scale, precision); - String rawValue = new String(((ByteBuffer)fieldValue).array()); - // raw value needs to be parsed to ensure that BigDecimal will not throw an exception for specific locale - rawValue = numberFormat.parse(rawValue).toString(); - out.write(numberFormat.format(new BigDecimal(rawValue)).getBytes(StandardCharsets.UTF_8)); - } else { - out.write(fieldValue.toString().getBytes(StandardCharsets.UTF_8)); - } - } - if (delimiterToUse.length() == 0) { - delimiterToUse = String.valueOf(delimiter); - } - } - } catch (IOException | ParseException e) { - throw new IllegalStateException("Failed to parse AVRO Record", e); - } - } - - /** - * According to the 1.7.7 spec If a logical type is invalid, for example a - * decimal with scale greater than its precision,then implementations should - * ignore the logical type and use the underlying Avro type. - */ - private static void normalizeNumberFormat(NumberFormat numberFormat, int scale, int precision) { - if (scale < precision) { - // write out with the specified precision and scale. - numberFormat.setMaximumIntegerDigits(precision); - numberFormat.setMaximumFractionDigits(scale); - numberFormat.setMinimumFractionDigits(scale); - } - } - - /** - * - */ - private static String convertInputStreamToString(InputStream record) { - StringWriter writer = new StringWriter(); - try { - IOUtils.copy(record, writer, StandardCharsets.UTF_8); - } catch (Exception e) { - throw new IllegalStateException("Failed to read InputStream into String", e); - } - return writer.toString(); - } - - /** - * - */ - private static ByteBuffer encodeLogicalType(final Field field, final String fieldValue) { - String logicalType = field.getProp("logicalType"); - if (!"decimal".contentEquals(logicalType)) { - throw new IllegalArgumentException("The field '" + field.name() + "' has a logical type of '" + logicalType - + "' that is currently not supported."); - } - - JsonNode rawPrecision = field.getJsonProp("precision"); - if (null == rawPrecision) { - throw new IllegalArgumentException("The field '" + field.name() + "' is missing the required precision property"); - } - int precision = rawPrecision.asInt(); - JsonNode rawScale = field.getJsonProp("scale"); - int scale = null == rawScale ? 0 : rawScale.asInt(); - - NumberFormat numberFormat = DecimalFormat.getInstance(); - numberFormat.setGroupingUsed(false); - normalizeNumberFormat(numberFormat, scale, precision); - BigDecimal decimal = null == fieldValue ? new BigDecimal(retrieveDefaultFieldValue(field).asText()) : new BigDecimal(fieldValue); - return ByteBuffer.wrap(numberFormat.format(decimal).getBytes(StandardCharsets.UTF_8)); - } - - /** - * - */ - private static JsonNode retrieveDefaultFieldValue(Field field) { - JsonNode jsonNode = field.defaultValue(); - if (null == jsonNode) { - throw new IllegalArgumentException("The field '" + field.name() + "' is NULL and there is no default value supplied in the Avro Schema"); - } - return jsonNode; - } - - /** - * - */ - private static void updateRecord(Field field, Type type, String providedValue, Record avroRecord) { - if (Type.NULL != type) { - Object value; - if (Type.INT == type) { - value = null == providedValue ? possiblyGetDefaultValue(field, IntNode.class).getIntValue() - : Integer.parseInt(providedValue); - avroRecord.put(field.name(), value); - } else if (Type.BOOLEAN == type) { - value = null == providedValue - ? possiblyGetDefaultValue(field, BooleanNode.class).getBooleanValue() - : Boolean.parseBoolean(providedValue); - avroRecord.put(field.name(), value); - } else if (Type.DOUBLE == type) { - value = null == providedValue ? possiblyGetDefaultValue(field, DoubleNode.class).getDoubleValue() - : Double.parseDouble(providedValue); - avroRecord.put(field.name(), value); - } else if (Type.FLOAT == type) { - value = null == providedValue ? possiblyGetDefaultValue(field, DoubleNode.class).getDoubleValue() - : Float.parseFloat(providedValue); - avroRecord.put(field.name(), value); - } else if (Type.LONG == type) { - value = null == providedValue ? possiblyGetDefaultValue(field, LongNode.class).getLongValue() - : Long.parseLong(providedValue); - avroRecord.put(field.name(), value); - } else if (Type.STRING == type) { - value = null == providedValue ? possiblyGetDefaultValue(field, TextNode.class).getTextValue() - : providedValue; - avroRecord.put(field.name(), value); - } else if (Type.BYTES == type) { - value = encodeLogicalType(field, providedValue); - avroRecord.put(field.name(), value); - } else if (Type.UNION == type) { - field.schema().getTypes() - .forEach(schema -> updateRecord(field, schema.getType(), providedValue, avroRecord)); - } else if (Type.ARRAY == type || Type.ENUM == type || Type.FIXED == type || Type.MAP == type - || Type.NULL == type || Type.RECORD == type) { - throw new IllegalArgumentException("The field type '" + type + "' is not supported at the moment"); - } else { - avroRecord.put(field.name(), providedValue); - } - } - } - - /** - * Check to see if there is a default value to use, if not will throw - * {@link IllegalArgumentException} - */ - private static <T extends JsonNode> JsonNode possiblyGetDefaultValue(Field field, Class<T> expectedDefaultType) { - JsonNode jsonNode = retrieveDefaultFieldValue(field); - if (field.schema().getType() != Type.UNION && !expectedDefaultType.isAssignableFrom(jsonNode.getClass())) { - // since we do not support schema evolution here we need to throw an - // exception here as the data is in error. - throw new IllegalArgumentException("The field '" + field.name() + "' has a default value that " - + "does not match the field type. Field Type is: '" + expectedDefaultType.getName() + "' and the " - + "default value type is: '" + field.defaultValue().toString()); - } - return jsonNode; - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/ExtractAvroFields.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/ExtractAvroFields.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/ExtractAvroFields.java deleted file mode 100644 index 2ab83c5..0000000 --- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/ExtractAvroFields.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.schemaregistry.processors; - -import java.io.InputStream; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.nifi.annotation.behavior.DynamicProperty; -import org.apache.nifi.annotation.behavior.InputRequirement; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.annotation.lifecycle.OnScheduled; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.util.StandardValidators; - -@Tags({ "registry", "schema", "avro", "extract", "evaluate" }) -@CapabilityDescription("Extracts Avro field and assigns it to the FlowFile attribute") -@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) -@DynamicProperty(name = "Avro field name", value = "FlowFile attribute name to set the extracted field", - description = "The value of the Avro field specified by 'Avro field name' will be extracted and set as " - + "FlowFile attribute under name specified by the value of this property.") -public final class ExtractAvroFields extends AbstractTransformer { - - private static final List<PropertyDescriptor> DESCRIPTORS; - - static { - List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>(); - descriptors.addAll(BASE_DESCRIPTORS); - descriptors.add(SCHEMA_TYPE); - DESCRIPTORS = Collections.unmodifiableList(descriptors); - } - - private volatile Map<String, String> dynamicProperties; - - /** - * - */ - @Override - protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { - return DESCRIPTORS; - } - - /** - * - */ - @Override - @OnScheduled - public void onScheduled(ProcessContext context) { - super.onScheduled(context); - this.dynamicProperties = context.getProperties().entrySet().stream() - .filter(p -> p.getKey().isDynamic()) - .collect(Collectors.toMap(p -> p.getKey().getName(), p -> p.getValue())); - } - - /** - * - */ - @Override - protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) { - return new PropertyDescriptor.Builder() - .name(propertyDescriptorName) - .expressionLanguageSupported(false) - .addValidator(StandardValidators.NON_BLANK_VALIDATOR) - .required(false) - .dynamic(true) - .build(); - } - - /** - * - */ - @Override - protected Map<String, String> transform(InputStream in, InvocationContextProperties contextProperties, Schema schema) { - GenericRecord avroRecord = AvroUtils.read(in, schema); - Map<String, String> attributes = this.dynamicProperties.entrySet().stream().collect( - Collectors.toMap(dProp -> dProp.getValue(), dProp -> String.valueOf(avroRecord.get(dProp.getKey())))); - return Collections.unmodifiableMap(attributes); - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/JsonUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/JsonUtils.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/JsonUtils.java deleted file mode 100644 index 81c98b3..0000000 --- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/JsonUtils.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.schemaregistry.processors; - -import java.io.DataInputStream; -import java.io.InputStream; -import java.io.OutputStream; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericDatumReader; -import org.apache.avro.generic.GenericDatumWriter; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.io.DatumReader; -import org.apache.avro.io.DatumWriter; -import org.apache.avro.io.Decoder; -import org.apache.avro.io.DecoderFactory; -import org.apache.avro.io.EncoderFactory; -import org.apache.avro.io.JsonEncoder; -import org.apache.nifi.flowfile.FlowFile; - -/** - * Various Json related utility operations relevant to transforming contents of - * the {@link FlowFile} between JSON and AVRO formats. - */ -class JsonUtils { - - /** - * Writes provided {@link GenericRecord} into the provided - * {@link OutputStream} as JSON. - */ - public static void write(GenericRecord record, OutputStream out) { - try { - DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(record.getSchema()); - JsonEncoder encoder = EncoderFactory.get().jsonEncoder(record.getSchema(), out); - writer.write(record, encoder); - encoder.flush(); - } catch (Exception e) { - throw new IllegalStateException("Failed to read GenericRecord", e); - } - } - - /** - * Reads provided {@link InputStream} as ISON into Avro - * {@link GenericRecord} applying provided {@link Schema} returning the - * resulting GenericRecord. - */ - public static GenericRecord read(InputStream jsonIs, Schema schema) { - DataInputStream din = new DataInputStream(jsonIs); - try { - Decoder decoder = DecoderFactory.get().jsonDecoder(schema, din); - DatumReader<GenericData.Record> reader = new GenericDatumReader<>(schema); - return reader.read(null, decoder); - } catch (Exception e) { - throw new IllegalStateException("Failed to parse incoming Json input stream into Avro GenericRecord. " - + "Possible reason: the value may not be a valid JSON or incompatible schema is provided. Schema was '" - + schema.toString(true) + "'.", e); - } - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/RegistryCommon.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/RegistryCommon.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/RegistryCommon.java deleted file mode 100644 index 3fc1530..0000000 --- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/RegistryCommon.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.schemaregistry.processors; - -import org.apache.avro.Schema; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.schemaregistry.processors.BaseTransformer.InvocationContextProperties; -import org.apache.nifi.schemaregistry.services.SchemaRegistry; - -/** - * Strategy that encapsulates common properties and functionalities used by all - * processors that integrate with Schema Registry. - */ -interface RegistryCommon { - - static final String SCHEMA_ATTRIBUTE_NAME = "schema.text"; - - static final PropertyDescriptor REGISTRY_SERVICE = new PropertyDescriptor.Builder() - .name("schema-registry-service") - .displayName("Schema Registry Service") - .description("The Schema Registry Service for serializing/deserializing messages as well as schema retrieval.") - .required(true) - .identifiesControllerService(SchemaRegistry.class) - .build(); - - static final PropertyDescriptor SCHEMA_NAME = new PropertyDescriptor.Builder() - .name("schema-name") - .displayName("Schema Name") - .description("The name of schema.") - .required(true) - .addValidator(StandardValidators.NON_BLANK_VALIDATOR) - .expressionLanguageSupported(true) - .build(); - - static final PropertyDescriptor SCHEMA_TYPE = new PropertyDescriptor.Builder() - .name("schema-type") - .displayName("Schema Type") - .description("The type of schema (avro is the the only current supported schema).") - .required(true) - .allowableValues("avro") - .defaultValue("avro") - .expressionLanguageSupported(true) - .build(); - - static final PropertyDescriptor DELIMITER = new PropertyDescriptor.Builder() - .name("csv-delimiter") - .displayName("CSV delimiter") - .description("Delimiter character for CSV records") - .addValidator(CSVUtils.CHAR_VALIDATOR) - .defaultValue(",") - .build(); - - static final PropertyDescriptor QUOTE = new PropertyDescriptor.Builder() - .name("csv-quote-character") - .displayName("CSV quote character") - .description("Quote character for CSV values") - .addValidator(CSVUtils.CHAR_VALIDATOR) - .defaultValue("\"") - .build(); - /** - * Utility operation to retrieve and parse {@link Schema} from Schema - * Registry using provided {@link SchemaRegistry}; - */ - static Schema retrieveSchema(SchemaRegistry schemaRegistry, InvocationContextProperties contextProperties) { - String schemaName = contextProperties.getPropertyValue(SCHEMA_NAME, true); - String schemaText = schemaRegistry.retrieveSchemaText(schemaName); - return new Schema.Parser().parse(schemaText); - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformAvroToCSV.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformAvroToCSV.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformAvroToCSV.java deleted file mode 100644 index aa0d418..0000000 --- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformAvroToCSV.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.schemaregistry.processors; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Collections; -import java.util.Map; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.commons.io.IOUtils; -import org.apache.nifi.annotation.behavior.InputRequirement; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.flowfile.attributes.CoreAttributes; - -@Tags({ "registry", "schema", "avro", "csv", "transform" }) -@CapabilityDescription("Transforms AVRO content of the Flow File to CSV using the schema provided by the Schema Registry Service.") -@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) -public final class TransformAvroToCSV extends AbstractCSVTransformer { - - /** - * - */ - @Override - protected Map<String, String> transform(InputStream in, OutputStream out, InvocationContextProperties contextProperties, Schema schema) { - byte[] buff = null; - try { - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - IOUtils.copy(in, bos); - buff = bos.toByteArray(); - } catch (Exception e) { - e.printStackTrace(); - } - ByteArrayInputStream is = new ByteArrayInputStream(buff); - GenericRecord avroRecord = AvroUtils.read(is, schema); - CSVUtils.write(avroRecord, this.delimiter, out); - return Collections.singletonMap(CoreAttributes.MIME_TYPE.key(), "text/csv"); - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformAvroToJson.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformAvroToJson.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformAvroToJson.java deleted file mode 100644 index ba45563..0000000 --- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformAvroToJson.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.schemaregistry.processors; - -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Collections; -import java.util.Map; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.nifi.annotation.behavior.InputRequirement; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.flowfile.attributes.CoreAttributes; - - -@Tags({ "registry", "schema", "avro", "json", "transform" }) -@CapabilityDescription("Transforms AVRO content of the Flow File to JSON using the schema provided by the Schema Registry Service.") -@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) -public final class TransformAvroToJson extends AbstractContentTransformer { - - /** - * - */ - @Override - protected Map<String, String> transform(InputStream in, OutputStream out, InvocationContextProperties contextProperties, Schema schema) { - GenericRecord avroRecord = AvroUtils.read(in, schema); - JsonUtils.write(avroRecord, out); - return Collections.singletonMap(CoreAttributes.MIME_TYPE.key(), "application/json"); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformCSVToAvro.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformCSVToAvro.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformCSVToAvro.java deleted file mode 100644 index f44e440..0000000 --- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformCSVToAvro.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.schemaregistry.processors; - -import java.io.InputStream; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.nifi.annotation.behavior.InputRequirement; -import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.annotation.lifecycle.OnScheduled; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.flowfile.attributes.CoreAttributes; -import org.apache.nifi.processor.ProcessContext; - -@Tags({ "csv", "avro", "transform", "registry", "schema" }) -@InputRequirement(Requirement.INPUT_REQUIRED) -@CapabilityDescription("Transforms CSV content of the Flow File to Avro using the schema provided by the Schema Registry Service.") -public final class TransformCSVToAvro extends AbstractCSVTransformer { - - private static final List<PropertyDescriptor> DESCRIPTORS; - - static { - List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>(); - descriptors.addAll(BASE_CSV_DESCRIPTORS); - descriptors.add(QUOTE); - DESCRIPTORS = Collections.unmodifiableList(descriptors); - } - - private volatile char quoteChar; - - /** - * - */ - @Override - @OnScheduled - public void onScheduled(ProcessContext context) { - super.onScheduled(context); - this.quoteChar = context.getProperty(QUOTE).getValue().charAt(0); - } - - /** - * - */ - @Override - public List<PropertyDescriptor> getSupportedPropertyDescriptors() { - return DESCRIPTORS; - } - - /** - * - */ - @Override - protected Map<String, String> transform(InputStream in, OutputStream out, InvocationContextProperties contextProperties, Schema schema) { - GenericRecord avroRecord = CSVUtils.read(in, this.delimiter, schema, this.quoteChar); - AvroUtils.write(avroRecord, out); - return Collections.singletonMap(CoreAttributes.MIME_TYPE.key(), "binary/avro"); - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformCSVToJson.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformCSVToJson.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformCSVToJson.java deleted file mode 100644 index 2ce9fbe..0000000 --- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformCSVToJson.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.schemaregistry.processors; - -import java.io.InputStream; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.nifi.annotation.behavior.InputRequirement; -import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.annotation.lifecycle.OnScheduled; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.flowfile.attributes.CoreAttributes; -import org.apache.nifi.processor.ProcessContext; - -@Tags({ "csv", "json", "transform", "registry", "schema" }) -@InputRequirement(Requirement.INPUT_REQUIRED) -@CapabilityDescription("Transforms CSV content of the Flow File to JSON using the schema provided by the Schema Registry Service.") -public final class TransformCSVToJson extends AbstractCSVTransformer { - - private static final List<PropertyDescriptor> DESCRIPTORS; - - static { - List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>(); - descriptors.addAll(BASE_CSV_DESCRIPTORS); - descriptors.add(QUOTE); - DESCRIPTORS = Collections.unmodifiableList(descriptors); - } - - private volatile char quoteChar; - - /** - * - */ - @Override - @OnScheduled - public void onScheduled(ProcessContext context) { - super.onScheduled(context); - this.quoteChar = context.getProperty(QUOTE).getValue().charAt(0); - } - - /** - * - */ - @Override - public List<PropertyDescriptor> getSupportedPropertyDescriptors() { - return DESCRIPTORS; - } - - /** - * - */ - @Override - protected Map<String, String> transform(InputStream in, OutputStream out, InvocationContextProperties contextProperties, Schema schema) { - GenericRecord avroRecord = CSVUtils.read(in, this.delimiter, schema, this.quoteChar); - JsonUtils.write(avroRecord, out); - return Collections.singletonMap(CoreAttributes.MIME_TYPE.key(), "application/json"); - } -}