Denovo1998 opened a new issue, #19565:
URL: https://github.com/apache/pulsar/issues/19565

   ### Motivation
   
   In https://github.com/apache/pulsar/issues/19385:
   The current implementation checks only for changing a root message name.
   
https://github.com/apache/pulsar/blob/34c18704ce759922ce45820321af44b382a28e10/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/ProtobufNativeSchemaCompatibilityCheck.java#L67-L72
   And the schema compatibility strategy only has FULL. So we need to improve 
the PROTOBUF_NATIVE schema compatibility checks.
   
   ### Goal
   
   1. Implement a `ProtobufNativeSchemaValidator`, just like the 
`SchemaValidatorBuilder` and `SchemaValidator` in `Avro`(data written with one 
schema can be read using another). 
   2. Define and implement incompatible rules between different "version" of 
protoBuf, and throw an exception (schema incompatibility) when matched.
   
   ### API Changes
   
   1. Add `ProtobufNativeSchemaValidator`
   ```java
   public interface ProtobufNativeSchemaValidator {
       void validate(Iterable<Descriptors.Descriptor> fromDescriptor, 
Descriptors.Descriptor toDescriptor)
               throws ProtoBufCanReadCheckException;
   }
   ```
   2. Add `ProtobufNativeSchemaValidationStrategy`
   ```java
   public enum ProtobufNativeSchemaValidationStrategy {
       /**
        * a schema can be used to read existing schema(s).
        */
       CanReadExistingStrategy,
   
       /**
        * a schema can be read by existing schema(s).
        */
       CanBeReadByExistingStrategy,
   
       /**
        * a schema can read existing schema(s).
        */
       CanBeReadMutualStrategy
   }
   ```
   3. Add `ProtobufNativeSchemaValidatorBuilder`
   ```java
   public class ProtobufNativeSchemaValidatorBuilder {
       private ProtobufNativeSchemaValidationStrategy strategy;
       private boolean onlyValidateLatest;
   
       public ProtobufNativeSchemaValidatorBuilder validatorStrategy(
               @NonNull ProtobufNativeSchemaValidationStrategy strategy) {
           this.strategy = strategy;
           return this;
       }
   
       public ProtobufNativeSchemaValidatorBuilder isOnlyValidateLatest(boolean 
onlyValidateLatest) {
           this.onlyValidateLatest = onlyValidateLatest;
           return this;
       }
   
       public ProtobufNativeSchemaValidator build() {
           return new ProtobufNativeSchemaBreakValidatorImpl(strategy, 
onlyValidateLatest);
       }
   }
   ```
   
   ### Implementation
   
   1. In `ProtobufNativeSchemaCompatibilityCheck`.
   ```java
       @Override
       public void checkCompatible(SchemaData from, SchemaData to, 
SchemaCompatibilityStrategy strategy)
               throws IncompatibleSchemaException {
           checkCompatible(Collections.singletonList(from), to, strategy);
       }
   
       @Override
       public void checkCompatible(Iterable<SchemaData> from, SchemaData to, 
SchemaCompatibilityStrategy strategy)
               throws IncompatibleSchemaException {
           checkArgument(from != null, "check compatibility list is null");
           LinkedList<Descriptor> fromList = new LinkedList<>();
           try {
               for (SchemaData schemaData : from) {
                   
fromList.addFirst(ProtobufNativeSchemaUtils.deserialize(schemaData.getData()));
               }
               Descriptor toDescriptor = 
ProtobufNativeSchemaUtils.deserialize(to.getData());
               ProtobufNativeSchemaValidator schemaValidator = 
createSchemaValidator(strategy);
               schemaValidator.validate(fromList, toDescriptor);
           } catch (ProtoBufCanReadCheckException e) {
               throw new IncompatibleSchemaException(e);
           }
       }
   
       static ProtobufNativeSchemaValidator 
createSchemaValidator(SchemaCompatibilityStrategy compatibilityStrategy) {
           final ProtobufNativeSchemaValidatorBuilder schemaValidatorBuilder = 
new
                   ProtobufNativeSchemaValidatorBuilder();
           return switch (compatibilityStrategy) {
               case BACKWARD_TRANSITIVE -> schemaValidatorBuilder
                       
.validatorStrategy(ProtobufNativeSchemaValidationStrategy.CanReadExistingStrategy)
                       .isOnlyValidateLatest(false).build();
               case BACKWARD -> schemaValidatorBuilder
                       
.validatorStrategy(ProtobufNativeSchemaValidationStrategy.CanReadExistingStrategy)
                       .isOnlyValidateLatest(true).build();
               case FORWARD_TRANSITIVE -> schemaValidatorBuilder
                       
.validatorStrategy(ProtobufNativeSchemaValidationStrategy.CanBeReadByExistingStrategy)
                       .isOnlyValidateLatest(false).build();
               case FORWARD -> schemaValidatorBuilder
                       
.validatorStrategy(ProtobufNativeSchemaValidationStrategy.CanBeReadByExistingStrategy)
                       .isOnlyValidateLatest(true).build();
               case FULL_TRANSITIVE -> schemaValidatorBuilder
                       
.validatorStrategy(ProtobufNativeSchemaValidationStrategy.CanBeReadMutualStrategy)
                       .isOnlyValidateLatest(false).build();
               case FULL -> schemaValidatorBuilder
                       
.validatorStrategy(ProtobufNativeSchemaValidationStrategy.CanBeReadMutualStrategy)
                       .isOnlyValidateLatest(true).build();
               case ALWAYS_COMPATIBLE -> 
ProtobufNativeAlwaysCompatibleValidator.INSTANCE;
               default -> ProtobufNativeNeverCompatibleValidator.INSTANCE;
           };
       }
   ```
   2. `canRead()` will check that the written schema can be read by another.
   ```java
       private void validateWithStrategy(Descriptors.Descriptor toValidate, 
Descriptors.Descriptor fromDescriptor)
               throws ProtoBufCanReadCheckException {
           switch (strategy) {
               case CanReadExistingStrategy -> canRead(fromDescriptor, 
toValidate);
               case CanBeReadByExistingStrategy -> canRead(toValidate, 
fromDescriptor);
               case CanBeReadMutualStrategy -> {
                   canRead(toValidate, fromDescriptor);
                   canRead(fromDescriptor, toValidate);
               }
           }
       }
   
                private void canRead(Descriptors.Descriptor writtenSchema, 
Descriptors.Descriptor readSchema)
               throws ProtoBufCanReadCheckException {
           
ProtobufNativeSchemaBreakCheckUtils.checkSchemaCompatibility(writtenSchema, 
readSchema);
       }
   ```
   - Backward(`CanReadExistingStrategy`): The schema of the "old" version is 
