I found a similar compatibility problem with my closed PR. We should
not set the `Schema` field for AUTO_CONSUME schema. More explanations
can be found here [1].

Instead, we can add an optional field into CommandSubscribe to
indicate the schema compatibility check is skipped.

```protobuf
optional bool check_schema_compatibility = 20 [default = true]
```

Then we can add a relative parameter here:

```java
CompletableFuture<Void> addSchemaIfIdleOrCheckCompatible(SchemaData
schema, boolean checkSchemaCompatibility);
```


[1] https://github.com/apache/pulsar/pull/17449#issuecomment-1371135700

Thanks,
Yunze

On Wed, Jan 4, 2023 at 11:49 PM Yunze Xu <y...@streamnative.io> wrote:
>
> Could you also update the PIP issue? This solution is totally
> different from your original proposal. Since it still introduces
> changes to `PulsarApi.proto`, it also requires a PIP (we can reuse
> this one).
>
> ----
>
> BTW, I tested again about carrying the SchemaInfo in the
> CommandSubscribe request. It could break compatibility. Given the
> following code run against Pulsar standalone 2.8.4:
>
> ```java
>         PulsarClient client =
> PulsarClient.builder().serviceUrl(serviceUrl).build();
>         Consumer<GenericRecord> consumer =
> client.newConsumer(Schema.AUTO_CONSUME())
>                 .topic(topic)
>                 .subscriptionName("sub")
>                 .subscriptionType(SubscriptionType.Shared)
>                 .subscribe();
>         Producer<User> producer = client.newProducer(Schema.AVRO(User.class))
>                 .topic(topic)
>                 .create();
> ```
>
> - If the schema type is 0 in CommandSubscribe, the NONE schema will be
> persisted and the producer will fail to create due to the schema
> compatibility check.
> - If the schema type is -3 (AUTO_CONSUME), it will fail at subscribe()
> with the following error:
>
> ```
> 23:49:10.978 [pulsar-io-18-13] WARN
> org.apache.pulsar.broker.service.ServerCnx - [/172.23.160.1:5921] Got
> exception java.lang.IllegalStateException: Some required fields are
> missing
>         at 
> org.apache.pulsar.common.api.proto.Schema.checkRequiredFields(Schema.java:337)
>         at 
> org.apache.pulsar.common.api.proto.Schema.parseFrom(Schema.java:332)
>         at 
> org.apache.pulsar.common.api.proto.CommandSubscribe.parseFrom(CommandSubscribe.java:785)
>         at 
> org.apache.pulsar.common.api.proto.BaseCommand.parseFrom(BaseCommand.java:2397)
> ```
>
> Thanks,
> Yunze
>
>
> On Wed, Jan 4, 2023 at 10:34 PM SiNan Liu <liusinan1...@gmail.com> wrote:
> >
> > I just implemented add an optional field in the subscribe request and
> > compatibility seems to be fine. You guys can have a look at my PR (
> > https://github.com/apache/pulsar/pull/17449).
> >
> > Yunze Xu <y...@streamnative.io.invalid> 于2023年1月4日周三 21:31写道:
> >
> > > > Why can't we upload negative schema types?
> > >
> > > I want to avoid the changes to existing methods like
> > > Commands#getSchemaType, which converts all negative schema types to
> > > NONE:
> > >
> > > ```java
> > > private static Schema.Type getSchemaType(SchemaType type) {
> > >     if (type.getValue() < 0) {
> > >         return Schema.Type.None;
> > >     } else {
> > >         return Schema.Type.valueOf(type.getValue());
> > >     }
> > > }
> > > ```
> > >
> > > I guess the above code was written because:
> > > 1. NONE schema type means it's not uploaded into the registry. (See #3940
> > > [1])
> > > 2. There is no existing schema that uses NONE as its schema type, i.e.
> > > NONE schema is used as something special.
> > >
> > > > every different language client will code the special logic.
> > >
> > > If other clients follow the behavior of the Java client, they should
> > > also convert negative schemas to NONE currently. Therefore, changes
> > > cannot be avoided. No matter if the semantic of `setSchemaType` is
> > > changed, they should follow the Java implementation as well.
> > >
> > > > This will change the meaning of the schema data field
> > >
> > > The existing definition only defines its meaning to the AVRO and JSON
> > > schema. But from a more general view, the schema data should be
> > > something associated with the current schema. Giving it more meaning
> > > for other schema types is acceptable IMO. For example, the schema data
> > > field represents the serialized Protobuf descriptor in Protobuf Native
> > > schema, see `ProtobufNativeSchema#of`:
> > >
> > > ```java
> > > .schema(ProtobufNativeSchemaUtils.serialize(descriptor))
> > > ```
> > >
> > > [1] https://github.com/apache/pulsar/pull/3940
> > >
> > > Thanks,
> > > Yunze
> > >
> > > On Wed, Jan 4, 2023 at 8:27 PM 丛搏 <bog...@apache.org> wrote:
> > > >
> > > > > It does not affect the public API so it can be cherry-picked into old
> > > > > branches. The main difference with this proposal is that my solution
> > > > > carries the identity info (i.e. `AUTO_CONSUME`) in the schema data,
> > > > > which is a byte array. The negative schema types should not be exposed
> > > > > to users. Adding a field to the subscribe request might be okay but it
> > > > > could be unnecessary to cover such a corner case.
> > > >
> > > > This will change the meaning of the schema data field and couple the
> > > > schema type and schema data. `schema type = NONE` and `schema data =
> > > > "AUTO_CONSUME" ` represent `AUTO_ CONSUME`, I think it's weird. Why
> > > > can't we upload negative schema types?
> > > >
> > > > > It does not affect the public API
> > > > upload negative schema types only changes the proto, if using `schema
> > > > type = NONE` and `schema data = "AUTO_CONSUME" `, every different
> > > > language client will code the special logic. This special logic can
> > > > easily be ignored.
> > > >
> > > > Thanks,
> > > > Bo
> > > >
> > > > Yunze Xu <y...@streamnative.io.invalid> 于2023年1月4日周三 17:02写道:
> > > > >
> > > > > I opened a PR to fix this issue:
> > > https://github.com/apache/pulsar/pull/19128
> > > > >
> > > > > It does not affect the public API so it can be cherry-picked into old
> > > > > branches. The main difference with this proposal is that my solution
> > > > > carries the identity info (i.e. `AUTO_CONSUME`) in the schema data,
> > > > > which is a byte array. The negative schema types should not be exposed
> > > > > to users. Adding a field to the subscribe request might be okay but it
> > > > > could be unnecessary to cover such a corner case.
> > > > >
> > > > > It might be controversial if schema data should be used in such a way,
> > > > > because the original purpose is to represent the AVRO or JSON
> > > > > definition. However, this semantic is defined just for AVRO or JSON
> > > > > schema. IMO, the data field of other schemas is never used well.
> > > > >
> > > > > Another solution is to make use of the name field of schema, which
> > > > > might be more natural. I think we can continue the discussion in my
> > > > > PR.
> > > > >
> > > > > Thanks,
> > > > > Yunze
> > > > >
> > > > > On Wed, Jan 4, 2023 at 11:07 AM Yunze Xu <y...@streamnative.io> wrote:
> > > > > >
> > > > > > Modifying the subscribe request is better than exposing AUTO_CONSUME
> > > > > > schema type IMO. The negative value of a schema type, like BYTES,
> > > > > > AUTO_PRODUCE, means this schema type should only be used internally.
> > > > > > Adding the negative enum value to the Schema definition in
> > > > > > PulsarApi.proto looks very weird.
> > > > > >
> > > > > > But I'm still wondering if we can avoid the API changes. I will look
> > > > > > deeper into this issue.
> > > > > >
> > > > > > Thanks,
> > > > > > Yunze
> > > > > >
> > > > > > On Wed, Jan 4, 2023 at 12:12 AM Enrico Olivelli 
> > > > > > <eolive...@gmail.com>
> > > wrote:
> > > > > > >
> > > > > > > Il Mar 3 Gen 2023, 14:37 Yunze Xu <y...@streamnative.io.invalid>
> > > ha scritto:
> > > > > > >
> > > > > > > > Hi Bo,
> > > > > > > >
> > > > > > > > I got it now. The PIP title sounds ambiguous. Using the term
> > > "Upload
> > > > > > > > xxx SchemaType" sounds like uploading the schema into the
> > > registry.
> > > > > > > > Instead, it should be "carrying schema in the request when
> > > subscribing
> > > > > > > > with AUTO_CONSUME schema".
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > I agree that we should change the naming and we should probably
> > > not use a
> > > > > > > new Schema type but add an optional field in the subscribe request
> > > (and do
> > > > > > > not send it if the broker is an old version)
> > > > > > >
> > > > > > >
> > > > > > > Enrico
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Yunze
> > > > > > > >
> > > > > > > > On Tue, Jan 3, 2023 at 4:56 PM 丛搏 <bog...@apache.org> wrote:
> > > > > > > > >
> > > > > > > > > Hi, Yunze
> > > > > > > > > > What I am concerned about is that if the old clients with
> > > other
> > > > > > > > > > schemas (i.e. schema is neither null nor AUTO_CONSUME)
> > > subscribe to
> > > > > > > > > > the topic with AUTO_CONSUME schema, what will happen?
> > > > > > > > >
> > > > > > > > > AUTO_CONSUME schema will not store in
> > > `SchemaRegistryServiceImpl`, it
> > > > > > > > > only represents one consumer with AUTO_CONSUME schema to
> > > subscribe to
> > > > > > > > > a topic. If old clients with other schemas subscribe to this
> > > topic,
> > > > > > > > > Its behavior will not be changed by this PIP.
> > > > > > > > >
> > > > > > > > > > What's the schema compatibility check rule on a topic with
> > > > > > > > AUTO_CONSUME schema?
> > > > > > > > >
> > > > > > > > > it's only the consumer schema compatibility check, not on
> > > topic. if a
> > > > > > > > > consume with AUTO_CONSUME schema will do any compatibility
> > > check
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Bo
> > > > > > > > >
> > > > > > > > > Yunze Xu <y...@streamnative.io.invalid> 于2023年1月3日周二 10:16写道:
> > > > > > > > > >
> > > > > > > > > > What I am concerned about is that if the old clients with
> > > other
> > > > > > > > > > schemas (i.e. schema is neither null nor AUTO_CONSUME)
> > > subscribe to
> > > > > > > > > > the topic with AUTO_CONSUME schema, what will happen? What's
> > > the
> > > > > > > > > > schema compatibility check rule on a topic with AUTO_CONSUME
> > > schema?
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > > Yunze
> > > > > > > > > >
> > > > > > > > > > On Mon, Jan 2, 2023 at 12:38 AM SiNan Liu <
> > > liusinan1...@gmail.com>
> > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > 1.Schema.Type and
> > > org.apache.pulsar.common.schema.SchemaType value
> > > > > > > > should
> > > > > > > > > > > be the same.
> > > > > > > > > > > 2.These changes do not affect produce and are only affect
> > > consumer
> > > > > > > > > > > subscribe behavior.
> > > > > > > > > > > 3.backward compatibility:
> > > > > > > > > > > (1)In
> > > org.apache.pulsar.broker.service.ServerCnx#handleSubscribe.
> > > > > > > > > > > if (schema != null && schema.getType() !=
> > > SchemaType.AUTO_CONSUME) {
> > > > > > > > > > > return topic.addSchemaIfIdleOrCheckCompatible(schema)
> > > > > > > > > > > .thenCompose(v -> topic.subscribe(option));
> > > > > > > > > > > } else {
> > > > > > > > > > > return topic.subscribe(option);
> > > > > > > > > > > }
> > > > > > > > > > > For the older pulsar client, the schema is null if
> > > AUTO_CONSUME
> > > > > > > > consumer
> > > > > > > > > > > subscribe to the Topic.
> > > > > > > > > > > For the new pulsar client, if AUTO_CONSUME consumer
> > > subscribe the
> > > > > > > > Topic,
> > > > > > > > > > > then schema is not null and schema.getType() =
> > > > > > > > SchemaType.AUTO_CONSUME.
> > > > > > > > > > > Both new and old pulsar clients consume the topic, will
> > > return topic.
> > > > > > > > > > > subscribe(option).
> > > > > > > > > > >
> > > > > > > > > > > (2)In
> > > org.apache.pulsar.broker.service.persistent.PersistentTopic
> > > > > > > > > > > #addSchemaIfIdleOrCheckCompatible.
> > > > > > > > > > > @Override
> > > > > > > > > > > public CompletableFuture<Void>
> > > > > > > > addSchemaIfIdleOrCheckCompatible(SchemaData
> > > > > > > > > > > schema) {
> > > > > > > > > > > return hasSchema().thenCompose((hasSchema) -> {
> > > > > > > > > > > int numActiveConsumersWithoutAutoSchema =
> > > > > > > > subscriptions.values().stream()
> > > > > > > > > > > .mapToInt(subscription ->
> > > subscription.getConsumers().stream()
> > > > > > > > > > > .filter(consumer -> consumer.getSchemaType() !=
> > > > > > > > SchemaType.AUTO_CONSUME)
> > > > > > > > > > > .toList().size())
> > > > > > > > > > > .sum();
> > > > > > > > > > > if (hasSchema
> > > > > > > > > > > || (!producers.isEmpty())
> > > > > > > > > > > || (numActiveConsumersWithoutAutoSchema != 0)
> > > > > > > > > > > || (ledger.getTotalSize() != 0)) {
> > > > > > > > > > > return checkSchemaCompatibleForConsumer(schema);
> > > > > > > > > > > } else {
> > > > > > > > > > > return addSchema(schema).thenCompose(schemaVersion ->
> > > > > > > > > > > CompletableFuture.completedFuture(null));
> > > > > > > > > > > }
> > > > > > > > > > > });
> > > > > > > > > > > }
> > > > > > > > > > > Only in one case will there be a bug.
> > > > > > > > > > > First, the old pulsar client consume the empty topic, the
> > > consumer
> > > > > > > > schema
> > > > > > > > > > > is AUTO_CONSUME, and then whether the new or old pulsar
> > > client
> > > > > > > > consume(i.e.
> > > > > > > > > > > schema is AVRO) the topic.
> > > > > > > > > > > The broker will return the error message as
> > > > > > > > IncompatibleSchemaException ("
> > > > > > > > > > > Topic does not have a schema to check "). The bug at
> > > issue17354 is
> > > > > > > > not
> > > > > > > > > > > fixed in this case.
> > > > > > > > > > > All the other cases will be normal.
> > > > > > > > > > >
> > > > > > > > > > > Yunze Xu <y...@streamnative.io.invalid> 于2022年12月31日周六
> > > 20:23写道:
> > > > > > > > > > >
> > > > > > > > > > > > Defining `AutoConsume` as -3 is somehow strange. Could
> > > you clarify
> > > > > > > > if
> > > > > > > > > > > > backward compatibility is guaranteed? i.e. if the new
> > > Pulsar client
> > > > > > > > > > > > uploaded the AUTO_CONSUME schema to the broker, can the
> > > old Pulsar
> > > > > > > > > > > > clients produce or consume the same topic anymore?
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks,
> > > > > > > > > > > > Yunze
> > > > > > > > > > > >
> > > > > > > > > > > > On Fri, Dec 30, 2022 at 11:32 PM 思楠刘 <
> > > liusinan1...@gmail.com>
> > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > Hi all,
> > > > > > > > > > > > >
> > > > > > > > > > > > > I made a PIP to discuss:
> > > > > > > > https://github.com/apache/pulsar/issues/19113.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > Sinan
> > > > > > > > > > > >
> > > > > > > >
> > >

Reply via email to