> This design also has serious compatibility problems between old and new
pulsar clients and new and old brokers.

Could you please explain more details of the compatibility issue if we
leverage
the protocol version?

> We should not use a negative enum number in PulsarApi.proto. It's
unnatural. If we decide to carry the AUTO_CONSUME schema in a
CommandSubscribe, it should not be treated as a negative schema type.

IMO, the protocol is defined as Enum. Users are developing based on the
Enum, not the value of the Enum. We need to make sure the value
of the Enum is immutable. It is not required that he must be a positive
number.
Maybe it looks ugly.

And the protocol is just the API definition, not about which schema will be
persistent.
As I understand from the protocol definition, the Schema in the subscribe
command is
used to pass the used schema of the consumer. We just make it absent before
for
AUTO_CONSUME schema. We just thought we could make it absent if the consumer
is using AUTO_CONSUME schema. But apparently, this is a problem for now.

I think the easier-to-understand way is for the client to set the schema
used when
subscribing or creating the producer. Rather than which ones need to be set
and which
ones do not need to be set.

Thanks,
Penghui

On Mon, Jan 9, 2023 at 11:32 AM SiNan Liu <liusinan1...@gmail.com> wrote:

> This design also has serious compatibility problems between old and new
> pulsar clients and new and old brokers.
>
>
> Thanks,
> Sinan
>
>
> PengHui Li <peng...@apache.org> 于 2023年1月9日周一 上午9:51写道:
>
> > Sorry for the late reply.
> >
> > We can leverage the `ProtocolVersion` [1] to handle the compatibility
> > issue.
> > It looks like only if the protocol_version >= 21, subscribe with the
> > auto_consume schema
> >
> > IMO, both the new key-value of the subscribe command, and a specific
> > representative are
> > API changes. It's just that some have modified the definition of the API,
> > and some have modified the behavior of the API
> >
> > I prefer the intuitive way. And from the perspective of API-based
> > developers, we should
> > try to provide a simple and clear API with no hidden rules. The client
> just
> > uploads the schema
> > that it has except the byte[] schema. The broker knows how to handle the
> > different schemas,
> > such as AUTO_PRODUCE, and AUTO_CONSUME. And this should not be the burden
> > of the
> > client developer to learn the details of the schema implementation. They
> > should work according
> > to the API spec.
> >
> > If we can resolve the compatibility issue with uploading the AUTO_CONSUME
> > schema with
> > subscribe command, do you see any apparent cons?
> >
> > Best,
> > Penghui
> >
> > [1]
> >
> >
> https://github.com/apache/pulsar/blob/master/pulsar-common/src/main/proto/PulsarApi.proto#L241-L266
> >
> > On Fri, Jan 6, 2023 at 10:31 PM SiNan Liu <liusinan1...@gmail.com>
> wrote:
> >
> > > Ok, I will update the PIP issue later.
> > >
> > > About my current design.
> > > When the consumer with AUTO_CONSUME schema subscribed to an "empty"
> > topic,
> > > the schemaInfo will be null.
> > > ```
> > > public SchemaInfo getSchemaInfo(byte[] schemaVersion) {
> > >     SchemaVersion sv = getSchemaVersion(schemaVersion);
> > >     if (schemaMap.containsKey(sv)) {
> > >         return schemaMap.get(sv).getSchemaInfo();
> > >     }
> > >     return null;
> > >
> > > }
> > >
> > > ```
> > > And checkSchemaCompatibility must be set in
> > > `org.apache.pulsar.common.protocol.Commands#newSubscribe`,
> > > and we need to know that this is an AUTO_CONSUME consumer subscribing.
> So
> > > we should set a "*default*" schemaInfo(schemaType = AUTO_CONSUME) for
> > > AutoConsumeSchema,
> > > this is because schemaInfo is also null when `si.getType` is
> > > SchemaType.BYTES or SchemaType.NONE.
> > > And checkSchemaCompatibility can be set in
> > > `org.apache.pulsar.common.protocol.Commands#newSubscribe`. The most
> > > important thing is clearSchema, which does not carry the wrong schema
> to
> > > the broker.
> > >
> > >
> > > Yunze Xu <y...@streamnative.io.invalid> 于2023年1月6日周五 12:57写道:
> > >
> > > > You only need to describe what's added to the PulsarApi.proto, i.e.
> > > > you don't need to paste all definitions of `CommandSubscribe` in the
> > > > proposal. Take PIP-54 [1] for example, it only pastes the new field
> > > > `ack_set` and does not paste the whole `MessageIdData` definition.
> > > >
> > > > The implementations section involves too much code and just looks
> like
> > > > an actual PR. Take PIP-194 [2] for example, you should only talk
> about
> > > > the implementations from a high level.
> > > >
> > > > Let's talk back to your current design, when the schema type is
> > > > AUTO_CONSUME, you clear the schema in CommandSubscribe. It seems that
> > > > adding a SchemaInfo to the AutoConsumeSchema is meaningless.
> > > >
> > > > [1]
> > > >
> > >
> >
> https://github.com/apache/pulsar/wiki/PIP-54:-Support-acknowledgment-at-batch-index-level#changes
> > > > [2] https://github.com/apache/pulsar/issues/16757
> > > >
> > > > On Fri, Jan 6, 2023 at 12:17 AM SiNan Liu <liusinan1...@gmail.com>
> > > wrote:
> > > > >
> > > > > I just updated the PIP issue and title, you guys can have a look.
> > > > issue19113
> > > > > <https://github.com/apache/pulsar/issues/19113>
> > > > > I added `check_schema_compatibility` in CommandSubscribe, and I
> also
> > > made
> > > > > many other changes.
> > > > >
> > > > > Yunze Xu <y...@streamnative.io.invalid> 于2023年1月5日周四 14:33写道:
> > > > >
> > > > > > It's not related to the schema itself. When an AUTO_CONSUME
> > consumer
> > > > > > subscribes to a topic, the option tells the broker that it's an
> > > > > > AUTO_CONSUME consumer so that the broker should not treat it as
> an
> > > > > > active consumer when performing schema compatibility check. If
> > there
> > > > > > is a consumer that also wants to ignore the schema compatibility
> > > check
> > > > > > in future, this option can be reused.
> > > > > >
> > > > > > The other important reason is the breaking change by carrying the
> > > > > > schema info on an AUTO_CONSUMER consumer. (See my explanations in
> > > > > > GitHub and the mail list) If the consumer serves an old version
> > > > > > consumer, the schema could be uploaded into the registry and
> other
> > > > > > clients would be affected. So we should keep not carrying the
> > schema
> > > > > > info in CommandSubscribe for an AUTO_CONSUMER consumer.
> > > > > >
> > > > > > Thanks,
> > > > > > Yunze
> > > > > >
> > > > > > On Thu, Jan 5, 2023 at 11:55 AM SiNan Liu <
> liusinan1...@gmail.com>
> > > > wrote:
> > > > > > >
> > > > > > > I have modified pip issue and title last night. Yunze. You mean
> > > that
> > > > in
> > > > > > > PulsarApi.proto, take `optional bool is_auto_consume_schema = 6
> > > > [default
> > > > > > =
> > > > > > > false]; ` in CommandSubscribe instead of Schema? But shouldn't
> > > > > > > schema-related stuff be in Schema?
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Sinan
> > > > > > >
> > > > > > > Yunze Xu <y...@streamnative.io.invalid> 于 2023年1月5日周四
> 上午12:31写道:
> > > > > > >
> > > > > > > > I found a similar compatibility problem with my closed PR. We
> > > > should
> > > > > > > > not set the `Schema` field for AUTO_CONSUME schema. More
> > > > explanations
> > > > > > > > can be found here [1].
> > > > > > > >
> > > > > > > > Instead, we can add an optional field into CommandSubscribe
> to
> > > > > > > > indicate the schema compatibility check is skipped.
> > > > > > > >
> > > > > > > > ```protobuf
> > > > > > > > optional bool check_schema_compatibility = 20 [default =
> true]
> > > > > > > > ```
> > > > > > > >
> > > > > > > > Then we can add a relative parameter here:
> > > > > > > >
> > > > > > > > ```java
> > > > > > > > CompletableFuture<Void>
> > > addSchemaIfIdleOrCheckCompatible(SchemaData
> > > > > > > > schema, boolean checkSchemaCompatibility);
> > > > > > > > ```
> > > > > > > >
> > > > > > > >
> > > > > > > > [1]
> > > > > >
> > https://github.com/apache/pulsar/pull/17449#issuecomment-1371135700
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Yunze
> > > > > > > >
> > > > > > > > On Wed, Jan 4, 2023 at 11:49 PM Yunze Xu <
> y...@streamnative.io
> > >
> > > > wrote:
> > > > > > > > >
> > > > > > > > > Could you also update the PIP issue? This solution is
> totally
> > > > > > > > > different from your original proposal. Since it still
> > > introduces
> > > > > > > > > changes to `PulsarApi.proto`, it also requires a PIP (we
> can
> > > > reuse
> > > > > > > > > this one).
> > > > > > > > >
> > > > > > > > > ----
> > > > > > > > >
> > > > > > > > > BTW, I tested again about carrying the SchemaInfo in the
> > > > > > > > > CommandSubscribe request. It could break compatibility.
> Given
> > > the
> > > > > > > > > following code run against Pulsar standalone 2.8.4:
> > > > > > > > >
> > > > > > > > > ```java
> > > > > > > > >         PulsarClient client =
> > > > > > > > > PulsarClient.builder().serviceUrl(serviceUrl).build();
> > > > > > > > >         Consumer<GenericRecord> consumer =
> > > > > > > > > client.newConsumer(Schema.AUTO_CONSUME())
> > > > > > > > >                 .topic(topic)
> > > > > > > > >                 .subscriptionName("sub")
> > > > > > > > >                 .subscriptionType(SubscriptionType.Shared)
> > > > > > > > >                 .subscribe();
> > > > > > > > >         Producer<User> producer =
> > > > > > > > client.newProducer(Schema.AVRO(User.class))
> > > > > > > > >                 .topic(topic)
> > > > > > > > >                 .create();
> > > > > > > > > ```
> > > > > > > > >
> > > > > > > > > - If the schema type is 0 in CommandSubscribe, the NONE
> > schema
> > > > will
> > > > > > be
> > > > > > > > > persisted and the producer will fail to create due to the
> > > schema
> > > > > > > > > compatibility check.
> > > > > > > > > - If the schema type is -3 (AUTO_CONSUME), it will fail at
> > > > > > subscribe()
> > > > > > > > > with the following error:
> > > > > > > > >
> > > > > > > > > ```
> > > > > > > > > 23:49:10.978 [pulsar-io-18-13] WARN
> > > > > > > > > org.apache.pulsar.broker.service.ServerCnx - [/
> > > 172.23.160.1:5921
> > > > ]
> > > > > > Got
> > > > > > > > > exception java.lang.IllegalStateException: Some required
> > fields
> > > > are
> > > > > > > > > missing
> > > > > > > > >         at
> > > > > > > >
> > > > > >
> > > >
> > >
> >
> org.apache.pulsar.common.api.proto.Schema.checkRequiredFields(Schema.java:337)
> > > > > > > > >         at
> > > > > > > >
> > > > org.apache.pulsar.common.api.proto.Schema.parseFrom(Schema.java:332)
> > > > > > > > >         at
> > > > > > > >
> > > > > >
> > > >
> > >
> >
> org.apache.pulsar.common.api.proto.CommandSubscribe.parseFrom(CommandSubscribe.java:785)
> > > > > > > > >         at
> > > > > > > >
> > > > > >
> > > >
> > >
> >
> org.apache.pulsar.common.api.proto.BaseCommand.parseFrom(BaseCommand.java:2397)
> > > > > > > > > ```
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Yunze
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Wed, Jan 4, 2023 at 10:34 PM SiNan Liu <
> > > > liusinan1...@gmail.com>
> > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > I just implemented add an optional field in the subscribe
> > > > request
> > > > > > and
> > > > > > > > > > compatibility seems to be fine. You guys can have a look
> at
> > > my
> > > > PR (
> > > > > > > > > > https://github.com/apache/pulsar/pull/17449).
> > > > > > > > > >
> > > > > > > > > > Yunze Xu <y...@streamnative.io.invalid> 于2023年1月4日周三
> > > 21:31写道:
> > > > > > > > > >
> > > > > > > > > > > > Why can't we upload negative schema types?
> > > > > > > > > > >
> > > > > > > > > > > I want to avoid the changes to existing methods like
> > > > > > > > > > > Commands#getSchemaType, which converts all negative
> > schema
> > > > types
> > > > > > to
> > > > > > > > > > > NONE:
> > > > > > > > > > >
> > > > > > > > > > > ```java
> > > > > > > > > > > private static Schema.Type getSchemaType(SchemaType
> > type) {
> > > > > > > > > > >     if (type.getValue() < 0) {
> > > > > > > > > > >         return Schema.Type.None;
> > > > > > > > > > >     } else {
> > > > > > > > > > >         return Schema.Type.valueOf(type.getValue());
> > > > > > > > > > >     }
> > > > > > > > > > > }
> > > > > > > > > > > ```
> > > > > > > > > > >
> > > > > > > > > > > I guess the above code was written because:
> > > > > > > > > > > 1. NONE schema type means it's not uploaded into the
> > > > registry.
> > > > > > (See
> > > > > > > > #3940
> > > > > > > > > > > [1])
> > > > > > > > > > > 2. There is no existing schema that uses NONE as its
> > schema
> > > > type,
> > > > > > > > i.e.
> > > > > > > > > > > NONE schema is used as something special.
> > > > > > > > > > >
> > > > > > > > > > > > every different language client will code the special
> > > > logic.
> > > > > > > > > > >
> > > > > > > > > > > If other clients follow the behavior of the Java
> client,
> > > they
> > > > > > should
> > > > > > > > > > > also convert negative schemas to NONE currently.
> > Therefore,
> > > > > > changes
> > > > > > > > > > > cannot be avoided. No matter if the semantic of
> > > > `setSchemaType`
> > > > > > is
> > > > > > > > > > > changed, they should follow the Java implementation as
> > > well.
> > > > > > > > > > >
> > > > > > > > > > > > This will change the meaning of the schema data field
> > > > > > > > > > >
> > > > > > > > > > > The existing definition only defines its meaning to the
> > > AVRO
> > > > and
> > > > > > JSON
> > > > > > > > > > > schema. But from a more general view, the schema data
> > > should
> > > > be
> > > > > > > > > > > something associated with the current schema. Giving it
> > > more
> > > > > > meaning
> > > > > > > > > > > for other schema types is acceptable IMO. For example,
> > the
> > > > schema
> > > > > > > > data
> > > > > > > > > > > field represents the serialized Protobuf descriptor in
> > > > Protobuf
> > > > > > > > Native
> > > > > > > > > > > schema, see `ProtobufNativeSchema#of`:
> > > > > > > > > > >
> > > > > > > > > > > ```java
> > > > > > > > > > >
> .schema(ProtobufNativeSchemaUtils.serialize(descriptor))
> > > > > > > > > > > ```
> > > > > > > > > > >
> > > > > > > > > > > [1] https://github.com/apache/pulsar/pull/3940
> > > > > > > > > > >
> > > > > > > > > > > Thanks,
> > > > > > > > > > > Yunze
> > > > > > > > > > >
> > > > > > > > > > > On Wed, Jan 4, 2023 at 8:27 PM 丛搏 <bog...@apache.org>
> > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > It does not affect the public API so it can be
> > > > cherry-picked
> > > > > > > > into old
> > > > > > > > > > > > > branches. The main difference with this proposal is
> > > that
> > > > my
> > > > > > > > solution
> > > > > > > > > > > > > carries the identity info (i.e. `AUTO_CONSUME`) in
> > the
> > > > schema
> > > > > > > > data,
> > > > > > > > > > > > > which is a byte array. The negative schema types
> > should
> > > > not
> > > > > > be
> > > > > > > > exposed
> > > > > > > > > > > > > to users. Adding a field to the subscribe request
> > might
> > > > be
> > > > > > okay
> > > > > > > > but it
> > > > > > > > > > > > > could be unnecessary to cover such a corner case.
> > > > > > > > > > > >
> > > > > > > > > > > > This will change the meaning of the schema data field
> > and
> > > > > > couple
> > > > > > > > the
> > > > > > > > > > > > schema type and schema data. `schema type = NONE` and
> > > > `schema
> > > > > > data
> > > > > > > > =
> > > > > > > > > > > > "AUTO_CONSUME" ` represent `AUTO_ CONSUME`, I think
> > it's
> > > > > > weird. Why
> > > > > > > > > > > > can't we upload negative schema types?
> > > > > > > > > > > >
> > > > > > > > > > > > > It does not affect the public API
> > > > > > > > > > > > upload negative schema types only changes the proto,
> if
> > > > using
> > > > > > > > `schema
> > > > > > > > > > > > type = NONE` and `schema data = "AUTO_CONSUME" `,
> every
> > > > > > different
> > > > > > > > > > > > language client will code the special logic. This
> > special
> > > > > > logic can
> > > > > > > > > > > > easily be ignored.
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks,
> > > > > > > > > > > > Bo
> > > > > > > > > > > >
> > > > > > > > > > > > Yunze Xu <y...@streamnative.io.invalid> 于2023年1月4日周三
> > > > 17:02写道:
> > > > > > > > > > > > >
> > > > > > > > > > > > > I opened a PR to fix this issue:
> > > > > > > > > > > https://github.com/apache/pulsar/pull/19128
> > > > > > > > > > > > >
> > > > > > > > > > > > > It does not affect the public API so it can be
> > > > cherry-picked
> > > > > > > > into old
> > > > > > > > > > > > > branches. The main difference with this proposal is
> > > that
> > > > my
> > > > > > > > solution
> > > > > > > > > > > > > carries the identity info (i.e. `AUTO_CONSUME`) in
> > the
> > > > schema
> > > > > > > > data,
> > > > > > > > > > > > > which is a byte array. The negative schema types
> > should
> > > > not
> > > > > > be
> > > > > > > > exposed
> > > > > > > > > > > > > to users. Adding a field to the subscribe request
> > might
> > > > be
> > > > > > okay
> > > > > > > > but it
> > > > > > > > > > > > > could be unnecessary to cover such a corner case.
> > > > > > > > > > > > >
> > > > > > > > > > > > > It might be controversial if schema data should be
> > used
> > > > in
> > > > > > such
> > > > > > > > a way,
> > > > > > > > > > > > > because the original purpose is to represent the
> AVRO
> > > or
> > > > JSON
> > > > > > > > > > > > > definition. However, this semantic is defined just
> > for
> > > > AVRO
> > > > > > or
> > > > > > > > JSON
> > > > > > > > > > > > > schema. IMO, the data field of other schemas is
> never
> > > > used
> > > > > > well.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Another solution is to make use of the name field
> of
> > > > schema,
> > > > > > > > which
> > > > > > > > > > > > > might be more natural. I think we can continue the
> > > > > > discussion in
> > > > > > > > my
> > > > > > > > > > > > > PR.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > Yunze
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Wed, Jan 4, 2023 at 11:07 AM Yunze Xu <
> > > > > > y...@streamnative.io>
> > > > > > > > wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Modifying the subscribe request is better than
> > > exposing
> > > > > > > > AUTO_CONSUME
> > > > > > > > > > > > > > schema type IMO. The negative value of a schema
> > type,
> > > > like
> > > > > > > > BYTES,
> > > > > > > > > > > > > > AUTO_PRODUCE, means this schema type should only
> be
> > > > used
> > > > > > > > internally.
> > > > > > > > > > > > > > Adding the negative enum value to the Schema
> > > > definition in
> > > > > > > > > > > > > > PulsarApi.proto looks very weird.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > But I'm still wondering if we can avoid the API
> > > > changes. I
> > > > > > > > will look
> > > > > > > > > > > > > > deeper into this issue.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > Yunze
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Wed, Jan 4, 2023 at 12:12 AM Enrico Olivelli <
> > > > > > > > eolive...@gmail.com>
> > > > > > > > > > > wrote:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Il Mar 3 Gen 2023, 14:37 Yunze Xu
> > > > > > > > <y...@streamnative.io.invalid>
> > > > > > > > > > > ha scritto:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Hi Bo,
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > I got it now. The PIP title sounds ambiguous.
> > > > Using the
> > > > > > > > term
> > > > > > > > > > > "Upload
> > > > > > > > > > > > > > > > xxx SchemaType" sounds like uploading the
> > schema
> > > > into
> > > > > > the
> > > > > > > > > > > registry.
> > > > > > > > > > > > > > > > Instead, it should be "carrying schema in the
> > > > request
> > > > > > when
> > > > > > > > > > > subscribing
> > > > > > > > > > > > > > > > with AUTO_CONSUME schema".
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > I agree that we should change the naming and we
> > > > should
> > > > > > > > probably
> > > > > > > > > > > not use a
> > > > > > > > > > > > > > > new Schema type but add an optional field in
> the
> > > > > > subscribe
> > > > > > > > request
> > > > > > > > > > > (and do
> > > > > > > > > > > > > > > not send it if the broker is an old version)
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Enrico
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > Yunze
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > On Tue, Jan 3, 2023 at 4:56 PM 丛搏 <
> > > > bog...@apache.org>
> > > > > > > > wrote:
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Hi, Yunze
> > > > > > > > > > > > > > > > > > What I am concerned about is that if the
> > old
> > > > > > clients
> > > > > > > > with
> > > > > > > > > > > other
> > > > > > > > > > > > > > > > > > schemas (i.e. schema is neither null nor
> > > > > > AUTO_CONSUME)
> > > > > > > > > > > subscribe to
> > > > > > > > > > > > > > > > > > the topic with AUTO_CONSUME schema, what
> > will
> > > > > > happen?
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > AUTO_CONSUME schema will not store in
> > > > > > > > > > > `SchemaRegistryServiceImpl`, it
> > > > > > > > > > > > > > > > > only represents one consumer with
> > AUTO_CONSUME
> > > > > > schema to
> > > > > > > > > > > subscribe to
> > > > > > > > > > > > > > > > > a topic. If old clients with other schemas
> > > > subscribe
> > > > > > to
> > > > > > > > this
> > > > > > > > > > > topic,
> > > > > > > > > > > > > > > > > Its behavior will not be changed by this
> PIP.
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > What's the schema compatibility check
> rule
> > > on a
> > > > > > topic
> > > > > > > > with
> > > > > > > > > > > > > > > > AUTO_CONSUME schema?
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > it's only the consumer schema compatibility
> > > > check,
> > > > > > not on
> > > > > > > > > > > topic. if a
> > > > > > > > > > > > > > > > > consume with AUTO_CONSUME schema will do
> any
> > > > > > > > compatibility
> > > > > > > > > > > check
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > > Bo
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Yunze Xu <y...@streamnative.io.invalid>
> > > > 于2023年1月3日周二
> > > > > > > > 10:16写道:
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > What I am concerned about is that if the
> > old
> > > > > > clients
> > > > > > > > with
> > > > > > > > > > > other
> > > > > > > > > > > > > > > > > > schemas (i.e. schema is neither null nor
> > > > > > AUTO_CONSUME)
> > > > > > > > > > > subscribe to
> > > > > > > > > > > > > > > > > > the topic with AUTO_CONSUME schema, what
> > will
> > > > > > happen?
> > > > > > > > What's
> > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > schema compatibility check rule on a
> topic
> > > with
> > > > > > > > AUTO_CONSUME
> > > > > > > > > > > schema?
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > > > Yunze
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > On Mon, Jan 2, 2023 at 12:38 AM SiNan
> Liu <
> > > > > > > > > > > liusinan1...@gmail.com>
> > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > 1.Schema.Type and
> > > > > > > > > > > org.apache.pulsar.common.schema.SchemaType value
> > > > > > > > > > > > > > > > should
> > > > > > > > > > > > > > > > > > > be the same.
> > > > > > > > > > > > > > > > > > > 2.These changes do not affect produce
> and
> > > are
> > > > > > only
> > > > > > > > affect
> > > > > > > > > > > consumer
> > > > > > > > > > > > > > > > > > > subscribe behavior.
> > > > > > > > > > > > > > > > > > > 3.backward compatibility:
> > > > > > > > > > > > > > > > > > > (1)In
> > > > > > > > > > >
> > org.apache.pulsar.broker.service.ServerCnx#handleSubscribe.
> > > > > > > > > > > > > > > > > > > if (schema != null && schema.getType()
> !=
> > > > > > > > > > > SchemaType.AUTO_CONSUME) {
> > > > > > > > > > > > > > > > > > > return
> > > > > > topic.addSchemaIfIdleOrCheckCompatible(schema)
> > > > > > > > > > > > > > > > > > > .thenCompose(v ->
> > topic.subscribe(option));
> > > > > > > > > > > > > > > > > > > } else {
> > > > > > > > > > > > > > > > > > > return topic.subscribe(option);
> > > > > > > > > > > > > > > > > > > }
> > > > > > > > > > > > > > > > > > > For the older pulsar client, the schema
> > is
> > > > null
> > > > > > if
> > > > > > > > > > > AUTO_CONSUME
> > > > > > > > > > > > > > > > consumer
> > > > > > > > > > > > > > > > > > > subscribe to the Topic.
> > > > > > > > > > > > > > > > > > > For the new pulsar client, if
> > AUTO_CONSUME
> > > > > > consumer
> > > > > > > > > > > subscribe the
> > > > > > > > > > > > > > > > Topic,
> > > > > > > > > > > > > > > > > > > then schema is not null and
> > > schema.getType()
> > > > =
> > > > > > > > > > > > > > > > SchemaType.AUTO_CONSUME.
> > > > > > > > > > > > > > > > > > > Both new and old pulsar clients consume
> > the
> > > > > > topic,
> > > > > > > > will
> > > > > > > > > > > return topic.
> > > > > > > > > > > > > > > > > > > subscribe(option).
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > (2)In
> > > > > > > > > > >
> > org.apache.pulsar.broker.service.persistent.PersistentTopic
> > > > > > > > > > > > > > > > > > > #addSchemaIfIdleOrCheckCompatible.
> > > > > > > > > > > > > > > > > > > @Override
> > > > > > > > > > > > > > > > > > > public CompletableFuture<Void>
> > > > > > > > > > > > > > > > addSchemaIfIdleOrCheckCompatible(SchemaData
> > > > > > > > > > > > > > > > > > > schema) {
> > > > > > > > > > > > > > > > > > > return
> > hasSchema().thenCompose((hasSchema)
> > > > -> {
> > > > > > > > > > > > > > > > > > > int
> numActiveConsumersWithoutAutoSchema =
> > > > > > > > > > > > > > > > subscriptions.values().stream()
> > > > > > > > > > > > > > > > > > > .mapToInt(subscription ->
> > > > > > > > > > > subscription.getConsumers().stream()
> > > > > > > > > > > > > > > > > > > .filter(consumer ->
> > > consumer.getSchemaType()
> > > > !=
> > > > > > > > > > > > > > > > SchemaType.AUTO_CONSUME)
> > > > > > > > > > > > > > > > > > > .toList().size())
> > > > > > > > > > > > > > > > > > > .sum();
> > > > > > > > > > > > > > > > > > > if (hasSchema
> > > > > > > > > > > > > > > > > > > || (!producers.isEmpty())
> > > > > > > > > > > > > > > > > > > || (numActiveConsumersWithoutAutoSchema
> > !=
> > > 0)
> > > > > > > > > > > > > > > > > > > || (ledger.getTotalSize() != 0)) {
> > > > > > > > > > > > > > > > > > > return
> > > > checkSchemaCompatibleForConsumer(schema);
> > > > > > > > > > > > > > > > > > > } else {
> > > > > > > > > > > > > > > > > > > return
> > > > > > addSchema(schema).thenCompose(schemaVersion ->
> > > > > > > > > > > > > > > > > > >
> CompletableFuture.completedFuture(null));
> > > > > > > > > > > > > > > > > > > }
> > > > > > > > > > > > > > > > > > > });
> > > > > > > > > > > > > > > > > > > }
> > > > > > > > > > > > > > > > > > > Only in one case will there be a bug.
> > > > > > > > > > > > > > > > > > > First, the old pulsar client consume
> the
> > > > empty
> > > > > > > > topic, the
> > > > > > > > > > > consumer
> > > > > > > > > > > > > > > > schema
> > > > > > > > > > > > > > > > > > > is AUTO_CONSUME, and then whether the
> new
> > > or
> > > > old
> > > > > > > > pulsar
> > > > > > > > > > > client
> > > > > > > > > > > > > > > > consume(i.e.
> > > > > > > > > > > > > > > > > > > schema is AVRO) the topic.
> > > > > > > > > > > > > > > > > > > The broker will return the error
> message
> > as
> > > > > > > > > > > > > > > > IncompatibleSchemaException ("
> > > > > > > > > > > > > > > > > > > Topic does not have a schema to check
> ").
> > > The
> > > > > > bug at
> > > > > > > > > > > issue17354 is
> > > > > > > > > > > > > > > > not
> > > > > > > > > > > > > > > > > > > fixed in this case.
> > > > > > > > > > > > > > > > > > > All the other cases will be normal.
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > Yunze Xu <y...@streamnative.io.invalid
> >
> > > > > > > > 于2022年12月31日周六
> > > > > > > > > > > 20:23写道:
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > Defining `AutoConsume` as -3 is
> somehow
> > > > > > strange.
> > > > > > > > Could
> > > > > > > > > > > you clarify
> > > > > > > > > > > > > > > > if
> > > > > > > > > > > > > > > > > > > > backward compatibility is guaranteed?
> > > i.e.
> > > > if
> > > > > > the
> > > > > > > > new
> > > > > > > > > > > Pulsar client
> > > > > > > > > > > > > > > > > > > > uploaded the AUTO_CONSUME schema to
> the
> > > > broker,
> > > > > > > > can the
> > > > > > > > > > > old Pulsar
> > > > > > > > > > > > > > > > > > > > clients produce or consume the same
> > topic
> > > > > > anymore?
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > > > > > Yunze
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > On Fri, Dec 30, 2022 at 11:32 PM 思楠刘
> <
> > > > > > > > > > > liusinan1...@gmail.com>
> > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > Hi all,
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > I made a PIP to discuss:
> > > > > > > > > > > > > > > >
> https://github.com/apache/pulsar/issues/19113.
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > > > > > > Sinan
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > >
> > > > > >
> > > >
> > >
> >
>

Reply via email to