This is an automated email from the ASF dual-hosted git repository. tpalfy pushed a commit to branch support/nifi-1.x in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push: new ddcdac674b NIFI-12614: Create record reader service for Protobuf messages (1.x version) ddcdac674b is described below commit ddcdac674bd70e2f9fc9d604de3aefe0465a4219 Author: Mark Bathori <bathori.m...@gmail.com> AuthorDate: Tue Jan 16 11:44:02 2024 +0100 NIFI-12614: Create record reader service for Protobuf messages (1.x version) This closes #8626. Signed-off-by: Tamas Palfy <tpa...@apache.org> --- nifi-assembly/pom.xml | 6 + .../nifi-protobuf-services-nar/pom.xml | 42 +++ .../src/main/resources/META-INF/LICENSE | 210 +++++++++++ .../src/main/resources/META-INF/NOTICE | 92 +++++ .../nifi-protobuf-services/pom.xml | 88 +++++ .../apache/nifi/services/protobuf/FieldType.java | 57 +++ .../nifi/services/protobuf/ProtobufReader.java | 177 +++++++++ .../services/protobuf/ProtobufRecordReader.java | 65 ++++ .../services/protobuf/converter/ProtoField.java | 53 +++ .../protobuf/converter/ProtobufDataConverter.java | 403 +++++++++++++++++++++ .../protobuf/schema/ProtoSchemaParser.java | 177 +++++++++ .../protobuf/schema/ProtoSchemaStrategy.java | 49 +++ .../validation/ProtoValidationResource.java | 38 ++ .../org.apache.nifi.controller.ControllerService | 16 + .../additionalDetails.html | 189 ++++++++++ .../nifi/services/protobuf/ProtoTestUtil.java | 144 ++++++++ .../protobuf/TestProtobufRecordReader.java | 145 ++++++++ .../converter/TestProtobufDataConverter.java | 111 ++++++ .../protobuf/schema/TestProtoSchemaParser.java | 84 +++++ .../src/test/resources/google/protobuf/any.desc | 7 + .../src/test/resources/test_proto2.desc | 11 + .../src/test/resources/test_proto2.proto | 35 ++ .../src/test/resources/test_proto3.desc | Bin 0 -> 1022 bytes .../src/test/resources/test_proto3.proto | 53 +++ nifi-nar-bundles/nifi-protobuf-bundle/pom.xml | 110 ++++++ nifi-nar-bundles/pom.xml | 1 + 26 files changed, 2363 insertions(+) diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index d07770d8ba..8c996b5921 100644 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -1019,6 +1019,12 @@ language governing permissions and limitations under the License. --> <version>1.26.0-SNAPSHOT</version> <type>nar</type> </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-protobuf-services-nar</artifactId> + <version>1.26.0-SNAPSHOT</version> + <type>nar</type> + </dependency> <!-- AspectJ library needed by the Java Agent used for native library loading (see bootstrap.conf) --> <dependency> <groupId>org.aspectj</groupId> diff --git a/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services-nar/pom.xml b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services-nar/pom.xml new file mode 100644 index 0000000000..9d0f55fc78 --- /dev/null +++ b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services-nar/pom.xml @@ -0,0 +1,42 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-protobuf-bundle</artifactId> + <version>1.26.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-protobuf-services-nar</artifactId> + <packaging>nar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-protobuf-services</artifactId> + <version>1.26.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-standard-shared-nar</artifactId> + <type>nar</type> + </dependency> + </dependencies> + +</project> \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services-nar/src/main/resources/META-INF/LICENSE b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services-nar/src/main/resources/META-INF/LICENSE new file mode 100644 index 0000000000..84fb21549a --- /dev/null +++ b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services-nar/src/main/resources/META-INF/LICENSE @@ -0,0 +1,210 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +APACHE NIFI SUBCOMPONENTS: + +The Apache NiFi project contains subcomponents with separate copyright +notices and license terms. Your use of the source code for the these +subcomponents is subject to the terms and conditions of the following +licenses. + diff --git a/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services-nar/src/main/resources/META-INF/NOTICE new file mode 100644 index 0000000000..b57ec55464 --- /dev/null +++ b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services-nar/src/main/resources/META-INF/NOTICE @@ -0,0 +1,92 @@ +nifi-iceberg-processors-nar +Copyright 2014-2023 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +=========================================== +Apache Software License v2 +=========================================== + + (ASLv2) Wire + The following NOTICE information applies: + Wire + Copyright 2013 Square, Inc. + + (ASLv2) KotlinPoet + The following NOTICE information applies: + KotlinPoet + Copyright 2017 Square, Inc. + + (ASLv2) Guava: Google Core Libraries For Java + The following NOTICE information applies: + Guava: Google Core Libraries For Java + Copyright (C) 2017 The Guava Authors + + (ASLv2) J2ObjC Annotations + The following NOTICE information applies: + J2ObjC Annotations + Copyright 2022 The J2ObjC Annotations Authors + + (ASLv2) FindBugs JSR305 + The following NOTICE information applies: + FindBugs JSR305 + Copyright 2017 The FindBugs JSR305 Authors + + (ASLv2) Guava ListenableFuture Only + The following NOTICE information applies: + Guava ListenableFuture Only + Copyright (C) 2018 The Guava Authors + + (ASLv2) Error Prone Annotations + The following NOTICE information applies: + Error Prone Annotations + Copyright 2015 The Error Prone Authors + + (ASLv2) Guava InternalFutureFailureAccess and InternalFutures + The following NOTICE information applies: + Guava InternalFutureFailureAccess and InternalFutures + Copyright (C) 2018 The Guava Authors + + (ASLv2) Okio + The following NOTICE information applies: + Okio + Copyright 2013 Square, Inc. + + (ASLv2) JavaPoet + Copyright 2015 Square, Inc. + + (ASLv2) Apache Commons CSV + The following NOTICE information applies: + Apache Commons CSV + Copyright 2005-2016 The Apache Software Foundation + + This product includes software developed at + The Apache Software Foundation (http://www.apache.org/). + + (ASLv2) Apache Avro + The following NOTICE information applies: + Apache Avro + Copyright 2009-2017 The Apache Software Foundation + +=========================================== +MIT License +=========================================== + + (MIT) Checker Qual + The following NOTICE information applies: + + Copyright (c) Copyright 2004-present by the Checker Framework developers + All rights reserved. + https://www.checkerframework.org/ + +************************ +BSD License +************************ + +The following binary components are provided under the BSD License. See project link for details. + + (BSD 3-Clause) Protocol Buffers + The following NOTICE information applies: + Copyright 2008 Google Inc. All rights reserved. + https://github.com/google/protobuf/tree/master/java \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/pom.xml b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/pom.xml new file mode 100644 index 0000000000..ace5ea01c8 --- /dev/null +++ b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/pom.xml @@ -0,0 +1,88 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-protobuf-bundle</artifactId> + <version>1.26.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-protobuf-services</artifactId> + + <properties> + <protobuf.version>3.25.1</protobuf.version> + <wire.version>4.9.3</wire.version> + </properties> + + <dependencies> + <!-- Internal dependencies --> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-record</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-record-serialization-service-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-schema-registry-service-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-avro-record-utils</artifactId> + <version>1.26.0-SNAPSHOT</version> + </dependency> + + <!-- External dependencies --> + <dependency> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + <version>${protobuf.version}</version> + </dependency> + <dependency> + <groupId>com.squareup.wire</groupId> + <artifactId>wire-schema-jvm</artifactId> + <version>${wire.version}</version> + </dependency> + + <!-- Test dependencies --> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mock</artifactId> + <version>1.26.0-SNAPSHOT</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mock-record-utils</artifactId> + <version>1.26.0-SNAPSHOT</version> + <scope>test</scope> + </dependency> + </dependencies> + +</project> \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/FieldType.java b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/FieldType.java new file mode 100644 index 0000000000..71865cad09 --- /dev/null +++ b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/FieldType.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.protobuf; + +import java.util.Arrays; + +/** + * Type for proto scalar fields. + */ +public enum FieldType { + DOUBLE("double"), + FLOAT("float"), + INT32("int32"), + INT64("int64"), + UINT32("uint32"), + UINT64("uint64"), + SINT32("sint32"), + SINT64("sint64"), + FIXED32("fixed32"), + FIXED64("fixed64"), + SFIXED32("sfixed32"), + SFIXED64("sfixed64"), + BOOL("bool"), + STRING("string"), + BYTES("bytes"); + + private final String type; + + FieldType(String type) { + this.type = type; + } + + public String getType() { + return type; + } + + public static FieldType findValue(final String value) { + return Arrays.stream(FieldType.values()) + .filter((type -> type.getType().equalsIgnoreCase(value))) + .findFirst() + .orElseThrow(() -> new IllegalArgumentException(String.format("ProtoType [%s] not found", value))); + } +} diff --git a/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufReader.java b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufReader.java new file mode 100644 index 0000000000..85904aca54 --- /dev/null +++ b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufReader.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.protobuf; + +import com.squareup.wire.schema.CoreLoaderKt; +import com.squareup.wire.schema.Location; +import com.squareup.wire.schema.Schema; +import com.squareup.wire.schema.SchemaLoader; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaAccessStrategy; +import org.apache.nifi.schema.access.SchemaAccessUtils; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.schemaregistry.services.SchemaRegistry; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.SchemaRegistryService; +import org.apache.nifi.services.protobuf.schema.ProtoSchemaStrategy; +import org.apache.nifi.services.protobuf.validation.ProtoValidationResource; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.FileSystems; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +@Tags({"protobuf", "record", "reader", "parser"}) +@CapabilityDescription("Parses a Protocol Buffers message from binary format.") +public class ProtobufReader extends SchemaRegistryService implements RecordReaderFactory { + + private static final String ANY_PROTO = "google/protobuf/any.proto"; + private static final String DURATION_PROTO = "google/protobuf/duration.proto"; + private static final String EMPTY_PROTO = "google/protobuf/empty.proto"; + private static final String STRUCT_PROTO = "google/protobuf/struct.proto"; + private static final String TIMESTAMP_PROTO = "google/protobuf/timestamp.proto"; + private static final String WRAPPERS_PROTO = "google/protobuf/wrappers.proto"; + + private static final AllowableValue GENERATE_FROM_PROTO_FILE = new AllowableValue("generate-from-proto-file", + "Generate from Proto file", "The record schema is generated from the provided proto file"); + + private volatile String messageType; + private volatile Schema protoSchema; + + // Holder of cached proto information so validation does not reload the same proto file over and over + private final AtomicReference<ProtoValidationResource> validationResourceHolder = new AtomicReference<>(); + + public static final PropertyDescriptor PROTOBUF_DIRECTORY = new PropertyDescriptor.Builder() + .name("Proto Directory") + .displayName("Proto Directory") + .description("Directory containing Protocol Buffers message definition (.proto) file(s).") + .required(true) + .addValidator(StandardValidators.createDirectoryExistsValidator(true, false)) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + public static final PropertyDescriptor MESSAGE_TYPE = new PropertyDescriptor.Builder() + .name("Message Type") + .displayName("Message Type") + .description("Fully qualified name of the Protocol Buffers message type including its package (eg. mypackage.MyMessage). " + + "The .proto files configured in '" + PROTOBUF_DIRECTORY.getDisplayName() + "' must contain the definition of this message type.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); + properties.add(PROTOBUF_DIRECTORY); + properties.add(MESSAGE_TYPE); + return properties; + } + + @Override + protected Collection<ValidationResult> customValidate(ValidationContext validationContext) { + final List<ValidationResult> problems = new ArrayList<>(); + final String protoDirectory = validationContext.getProperty(PROTOBUF_DIRECTORY).evaluateAttributeExpressions().getValue(); + final String messageType = validationContext.getProperty(MESSAGE_TYPE).evaluateAttributeExpressions().getValue(); + + if (protoDirectory != null && messageType != null) { + final Schema protoSchema = getSchemaForValidation(protoDirectory); + if (protoSchema.getType(messageType) == null) { + problems.add(new ValidationResult.Builder() + .subject(MESSAGE_TYPE.getDisplayName()) + .valid(false) + .explanation(String.format("'%s' message type cannot be found in the provided proto files.", messageType)) + .build()); + } + } + + return problems; + } + + @OnEnabled + public void onEnabled(final ConfigurationContext context) { + final String protoDirectory = context.getProperty(PROTOBUF_DIRECTORY).evaluateAttributeExpressions().getValue(); + messageType = context.getProperty(MESSAGE_TYPE).evaluateAttributeExpressions().getValue(); + protoSchema = loadProtoSchema(protoDirectory); + } + + @Override + protected SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final PropertyContext context) { + if (allowableValue.equalsIgnoreCase(GENERATE_FROM_PROTO_FILE.getValue())) { + return new ProtoSchemaStrategy(messageType, protoSchema); + } + + return SchemaAccessUtils.getSchemaAccessStrategy(allowableValue, schemaRegistry, context); + } + + @Override + protected List<AllowableValue> getSchemaAccessStrategyValues() { + final List<AllowableValue> allowableValues = new ArrayList<>(super.getSchemaAccessStrategyValues()); + allowableValues.add(GENERATE_FROM_PROTO_FILE); + return allowableValues; + } + + @Override + protected AllowableValue getDefaultSchemaAccessStrategy() { + return GENERATE_FROM_PROTO_FILE; + } + + @Override + public RecordReader createRecordReader(Map<String, String> variables, InputStream in, long inputLength, ComponentLog logger) throws IOException, SchemaNotFoundException { + return new ProtobufRecordReader(protoSchema, messageType, in, getSchema(variables, in, null)); + } + + private Schema loadProtoSchema(final String protoDirectory) { + final SchemaLoader schemaLoader = new SchemaLoader(FileSystems.getDefault()); + schemaLoader.initRoots(Arrays.asList(Location.get(protoDirectory), + Location.get(CoreLoaderKt.WIRE_RUNTIME_JAR, ANY_PROTO), + Location.get(CoreLoaderKt.WIRE_RUNTIME_JAR, DURATION_PROTO), + Location.get(CoreLoaderKt.WIRE_RUNTIME_JAR, EMPTY_PROTO), + Location.get(CoreLoaderKt.WIRE_RUNTIME_JAR, STRUCT_PROTO), + Location.get(CoreLoaderKt.WIRE_RUNTIME_JAR, TIMESTAMP_PROTO), + Location.get(CoreLoaderKt.WIRE_RUNTIME_JAR, WRAPPERS_PROTO)), Collections.emptyList()); + return schemaLoader.loadSchema(); + } + + private Schema getSchemaForValidation(final String protoDirectory) { + ProtoValidationResource validationResource = validationResourceHolder.get(); + if (validationResource == null || !protoDirectory.equals(validationResource.getProtoDirectory())) { + validationResource = new ProtoValidationResource(protoDirectory, loadProtoSchema(protoDirectory)); + validationResourceHolder.set(validationResource); + } + + return validationResource.getProtoSchema(); + } +} diff --git a/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufRecordReader.java b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufRecordReader.java new file mode 100644 index 0000000000..ab2d6fdbd7 --- /dev/null +++ b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufRecordReader.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.protobuf; + +import com.squareup.wire.schema.Schema; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.services.protobuf.converter.ProtobufDataConverter; + +import java.io.IOException; +import java.io.InputStream; + +public class ProtobufRecordReader implements RecordReader { + + private final Schema protoSchema; + private final String messageType; + private final InputStream inputStream; + private RecordSchema recordSchema; + private boolean inputProcessed; + + public ProtobufRecordReader(Schema protoSchema, String messageType, InputStream inputStream, RecordSchema recordSchema) { + this.protoSchema = protoSchema; + this.messageType = messageType; + this.inputStream = inputStream; + this.recordSchema = recordSchema; + } + + @Override + public Record nextRecord(boolean coerceTypes, boolean dropUnknownFields) throws IOException { + if (!inputProcessed) { + final ProtobufDataConverter dataConverter = new ProtobufDataConverter(protoSchema, messageType, recordSchema, coerceTypes, dropUnknownFields); + final Record record = dataConverter.createRecord(inputStream); + inputProcessed = true; + recordSchema = record.getSchema(); + return record; + } + + return null; + } + + @Override + public RecordSchema getSchema() { + return recordSchema; + } + + @Override + public void close() throws IOException { + inputStream.close(); + } +} diff --git a/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/converter/ProtoField.java b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/converter/ProtoField.java new file mode 100644 index 0000000000..1481849280 --- /dev/null +++ b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/converter/ProtoField.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.protobuf.converter; + +import com.squareup.wire.schema.Field; +import com.squareup.wire.schema.ProtoType; + +public class ProtoField { + + private final String fieldName; + private final ProtoType protoType; + private final boolean repeatable; + + public ProtoField(Field field) { + this(field.getName(), field.getType(), field.isRepeated()); + } + + public ProtoField(String fieldName, ProtoType protoType) { + this(fieldName, protoType, false); + } + + private ProtoField(String fieldName, ProtoType protoType, boolean repeatable) { + this.fieldName = fieldName; + this.protoType = protoType; + this.repeatable = repeatable; + } + + public String getFieldName() { + return fieldName; + } + + public ProtoType getProtoType() { + return protoType; + } + + public boolean isRepeatable() { + return repeatable; + } +} diff --git a/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/converter/ProtobufDataConverter.java b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/converter/ProtobufDataConverter.java new file mode 100644 index 0000000000..8541f5723a --- /dev/null +++ b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/converter/ProtobufDataConverter.java @@ -0,0 +1,403 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.protobuf.converter; + +import com.google.protobuf.ByteString; +import com.google.protobuf.CodedInputStream; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.UnknownFieldSet; +import com.squareup.wire.schema.EnumType; +import com.squareup.wire.schema.Field; +import com.squareup.wire.schema.MessageType; +import com.squareup.wire.schema.OneOf; +import com.squareup.wire.schema.ProtoType; +import com.squareup.wire.schema.Schema; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.type.RecordDataType; +import org.apache.nifi.serialization.record.util.DataTypeUtils; +import org.apache.nifi.services.protobuf.FieldType; +import org.apache.nifi.services.protobuf.schema.ProtoSchemaParser; + +import java.io.IOException; +import java.io.InputStream; +import java.math.BigInteger; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static com.google.protobuf.CodedInputStream.decodeZigZag32; +import static com.google.protobuf.TextFormat.unsignedToString; + +/** + * The class is responsible for creating Record by mapping the provided proto schema fields with the list of Unknown fields parsed from encoded proto data. + */ +public class ProtobufDataConverter { + + public static final String MAP_KEY_FIELD_NAME = "key"; + public static final String MAP_VALUE_FIELD_NAME = "value"; + public static final String ANY_TYPE_URL_FIELD_NAME = "type_url"; + public static final String ANY_VALUE_FIELD_NAME = "value"; + public static final String ANY_MESSAGE_TYPE = "google.protobuf.Any"; + + private final Schema schema; + private final String rootMessageType; + private final RecordSchema rootRecordSchema; + private final boolean coerceTypes; + private final boolean dropUnknownFields; + + private boolean containsAnyField = false; + + public ProtobufDataConverter(Schema schema, String messageType, RecordSchema recordSchema, boolean coerceTypes, boolean dropUnknownFields) { + this.schema = schema; + this.rootMessageType = messageType; + this.rootRecordSchema = recordSchema; + this.coerceTypes = coerceTypes; + this.dropUnknownFields = dropUnknownFields; + } + + /** + * Creates a record from the root message. + * + * @return created record + * @throws IOException failed to read input stream + */ + public MapRecord createRecord(InputStream data) throws IOException { + final MessageType rootMessageType = (MessageType) schema.getType(this.rootMessageType); + Objects.requireNonNull(rootMessageType, String.format("Message with name [%s] not found in the provided proto files", this.rootMessageType)); + + MapRecord record = createRecord(rootMessageType, ByteString.readFrom(data), rootRecordSchema); + if (containsAnyField) { + record.regenerateSchema(); + } + + return record; + } + + /** + * Creates a record for the provided message. + * + * @param messageType message to create a record from + * @param data proto message data + * @param recordSchema record schema for the created record + * @return created record + * @throws InvalidProtocolBufferException failed to parse input data + */ + private MapRecord createRecord(MessageType messageType, ByteString data, RecordSchema recordSchema) throws InvalidProtocolBufferException { + final UnknownFieldSet unknownFieldSet = UnknownFieldSet.parseFrom(data); + + if ((ANY_MESSAGE_TYPE).equals(messageType.getType().toString())) { + containsAnyField = true; + return handleAnyField(unknownFieldSet); + } + + final Map<String, Object> fieldValues = processMessageFields(messageType, unknownFieldSet); + return new MapRecord(recordSchema, fieldValues, false, dropUnknownFields); + } + + /** + * Process declared, extension and oneOf fields in the provided message. + * + * @param messageType message with fields to be processed + * @param unknownFieldSet received proto data fields + * @return Map of processed fields + */ + private Map<String, Object> processMessageFields(MessageType messageType, UnknownFieldSet unknownFieldSet) throws InvalidProtocolBufferException { + Map<String, Object> recordValues = new HashMap<>(); + + for (final Field field : messageType.getDeclaredFields()) { + collectFieldValue(recordValues, new ProtoField(field), unknownFieldSet.getField(field.getTag())); + } + + for (final Field field : messageType.getExtensionFields()) { + collectFieldValue(recordValues, new ProtoField(field), unknownFieldSet.getField(field.getTag())); + } + + for (final OneOf oneOf : messageType.getOneOfs()) { + for (Field field : oneOf.getFields()) { + collectFieldValue(recordValues, new ProtoField(field), unknownFieldSet.getField(field.getTag())); + } + } + return recordValues; + } + + /** + * Checks the field value's presence and sets it into the result Map. + * + * @param fieldNameToConvertedValue Map of converter values + * @param protoField proto field's properties + * @param unknownField field's value + */ + private void collectFieldValue(Map<String, Object> fieldNameToConvertedValue, ProtoField protoField, UnknownFieldSet.Field unknownField) throws InvalidProtocolBufferException { + final Optional<Object> fieldValue = convertFieldValues(protoField, unknownField); + fieldValue.ifPresent(value -> fieldNameToConvertedValue.put(protoField.getFieldName(), value)); + } + + private Optional<Object> convertFieldValues(ProtoField protoField, UnknownFieldSet.Field unknownField) throws InvalidProtocolBufferException { + if (!unknownField.getLengthDelimitedList().isEmpty()) { + return Optional.of(convertLengthDelimitedFields(protoField, unknownField.getLengthDelimitedList())); + } + if (!unknownField.getFixed32List().isEmpty()) { + return Optional.of(convertFixed32Fields(protoField, unknownField.getFixed32List())); + } + if (!unknownField.getFixed64List().isEmpty()) { + return Optional.of(convertFixed64Fields(protoField, unknownField.getFixed64List())); + } + if (!unknownField.getVarintList().isEmpty()) { + return Optional.of(convertVarintFields(protoField, unknownField.getVarintList())); + } + + return Optional.empty(); + } + + /** + * Converts a Length-Delimited field value into it's suitable data type. + * + * @param protoField proto field's properties + * @param values field's unprocessed values + * @return converted field values + * @throws InvalidProtocolBufferException failed to parse input data + */ + private Object convertLengthDelimitedFields(ProtoField protoField, List<ByteString> values) throws InvalidProtocolBufferException { + final ProtoType protoType = protoField.getProtoType(); + final Function<ByteString, Object> valueConverter; + if (protoType.isScalar()) { + switch (FieldType.findValue(protoType.getSimpleName())) { + case STRING: + valueConverter = ByteString::toStringUtf8; + break; + case BYTES: + valueConverter = ByteString::toByteArray; + break; + default: + throw new IllegalStateException(String.format("Incompatible value was received for field [%s]," + + " [%s] is not LengthDelimited field type", protoField.getFieldName(), protoType.getSimpleName())); + } + } else if (protoType.isMap()) { + return createMap(protoType, values); + } else { + final MessageType messageType = (MessageType) schema.getType(protoType); + Objects.requireNonNull(messageType, String.format("Message type with name [%s] not found in the provided proto files", protoType)); + + valueConverter = value -> { + try { + Optional<DataType> recordDataType = rootRecordSchema.getDataType(protoField.getFieldName()); + RecordSchema recordSchema = recordDataType.map(dataType -> + ((RecordDataType) dataType).getChildSchema()).orElse(generateRecordSchema(messageType.getType().toString())); + return createRecord(messageType, value, recordSchema); + } catch (InvalidProtocolBufferException e) { + throw new IllegalStateException("Failed to create record from the provided input data for field " + protoField.getFieldName(), e); + } + }; + } + + return resolveFieldValue(protoField, values, valueConverter); + } + + /** + * Converts a Fixed32 field value into it's suitable data type. + * + * @param protoField proto field's properties + * @param values field's unprocessed values + * @return converted field values + */ + private Object convertFixed32Fields(ProtoField protoField, List<Integer> values) { + final String typeName = protoField.getProtoType().getSimpleName(); + Function<Integer, Object> valueConverter; + switch (FieldType.findValue(typeName)) { + case FIXED32: + valueConverter = value -> Long.parseLong(unsignedToString(value)); + break; + case SFIXED32: + valueConverter = value -> value; + break; + case FLOAT: + valueConverter = Float::intBitsToFloat; + break; + default: + throw new IllegalStateException(String.format("Incompatible value was received for field [%s]," + + " [%s] is not Fixed32 field type", protoField.getFieldName(), typeName)); + } + + return resolveFieldValue(protoField, values, valueConverter); + } + + /** + * Converts a Fixed64 field value into it's suitable data type. + * + * @param protoField proto field's properties + * @param values field's unprocessed values + * @return converted field values + */ + private Object convertFixed64Fields(ProtoField protoField, List<Long> values) { + final String typeName = protoField.getProtoType().getSimpleName(); + Function<Long, Object> valueConverter; + switch (FieldType.findValue(typeName)) { + case FIXED64: + valueConverter = value -> new BigInteger(unsignedToString(value)); + break; + case SFIXED64: + valueConverter = value -> value; + break; + case DOUBLE: + valueConverter = Double::longBitsToDouble; + break; + default: + throw new IllegalStateException(String.format("Incompatible value was received for field [%s]," + + " [%s] is not Fixed64 field type", protoField.getFieldName(), typeName)); + } + + return resolveFieldValue(protoField, values, valueConverter); + } + + /** + * Converts a Varint field value into it's suitable data type. + * + * @param protoField proto field's properties + * @param values field's unprocessed values + * @return converted field values + */ + private Object convertVarintFields(ProtoField protoField, List<Long> values) { + final ProtoType protoType = protoField.getProtoType(); + final Function<Long, Object> valueConverter; + if (protoType.isScalar()) { + switch (FieldType.findValue(protoType.getSimpleName())) { + case BOOL: + valueConverter = value -> value.equals(1L); + break; + case INT32: + case SFIXED32: + valueConverter = Long::intValue; + break; + case UINT32: + case INT64: + case SFIXED64: + valueConverter = value -> value; + break; + case UINT64: + valueConverter = value -> new BigInteger(unsignedToString(value)); + break; + case SINT32: + valueConverter = value -> decodeZigZag32(value.intValue()); + break; + case SINT64: + valueConverter = CodedInputStream::decodeZigZag64; + break; + default: + throw new IllegalStateException(String.format("Incompatible value was received for field [%s]," + + " [%s] is not Varint field type", protoField.getFieldName(), protoType.getSimpleName())); + } + } else { + valueConverter = value -> { + final EnumType enumType = (EnumType) schema.getType(protoType); + Objects.requireNonNull(enumType, String.format("Enum with name [%s] not found in the provided proto files", protoType)); + return enumType.constant(Integer.parseInt(value.toString())).getName(); + }; + } + + return resolveFieldValue(protoField, values, valueConverter); + } + + private <T> Object resolveFieldValue(ProtoField protoField, List<T> values, Function<T, Object> valueConverter) { + List<Object> resultValues = values.stream().map(valueConverter).collect(Collectors.toList()); + + if (coerceTypes) { + final Optional<RecordField> recordField = rootRecordSchema.getField(protoField.getFieldName()); + if (recordField.isPresent()) { + resultValues = resultValues.stream().map(value -> DataTypeUtils.convertType(value, recordField.get().getDataType(), recordField.get().getFieldName())).collect(Collectors.toList()); + } + } + + if (!protoField.isRepeatable()) { + return resultValues.get(0); + } else { + return resultValues.toArray(); + } + } + + /** + * Handles Map type creation in the record. + * + * @param protoType field's proto type + * @param data data to be processed + * @return created Map + * @throws InvalidProtocolBufferException failed to parse input data + */ + private Map<String, Object> createMap(ProtoType protoType, List<ByteString> data) throws InvalidProtocolBufferException { + Map<String, Object> mapResult = new HashMap<>(); + + for (final ByteString entry : data) { + final UnknownFieldSet unknownFieldSet = UnknownFieldSet.parseFrom(entry); + Map<String, Object> mapEntry = new HashMap<>(); + + collectFieldValue(mapEntry, new ProtoField(MAP_KEY_FIELD_NAME, protoType.getKeyType()), unknownFieldSet.getField(1)); + collectFieldValue(mapEntry, new ProtoField(MAP_VALUE_FIELD_NAME, protoType.getValueType()), unknownFieldSet.getField(2)); + + mapResult.put(String.valueOf(mapEntry.get(MAP_KEY_FIELD_NAME)), mapEntry.get(MAP_VALUE_FIELD_NAME)); + } + + return mapResult; + } + + /** + * Process a 'google.protobuf.Any' typed field. The method gets the schema for the message type provided in the 'type_url' property + * and parse the serialized message from the 'value' field. The result record will contain only the parsed message's fields. + * + * @param unknownFieldSet 'google.protobuf.Any' typed message's field list + * @return created record from the parsed message + * @throws InvalidProtocolBufferException failed to parse input data + */ + private MapRecord handleAnyField(UnknownFieldSet unknownFieldSet) throws InvalidProtocolBufferException { + Map<String, Object> recordValues = new HashMap<>(); + collectFieldValue(recordValues, new ProtoField(ANY_TYPE_URL_FIELD_NAME, ProtoType.STRING), unknownFieldSet.getField(1)); + collectFieldValue(recordValues, new ProtoField(ANY_VALUE_FIELD_NAME, ProtoType.BYTES), unknownFieldSet.getField(2)); + + final String typeName = String.valueOf(recordValues.get(ANY_TYPE_URL_FIELD_NAME)); + final UnknownFieldSet anyFieldSet = UnknownFieldSet.parseFrom((byte[]) recordValues.get(ANY_VALUE_FIELD_NAME)); + final MessageType messageType = (MessageType) schema.getType(getQualifiedTypeName(typeName)); + Objects.requireNonNull(messageType, String.format("Message type with name [%s] not found in the provided proto files", typeName)); + + return new MapRecord(generateRecordSchema(typeName), processMessageFields(messageType, anyFieldSet), false, dropUnknownFields); + } + + /** + * Generates a schema for the provided message type + * + * @param typeName name of the message + * @return generated schema + */ + private RecordSchema generateRecordSchema(String typeName) { + final ProtoSchemaParser schemaParser = new ProtoSchemaParser(schema); + return schemaParser.createSchema(getQualifiedTypeName(typeName)); + } + + /** + * Gets the fully qualified name of the message type. + * + * @param typeName name of the message + * @return fully qualified name of the message type + */ + private String getQualifiedTypeName(String typeName) { + return typeName.substring(typeName.lastIndexOf('/') + 1); + } +} diff --git a/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/schema/ProtoSchemaParser.java b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/schema/ProtoSchemaParser.java new file mode 100644 index 0000000000..8d9e997b6c --- /dev/null +++ b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/schema/ProtoSchemaParser.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.protobuf.schema; + +import com.squareup.wire.schema.EnumConstant; +import com.squareup.wire.schema.EnumType; +import com.squareup.wire.schema.Field; +import com.squareup.wire.schema.MessageType; +import com.squareup.wire.schema.OneOf; +import com.squareup.wire.schema.ProtoType; +import com.squareup.wire.schema.Schema; +import com.squareup.wire.schema.Type; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.type.EnumDataType; +import org.apache.nifi.serialization.record.type.MapDataType; +import org.apache.nifi.serialization.record.type.RecordDataType; +import org.apache.nifi.services.protobuf.FieldType; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +/** + * Creates a {@link RecordSchema} for the provided proto schema. + */ +public class ProtoSchemaParser { + + private final Schema schema; + + public ProtoSchemaParser(Schema schema) { + this.schema = schema; + } + + /** + * Creates a {@link RecordSchema} for the provided message type. + * @param messageTypeName proto message type + * @return record schema + */ + public RecordSchema createSchema(String messageTypeName) { + final MessageType messageType = (MessageType) schema.getType(messageTypeName); + Objects.requireNonNull(messageType, String.format("Message type with name [%s] not found in the provided proto files", messageTypeName)); + List<RecordField> recordFields = new ArrayList<>(); + + recordFields.addAll(processFields(messageType.getDeclaredFields())); + recordFields.addAll(processFields(messageType.getExtensionFields())); + recordFields.addAll(processOneOfFields(messageType)); + + return new SimpleRecordSchema(recordFields); + } + + /** + * Iterates through and process OneOf fields in the given message type. + * @param messageType message type + * @return generated {@link RecordSchema} list from the OneOf fields + */ + private List<RecordField> processOneOfFields(MessageType messageType) { + List<RecordField> recordFields = new ArrayList<>(); + for (final OneOf oneOf : messageType.getOneOfs()) { + + for (Field field : oneOf.getFields()) { + final DataType dataType = getDataTypeForField(field.getType()); + recordFields.add(new RecordField(field.getName(), dataType, field.getDefault(), true)); + } + } + + return recordFields; + } + + /** + * Iterates through and process fields in the given message type. + * @return generated {@link RecordSchema} list from the provided fields + */ + private List<RecordField> processFields(List<Field> fields) { + List<RecordField> recordFields = new ArrayList<>(); + for (final Field field : fields) { + DataType dataType = getDataTypeForField(field.getType()); + + if (field.isRepeated()) { + dataType = RecordFieldType.ARRAY.getArrayDataType(dataType); + } + + recordFields.add(new RecordField(field.getName(), dataType, field.getDefault(), !field.isRequired())); + } + + return recordFields; + } + + /** + * Checks the provided field's type and calls the proper {@link DataType} processing function. + * @param protoType field's type + * @return data type + */ + private DataType getDataTypeForField(ProtoType protoType) { + if (protoType.isScalar()) { + return getDataTypeForScalarField(protoType); + } else { + return getDataTypeForCompositeField(protoType); + } + } + + /** + * Gets the suitable {@link DataType} for the provided composite field. + * @param protoType field's type + * @return data type + */ + private DataType getDataTypeForCompositeField(ProtoType protoType) { + if (protoType.isMap()) { + final DataType valueType = getDataTypeForField(protoType.getValueType()); + return new MapDataType(valueType); + } + + final Type fieldType = schema.getType(protoType); + + if (fieldType instanceof MessageType) { + final RecordSchema recordSchema = createSchema(protoType.toString()); + return new RecordDataType(recordSchema); + } else if (fieldType instanceof EnumType) { + return new EnumDataType(((EnumType) fieldType).getConstants().stream().map(EnumConstant::getName).collect(Collectors.toList())); + } else { + throw new IllegalStateException("Unknown proto type: " + fieldType); + } + } + + /** + * Gets the suitable {@link DataType} for the provided scalar field. + * @param protoType field's type + * @return data type + */ + private DataType getDataTypeForScalarField(ProtoType protoType) { + switch (FieldType.findValue(protoType.getSimpleName())) { + case DOUBLE: + return RecordFieldType.DOUBLE.getDataType(); + case FLOAT: + return RecordFieldType.FLOAT.getDataType(); + case INT32: + case SFIXED32: + return RecordFieldType.INT.getDataType(); + case UINT32: + case SINT32: + case FIXED32: + case INT64: + case SINT64: + case SFIXED64: + return RecordFieldType.LONG.getDataType(); + case UINT64: + case FIXED64: + return RecordFieldType.BIGINT.getDataType(); + case BOOL: + return RecordFieldType.BOOLEAN.getDataType(); + case STRING: + return RecordFieldType.STRING.getDataType(); + case BYTES: + return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType()); + default: + throw new IllegalArgumentException(); + } + } +} diff --git a/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/schema/ProtoSchemaStrategy.java b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/schema/ProtoSchemaStrategy.java new file mode 100644 index 0000000000..d6977847e8 --- /dev/null +++ b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/schema/ProtoSchemaStrategy.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.protobuf.schema; + +import com.squareup.wire.schema.Schema; +import org.apache.nifi.schema.access.SchemaAccessStrategy; +import org.apache.nifi.schema.access.SchemaField; +import org.apache.nifi.serialization.record.RecordSchema; + +import java.io.InputStream; +import java.util.EnumSet; +import java.util.Map; +import java.util.Set; + +public class ProtoSchemaStrategy implements SchemaAccessStrategy { + + private final String messageType; + private final Schema schema; + + public ProtoSchemaStrategy(String messageType, Schema schema) { + this.messageType = messageType; + this.schema = schema; + } + + @Override + public RecordSchema getSchema(Map<String, String> variables, InputStream contentStream, RecordSchema readSchema) { + final ProtoSchemaParser schemaParser = new ProtoSchemaParser(schema); + return schemaParser.createSchema(messageType); + } + + @Override + public Set<SchemaField> getSuppliedSchemaFields() { + return EnumSet.noneOf(SchemaField.class); + } +} diff --git a/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/validation/ProtoValidationResource.java b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/validation/ProtoValidationResource.java new file mode 100644 index 0000000000..35cce3bf51 --- /dev/null +++ b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/validation/ProtoValidationResource.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.protobuf.validation; + +import com.squareup.wire.schema.Schema; + +public class ProtoValidationResource { + + private final String protoDirectory; + private final Schema protoSchema; + + public ProtoValidationResource(String protoDirectory, Schema protoSchema) { + this.protoDirectory = protoDirectory; + this.protoSchema = protoSchema; + } + + public String getProtoDirectory() { + return protoDirectory; + } + + public Schema getProtoSchema() { + return protoSchema; + } +} diff --git a/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService new file mode 100644 index 0000000000..44ded1008f --- /dev/null +++ b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.nifi.services.protobuf.ProtobufReader \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/resources/docs/org.apache.nifi.services.protobuf.ProtobufReader/additionalDetails.html b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/resources/docs/org.apache.nifi.services.protobuf.ProtobufReader/additionalDetails.html new file mode 100644 index 0000000000..0228875ffc --- /dev/null +++ b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/resources/docs/org.apache.nifi.services.protobuf.ProtobufReader/additionalDetails.html @@ -0,0 +1,189 @@ +<!DOCTYPE html> +<html lang="en"> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + <head> + <meta charset="utf-8"/> + <title>ProtobufReader</title> + <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/> + </head> + + <body> + <p> + The ProtobufReader Controller Service reads and parses a Protocol Buffers Message from binary format and creates a Record object. + The Controller Service must be configured with the same '.proto' file that was used for the Message encoding, and the fully qualified + name of the Message type including its package (e.g. mypackage.MyMessage). The Reader will always generate one record from the input + data which represents the provided Protocol Buffers Message type. + Further information about Protocol Buffers can be found here: + <a href="https://protobuf.dev/">protobuf.dev</a> + </p> + + <h2>Data Type Mapping</h2> + + <p> + When a record is parsed from incoming data, the Controller Service is going to map the Proto Message field types to the corresponding + NiFi data types. The mapping between the provided Message fields and the encoded input is always based on the field tag numbers. + When a field is defined as 'repeated' then it's data type will be an array with data type of it's originally specified type. + The following tables show which proto field type will correspond to which NiFi field type after the conversion. + </p> + + <h3> + Scalar Value Types + </h3> + + <p> + <table> + <tr><th>Proto Type</th><th>Proto Wire Type</th><th>NiFi Data Type</th></tr> + <tr><td>double</td><td>fixed64</td><td>double</td></tr> + <tr><td>float</td><td>fixed32</td><td>float</td></tr> + <tr><td>int32</td><td>varint</td><td>int</td></tr> + <tr><td>int64</td><td>varint</td><td>long</td></tr> + <tr><td>uint32</td><td>varint</td><td>long</td></tr> + <tr><td>uint64</td><td>varint</td><td>bigint</td></tr> + <tr><td>sint32</td><td>varint</td><td>long</td></tr> + <tr><td>sint64</td><td>varint</td><td>long</td></tr> + <tr><td>fixed32</td><td>fixed32</td><td>long</td></tr> + <tr><td>fixed64</td><td>fixed64</td><td>bigint</td></tr> + <tr><td>sfixed32</td><td>varint</td><td>int</td></tr> + <tr><td>sfixed64</td><td>varint</td><td>long</td></tr> + <tr><td>bool</td><td>varint</td><td>boolean</td></tr> + <tr><td>string</td><td>length delimited</td><td>string</td></tr> + <tr><td>bytes</td><td>length delimited</td><td>array[byte]</td></tr> + </table> + </p> + + <h3> + Composite Value Types + </h3> + + <p> + <table> + <tr><th>Proto Type</th><th>Proto Wire Type</th><th>NiFi Data Type</th></tr> + <tr><td>message</td><td>length delimited</td><td>record</td></tr> + <tr><td>enum</td><td>varint</td><td>enum</td></tr> + <tr><td>map</td><td>length delimited</td><td>map</td></tr> + <tr><td>oneof</td><td>-</td><td>choice</td></tr> + </table> + </p> + + <h2>Schemas and Type Coercion</h2> + + <p> + When a record is parsed from incoming data, it is separated into fields. Each of these fields is then looked up against the + configured schema (by field name) in order to determine what the type of the data should be. If the field is not present in + the schema, that field will be stored in the Record's value list on its original type. If the field is found in the schema, + the data type of the received data is compared against the data type specified in the schema. If the types match, the value + of that field is used as-is. If the schema indicates that the field should be of a different type, then the Controller Service + will attempt to coerce the data into the type specified by the schema. If the field cannot be coerced into the specified type, + an Exception will be thrown. + </p> + + <p> + The following rules apply when attempting to coerce a field value from one data type to another: + </p> + + <ul> + <li>Any data type can be coerced into a String type.</li> + <li>Any numeric data type (Int, Long, Float, Double) can be coerced into any other numeric data type.</li> + <li>Any numeric value can be coerced into a Date, Time, or Timestamp type, by assuming that the Long value is the number of + milliseconds since epoch (Midnight GMT, January 1, 1970).</li> + <li>A String value can be coerced into a Date, Time, or Timestamp type, if its format matches the configured "Date Format," "Time Format," + or "Timestamp Format."</li> + <li>A String value can be coerced into a numeric value if the value is of the appropriate type. For example, the String value + <code>8</code> can be coerced into any numeric type. However, the String value <code>8.2</code> can be coerced into a Double or Float + type but not an Integer.</li> + <li>A String value of "true" or "false" (regardless of case) can be coerced into a Boolean value.</li> + <li>A String value that is not empty can be coerced into a Char type. If the String contains more than 1 character, the first character is used + and the rest of the characters are ignored.</li> + </ul> + + <p> + If none of the above rules apply when attempting to coerce a value from one data type to another, the coercion will fail and an Exception + will be thrown. + </p> + + <h2>Schema Access Strategy</h2> + + <p> + Beside the common Schema Access strategies like getting the schema from property value or accessing it from Schema Registry, + the ProtobufReader Controller Service offers another access strategy option called "Generate from Proto file". When using this strategy, + the Reader will generate the Record Schema from the provided '.proto' file and Message type. This is a recommended strategy when the user + doesn't want to manually create the schema or when no type coercion is needed. + </p> + + + <h2>Protobuf Any Field Type</h2> + + <p> + Protocol Buffers offers further Message types called Well-Known Types. These are additionally provided messages that defines + complex structured types and wrappers for scalar types. The Any type is one of these Well-Known Types which is used to store an arbitrary + serialized Message along with an URL that describes the type of the serialized Message. Since the Message type and the embedded Message will be + available only when the Any Message is already populated with data, the ProtobufReader needs to do this Message processing at data conversion time. + The Reader is capable to generate schema for the embedded Message in the Any field and replace it in the result Record schema. + </p> + + <h3>Example</h3> + + <p> + There is a Message called 'TestMessage' which has only one field that is an Any typed field. There is another Message called 'NestedMessage' + that we would like to add as serialized Message in the value of 'anyField'. + </p> + +<code><pre> +message Any { + string type_url = 1; + bytes value = 2; +} + +message TestMessage { + google.protobuf.Any anyField = 3; +} + +message NestedMessage { + string field_1 = 1; + string field_2 = 2; + string field_3 = 3; +} +</pre></code> + + <p> + With normal data conversion our result would look like this: + </p> + +<code><pre> +{ + anyField : { + type_url : "type.googleapis.com/NestedMessage" + value : [84, 101, 115, 116, 32, 98, 121, 116, 101, 115] + } +} +</pre></code> + + <p> + Result after the Protobuf Reader replaces the Any Message's fields with the processed embedded Message: + </p> + +<code><pre> +{ + anyField : { + field_1 : "value 1", + field_2 : "value 2", + field_3 : "value 3" + } +} +</pre></code> + + </body> +</html> diff --git a/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/ProtoTestUtil.java b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/ProtoTestUtil.java new file mode 100644 index 0000000000..4a10c0ecfd --- /dev/null +++ b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/ProtoTestUtil.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.protobuf; + +import com.google.protobuf.DescriptorProtos; +import com.google.protobuf.Descriptors; +import com.google.protobuf.DynamicMessage; +import com.squareup.wire.schema.CoreLoaderKt; +import com.squareup.wire.schema.Location; +import com.squareup.wire.schema.Schema; +import com.squareup.wire.schema.SchemaLoader; + +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.FileSystems; +import java.util.Arrays; +import java.util.Collections; + +import static org.apache.nifi.services.protobuf.converter.ProtobufDataConverter.MAP_KEY_FIELD_NAME; +import static org.apache.nifi.services.protobuf.converter.ProtobufDataConverter.MAP_VALUE_FIELD_NAME; + +public class ProtoTestUtil { + + public static final String BASE_TEST_PATH = "src/test/resources/"; + + public static Schema loadProto3TestSchema() { + final SchemaLoader schemaLoader = new SchemaLoader(FileSystems.getDefault()); + schemaLoader.initRoots(Collections.singletonList(Location.get(BASE_TEST_PATH + "test_proto3.proto")), Collections.emptyList()); + return schemaLoader.loadSchema(); + } + + public static Schema loadProto2TestSchema() { + final SchemaLoader schemaLoader = new SchemaLoader(FileSystems.getDefault()); + schemaLoader.initRoots(Arrays.asList( + Location.get(BASE_TEST_PATH, "test_proto2.proto"), + Location.get(CoreLoaderKt.WIRE_RUNTIME_JAR, "google/protobuf/any.proto")), Collections.emptyList()); + return schemaLoader.loadSchema(); + } + + public static InputStream generateInputDataForProto3() throws IOException, Descriptors.DescriptorValidationException { + DescriptorProtos.FileDescriptorSet descriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(new FileInputStream(BASE_TEST_PATH + "test_proto3.desc")); + Descriptors.FileDescriptor fileDescriptor = Descriptors.FileDescriptor.buildFrom(descriptorSet.getFile(0), new Descriptors.FileDescriptor[0]); + + Descriptors.Descriptor messageDescriptor = fileDescriptor.findMessageTypeByName("Proto3Message"); + Descriptors.Descriptor nestedMessageDescriptor = fileDescriptor.findMessageTypeByName("NestedMessage"); + Descriptors.EnumDescriptor enumValueDescriptor = fileDescriptor.findEnumTypeByName("TestEnum"); + Descriptors.Descriptor mapDescriptor = nestedMessageDescriptor.findNestedTypeByName("TestMapEntry"); + + DynamicMessage mapEntry1 = DynamicMessage + .newBuilder(mapDescriptor) + .setField(mapDescriptor.findFieldByName(MAP_KEY_FIELD_NAME), "test_key_entry1") + .setField(mapDescriptor.findFieldByName(MAP_VALUE_FIELD_NAME), 101) + .build(); + + DynamicMessage mapEntry2 = DynamicMessage + .newBuilder(mapDescriptor) + .setField(mapDescriptor.findFieldByName(MAP_KEY_FIELD_NAME), "test_key_entry2") + .setField(mapDescriptor.findFieldByName(MAP_VALUE_FIELD_NAME), 202) + .build(); + + DynamicMessage nestedMessage = DynamicMessage + .newBuilder(nestedMessageDescriptor) + .setField(nestedMessageDescriptor.findFieldByNumber(20), enumValueDescriptor.findValueByNumber(2)) + .addRepeatedField(nestedMessageDescriptor.findFieldByNumber(21), "Repeated 1") + .addRepeatedField(nestedMessageDescriptor.findFieldByNumber(21), "Repeated 2") + .addRepeatedField(nestedMessageDescriptor.findFieldByNumber(21), "Repeated 3") + .setField(nestedMessageDescriptor.findFieldByNumber(22), "One Of Option") + .setField(nestedMessageDescriptor.findFieldByNumber(23), true) + .setField(nestedMessageDescriptor.findFieldByNumber(24), 3) + .setField(nestedMessageDescriptor.findFieldByNumber(25), Arrays.asList(mapEntry1, mapEntry2)) + .build(); + + DynamicMessage message = DynamicMessage + .newBuilder(messageDescriptor) + .setField(messageDescriptor.findFieldByNumber(1), true) + .setField(messageDescriptor.findFieldByNumber(2), "Test text") + .setField(messageDescriptor.findFieldByNumber(3), Integer.MAX_VALUE) + .setField(messageDescriptor.findFieldByNumber(4), -1) + .setField(messageDescriptor.findFieldByNumber(5), Integer.MIN_VALUE) + .setField(messageDescriptor.findFieldByNumber(6), -2) + .setField(messageDescriptor.findFieldByNumber(7), Integer.MAX_VALUE) + .setField(messageDescriptor.findFieldByNumber(8), Double.MAX_VALUE) + .setField(messageDescriptor.findFieldByNumber(9), Float.MAX_VALUE) + .setField(messageDescriptor.findFieldByNumber(10), "Test bytes".getBytes()) + .setField(messageDescriptor.findFieldByNumber(11), Long.MAX_VALUE) + .setField(messageDescriptor.findFieldByNumber(12), -1L) + .setField(messageDescriptor.findFieldByNumber(13), Long.MIN_VALUE) + .setField(messageDescriptor.findFieldByNumber(14), -2L) + .setField(messageDescriptor.findFieldByNumber(15), Long.MAX_VALUE) + .setField(messageDescriptor.findFieldByNumber(16), nestedMessage) + .build(); + + return message.toByteString().newInput(); + } + + public static InputStream generateInputDataForProto2() throws IOException, Descriptors.DescriptorValidationException { + DescriptorProtos.FileDescriptorSet anyDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(new FileInputStream(BASE_TEST_PATH + "google/protobuf/any.desc")); + Descriptors.FileDescriptor anyDesc = Descriptors.FileDescriptor.buildFrom(anyDescriptorSet.getFile(0), new Descriptors.FileDescriptor[]{}); + + DescriptorProtos.FileDescriptorSet descriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(new FileInputStream(BASE_TEST_PATH + "test_proto2.desc")); + Descriptors.FileDescriptor fileDescriptor = Descriptors.FileDescriptor.buildFrom(descriptorSet.getFile(0), new Descriptors.FileDescriptor[]{anyDesc}); + + Descriptors.Descriptor messageDescriptor = fileDescriptor.findMessageTypeByName("Proto2Message"); + Descriptors.Descriptor anyTestDescriptor = fileDescriptor.findMessageTypeByName("AnyValueMessage"); + Descriptors.FieldDescriptor fieldDescriptor = fileDescriptor.findExtensionByName("extensionField"); + Descriptors.Descriptor anyDescriptor = anyDesc.findMessageTypeByName("Any"); + + DynamicMessage anyTestMessage = DynamicMessage + .newBuilder(anyTestDescriptor) + .setField(anyTestDescriptor.findFieldByNumber(1), "Test field 1") + .setField(anyTestDescriptor.findFieldByNumber(2), "Test field 2") + .build(); + + DynamicMessage anyMessage = DynamicMessage + .newBuilder(anyDescriptor) + .setField(anyDescriptor.findFieldByNumber(1), "type.googleapis.com/AnyValueMessage") + .setField(anyDescriptor.findFieldByNumber(2), anyTestMessage.toByteArray()) + .build(); + + DynamicMessage message = DynamicMessage + .newBuilder(messageDescriptor) + .setField(messageDescriptor.findFieldByNumber(1), true) + .setField(messageDescriptor.findFieldByNumber(3), anyMessage) + .setField(fieldDescriptor, Integer.MAX_VALUE) + .build(); + + return message.toByteString().newInput(); + } +} diff --git a/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/TestProtobufRecordReader.java b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/TestProtobufRecordReader.java new file mode 100644 index 0000000000..cd433015de --- /dev/null +++ b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/TestProtobufRecordReader.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.protobuf; + +import com.google.protobuf.Descriptors; +import com.squareup.wire.schema.Schema; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.nifi.services.protobuf.ProtoTestUtil.generateInputDataForProto3; +import static org.apache.nifi.services.protobuf.ProtoTestUtil.loadProto3TestSchema; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; + +public class TestProtobufRecordReader { + + private static Schema protoSchema; + + @BeforeAll + public static void setup(){ + protoSchema = loadProto3TestSchema(); + } + + @Test + public void testReadRecord() throws Descriptors.DescriptorValidationException, IOException { + final ProtobufRecordReader reader = createReader(generateInputDataForProto3(), "Proto3Message", protoSchema, generateRecordSchema()); + final Record record = reader.nextRecord(false, false); + + final Object field1 = record.getValue("booleanField"); + assertEquals(true, field1); + assertInstanceOf(Boolean.class, field1); + + final Object field2 = record.getValue("stringField"); + assertEquals("Test text", field2); + assertInstanceOf(String.class, field2); + + final Object field3 = record.getValue("int32Field"); + assertEquals(Integer.MAX_VALUE, field3); + assertInstanceOf(Integer.class, field3); + + final Object field4 = record.getValue("uint32Field"); + assertNotNull(field4); + } + + @Test + public void testReadRecordWithCoerceType() throws Descriptors.DescriptorValidationException, IOException { + final ProtobufRecordReader reader = createReader(generateInputDataForProto3(), "Proto3Message", protoSchema, generateRecordSchema()); + final Record record = reader.nextRecord(true, false); + + final Object field1 = record.getValue("booleanField"); + assertEquals("true", field1); + assertInstanceOf(String.class, field1); + + final Object field2 = record.getValue("stringField"); + assertEquals("Test text", field2); + assertInstanceOf(String.class, field2); + + final Object field3 = record.getValue("int32Field"); + assertEquals(String.valueOf(Integer.MAX_VALUE), field3); + assertInstanceOf(String.class, field3); + + final Object field4 = record.getValue("uint32Field"); + assertNotNull(field4); + } + + @Test + public void testReadRecordWithDropUnknownFields() throws Descriptors.DescriptorValidationException, IOException { + final ProtobufRecordReader reader = createReader(generateInputDataForProto3(), "Proto3Message", protoSchema, generateRecordSchema()); + final Record record = reader.nextRecord(false, true); + + final Object field1 = record.getValue("booleanField"); + assertEquals(true, field1); + assertInstanceOf(Boolean.class, field1); + + final Object field2 = record.getValue("stringField"); + assertEquals("Test text", field2); + assertInstanceOf(String.class, field2); + + final Object field3 = record.getValue("int32Field"); + assertEquals(Integer.MAX_VALUE, field3); + assertInstanceOf(Integer.class, field3); + + final Object field4 = record.getValue("uint32Field"); + assertNull(field4); + } + + @Test + public void testReadRecordWithCoerceTypeAndDropUnknownFields() throws Descriptors.DescriptorValidationException, IOException { + final ProtobufRecordReader reader = createReader(generateInputDataForProto3(), "Proto3Message", protoSchema, generateRecordSchema()); + final Record record = reader.nextRecord(true, true); + + final Object field1 = record.getValue("booleanField"); + assertEquals("true", field1); + assertInstanceOf(String.class, field1); + + final Object field2 = record.getValue("stringField"); + assertEquals("Test text", field2); + assertInstanceOf(String.class, field2); + + final Object field3 = record.getValue("int32Field"); + assertEquals(String.valueOf(Integer.MAX_VALUE), field3); + assertInstanceOf(String.class, field3); + + final Object field4 = record.getValue("uint32Field"); + assertNull(field4); + } + + private RecordSchema generateRecordSchema() { + final List<RecordField> fields = new ArrayList<>(); + for (final String fieldName : new String[] {"booleanField", "stringField", "int32Field"}) { + fields.add(new RecordField(fieldName, RecordFieldType.STRING.getDataType())); + } + return new SimpleRecordSchema(fields); + } + + private ProtobufRecordReader createReader(InputStream in, String message, Schema schema, RecordSchema recordSchema) { + return new ProtobufRecordReader(schema, message, in, recordSchema); + } +} diff --git a/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/converter/TestProtobufDataConverter.java b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/converter/TestProtobufDataConverter.java new file mode 100644 index 0000000000..df811f506b --- /dev/null +++ b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/converter/TestProtobufDataConverter.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.protobuf.converter; + +import com.google.protobuf.Descriptors; +import com.squareup.wire.schema.Schema; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.util.DataTypeUtils; +import org.apache.nifi.services.protobuf.ProtoTestUtil; +import org.apache.nifi.services.protobuf.schema.ProtoSchemaParser; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.math.BigInteger; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.nifi.services.protobuf.ProtoTestUtil.loadProto2TestSchema; +import static org.apache.nifi.services.protobuf.ProtoTestUtil.loadProto3TestSchema; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TestProtobufDataConverter { + + @Test + public void testDataConverterForProto3() throws Descriptors.DescriptorValidationException, IOException { + final Schema schema = loadProto3TestSchema(); + final RecordSchema recordSchema = new ProtoSchemaParser(schema).createSchema("Proto3Message"); + + final ProtobufDataConverter dataConverter = new ProtobufDataConverter(schema, "Proto3Message", recordSchema, false, false); + final MapRecord record = dataConverter.createRecord(ProtoTestUtil.generateInputDataForProto3()); + + assertEquals(true, record.getValue("booleanField")); + assertEquals("Test text", record.getValue("stringField")); + assertEquals(Integer.MAX_VALUE, record.getValue("int32Field")); + assertEquals(4294967295L, record.getValue("uint32Field")); + assertEquals(Integer.MIN_VALUE, record.getValue("sint32Field")); + assertEquals(4294967294L, record.getValue("fixed32Field")); + assertEquals(Integer.MAX_VALUE, record.getValue("sfixed32Field")); + assertEquals(Double.MAX_VALUE, record.getValue("doubleField")); + assertEquals(Float.MAX_VALUE, record.getValue("floatField")); + assertArrayEquals("Test bytes".getBytes(), (byte[]) record.getValue("bytesField")); + assertEquals(Long.MAX_VALUE, record.getValue("int64Field")); + assertEquals(new BigInteger("18446744073709551615"), DataTypeUtils.toBigInt(record.getValue("uint64Field"), "field12")); + assertEquals(Long.MIN_VALUE, record.getValue("sint64Field")); + assertEquals(new BigInteger("18446744073709551614"), DataTypeUtils.toBigInt(record.getValue("fixed64Field"), "field14")); + assertEquals(Long.MAX_VALUE, record.getValue("sfixed64Field")); + + final MapRecord nestedRecord = (MapRecord) record.getValue("nestedMessage"); + assertEquals("ENUM_VALUE_3", nestedRecord.getValue("testEnum")); + + assertArrayEquals(new Object[]{"Repeated 1", "Repeated 2", "Repeated 3"}, (Object[]) nestedRecord.getValue("repeatedField")); + + // assert only one field is set in the OneOf field + assertNull(nestedRecord.getValue("stringOption")); + assertNull(nestedRecord.getValue("booleanOption")); + assertEquals(3, nestedRecord.getValue("int32Option")); + + final Map<String, Integer> expectedMap = new HashMap<String, Integer>() {{ + put("test_key_entry1", 101); + put("test_key_entry2", 202); + }}; + assertEquals(expectedMap, nestedRecord.getValue("testMap")); + } + + @Test + public void testDataConverterForProto2() throws Descriptors.DescriptorValidationException, IOException { + final Schema schema = loadProto2TestSchema(); + final RecordSchema recordSchema = new ProtoSchemaParser(schema).createSchema("Proto2Message"); + + final ProtobufDataConverter dataConverter = new ProtobufDataConverter(schema, "Proto2Message", recordSchema, false, false); + final MapRecord record = dataConverter.createRecord(ProtoTestUtil.generateInputDataForProto2()); + + assertEquals(true, record.getValue("booleanField")); + assertEquals("Missing field", record.getValue("stringField")); + assertEquals(Integer.MAX_VALUE, record.getValue("extensionField")); + + final MapRecord anyValueRecord = (MapRecord) record.getValue("anyField"); + assertEquals("Test field 1", anyValueRecord.getValue("anyStringField1")); + assertEquals("Test field 2", anyValueRecord.getValue("anyStringField2")); + } + + @Test + public void testMissingMessage() { + final Schema schema = loadProto3TestSchema(); + final RecordSchema recordSchema = new ProtoSchemaParser(schema).createSchema("Proto3Message"); + + final ProtobufDataConverter dataConverter = new ProtobufDataConverter(schema, "MissingMessage", recordSchema, false, false); + + NullPointerException e = assertThrows(NullPointerException.class, () -> dataConverter.createRecord(ProtoTestUtil.generateInputDataForProto2())); + assertTrue(e.getMessage().contains("Message with name [MissingMessage] not found in the provided proto files"), e.getMessage()); + } +} diff --git a/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/schema/TestProtoSchemaParser.java b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/schema/TestProtoSchemaParser.java new file mode 100644 index 0000000000..d313bb595c --- /dev/null +++ b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/schema/TestProtoSchemaParser.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.protobuf.schema; + +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; + +import static org.apache.nifi.services.protobuf.ProtoTestUtil.loadProto2TestSchema; +import static org.apache.nifi.services.protobuf.ProtoTestUtil.loadProto3TestSchema; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class TestProtoSchemaParser { + + @Test + public void testSchemaParserForProto3() { + final ProtoSchemaParser schemaParser = new ProtoSchemaParser(loadProto3TestSchema()); + + final SimpleRecordSchema expected = new SimpleRecordSchema(Arrays.asList( + new RecordField("booleanField", RecordFieldType.BOOLEAN.getDataType()), + new RecordField("stringField", RecordFieldType.STRING.getDataType()), + new RecordField("int32Field", RecordFieldType.INT.getDataType()), + new RecordField("uint32Field", RecordFieldType.LONG.getDataType()), + new RecordField("sint32Field", RecordFieldType.LONG.getDataType()), + new RecordField("fixed32Field", RecordFieldType.LONG.getDataType()), + new RecordField("sfixed32Field", RecordFieldType.INT.getDataType()), + new RecordField("doubleField", RecordFieldType.DOUBLE.getDataType()), + new RecordField("floatField", RecordFieldType.FLOAT.getDataType()), + new RecordField("bytesField", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType())), + new RecordField("int64Field", RecordFieldType.LONG.getDataType()), + new RecordField("uint64Field", RecordFieldType.BIGINT.getDataType()), + new RecordField("sint64Field", RecordFieldType.LONG.getDataType()), + new RecordField("fixed64Field", RecordFieldType.BIGINT.getDataType()), + new RecordField("sfixed64Field", RecordFieldType.LONG.getDataType()), + new RecordField("nestedMessage", RecordFieldType.RECORD.getRecordDataType(new SimpleRecordSchema(Arrays.asList( + new RecordField("testEnum", RecordFieldType.ENUM.getEnumDataType(Arrays.asList("ENUM_VALUE_1", "ENUM_VALUE_2", "ENUM_VALUE_3"))), + new RecordField("repeatedField", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType())), + new RecordField("testMap", RecordFieldType.MAP.getMapDataType(RecordFieldType.INT.getDataType())), + new RecordField("stringOption", RecordFieldType.STRING.getDataType()), + new RecordField("booleanOption", RecordFieldType.BOOLEAN.getDataType()), + new RecordField("int32Option", RecordFieldType.INT.getDataType()) + )))) + )); + + final RecordSchema actual = schemaParser.createSchema("Proto3Message"); + assertEquals(expected, actual); + } + + @Test + public void testSchemaParserForProto2() { + final ProtoSchemaParser schemaParser = new ProtoSchemaParser(loadProto2TestSchema()); + + final SimpleRecordSchema expected = new SimpleRecordSchema(Arrays.asList( + new RecordField("booleanField", RecordFieldType.BOOLEAN.getDataType(), false), + new RecordField("stringField", RecordFieldType.STRING.getDataType(), "Missing field", true), + new RecordField("anyField", RecordFieldType.RECORD.getRecordDataType(new SimpleRecordSchema(Arrays.asList( + new RecordField("type_url", RecordFieldType.STRING.getDataType()), + new RecordField("value", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType())) + )))), + new RecordField("extensionField", RecordFieldType.INT.getDataType()) + )); + + final RecordSchema actual = schemaParser.createSchema("Proto2Message"); + assertEquals(expected, actual); + } +} diff --git a/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/google/protobuf/any.desc b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/google/protobuf/any.desc new file mode 100644 index 0000000000..75c391b226 --- /dev/null +++ b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/google/protobuf/any.desc @@ -0,0 +1,7 @@ + +� + any.protogoogle.protobuf"6 +Any +type_url ( RtypeUrl +value (RvalueBv +com.google.protobufBAnyProtoPZ,google.golang.org/protobuf/types/known/anypb�GPB�Google.Protobuf.WellKnownTypesbproto3 \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/test_proto2.desc b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/test_proto2.desc new file mode 100644 index 0000000000..e938726323 --- /dev/null +++ b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/test_proto2.desc @@ -0,0 +1,11 @@ + +� +test_proto2.protogoogle/protobuf/any.proto"� + Proto2Message" +booleanField (RbooleanField/ +stringField ( : Missing fieldRstringField0 +anyField (2.google.protobuf.AnyRanyField*d����"e +AnyValueMessage( +anyStringField1 ( RanyStringField1( +anyStringField2 ( RanyStringField2:6 +extensionField.Proto2Messaged (RextensionField \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/test_proto2.proto b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/test_proto2.proto new file mode 100644 index 0000000000..11d71e6c44 --- /dev/null +++ b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/test_proto2.proto @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +syntax = "proto2"; + +import "google/protobuf/any.proto"; + +message Proto2Message { + extensions 100 to max; + required bool booleanField = 1; + optional string stringField = 2 [default = "Missing field"]; + optional google.protobuf.Any anyField = 3; +} + +message AnyValueMessage { + optional string anyStringField1 = 1; + optional string anyStringField2 = 2; +} + +extend Proto2Message { + optional int32 extensionField = 100; +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/test_proto3.desc b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/test_proto3.desc new file mode 100644 index 0000000000..a2316f3f87 Binary files /dev/null and b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/test_proto3.desc differ diff --git a/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/test_proto3.proto b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/test_proto3.proto new file mode 100644 index 0000000000..a6ddec0e61 --- /dev/null +++ b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/test_proto3.proto @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +syntax = "proto3"; + +message Proto3Message { + bool booleanField = 1; + string stringField = 2; + int32 int32Field = 3; + uint32 uint32Field = 4; + sint32 sint32Field = 5; + fixed32 fixed32Field = 6; + sfixed32 sfixed32Field = 7; + double doubleField = 8; + float floatField = 9; + bytes bytesField = 10; + int64 int64Field = 11; + uint64 uint64Field = 12; + sint64 sint64Field = 13; + fixed64 fixed64Field = 14; + sfixed64 sfixed64Field = 15; + NestedMessage nestedMessage = 16; +} + +message NestedMessage { + TestEnum testEnum = 20; + repeated string repeatedField = 21; + oneof oneOfField { + string stringOption = 22; + bool booleanOption = 23; + int32 int32Option = 24; + } + map<string, int32> testMap = 25; +} + +enum TestEnum { + ENUM_VALUE_1 = 0; + ENUM_VALUE_2 = 1; + ENUM_VALUE_3 = 2; +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-protobuf-bundle/pom.xml b/nifi-nar-bundles/nifi-protobuf-bundle/pom.xml new file mode 100644 index 0000000000..04d9553db1 --- /dev/null +++ b/nifi-nar-bundles/nifi-protobuf-bundle/pom.xml @@ -0,0 +1,110 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-nar-bundles</artifactId> + <version>1.26.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-protobuf-bundle</artifactId> + <packaging>pom</packaging> + + <modules> + <module>nifi-protobuf-services</module> + <module>nifi-protobuf-services-nar</module> + </modules> + + <dependencyManagement> + <dependencies> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>32.1.2-jre</version> + </dependency> + <dependency> + <groupId>org.jetbrains.kotlin</groupId> + <artifactId>kotlin-stdlib</artifactId> + <version>${kotlin.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.jetbrains.kotlin</groupId> + <artifactId>kotlin-stdlib-jdk8</artifactId> + <version>${kotlin.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.jetbrains.kotlin</groupId> + <artifactId>kotlin-stdlib-common</artifactId> + <version>${kotlin.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>com.github.ben-manes.caffeine</groupId> + <artifactId>caffeine</artifactId> + <version>${caffeine.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-compress</artifactId> + <version>${org.apache.commons.compress.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + <version>${org.apache.commons.io.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-text</artifactId> + <version>${org.apache.commons.text.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + <version>${org.apache.commons.lang3.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-annotations</artifactId> + <version>${jackson.bom.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + <version>${jackson.bom.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + <version>${jackson.bom.version}</version> + <scope>provided</scope> + </dependency> + </dependencies> + </dependencyManagement> + +</project> \ No newline at end of file diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml index 67fd6fd8c7..20e49475f9 100755 --- a/nifi-nar-bundles/pom.xml +++ b/nifi-nar-bundles/pom.xml @@ -127,6 +127,7 @@ <module>nifi-cipher-bundle</module> <module>nifi-compress-bundle</module> <module>nifi-opentelemetry-bundle</module> + <module>nifi-protobuf-bundle</module> </modules> <build>