exceptionfactory commented on code in PR #10105: URL: https://github.com/apache/nifi/pull/10105#discussion_r2268058335
########## nifi-extension-bundles/nifi-confluent-platform-bundle/nifi-confluent-platform-api/pom.xml: ########## @@ -0,0 +1,24 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor + license agreements. See the NOTICE file distributed with this work for additional + information regarding copyright ownership. The ASF licenses this file to + You under the Apache License, Version 2.0 (the "License"); you may not use + this file except in compliance with the License. You may obtain a copy of + the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required + by applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. See the License for the specific + language governing permissions and limitations under the License. --> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-confluent-platform-bundle</artifactId> + <version>2.6.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-confluent-platform-api</artifactId> Review Comment: Since this only contains Protobuf components, recommend making this more specific: ```suggestion <artifactId>nifi-confluent-platform-schema-api</artifactId> ``` ########## nifi-extension-bundles/nifi-confluent-platform-bundle/nifi-confluent-protobuf-message-name-resolver/src/main/java/org/apache/nifi/confluent/schemaregistry/VarintUtils.java: ########## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.confluent.schemaregistry; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; + +/** + * Utility class for reading, writing and decoding varint values from input streams. + * This class provides methods for reading variable-length integers (varints), + * writing zigzag-encoded varints, and decoding zigzag-encoded integers as per the Protocol Buffers specification. + * + * <p>Variable-length integers (varints) are a method of serializing integers using one or more bytes. + * Smaller numbers take fewer bytes. Each byte in a varint, except the last byte, has the most + * significant bit set – this indicates that there are further bytes to come. The lower 7 bits + * of each byte are used to store the two's complement representation of the number in groups of 7 bits, + * least significant group first.</p> + * + * <p>For more information about varint encoding, see: + * <a href="https://en.wikipedia.org/wiki/Variable-length_quantity">Variable-length quantity - Wikipedia</a></p> + * + * <p>This implementation follows the Protocol Buffers varint encoding format as described in: + * <a href="https://developers.google.com/protocol-buffers/docs/encoding#varints">Protocol Buffers Encoding</a></p> + * + */ +final class VarintUtils { + + private VarintUtils() { } + + /** + * Reads a single varint from the input stream. + * + * @param inputStream the input stream to read from + * @return the decoded varint value + * @throws IOException if unable to read from stream or invalid varint format + */ + public static int readVarintFromStream(InputStream inputStream) throws IOException { + final int firstByte = inputStream.read(); + if (firstByte == -1) { + throw new IOException("Unexpected end of stream while reading varint"); + } + return readVarintFromStreamAfterFirstByteConsumed(inputStream, firstByte); + } + + /** + * Reads a single varint from the input stream with a pre-read first byte. + * + * @param inputStream the input stream to read from + * @param firstByte the first byte already read from the stream + * @return the decoded varint value + * @throws IOException if unable to read from stream or invalid varint format + */ + public static int readVarintFromStreamAfterFirstByteConsumed(InputStream inputStream, int firstByte) throws IOException { + // accumulated result + int value = 0; + + // stores bit shift position (0, 7, 14, 21, 28 for consecutive bytes) + int shift = 0; + + // Store the current byte being processed + int currentByte = firstByte; + + // continues until we find a byte without the continuation bit + while (true) { + // Prevent overflow by limiting varint to 32 bits (5 bytes maximum: 5 * 7 = 35 bits, but we bail at 32) + if (shift >= 32) { + throw new IOException("Varint too long (more than 32 bits)"); + } + + // Extract the lower 7 bits of the current byte using & 0x7F (0111 1111) + // Shift these 7 bits to their correct position in the final value + // Combine with the accumulated value |= + value |= (currentByte & 0x7F) << shift; + + // Check if this is the last byte by testing the MSB + // If bit 7 is 0 (no continuation bit), we're done + if ((currentByte & 0x80) == 0) { + break; + } + + // Increment the bit shift position by 7 for the next byte + shift += 7; + + currentByte = inputStream.read(); + + if (currentByte == -1) { + throw new IOException("Unexpected end of stream while reading varint"); + } + } + + return value; + } + + /** + * Decodes a zigzag encoded integer. + * ZigZag encoding maps signed integers to unsigned integers so that + * numbers with a small absolute value have a small varint encoding. + * + * @param encodedValue the zigzag encoded value + * @return the decoded integer value + */ + public static int decodeZigZag(int encodedValue) { Review Comment: ```suggestion public static int decodeZigZag(final int encodedValue) { ``` ########## nifi-extension-bundles/nifi-confluent-platform-bundle/nifi-confluent-protobuf-message-name-resolver/src/main/java/org/apache/nifi/confluent/schemaregistry/VarintUtils.java: ########## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.confluent.schemaregistry; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; + +/** + * Utility class for reading, writing and decoding varint values from input streams. + * This class provides methods for reading variable-length integers (varints), + * writing zigzag-encoded varints, and decoding zigzag-encoded integers as per the Protocol Buffers specification. + * + * <p>Variable-length integers (varints) are a method of serializing integers using one or more bytes. + * Smaller numbers take fewer bytes. Each byte in a varint, except the last byte, has the most + * significant bit set – this indicates that there are further bytes to come. The lower 7 bits + * of each byte are used to store the two's complement representation of the number in groups of 7 bits, + * least significant group first.</p> + * + * <p>For more information about varint encoding, see: + * <a href="https://en.wikipedia.org/wiki/Variable-length_quantity">Variable-length quantity - Wikipedia</a></p> + * + * <p>This implementation follows the Protocol Buffers varint encoding format as described in: + * <a href="https://developers.google.com/protocol-buffers/docs/encoding#varints">Protocol Buffers Encoding</a></p> + * + */ +final class VarintUtils { + + private VarintUtils() { } + + /** + * Reads a single varint from the input stream. + * + * @param inputStream the input stream to read from + * @return the decoded varint value + * @throws IOException if unable to read from stream or invalid varint format + */ + public static int readVarintFromStream(InputStream inputStream) throws IOException { + final int firstByte = inputStream.read(); + if (firstByte == -1) { + throw new IOException("Unexpected end of stream while reading varint"); + } + return readVarintFromStreamAfterFirstByteConsumed(inputStream, firstByte); + } + + /** + * Reads a single varint from the input stream with a pre-read first byte. + * + * @param inputStream the input stream to read from + * @param firstByte the first byte already read from the stream + * @return the decoded varint value + * @throws IOException if unable to read from stream or invalid varint format + */ + public static int readVarintFromStreamAfterFirstByteConsumed(InputStream inputStream, int firstByte) throws IOException { + // accumulated result + int value = 0; + + // stores bit shift position (0, 7, 14, 21, 28 for consecutive bytes) + int shift = 0; + + // Store the current byte being processed + int currentByte = firstByte; + + // continues until we find a byte without the continuation bit + while (true) { + // Prevent overflow by limiting varint to 32 bits (5 bytes maximum: 5 * 7 = 35 bits, but we bail at 32) + if (shift >= 32) { + throw new IOException("Varint too long (more than 32 bits)"); + } + + // Extract the lower 7 bits of the current byte using & 0x7F (0111 1111) + // Shift these 7 bits to their correct position in the final value + // Combine with the accumulated value |= + value |= (currentByte & 0x7F) << shift; + + // Check if this is the last byte by testing the MSB + // If bit 7 is 0 (no continuation bit), we're done + if ((currentByte & 0x80) == 0) { + break; + } + + // Increment the bit shift position by 7 for the next byte + shift += 7; + + currentByte = inputStream.read(); + + if (currentByte == -1) { + throw new IOException("Unexpected end of stream while reading varint"); + } + } + + return value; + } + + /** + * Decodes a zigzag encoded integer. + * ZigZag encoding maps signed integers to unsigned integers so that + * numbers with a small absolute value have a small varint encoding. + * + * @param encodedValue the zigzag encoded value + * @return the decoded integer value + */ + public static int decodeZigZag(int encodedValue) { + return (encodedValue >>> 1) ^ -(encodedValue & 1); + } + + /** + * Writes a zigzag encoded varint to the output stream. + * ZigZag encoding maps signed integers to unsigned integers so that + * numbers with a small absolute value have a small varint encoding. + * + * @param output the output stream to write to + * @param value the integer value to encode and write + */ + public static void writeZigZagVarint(ByteArrayOutputStream output, int value) { Review Comment: Passing in a `ByteArrayOutputStream` seems unnecessary, as opposed to just returning the `byte[]`. Is there a particular reason for this approach? ```suggestion public static void writeZigZagVarint(final ByteArrayOutputStream output, final int value) { ``` ########## nifi-extension-bundles/nifi-standard-services/nifi-schema-registry-service-api/src/main/java/org/apache/nifi/schemaregistry/services/MessageName.java: ########## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schemaregistry.services; + +import java.util.Optional; + +/** + * Represents a message name in a schema registry service, providing access to both the simple name + * and the namespace for message types. This interface is typically used for schema formats + * that support namespaced message types, such as Protocol Buffers, where messages can be organized + * within packages. + */ +public interface MessageName { + + /** + * Returns the simple name of the message without any package qualification. + * + * @return the message name + */ + String getName(); + + /** + * Returns the namespace that contains this message, if present. + * + * @return an Optional containing the namespace name, or empty if the message + * is not contained within a namespace + */ + Optional<String> getNamespace(); + + /** + * Returns the fully qualified name of the message by combining the namespace + * and simple name. If no namespace is present, returns just the simple name. + * + * @return the fully qualified message name in the format "namespace.name" or + * just "name" if no namespace is present + */ + default String getFullyQualifiedName() { + return getNamespace() + .map(namespace -> namespace.trim() + ".") Review Comment: Why is the `trim()` included? Is it expected to contain spaces? ########## nifi-extension-bundles/nifi-standard-services/nifi-schema-registry-service-api/src/main/java/org/apache/nifi/schemaregistry/services/StandardMessageName.java: ########## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schemaregistry.services; + +import java.util.Objects; +import java.util.Optional; + +public class StandardMessageName implements MessageName { + + private final Optional<String> namespace; + private final String name; + + public StandardMessageName(final Optional<String> namespace, final String name) { + this.name = Objects.requireNonNull(name, "name must not be null"); + this.namespace = namespace.map(String::trim); Review Comment: Is this `String::trim` needed, as in namespaces are expected to contain spaces? ########## nifi-extension-bundles/nifi-standard-services/nifi-schema-registry-service-api/src/main/java/org/apache/nifi/schemaregistry/services/StandardSchemaDefinition.java: ########## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schemaregistry.services; + +import org.apache.nifi.serialization.record.SchemaIdentifier; + +import java.util.Map; +import java.util.Objects; + +/** + * Simple implementation of SchemaDefinition that holds a schema identifier, text, and references. + */ +public class StandardSchemaDefinition implements SchemaDefinition { + + private final SchemaIdentifier identifier; + private final String text; + private final Map<String, SchemaDefinition> references; + private final SchemaType schemaType; + + /** + * Creates a new StandardSchemaDefinition. + * + * @param identifier the schema identifier + * @param text the schema text content + * @param references map of schema references (key = reference name, value = referenced schema) + */ + public StandardSchemaDefinition(final SchemaIdentifier identifier, + final String text, + final SchemaType schemaType, + final Map<String, SchemaDefinition> references + + ) { + this.identifier = Objects.requireNonNull(identifier, "Schema identifier cannot be null"); + this.text = Objects.requireNonNull(text, "Schema text cannot be null"); + this.references = references != null ? Map.copyOf(references) : Map.of(); + this.schemaType = schemaType; Review Comment: It looks like schemaType should be required as non null. ########## nifi-extension-bundles/nifi-confluent-platform-bundle/nifi-confluent-protobuf-message-name-resolver/src/main/java/org/apache/nifi/confluent/schemaregistry/ConfluentProtobufMessageNameResolver.java: ########## @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.confluent.schemaregistry; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.confluent.schema.AntlrProtobufMessageSchemaParser; +import org.apache.nifi.confluent.schema.ProtobufMessageSchema; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.schemaregistry.services.MessageName; +import org.apache.nifi.schemaregistry.services.MessageNameResolver; +import org.apache.nifi.schemaregistry.services.SchemaDefinition; +import org.apache.nifi.schemaregistry.services.StandardMessageName; + +import java.io.IOException; +import java.io.InputStream; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static java.lang.String.format; +import static java.util.Collections.singletonList; +import static org.apache.nifi.confluent.schemaregistry.VarintUtils.decodeZigZag; +import static org.apache.nifi.confluent.schemaregistry.VarintUtils.readVarintFromStream; +import static org.apache.nifi.confluent.schemaregistry.VarintUtils.readVarintFromStreamAfterFirstByteConsumed; + +@Tags({"confluent", "schema", "registry", "protobuf", "message", "name", "resolver"}) +@CapabilityDescription(""" + Resolves Protobuf message names from Confluent Schema Registry wire format by decoding message indexes and looking up the fully qualified name in the schema definition + For Confluent wire format reference see: <a href="https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format">Confluent Wire Format</a> Review Comment: Recommend removing this HTML since descriptions do not necessarily support raw HTML tags. ########## nifi-extension-bundles/nifi-confluent-platform-bundle/nifi-confluent-protobuf-message-name-resolver/src/test/java/org/apache/nifi/confluent/schemaregistry/VarintUtilsTest.java: ########## @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.confluent.schemaregistry; + +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; + +import static org.apache.nifi.confluent.schemaregistry.VarintUtils.decodeZigZag; +import static org.apache.nifi.confluent.schemaregistry.VarintUtils.readVarintFromStream; +import static org.apache.nifi.confluent.schemaregistry.VarintUtils.readVarintFromStreamAfterFirstByteConsumed; +import static org.apache.nifi.confluent.schemaregistry.VarintUtils.writeZigZagVarint; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class VarintUtilsTest { Review Comment: The `public` modifier can be removed from the class and all test methods ########## nifi-extension-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java: ########## @@ -211,6 +221,91 @@ public RecordSchema getSchema(final int schemaId) throws IOException, SchemaNotF return createRecordSchema(completeSchema); } + @Override + public SchemaDefinition getSchemaDefinition(SchemaIdentifier identifier) throws SchemaNotFoundException { + JsonNode schemaJson; + String subject = null; + Integer version = null; Review Comment: Can these be declared as `final`? It looks like the logic should be able to handle that, without changing the value. ########## nifi-extension-bundles/nifi-confluent-platform-bundle/nifi-confluent-platform-api/src/main/java/org/apache/nifi/confluent/schema/ProtobufMessageSchema.java: ########## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.confluent.schema; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +/** + * Basic abstraction over Protobuf schema message entity. + * It contains only the fields that are needed for crude schema insights. It's useful when the + * message name resolver needs to pinpoint specific messages by message indexes encoded on the wire + * <p> + * https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format + * <p> + * It contains bare minimum of information for name resolver to be able to resolve message names + */ +public final class ProtobufMessageSchema { + + private final String name; + private final Optional<String> packageName; + private final List<ProtobufMessageSchema> childMessageSchemas = new ArrayList<>(); + + public ProtobufMessageSchema(final String name, final Optional<String> packageName) { + this.name = name; + this.packageName = packageName; + } + + public String name() { + return name; + } + + public Optional<String> packageName() { + return packageName; + } + + public boolean isDefaultPackage() { + return packageName.isEmpty(); + } + + public List<ProtobufMessageSchema> getChildMessageSchemas() { + return childMessageSchemas; + } + + void addChildMessage(final ProtobufMessageSchema protobufMessageSchema) { Review Comment: I recommend removing the option for `List` mutation and instead providing the full list as a constructor argument. ########## nifi-extension-bundles/nifi-confluent-platform-bundle/nifi-confluent-protobuf-message-name-resolver/src/main/java/org/apache/nifi/confluent/schemaregistry/VarintUtils.java: ########## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.confluent.schemaregistry; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; + +/** + * Utility class for reading, writing and decoding varint values from input streams. + * This class provides methods for reading variable-length integers (varints), + * writing zigzag-encoded varints, and decoding zigzag-encoded integers as per the Protocol Buffers specification. + * + * <p>Variable-length integers (varints) are a method of serializing integers using one or more bytes. + * Smaller numbers take fewer bytes. Each byte in a varint, except the last byte, has the most + * significant bit set – this indicates that there are further bytes to come. The lower 7 bits + * of each byte are used to store the two's complement representation of the number in groups of 7 bits, + * least significant group first.</p> + * + * <p>For more information about varint encoding, see: + * <a href="https://en.wikipedia.org/wiki/Variable-length_quantity">Variable-length quantity - Wikipedia</a></p> + * + * <p>This implementation follows the Protocol Buffers varint encoding format as described in: + * <a href="https://developers.google.com/protocol-buffers/docs/encoding#varints">Protocol Buffers Encoding</a></p> + * + */ +final class VarintUtils { + + private VarintUtils() { } + + /** + * Reads a single varint from the input stream. + * + * @param inputStream the input stream to read from + * @return the decoded varint value + * @throws IOException if unable to read from stream or invalid varint format + */ + public static int readVarintFromStream(InputStream inputStream) throws IOException { Review Comment: ```suggestion public static int readVarintFromStream(final InputStream inputStream) throws IOException { ``` ########## nifi-extension-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java: ########## @@ -211,6 +221,91 @@ public RecordSchema getSchema(final int schemaId) throws IOException, SchemaNotF return createRecordSchema(completeSchema); } + @Override + public SchemaDefinition getSchemaDefinition(SchemaIdentifier identifier) throws SchemaNotFoundException { + JsonNode schemaJson; + String subject = null; + Integer version = null; + + // If we have an ID, get the schema by ID first + // Using schemaVersionId, because that is what is set by ConfluentEncodedSchemaReferenceReader. + // probably identifier field should be used, but I'm not changing ConfluentEncodedSchemaReferenceReader for backward compatibility reasons. + if (identifier.getSchemaVersionId().isPresent()) { + long schemaId = identifier.getSchemaVersionId().getAsLong(); + String schemaPath = getSchemaPath(schemaId); + schemaJson = fetchJsonResponse(schemaPath, "id " + schemaId); + } else if (identifier.getName().isPresent()) { + // If we have a name or (name and version), get the schema by those + subject = identifier.getName().get(); + version = identifier.getVersion().isPresent() ? identifier.getVersion().getAsInt() : null; + // if no version was specified, the latest version will be used. See @getSubjectPath method. + String pathSuffix = getSubjectPath(subject, version); + schemaJson = fetchJsonResponse(pathSuffix, "name " + subject); + } else { + throw new SchemaNotFoundException("Schema identifier must contain either a version identifier or a subject name"); + } + + // Extract schema information + String schemaText = schemaJson.get(SCHEMA_TEXT_FIELD_NAME).asText(); + String schemaTypeText = schemaJson.get(SCHEMA_TYPE_FIELD_NAME).asText(); + SchemaType schemaType = toSchemaType(schemaTypeText); + + long schemaId; + if (schemaJson.has(ID_FIELD_NAME)) { + schemaId = schemaJson.get(ID_FIELD_NAME).asLong(); + } else { + schemaId = identifier.getSchemaVersionId().getAsLong(); + } + + if (subject == null && schemaJson.has(SUBJECT_FIELD_NAME)) { + subject = schemaJson.get(SUBJECT_FIELD_NAME).asText(); + } + + if (version == null && schemaJson.has(VERSION_FIELD_NAME)) { + version = schemaJson.get(VERSION_FIELD_NAME).asInt(); + } + + // Build schema identifier with all available information + SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder() + .id(schemaId) + .name(subject) + .version(version) + .build(); + + // Process references if present + Map<String, SchemaDefinition> references = new HashMap<>(); + if (schemaJson.has(REFERENCES_FIELD_NAME) && !schemaJson.get(REFERENCES_FIELD_NAME).isNull()) { + ArrayNode refsArray = (ArrayNode) schemaJson.get(REFERENCES_FIELD_NAME); + for (JsonNode ref : refsArray) { + String refName = ref.get(REFERENCE_NAME_FIELD_NAME).asText(); + String refSubject = ref.get(REFERENCE_SUBJECT_FIELD_NAME).asText(); + int refVersion = ref.get(REFERENCE_VERSION_FIELD_NAME).asInt(); + + // Recursively get referenced schema + SchemaIdentifier refId = SchemaIdentifier.builder() + .name(refSubject) + .version(refVersion) + .build(); + SchemaDefinition refSchema = getSchemaDefinition(refId); + references.put(refName, refSchema); + } + } + + return new StandardSchemaDefinition(schemaIdentifier, schemaText, schemaType, references); + } + + private SchemaType toSchemaType(final String schemaTypeText) { + try { + if (schemaTypeText == null || schemaTypeText.isEmpty()) { + return SchemaType.AVRO; // Default schema type for confluent schema registry is AVRO. + } + return SchemaType.valueOf(schemaTypeText.toUpperCase().trim()); + } catch (final Exception e) { + final String error = String.format("Could not convert schema type '%s' to SchemaType enum.", schemaTypeText); Review Comment: ```suggestion final String message = String.format("Could not convert schema type '%s' to SchemaType enum", schemaTypeText); ``` ########## nifi-extension-bundles/nifi-confluent-platform-bundle/nifi-confluent-protobuf-message-name-resolver/src/test/java/org/apache/nifi/confluent/schemaregistry/ConfluentProtobufMessageNameResolverTest.java: ########## @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.confluent.schemaregistry; + +import org.apache.nifi.schemaregistry.services.MessageName; +import org.apache.nifi.schemaregistry.services.SchemaDefinition; +import org.apache.nifi.schemaregistry.services.StandardSchemaDefinition; +import org.apache.nifi.serialization.record.SchemaIdentifier; +import org.apache.nifi.util.NoOpProcessor; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.stream.Stream; + +import static org.apache.nifi.confluent.schemaregistry.VarintUtils.writeZigZagVarint; + +import static org.apache.nifi.schemaregistry.services.SchemaDefinition.SchemaType.PROTOBUF; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@ExtendWith(MockitoExtension.class) +class ConfluentProtobufMessageNameResolverTest { + + + private static final int MAGIC_BYTE_LENGTH = 1; + private static final int SCHEMA_ID_LENGTH = 4; + // Schema without package (default package) + private static final String DEFAULT_PACKAGE_SCHEMA = """ + syntax = "proto3"; + message User { + int32 id = 1; + string name = 2; + Address address = 3; + message Profile { + string bio = 1; + Settings settings = 2; + message Settings { + bool notifications = 1; + string theme = 2; + } + } + } + message Company { + string name = 1; + Address address = 2; + } + message Address { + string street = 1; + string city = 2; + }"""; + + // Schema with explicit package + private static final String EXPLICIT_PACKAGE_SCHEMA = """ + syntax = "proto3"; + package com.example.proto; + message User { + int32 id = 1; + string name = 2; + Address address = 3; + message Profile { + string bio = 1; + Settings settings = 2; + message Settings { + bool notifications = 1; + string theme = 2; + } + } + } + message Company { + string name = 1; + Address address = 2; + } + message Address { + string street = 1; + string city = 2; + }"""; + + private static final SchemaIdentifier ID = SchemaIdentifier.builder() + .id(1L) + .build(); + + private static final SchemaDefinition SCHEMA_WITH_DEFAULT_PACKAGE = new StandardSchemaDefinition( + ID, + DEFAULT_PACKAGE_SCHEMA, + PROTOBUF, + Map.of()); + private static final SchemaDefinition SCHEMA_WITH_EXPLICIT_PACKAGE = new StandardSchemaDefinition( + ID, + EXPLICIT_PACKAGE_SCHEMA, + PROTOBUF, + Map.of() + ); + + private ConfluentProtobufMessageNameResolver resolver; + + private static Stream<Arguments> provideMessageNameTestCases() { + return Stream.of( + // Default package schema tests + // the format is [input schema], [message indexes as decoded from protobuf header], [expected FQN message name] + Arguments.of(SCHEMA_WITH_DEFAULT_PACKAGE, new int[] {0}, "User"), + Arguments.of(SCHEMA_WITH_DEFAULT_PACKAGE, new int[] {1}, "Company"), + Arguments.of(SCHEMA_WITH_DEFAULT_PACKAGE, new int[] {2}, "Address"), + Arguments.of(SCHEMA_WITH_DEFAULT_PACKAGE, new int[] {0, 0}, "User.Profile"), + Arguments.of(SCHEMA_WITH_DEFAULT_PACKAGE, new int[] {0, 0, 0}, "User.Profile.Settings"), + + // Packaged schema tests + Arguments.of(SCHEMA_WITH_EXPLICIT_PACKAGE, new int[] {0}, "com.example.proto.User"), + Arguments.of(SCHEMA_WITH_EXPLICIT_PACKAGE, new int[] {1}, "com.example.proto.Company"), + Arguments.of(SCHEMA_WITH_EXPLICIT_PACKAGE, new int[] {2}, "com.example.proto.Address"), + Arguments.of(SCHEMA_WITH_EXPLICIT_PACKAGE, new int[] {0, 0}, "com.example.proto.User.Profile"), + Arguments.of(SCHEMA_WITH_EXPLICIT_PACKAGE, new int[] {0, 0, 0}, "com.example.proto.User.Profile.Settings") + ); + } + + @BeforeEach + void setUp() throws Exception { + resolver = new ConfluentProtobufMessageNameResolver(); + TestRunner testRunner = TestRunners.newTestRunner(NoOpProcessor.class); + testRunner.addControllerService("messageNameResolver", resolver); + testRunner.enableControllerService(resolver); + + } + + @ParameterizedTest + @MethodSource("provideMessageNameTestCases") + void testGetMessageName(final SchemaDefinition schemaDefinition, final int[] messageIndexes, final String expectedMessageName) throws IOException { + final InputStream inputStream = createWireFormatData(schemaDefinition, messageIndexes); + + final MessageName messageName = resolver.getMessageName(Map.of(), schemaDefinition, inputStream); + + assertNotNull(messageName); + assertEquals(expectedMessageName, messageName.getFullyQualifiedName()); + } + + @Test + void testGetMessageNameWithSpecialCaseZero() throws IOException { + // Test special case: single 0 byte means first message type for default package + InputStream inputStream = createWireFormatDataSpecialCase(SCHEMA_WITH_DEFAULT_PACKAGE); + MessageName messageName = resolver.getMessageName(Map.of(), SCHEMA_WITH_DEFAULT_PACKAGE, inputStream); + assertNotNull(messageName); + assertEquals("User", messageName.getFullyQualifiedName()); + assertTrue(messageName.getNamespace().isEmpty()); + + inputStream = createWireFormatDataSpecialCase(SCHEMA_WITH_EXPLICIT_PACKAGE); + messageName = resolver.getMessageName(Map.of(), SCHEMA_WITH_EXPLICIT_PACKAGE, inputStream); + assertNotNull(messageName); + assertEquals("com.example.proto.User", messageName.getFullyQualifiedName()); + assertEquals("com.example.proto", messageName.getNamespace().get()); + assertEquals("User", messageName.getName()); + } + + @Test + void testGetMessageNameEmptyInputStream() { + final ByteArrayInputStream inputStream = new ByteArrayInputStream(new byte[0]); + assertThrows(IOException.class, () -> resolver.getMessageName(Map.of(), SCHEMA_WITH_EXPLICIT_PACKAGE, inputStream)); + } + + @Test + void testGetMessageNameTooShortInputStream() { + final ByteArrayInputStream inputStream = new ByteArrayInputStream(new byte[] {1}); // Only 4 bytes + assertThrows(IllegalStateException.class, () -> resolver.getMessageName(Map.of(), SCHEMA_WITH_EXPLICIT_PACKAGE, inputStream)); + } + + @Test + void testGetMessageNameIndexOutOfBounds() throws IOException { + // Test with index [99] which should be out of bounds for both schemas + + // Test with default package schema + final InputStream inputStream1 = createWireFormatData(SCHEMA_WITH_DEFAULT_PACKAGE, new int[] {99}); + assertThrows(IllegalStateException.class, () -> resolver.getMessageName(Map.of(), SCHEMA_WITH_DEFAULT_PACKAGE, inputStream1)); + + // Test with explicit package schema + final InputStream inputStream2 = createWireFormatData(SCHEMA_WITH_EXPLICIT_PACKAGE, new int[] {99}); + assertThrows(IllegalStateException.class, () -> { + resolver.getMessageName(Map.of(), SCHEMA_WITH_EXPLICIT_PACKAGE, inputStream2); + }); + } + + + /** + * Creates wire format data according to Confluent specification: + * [magic_byte:1][schema_id:4][message_indexes:variable][protobuf_payload:rest] + * <br> + * <a href="https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format">Confluent specification</a> + */ + private InputStream createWireFormatData(final SchemaDefinition schemaDefinition, final int[] messageIndexes) throws IOException { + final ByteArrayOutputStream output = new ByteArrayOutputStream(); + + // Magic byte (0x0) + output.write(0x00); + + // Schema ID (4 bytes, big-endian) + final ByteBuffer schemaIdBuffer = ByteBuffer.allocate(4); + schemaIdBuffer.putInt((int) schemaDefinition.getIdentifier().getIdentifier().getAsLong()); + output.write(schemaIdBuffer.array()); + + // Message indexes (varint zigzag encoded) + final byte[] encodedIndexes = encodeMessageIndexes(messageIndexes); + output.write(encodedIndexes); + + // Optional: Add some dummy protobuf payload (not needed for message name resolution) + output.write(new byte[] {0x5F, 0x5F, 0x5F, 0x5F, 0x48, 0x45, 0x4C, 0x50, 0x20, 0x4D, 0x45, 0x5F, 0x5F, 0x5F}); + final ByteArrayInputStream payloadStream = new ByteArrayInputStream(output.toByteArray()); + // Skip magic byte, schema ID, + // Message name resolver is designed to work on a stream positioned exactly + // at the start of the message index wireformat section (6 th byte) + final int expectedSkipLength = MAGIC_BYTE_LENGTH + SCHEMA_ID_LENGTH; + final long actualSkipped = payloadStream.skip(expectedSkipLength); + if (actualSkipped != expectedSkipLength) { + throw new IOException("Failed to skip expected bytes: expected " + expectedSkipLength + ", actual " + actualSkipped); + } + return payloadStream; + } + + /** + * Creates wire format data for the special case (single 0 byte for first message type) + */ + private InputStream createWireFormatDataSpecialCase(final SchemaDefinition schemaDefinition) throws IOException { + final ByteArrayOutputStream output = new ByteArrayOutputStream(); + + output.write(0x00); + + final ByteBuffer schemaIdBuffer = ByteBuffer.allocate(4); + schemaIdBuffer.putInt((int) schemaDefinition.getIdentifier().getIdentifier().getAsLong()); + output.write(schemaIdBuffer.array()); + + // Special case: single 0 byte + output.write(0x00); + final ByteArrayInputStream payloadStream = new ByteArrayInputStream(output.toByteArray()); + final int expectedSkipLength = MAGIC_BYTE_LENGTH + SCHEMA_ID_LENGTH; + final long actualSkipped = payloadStream.skip(expectedSkipLength); + if (actualSkipped != expectedSkipLength) { + throw new IOException("Failed to skip expected bytes: expected " + expectedSkipLength + ", actual " + actualSkipped); + } + return payloadStream; + } + + /** + * Encodes message indexes using zigzag varint encoding as per Confluent specification + */ + private byte[] encodeMessageIndexes(final int[] indexes) { + final ByteArrayOutputStream output = new ByteArrayOutputStream(); + + // Encode array length as zigzag varint + writeZigZagVarint(output, indexes.length); + + // Encode each index as zigzag varint + for (final int index : indexes) { + writeZigZagVarint(output, index); + } + + return output.toByteArray(); + } + + Review Comment: ```suggestion ``` ########## nifi-extension-bundles/nifi-confluent-platform-bundle/nifi-confluent-protobuf-message-name-resolver/pom.xml: ########## @@ -0,0 +1,58 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor + license agreements. See the NOTICE file distributed with this work for additional + information regarding copyright ownership. The ASF licenses this file to + You under the Apache License, Version 2.0 (the "License"); you may not use + this file except in compliance with the License. You may obtain a copy of + the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required + by applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. See the License for the specific + language governing permissions and limitations under the License. --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-confluent-platform-bundle</artifactId> + <version>2.6.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-confluent-protobuf-message-name-resolver</artifactId> + <packaging>jar</packaging> + <description>Confluent Protobuf Message Name Resolver for NiFi</description> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-schema-registry-service-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-confluent-platform-api</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-confluent-protobuf-antlr-parser</artifactId> + <version>${project.version}</version> + </dependency> + + <!-- Test dependencies --> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-record</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mock</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-junit-jupiter</artifactId> + <scope>test</scope> + </dependency> Review Comment: This is declared in the root Maven configuration and inherited in all modules, so it can be removed. ```suggestion ``` ########## nifi-extension-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/test/java/org/apache/nifi/confluent/schemaregistry/client/CompleteSchemaResponse.java: ########## @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.confluent.schemaregistry.client; + +import java.util.List; +record CompleteSchemaResponse(String subject, int version, int id, String schema, String schemaType, List<SchemaReference> references) { Review Comment: Recommend listing the properties one per line for readability. ```suggestion import java.util.List; record CompleteSchemaResponse(String subject, int version, int id, String schema, String schemaType, List<SchemaReference> references) { ``` ########## nifi-extension-bundles/nifi-confluent-platform-bundle/nifi-confluent-protobuf-message-name-resolver/src/main/java/org/apache/nifi/confluent/schemaregistry/ConfluentProtobufMessageNameResolver.java: ########## @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.confluent.schemaregistry; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.confluent.schema.AntlrProtobufMessageSchemaParser; +import org.apache.nifi.confluent.schema.ProtobufMessageSchema; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.schemaregistry.services.MessageName; +import org.apache.nifi.schemaregistry.services.MessageNameResolver; +import org.apache.nifi.schemaregistry.services.SchemaDefinition; +import org.apache.nifi.schemaregistry.services.StandardMessageName; + +import java.io.IOException; +import java.io.InputStream; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static java.lang.String.format; +import static java.util.Collections.singletonList; +import static org.apache.nifi.confluent.schemaregistry.VarintUtils.decodeZigZag; +import static org.apache.nifi.confluent.schemaregistry.VarintUtils.readVarintFromStream; +import static org.apache.nifi.confluent.schemaregistry.VarintUtils.readVarintFromStreamAfterFirstByteConsumed; + +@Tags({"confluent", "schema", "registry", "protobuf", "message", "name", "resolver"}) +@CapabilityDescription(""" + Resolves Protobuf message names from Confluent Schema Registry wire format by decoding message indexes and looking up the fully qualified name in the schema definition + For Confluent wire format reference see: <a href="https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format">Confluent Wire Format</a> + """) +public class ConfluentProtobufMessageNameResolver extends AbstractControllerService implements MessageNameResolver { + + public static final int MAXIMUM_SUPPORTED_ARRAY_LENGTH = 100; + public static final int MAXIMUM_CACHE_SIZE = 1000; + private Cache<FindMessageNameArguments, MessageName> messageNameCache; + + @OnEnabled + public void onEnabled(final ConfigurationContext context) { + messageNameCache = Caffeine.newBuilder().maximumSize(MAXIMUM_CACHE_SIZE).expireAfterWrite(Duration.ofHours(1)).build(); + } + + @OnDisabled + public void onDisabled(final ConfigurationContext context) { + if (messageNameCache != null) { + messageNameCache.invalidateAll(); + messageNameCache = null; + } + } + + + @Override + public MessageName getMessageName(final Map<String, String> variables, final SchemaDefinition schemaDefinition, final InputStream inputStream) throws IOException { + final ComponentLog logger = getLogger(); + + // Read message indexes directly from stream (Confluent wire format) + final List<Integer> messageIndexes = readMessageIndexesFromStream(inputStream); + if (logger.isDebugEnabled()) { + logger.debug("Decoded message indexes: {}", messageIndexes); + } + final FindMessageNameArguments findMessageNameArgs = new FindMessageNameArguments(schemaDefinition, messageIndexes); + + return messageNameCache.get(findMessageNameArgs, this::findMessageName); + + } + + /** + * Reads message indexes directly from the input stream using Confluent wire format. + * Format: [array_length:varint][index1:varint][index2:varint]...[indexN:varint] + * Special case: single 0 byte means first message (index 0) + * <p> + * <a href="https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format">Wire format</a> + * + * @param inputStream the input stream positioned after the Confluent header (magic byte + schema ID) + * @return list of message indexes + * @throws IOException if unable to read from stream or invalid format + */ + private List<Integer> readMessageIndexesFromStream(final InputStream inputStream) throws IOException { + // Special case: check if first byte is 0 (most common case) + final int firstByte = inputStream.read(); + if (firstByte == -1) { + throw new IOException("Unexpected end of stream while reading message indexes"); + } + + if (firstByte == 0) { + // Single 0 byte means first message type (index 0) + return List.of(0); + } + + // General case: read array length as varint + int arrayLength = readVarintFromStreamAfterFirstByteConsumed(inputStream, firstByte); + arrayLength = decodeZigZag(arrayLength); + + if (arrayLength < 0 || arrayLength > MAXIMUM_SUPPORTED_ARRAY_LENGTH) { // Reasonable limit + throw new IllegalStateException("Invalid message index array length: " + arrayLength); + } + + // Read each index as varint + final List<Integer> indexes = new ArrayList<>(); + for (int i = 0; i < arrayLength; i++) { + final int rawIndex = readVarintFromStream(inputStream); + final int index = decodeZigZag(rawIndex); + indexes.add(index); + } + + return indexes; + } + + /** + * Finds the fully qualified message name in the protobuf schema using the message indexes. + */ + private MessageName findMessageName(final FindMessageNameArguments findMessageNameArguments) { + try { + final List<Integer> messageIndexes = findMessageNameArguments.messageIndexes(); + final String schemaText = findMessageNameArguments.schemaDefinition().getText(); + // Parse the protobuf schema using AntlrProtobufMessageSchemaParser + final AntlrProtobufMessageSchemaParser reader = new AntlrProtobufMessageSchemaParser(); + final List<ProtobufMessageSchema> rootMessages = reader.parse(schemaText); + + if (messageIndexes.isEmpty()) { + // Return the topmost root message name + if (!rootMessages.isEmpty()) { + return getFullyQualifiedName(singletonList(rootMessages.getFirst())); + } else { + throw new IllegalStateException("No root messages found in schema"); + } + } + + // Navigate through the message hierarchy using indexes + ProtobufMessageSchema currentMessage; + List<ProtobufMessageSchema> currentLevel = rootMessages; + final List<ProtobufMessageSchema> messagePath = new ArrayList<>(); + + for (final int index : messageIndexes) { + if (index >= currentLevel.size()) { + final String msg = format("Message index %d out of bounds for level with %d messages. Message indexes: [%s]", index, currentLevel.size(), messageIndexes); + throw new IllegalStateException(msg); + } + + currentMessage = currentLevel.get(index); + messagePath.add(currentMessage); + + // Move to nested messages of the current message + currentLevel = currentMessage.getChildMessageSchemas(); + } + + // Return the fully qualified name including parent message hierarchy + return getFullyQualifiedName(messagePath); + + } catch (final Exception e) { + getLogger().error(e.getMessage(), e); Review Comment: Logging the error an throwing an exception is generally unnecessary and contrary to best practices, so this should be removed. ```suggestion ``` ########## nifi-extension-bundles/nifi-confluent-platform-bundle/nifi-confluent-protobuf-message-name-resolver/src/main/java/org/apache/nifi/confluent/schemaregistry/VarintUtils.java: ########## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.confluent.schemaregistry; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; + +/** + * Utility class for reading, writing and decoding varint values from input streams. + * This class provides methods for reading variable-length integers (varints), + * writing zigzag-encoded varints, and decoding zigzag-encoded integers as per the Protocol Buffers specification. + * + * <p>Variable-length integers (varints) are a method of serializing integers using one or more bytes. + * Smaller numbers take fewer bytes. Each byte in a varint, except the last byte, has the most + * significant bit set – this indicates that there are further bytes to come. The lower 7 bits + * of each byte are used to store the two's complement representation of the number in groups of 7 bits, + * least significant group first.</p> + * + * <p>For more information about varint encoding, see: + * <a href="https://en.wikipedia.org/wiki/Variable-length_quantity">Variable-length quantity - Wikipedia</a></p> + * + * <p>This implementation follows the Protocol Buffers varint encoding format as described in: + * <a href="https://developers.google.com/protocol-buffers/docs/encoding#varints">Protocol Buffers Encoding</a></p> + * + */ +final class VarintUtils { + + private VarintUtils() { } + + /** + * Reads a single varint from the input stream. + * + * @param inputStream the input stream to read from + * @return the decoded varint value + * @throws IOException if unable to read from stream or invalid varint format + */ + public static int readVarintFromStream(InputStream inputStream) throws IOException { + final int firstByte = inputStream.read(); + if (firstByte == -1) { + throw new IOException("Unexpected end of stream while reading varint"); + } + return readVarintFromStreamAfterFirstByteConsumed(inputStream, firstByte); + } + + /** + * Reads a single varint from the input stream with a pre-read first byte. + * + * @param inputStream the input stream to read from + * @param firstByte the first byte already read from the stream + * @return the decoded varint value + * @throws IOException if unable to read from stream or invalid varint format + */ + public static int readVarintFromStreamAfterFirstByteConsumed(InputStream inputStream, int firstByte) throws IOException { Review Comment: ```suggestion public static int readVarintFromStreamAfterFirstByteConsumed(final InputStream inputStream, final int firstByte) throws IOException { ``` ########## nifi-extension-bundles/nifi-confluent-platform-bundle/nifi-confluent-platform-api/src/main/java/org/apache/nifi/confluent/schema/ProtobufMessageSchema.java: ########## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.confluent.schema; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +/** + * Basic abstraction over Protobuf schema message entity. + * It contains only the fields that are needed for crude schema insights. It's useful when the + * message name resolver needs to pinpoint specific messages by message indexes encoded on the wire + * <p> + * https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format + * <p> + * It contains bare minimum of information for name resolver to be able to resolve message names + */ +public final class ProtobufMessageSchema { + + private final String name; + private final Optional<String> packageName; + private final List<ProtobufMessageSchema> childMessageSchemas = new ArrayList<>(); + + public ProtobufMessageSchema(final String name, final Optional<String> packageName) { + this.name = name; + this.packageName = packageName; + } + + public String name() { + return name; + } + + public Optional<String> packageName() { + return packageName; + } Review Comment: Recommend using the standard `get` naming convention for all properties. ```suggestion public String getName() { return name; } public Optional<String> getPackageName() { return packageName; } ``` ########## nifi-extension-bundles/nifi-standard-services/nifi-schema-registry-service-api/src/main/java/org/apache/nifi/schemaregistry/services/SchemaRegistry.java: ########## @@ -45,4 +45,32 @@ public interface SchemaRegistry extends ControllerService { * @return the set of all Schema Fields that are supplied by the RecordSchema that is returned from {@link #retrieveSchema(SchemaIdentifier)} */ Set<SchemaField> getSuppliedSchemaFields(); + + /** + * Retrieves the raw schema definition including its textual representation and references. + * <p> + * This method is used to retrieve the complete schema definition structure, including the raw schema text + * and any schema references. Unlike {@link #retrieveSchema(SchemaIdentifier)}, which returns a parsed + * {@link RecordSchema} ready for immediate use, this method returns a {@link SchemaDefinition} containing + * the raw schema content that can be used for custom schema processing, compilation, or when schema + * references need to be resolved. + * </p> + * <p> + * This method is particularly useful for: + * <ul> + * <li>Processing schemas that reference other schemas (e.g., Protocol Buffers with imports)</li> + * <li>Custom schema compilation workflows where the raw schema text is needed</li> + * <li>Accessing schema metadata and references for advanced schema processing</li> + * </ul> + * </p> + * + * @param schemaIdentifier the schema identifier containing id, name, version, and optionally branch information + * @return a {@link SchemaDefinition} containing the raw schema text, type, identifier, and references + * @throws IOException if unable to communicate with the backing store + * @throws SchemaNotFoundException if unable to find the schema based on the given identifier + * @throws UnsupportedOperationException if the schema registry implementation does not support raw schema retrieval + */ + default SchemaDefinition retrieveSchemaRaw(SchemaIdentifier schemaIdentifier) throws IOException, SchemaNotFoundException { + throw new UnsupportedOperationException("retrieveSchemaRaw is not supported by this SchemaRegistry implementation"); Review Comment: Thanks for the reply, I agree with this new method under the circumstances. Adding is the feature check method `isSchemaDefinitionAccessSupported()` sounds like it would be a helpful addition. ########## nifi-extension-bundles/nifi-standard-services/nifi-schema-registry-service-api/src/main/java/org/apache/nifi/schemaregistry/services/StandardMessageName.java: ########## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schemaregistry.services; + +import java.util.Objects; +import java.util.Optional; + +public class StandardMessageName implements MessageName { + + private final Optional<String> namespace; + private final String name; + + public StandardMessageName(final Optional<String> namespace, final String name) { + this.name = Objects.requireNonNull(name, "name must not be null"); + this.namespace = namespace.map(String::trim); + } + + @Override + public String getName() { + return name; + } + + @Override + public Optional<String> getNamespace() { + return namespace; + } + + @Override + public final boolean equals(final Object o) { + if (this == o) { + return true; + } + if (!(o instanceof final StandardMessageName that)) { + return false; + } + + return name.equals(that.name) && namespace.equals(that.namespace); + } + + @Override + public int hashCode() { + int result = name.hashCode(); + result = 31 * result + namespace.hashCode(); + return result; + } + + @Override + public String toString() { + return "StandardMessageName{" + + "name='" + name + '\'' + + ", namespace=" + namespace + + '}'; Review Comment: Replacing this concatenation with a format string seems like it would be easier to read and maintain. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