`writtenSchema`, and the schema of the "new" version is `readSchema`.
   - Forward(`CanBeReadByExistingStrategy`): The schema of the "new" version is 
`writtenSchema`, and the schema of the "old" version is `readSchema`.
   - Full(`CanBeReadMutualStrategy`): Both checks are needed.
   3. The `checkSchemaCompatibility()` in `ProtobufNativeSchemaBreakCheckUtils`:
   
   
   (1) Create:
   
   - The `writtenSchema` cannot add required fields, but optional or duplicate 
fields can be added (The field number must be new).
   
   (2) Update:
   
   - The `writtenSchema` do not change the field number of any field in 
'readSchema' (the field name is the same, but the field number is different).
   - The `writtenSchema` does not change the field name and number, but it does 
change the field type.
   - `int32`, `uint32`, `int64`, `uint64`, and `bool` are all compatible – this 
means you can change a field from one of these types to another without 
breaking forwards- or backwards-compatibility.
   - `sint32` and `sint64` are compatible with each other but are *not* 
compatible with the other integer types.
   - `string` and `bytes` are compatible as long as the bytes are valid UTF-8.
   - Embedded messages are compatible with `bytes` if the bytes contain an 
encoded version of the message.
   - `fixed32` is compatible with `sfixed32`, and `fixed64` with `sfixed64`.
   - `enum` is compatible with `int32`, `uint32`, `int64`, and `uint64` in 
terms of wire format (note that values will be truncated if they don’t fit).
   
   (3) Delete:
   
   - The `writtenSchema` cannot remove required fields in the `readSchema` or 
fields that do not have a default value.
   
   ### Alternatives
   
   _No response_
   
   ### Anything else?
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to