I understand the proto version, and I updated the PIP issue, please check
it.

Thanks,
Sinan

PengHui Li <peng...@apache.org> 于2023年1月16日周一 12:08写道:

> > Is there any problem with using a positive value for it? I think there
> is no compatibility issue because the enum value is never used on the
> broker side. Making it positive makes AUTO_CONSUME different with
> other implicit schema types like BYTES, AUTO and AUTO_PUBLISH.
>
> That sounds good to me to use a positive number for `AUTO_CONSUME`
> in the protocol. Maybe 100 or something.
>
> Thanks,
> Penghui
>
> On Mon, Jan 16, 2023 at 10:38 AM Yunze Xu <y...@streamnative.io.invalid>
> wrote:
>
> > Is there any problem with using a positive value for it? I think there
> > is no compatibility issue because the enum value is never used on the
> > broker side. Making it positive makes AUTO_CONSUME different with
> > other implicit schema types like BYTES, AUTO and AUTO_PUBLISH.
> >
> > Thanks,
> > Yunze
> >
> > On Mon, Jan 16, 2023 at 9:36 AM PengHui Li <peng...@apache.org> wrote:
> > >
> > > > This design also has serious compatibility problems between old and
> new
> > > pulsar clients and new and old brokers.
> > >
> > > Could you please explain more details of the compatibility issue if we
> > > leverage
> > > the protocol version?
> > >
> > > > We should not use a negative enum number in PulsarApi.proto. It's
> > > unnatural. If we decide to carry the AUTO_CONSUME schema in a
> > > CommandSubscribe, it should not be treated as a negative schema type.
> > >
> > > IMO, the protocol is defined as Enum. Users are developing based on the
> > > Enum, not the value of the Enum. We need to make sure the value
> > > of the Enum is immutable. It is not required that he must be a positive
> > > number.
> > > Maybe it looks ugly.
> > >
> > > And the protocol is just the API definition, not about which schema
> will
> > be
> > > persistent.
> > > As I understand from the protocol definition, the Schema in the
> subscribe
> > > command is
> > > used to pass the used schema of the consumer. We just make it absent
> > before
> > > for
> > > AUTO_CONSUME schema. We just thought we could make it absent if the
> > consumer
> > > is using AUTO_CONSUME schema. But apparently, this is a problem for
> now.
> > >
> > > I think the easier-to-understand way is for the client to set the
> schema
> > > used when
> > > subscribing or creating the producer. Rather than which ones need to be
> > set
> > > and which
> > > ones do not need to be set.
> > >
> > > Thanks,
> > > Penghui
> > >
> > > On Mon, Jan 9, 2023 at 11:32 AM SiNan Liu <liusinan1...@gmail.com>
> > wrote:
> > >
> > > > This design also has serious compatibility problems between old and
> new
> > > > pulsar clients and new and old brokers.
> > > >
> > > >
> > > > Thanks,
> > > > Sinan
> > > >
> > > >
> > > > PengHui Li <peng...@apache.org> 于 2023年1月9日周一 上午9:51写道:
> > > >
> > > > > Sorry for the late reply.
> > > > >
> > > > > We can leverage the `ProtocolVersion` [1] to handle the
> compatibility
> > > > > issue.
> > > > > It looks like only if the protocol_version >= 21, subscribe with
> the
> > > > > auto_consume schema
> > > > >
> > > > > IMO, both the new key-value of the subscribe command, and a
> specific
> > > > > representative are
> > > > > API changes. It's just that some have modified the definition of
> the
> > API,
> > > > > and some have modified the behavior of the API
> > > > >
> > > > > I prefer the intuitive way. And from the perspective of API-based
> > > > > developers, we should
> > > > > try to provide a simple and clear API with no hidden rules. The
> > client
> > > > just
> > > > > uploads the schema
> > > > > that it has except the byte[] schema. The broker knows how to
> handle
> > the
> > > > > different schemas,
> > > > > such as AUTO_PRODUCE, and AUTO_CONSUME. And this should not be the
> > burden
> > > > > of the
> > > > > client developer to learn the details of the schema implementation.
> > They
> > > > > should work according
> > > > > to the API spec.
> > > > >
> > > > > If we can resolve the compatibility issue with uploading the
> > AUTO_CONSUME
> > > > > schema with
> > > > > subscribe command, do you see any apparent cons?
> > > > >
> > > > > Best,
> > > > > Penghui
> > > > >
> > > > > [1]
> > > > >
> > > > >
> > > >
> >
> https://github.com/apache/pulsar/blob/master/pulsar-common/src/main/proto/PulsarApi.proto#L241-L266
> > > > >
> > > > > On Fri, Jan 6, 2023 at 10:31 PM SiNan Liu <liusinan1...@gmail.com>
> > > > wrote:
> > > > >
> > > > > > Ok, I will update the PIP issue later.
> > > > > >
> > > > > > About my current design.
> > > > > > When the consumer with AUTO_CONSUME schema subscribed to an
> "empty"
> > > > > topic,
> > > > > > the schemaInfo will be null.
> > > > > > ```
> > > > > > public SchemaInfo getSchemaInfo(byte[] schemaVersion) {
> > > > > >     SchemaVersion sv = getSchemaVersion(schemaVersion);
> > > > > >     if (schemaMap.containsKey(sv)) {
> > > > > >         return schemaMap.get(sv).getSchemaInfo();
> > > > > >     }
> > > > > >     return null;
> > > > > >
> > > > > > }
> > > > > >
> > > > > > ```
> > > > > > And checkSchemaCompatibility must be set in
> > > > > > `org.apache.pulsar.common.protocol.Commands#newSubscribe`,
> > > > > > and we need to know that this is an AUTO_CONSUME consumer
> > subscribing.
> > > > So
> > > > > > we should set a "*default*" schemaInfo(schemaType = AUTO_CONSUME)
> > for
> > > > > > AutoConsumeSchema,
> > > > > > this is because schemaInfo is also null when `si.getType` is
> > > > > > SchemaType.BYTES or SchemaType.NONE.
> > > > > > And checkSchemaCompatibility can be set in
> > > > > > `org.apache.pulsar.common.protocol.Commands#newSubscribe`. The
> most
> > > > > > important thing is clearSchema, which does not carry the wrong
> > schema
> > > > to
> > > > > > the broker.
> > > > > >
> > > > > >
> > > > > > Yunze Xu <y...@streamnative.io.invalid> 于2023年1月6日周五 12:57写道:
> > > > > >
> > > > > > > You only need to describe what's added to the PulsarApi.proto,
> > i.e.
> > > > > > > you don't need to paste all definitions of `CommandSubscribe`
> in
> > the
> > > > > > > proposal. Take PIP-54 [1] for example, it only pastes the new
> > field
> > > > > > > `ack_set` and does not paste the whole `MessageIdData`
> > definition.
> > > > > > >
> > > > > > > The implementations section involves too much code and just
> looks
> > > > like
> > > > > > > an actual PR. Take PIP-194 [2] for example, you should only
> talk
> > > > about
> > > > > > > the implementations from a high level.
> > > > > > >
> > > > > > > Let's talk back to your current design, when the schema type is
> > > > > > > AUTO_CONSUME, you clear the schema in CommandSubscribe. It
> seems
> > that
> > > > > > > adding a SchemaInfo to the AutoConsumeSchema is meaningless.
> > > > > > >
> > > > > > > [1]
> > > > > > >
> > > > > >
> > > > >
> > > >
> >
> https://github.com/apache/pulsar/wiki/PIP-54:-Support-acknowledgment-at-batch-index-level#changes
> > > > > > > [2] https://github.com/apache/pulsar/issues/16757
> > > > > > >
> > > > > > > On Fri, Jan 6, 2023 at 12:17 AM SiNan Liu <
> > liusinan1...@gmail.com>
> > > > > > wrote:
> > > > > > > >
> > > > > > > > I just updated the PIP issue and title, you guys can have a
> > look.
> > > > > > > issue19113
> > > > > > > > <https://github.com/apache/pulsar/issues/19113>
> > > > > > > > I added `check_schema_compatibility` in CommandSubscribe,
> and I
> > > > also
> > > > > > made
> > > > > > > > many other changes.
> > > > > > > >
> > > > > > > > Yunze Xu <y...@streamnative.io.invalid> 于2023年1月5日周四
> 14:33写道:
> > > > > > > >
> > > > > > > > > It's not related to the schema itself. When an AUTO_CONSUME
> > > > > consumer
> > > > > > > > > subscribes to a topic, the option tells the broker that
> it's
> > an
> > > > > > > > > AUTO_CONSUME consumer so that the broker should not treat
> it
> > as
> > > > an
> > > > > > > > > active consumer when performing schema compatibility check.
> > If
> > > > > there
> > > > > > > > > is a consumer that also wants to ignore the schema
> > compatibility
> > > > > > check
> > > > > > > > > in future, this option can be reused.
> > > > > > > > >
> > > > > > > > > The other important reason is the breaking change by
> > carrying the
> > > > > > > > > schema info on an AUTO_CONSUMER consumer. (See my
> > explanations in
> > > > > > > > > GitHub and the mail list) If the consumer serves an old
> > version
> > > > > > > > > consumer, the schema could be uploaded into the registry
> and
> > > > other
> > > > > > > > > clients would be affected. So we should keep not carrying
> the
> > > > > schema
> > > > > > > > > info in CommandSubscribe for an AUTO_CONSUMER consumer.
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Yunze
> > > > > > > > >
> > > > > > > > > On Thu, Jan 5, 2023 at 11:55 AM SiNan Liu <
> > > > liusinan1...@gmail.com>
> > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > I have modified pip issue and title last night. Yunze.
> You
> > mean
> > > > > > that
> > > > > > > in
> > > > > > > > > > PulsarApi.proto, take `optional bool
> > is_auto_consume_schema = 6
> > > > > > > [default
> > > > > > > > > =
> > > > > > > > > > false]; ` in CommandSubscribe instead of Schema? But
> > shouldn't
> > > > > > > > > > schema-related stuff be in Schema?
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > > Sinan
> > > > > > > > > >
> > > > > > > > > > Yunze Xu <y...@streamnative.io.invalid> 于 2023年1月5日周四
> > > > 上午12:31写道:
> > > > > > > > > >
> > > > > > > > > > > I found a similar compatibility problem with my closed
> > PR. We
> > > > > > > should
> > > > > > > > > > > not set the `Schema` field for AUTO_CONSUME schema.
> More
> > > > > > > explanations
> > > > > > > > > > > can be found here [1].
> > > > > > > > > > >
> > > > > > > > > > > Instead, we can add an optional field into
> > CommandSubscribe
> > > > to
> > > > > > > > > > > indicate the schema compatibility check is skipped.
> > > > > > > > > > >
> > > > > > > > > > > ```protobuf
> > > > > > > > > > > optional bool check_schema_compatibility = 20 [default
> =
> > > > true]
> > > > > > > > > > > ```
> > > > > > > > > > >
> > > > > > > > > > > Then we can add a relative parameter here:
> > > > > > > > > > >
> > > > > > > > > > > ```java
> > > > > > > > > > > CompletableFuture<Void>
> > > > > > addSchemaIfIdleOrCheckCompatible(SchemaData
> > > > > > > > > > > schema, boolean checkSchemaCompatibility);
> > > > > > > > > > > ```
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > [1]
> > > > > > > > >
> > > > >
> https://github.com/apache/pulsar/pull/17449#issuecomment-1371135700
> > > > > > > > > > >
> > > > > > > > > > > Thanks,
> > > > > > > > > > > Yunze
> > > > > > > > > > >
> > > > > > > > > > > On Wed, Jan 4, 2023 at 11:49 PM Yunze Xu <
> > > > y...@streamnative.io
> > > > > >
> > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > Could you also update the PIP issue? This solution is
> > > > totally
> > > > > > > > > > > > different from your original proposal. Since it still
> > > > > > introduces
> > > > > > > > > > > > changes to `PulsarApi.proto`, it also requires a PIP
> > (we
> > > > can
> > > > > > > reuse
> > > > > > > > > > > > this one).
> > > > > > > > > > > >
> > > > > > > > > > > > ----
> > > > > > > > > > > >
> > > > > > > > > > > > BTW, I tested again about carrying the SchemaInfo in
> > the
> > > > > > > > > > > > CommandSubscribe request. It could break
> compatibility.
> > > > Given
> > > > > > the
> > > > > > > > > > > > following code run against Pulsar standalone 2.8.4:
> > > > > > > > > > > >
> > > > > > > > > > > > ```java
> > > > > > > > > > > >         PulsarClient client =
> > > > > > > > > > > >
> PulsarClient.builder().serviceUrl(serviceUrl).build();
> > > > > > > > > > > >         Consumer<GenericRecord> consumer =
> > > > > > > > > > > > client.newConsumer(Schema.AUTO_CONSUME())
> > > > > > > > > > > >                 .topic(topic)
> > > > > > > > > > > >                 .subscriptionName("sub")
> > > > > > > > > > > >
> >  .subscriptionType(SubscriptionType.Shared)
> > > > > > > > > > > >                 .subscribe();
> > > > > > > > > > > >         Producer<User> producer =
> > > > > > > > > > > client.newProducer(Schema.AVRO(User.class))
> > > > > > > > > > > >                 .topic(topic)
> > > > > > > > > > > >                 .create();
> > > > > > > > > > > > ```
> > > > > > > > > > > >
> > > > > > > > > > > > - If the schema type is 0 in CommandSubscribe, the
> NONE
> > > > > schema
> > > > > > > will
> > > > > > > > > be
> > > > > > > > > > > > persisted and the producer will fail to create due to
> > the
> > > > > > schema
> > > > > > > > > > > > compatibility check.
> > > > > > > > > > > > - If the schema type is -3 (AUTO_CONSUME), it will
> > fail at
> > > > > > > > > subscribe()
> > > > > > > > > > > > with the following error:
> > > > > > > > > > > >
> > > > > > > > > > > > ```
> > > > > > > > > > > > 23:49:10.978 [pulsar-io-18-13] WARN
> > > > > > > > > > > > org.apache.pulsar.broker.service.ServerCnx - [/
> > > > > > 172.23.160.1:5921
> > > > > > > ]
> > > > > > > > > Got
> > > > > > > > > > > > exception java.lang.IllegalStateException: Some
> > required
> > > > > fields
> > > > > > > are
> > > > > > > > > > > > missing
> > > > > > > > > > > >         at
> > > > > > > > > > >
> > > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> >
> org.apache.pulsar.common.api.proto.Schema.checkRequiredFields(Schema.java:337)
> > > > > > > > > > > >         at
> > > > > > > > > > >
> > > > > > >
> > org.apache.pulsar.common.api.proto.Schema.parseFrom(Schema.java:332)
> > > > > > > > > > > >         at
> > > > > > > > > > >
> > > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> >
> org.apache.pulsar.common.api.proto.CommandSubscribe.parseFrom(CommandSubscribe.java:785)
> > > > > > > > > > > >         at
> > > > > > > > > > >
> > > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> >
> org.apache.pulsar.common.api.proto.BaseCommand.parseFrom(BaseCommand.java:2397)
> > > > > > > > > > > > ```
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks,
> > > > > > > > > > > > Yunze
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > On Wed, Jan 4, 2023 at 10:34 PM SiNan Liu <
> > > > > > > liusinan1...@gmail.com>
> > > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > I just implemented add an optional field in the
> > subscribe
> > > > > > > request
> > > > > > > > > and
> > > > > > > > > > > > > compatibility seems to be fine. You guys can have a
> > look
> > > > at
> > > > > > my
> > > > > > > PR (
> > > > > > > > > > > > > https://github.com/apache/pulsar/pull/17449).
> > > > > > > > > > > > >
> > > > > > > > > > > > > Yunze Xu <y...@streamnative.io.invalid>
> 于2023年1月4日周三
> > > > > > 21:31写道:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > > Why can't we upload negative schema types?
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > I want to avoid the changes to existing methods
> > like
> > > > > > > > > > > > > > Commands#getSchemaType, which converts all
> negative
> > > > > schema
> > > > > > > types
> > > > > > > > > to
> > > > > > > > > > > > > > NONE:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > ```java
> > > > > > > > > > > > > > private static Schema.Type
> getSchemaType(SchemaType
> > > > > type) {
> > > > > > > > > > > > > >     if (type.getValue() < 0) {
> > > > > > > > > > > > > >         return Schema.Type.None;
> > > > > > > > > > > > > >     } else {
> > > > > > > > > > > > > >         return
> > Schema.Type.valueOf(type.getValue());
> > > > > > > > > > > > > >     }
> > > > > > > > > > > > > > }
> > > > > > > > > > > > > > ```
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > I guess the above code was written because:
> > > > > > > > > > > > > > 1. NONE schema type means it's not uploaded into
> > the
> > > > > > > registry.
> > > > > > > > > (See
> > > > > > > > > > > #3940
> > > > > > > > > > > > > > [1])
> > > > > > > > > > > > > > 2. There is no existing schema that uses NONE as
> > its
> > > > > schema
> > > > > > > type,
> > > > > > > > > > > i.e.
> > > > > > > > > > > > > > NONE schema is used as something special.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > every different language client will code the
> > special
> > > > > > > logic.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > If other clients follow the behavior of the Java
> > > > client,
> > > > > > they
> > > > > > > > > should
> > > > > > > > > > > > > > also convert negative schemas to NONE currently.
> > > > > Therefore,
> > > > > > > > > changes
> > > > > > > > > > > > > > cannot be avoided. No matter if the semantic of
> > > > > > > `setSchemaType`
> > > > > > > > > is
> > > > > > > > > > > > > > changed, they should follow the Java
> > implementation as
> > > > > > well.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > This will change the meaning of the schema data
> > field
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > The existing definition only defines its meaning
> > to the
> > > > > > AVRO
> > > > > > > and
> > > > > > > > > JSON
> > > > > > > > > > > > > > schema. But from a more general view, the schema
> > data
> > > > > > should
> > > > > > > be
> > > > > > > > > > > > > > something associated with the current schema.
> > Giving it
> > > > > > more
> > > > > > > > > meaning
> > > > > > > > > > > > > > for other schema types is acceptable IMO. For
> > example,
> > > > > the
> > > > > > > schema
> > > > > > > > > > > data
> > > > > > > > > > > > > > field represents the serialized Protobuf
> > descriptor in
> > > > > > > Protobuf
> > > > > > > > > > > Native
> > > > > > > > > > > > > > schema, see `ProtobufNativeSchema#of`:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > ```java
> > > > > > > > > > > > > >
> > > > .schema(ProtobufNativeSchemaUtils.serialize(descriptor))
> > > > > > > > > > > > > > ```
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > [1] https://github.com/apache/pulsar/pull/3940
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > Yunze
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Wed, Jan 4, 2023 at 8:27 PM 丛搏 <
> > bog...@apache.org>
> > > > > > wrote:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > It does not affect the public API so it can
> be
> > > > > > > cherry-picked
> > > > > > > > > > > into old
> > > > > > > > > > > > > > > > branches. The main difference with this
> > proposal is
> > > > > > that
> > > > > > > my
> > > > > > > > > > > solution
> > > > > > > > > > > > > > > > carries the identity info (i.e.
> > `AUTO_CONSUME`) in
> > > > > the
> > > > > > > schema
> > > > > > > > > > > data,
> > > > > > > > > > > > > > > > which is a byte array. The negative schema
> > types
> > > > > should
> > > > > > > not
> > > > > > > > > be
> > > > > > > > > > > exposed
> > > > > > > > > > > > > > > > to users. Adding a field to the subscribe
> > request
> > > > > might
> > > > > > > be
> > > > > > > > > okay
> > > > > > > > > > > but it
> > > > > > > > > > > > > > > > could be unnecessary to cover such a corner
> > case.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > This will change the meaning of the schema data
> > field
> > > > > and
> > > > > > > > > couple
> > > > > > > > > > > the
> > > > > > > > > > > > > > > schema type and schema data. `schema type =
> > NONE` and
> > > > > > > `schema
> > > > > > > > > data
> > > > > > > > > > > =
> > > > > > > > > > > > > > > "AUTO_CONSUME" ` represent `AUTO_ CONSUME`, I
> > think
> > > > > it's
> > > > > > > > > weird. Why
> > > > > > > > > > > > > > > can't we upload negative schema types?
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > It does not affect the public API
> > > > > > > > > > > > > > > upload negative schema types only changes the
> > proto,
> > > > if
> > > > > > > using
> > > > > > > > > > > `schema
> > > > > > > > > > > > > > > type = NONE` and `schema data = "AUTO_CONSUME"
> `,
> > > > every
> > > > > > > > > different
> > > > > > > > > > > > > > > language client will code the special logic.
> This
> > > > > special
> > > > > > > > > logic can
> > > > > > > > > > > > > > > easily be ignored.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > Bo
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Yunze Xu <y...@streamnative.io.invalid>
> > 于2023年1月4日周三
> > > > > > > 17:02写道:
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > I opened a PR to fix this issue:
> > > > > > > > > > > > > > https://github.com/apache/pulsar/pull/19128
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > It does not affect the public API so it can
> be
> > > > > > > cherry-picked
> > > > > > > > > > > into old
> > > > > > > > > > > > > > > > branches. The main difference with this
> > proposal is
> > > > > > that
> > > > > > > my
> > > > > > > > > > > solution
> > > > > > > > > > > > > > > > carries the identity info (i.e.
> > `AUTO_CONSUME`) in
> > > > > the
> > > > > > > schema
> > > > > > > > > > > data,
> > > > > > > > > > > > > > > > which is a byte array. The negative schema
> > types
> > > > > should
> > > > > > > not
> > > > > > > > > be
> > > > > > > > > > > exposed
> > > > > > > > > > > > > > > > to users. Adding a field to the subscribe
> > request
> > > > > might
> > > > > > > be
> > > > > > > > > okay
> > > > > > > > > > > but it
> > > > > > > > > > > > > > > > could be unnecessary to cover such a corner
> > case.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > It might be controversial if schema data
> > should be
> > > > > used
> > > > > > > in
> > > > > > > > > such
> > > > > > > > > > > a way,
> > > > > > > > > > > > > > > > because the original purpose is to represent
> > the
> > > > AVRO
> > > > > > or
> > > > > > > JSON
> > > > > > > > > > > > > > > > definition. However, this semantic is defined
> > just
> > > > > for
> > > > > > > AVRO
> > > > > > > > > or
> > > > > > > > > > > JSON
> > > > > > > > > > > > > > > > schema. IMO, the data field of other schemas
> is
> > > > never
> > > > > > > used
> > > > > > > > > well.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Another solution is to make use of the name
> > field
> > > > of
> > > > > > > schema,
> > > > > > > > > > > which
> > > > > > > > > > > > > > > > might be more natural. I think we can
> continue
> > the
> > > > > > > > > discussion in
> > > > > > > > > > > my
> > > > > > > > > > > > > > > > PR.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > Yunze
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > On Wed, Jan 4, 2023 at 11:07 AM Yunze Xu <
> > > > > > > > > y...@streamnative.io>
> > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Modifying the subscribe request is better
> > than
> > > > > > exposing
> > > > > > > > > > > AUTO_CONSUME
> > > > > > > > > > > > > > > > > schema type IMO. The negative value of a
> > schema
> > > > > type,
> > > > > > > like
> > > > > > > > > > > BYTES,
> > > > > > > > > > > > > > > > > AUTO_PRODUCE, means this schema type should
> > only
> > > > be
> > > > > > > used
> > > > > > > > > > > internally.
> > > > > > > > > > > > > > > > > Adding the negative enum value to the
> Schema
> > > > > > > definition in
> > > > > > > > > > > > > > > > > PulsarApi.proto looks very weird.
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > But I'm still wondering if we can avoid the
> > API
> > > > > > > changes. I
> > > > > > > > > > > will look
> > > > > > > > > > > > > > > > > deeper into this issue.
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > > Yunze
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > On Wed, Jan 4, 2023 at 12:12 AM Enrico
> > Olivelli <
> > > > > > > > > > > eolive...@gmail.com>
> > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > Il Mar 3 Gen 2023, 14:37 Yunze Xu
> > > > > > > > > > > <y...@streamnative.io.invalid>
> > > > > > > > > > > > > > ha scritto:
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > Hi Bo,
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > I got it now. The PIP title sounds
> > ambiguous.
> > > > > > > Using the
> > > > > > > > > > > term
> > > > > > > > > > > > > > "Upload
> > > > > > > > > > > > > > > > > > > xxx SchemaType" sounds like uploading
> the
> > > > > schema
> > > > > > > into
> > > > > > > > > the
> > > > > > > > > > > > > > registry.
> > > > > > > > > > > > > > > > > > > Instead, it should be "carrying schema
> > in the
> > > > > > > request
> > > > > > > > > when
> > > > > > > > > > > > > > subscribing
> > > > > > > > > > > > > > > > > > > with AUTO_CONSUME schema".
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > I agree that we should change the naming
> > and we
> > > > > > > should
> > > > > > > > > > > probably
> > > > > > > > > > > > > > not use a
> > > > > > > > > > > > > > > > > > new Schema type but add an optional field
> > in
> > > > the
> > > > > > > > > subscribe
> > > > > > > > > > > request
> > > > > > > > > > > > > > (and do
> > > > > > > > > > > > > > > > > > not send it if the broker is an old
> > version)
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > Enrico
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > > > > Yunze
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > On Tue, Jan 3, 2023 at 4:56 PM 丛搏 <
> > > > > > > bog...@apache.org>
> > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > Hi, Yunze
> > > > > > > > > > > > > > > > > > > > > What I am concerned about is that
> if
> > the
> > > > > old
> > > > > > > > > clients
> > > > > > > > > > > with
> > > > > > > > > > > > > > other
> > > > > > > > > > > > > > > > > > > > > schemas (i.e. schema is neither
> null
> > nor
> > > > > > > > > AUTO_CONSUME)
> > > > > > > > > > > > > > subscribe to
> > > > > > > > > > > > > > > > > > > > > the topic with AUTO_CONSUME schema,
> > what
> > > > > will
> > > > > > > > > happen?
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > AUTO_CONSUME schema will not store in
> > > > > > > > > > > > > > `SchemaRegistryServiceImpl`, it
> > > > > > > > > > > > > > > > > > > > only represents one consumer with
> > > > > AUTO_CONSUME
> > > > > > > > > schema to
> > > > > > > > > > > > > > subscribe to
> > > > > > > > > > > > > > > > > > > > a topic. If old clients with other
> > schemas
> > > > > > > subscribe
> > > > > > > > > to
> > > > > > > > > > > this
> > > > > > > > > > > > > > topic,
> > > > > > > > > > > > > > > > > > > > Its behavior will not be changed by
> > this
> > > > PIP.
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > What's the schema compatibility
> check
> > > > rule
> > > > > > on a
> > > > > > > > > topic
> > > > > > > > > > > with
> > > > > > > > > > > > > > > > > > > AUTO_CONSUME schema?
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > it's only the consumer schema
> > compatibility
> > > > > > > check,
> > > > > > > > > not on
> > > > > > > > > > > > > > topic. if a
> > > > > > > > > > > > > > > > > > > > consume with AUTO_CONSUME schema will
> > do
> > > > any
> > > > > > > > > > > compatibility
> > > > > > > > > > > > > > check
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > > > > > Bo
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > Yunze Xu
> <y...@streamnative.io.invalid
> > >
> > > > > > > 于2023年1月3日周二
> > > > > > > > > > > 10:16写道:
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > What I am concerned about is that
> if
> > the
> > > > > old
> > > > > > > > > clients
> > > > > > > > > > > with
> > > > > > > > > > > > > > other
> > > > > > > > > > > > > > > > > > > > > schemas (i.e. schema is neither
> null
> > nor
> > > > > > > > > AUTO_CONSUME)
> > > > > > > > > > > > > > subscribe to
> > > > > > > > > > > > > > > > > > > > > the topic with AUTO_CONSUME schema,
> > what
> > > > > will
> > > > > > > > > happen?
> > > > > > > > > > > What's
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > schema compatibility check rule on
> a
> > > > topic
> > > > > > with
> > > > > > > > > > > AUTO_CONSUME
> > > > > > > > > > > > > > schema?
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > > > > > > Yunze
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > On Mon, Jan 2, 2023 at 12:38 AM
> SiNan
> > > > Liu <
> > > > > > > > > > > > > > liusinan1...@gmail.com>
> > > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > 1.Schema.Type and
> > > > > > > > > > > > > > org.apache.pulsar.common.schema.SchemaType value
> > > > > > > > > > > > > > > > > > > should
> > > > > > > > > > > > > > > > > > > > > > be the same.
> > > > > > > > > > > > > > > > > > > > > > 2.These changes do not affect
> > produce
> > > > and
> > > > > > are
> > > > > > > > > only
> > > > > > > > > > > affect
> > > > > > > > > > > > > > consumer
> > > > > > > > > > > > > > > > > > > > > > subscribe behavior.
> > > > > > > > > > > > > > > > > > > > > > 3.backward compatibility:
> > > > > > > > > > > > > > > > > > > > > > (1)In
> > > > > > > > > > > > > >
> > > > > org.apache.pulsar.broker.service.ServerCnx#handleSubscribe.
> > > > > > > > > > > > > > > > > > > > > > if (schema != null &&
> > schema.getType()
> > > > !=
> > > > > > > > > > > > > > SchemaType.AUTO_CONSUME) {
> > > > > > > > > > > > > > > > > > > > > > return
> > > > > > > > > topic.addSchemaIfIdleOrCheckCompatible(schema)
> > > > > > > > > > > > > > > > > > > > > > .thenCompose(v ->
> > > > > topic.subscribe(option));
> > > > > > > > > > > > > > > > > > > > > > } else {
> > > > > > > > > > > > > > > > > > > > > > return topic.subscribe(option);
> > > > > > > > > > > > > > > > > > > > > > }
> > > > > > > > > > > > > > > > > > > > > > For the older pulsar client, the
> > schema
> > > > > is
> > > > > > > null
> > > > > > > > > if
> > > > > > > > > > > > > > AUTO_CONSUME
> > > > > > > > > > > > > > > > > > > consumer
> > > > > > > > > > > > > > > > > > > > > > subscribe to the Topic.
> > > > > > > > > > > > > > > > > > > > > > For the new pulsar client, if
> > > > > AUTO_CONSUME
> > > > > > > > > consumer
> > > > > > > > > > > > > > subscribe the
> > > > > > > > > > > > > > > > > > > Topic,
> > > > > > > > > > > > > > > > > > > > > > then schema is not null and
> > > > > > schema.getType()
> > > > > > > =
> > > > > > > > > > > > > > > > > > > SchemaType.AUTO_CONSUME.
> > > > > > > > > > > > > > > > > > > > > > Both new and old pulsar clients
> > consume
> > > > > the
> > > > > > > > > topic,
> > > > > > > > > > > will
> > > > > > > > > > > > > > return topic.
> > > > > > > > > > > > > > > > > > > > > > subscribe(option).
> > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > (2)In
> > > > > > > > > > > > > >
> > > > > org.apache.pulsar.broker.service.persistent.PersistentTopic
> > > > > > > > > > > > > > > > > > > > > >
> #addSchemaIfIdleOrCheckCompatible.
> > > > > > > > > > > > > > > > > > > > > > @Override
> > > > > > > > > > > > > > > > > > > > > > public CompletableFuture<Void>
> > > > > > > > > > > > > > > > > > >
> > addSchemaIfIdleOrCheckCompatible(SchemaData
> > > > > > > > > > > > > > > > > > > > > > schema) {
> > > > > > > > > > > > > > > > > > > > > > return
> > > > > hasSchema().thenCompose((hasSchema)
> > > > > > > -> {
> > > > > > > > > > > > > > > > > > > > > > int
> > > > numActiveConsumersWithoutAutoSchema =
> > > > > > > > > > > > > > > > > > > subscriptions.values().stream()
> > > > > > > > > > > > > > > > > > > > > > .mapToInt(subscription ->
> > > > > > > > > > > > > > subscription.getConsumers().stream()
> > > > > > > > > > > > > > > > > > > > > > .filter(consumer ->
> > > > > > consumer.getSchemaType()
> > > > > > > !=
> > > > > > > > > > > > > > > > > > > SchemaType.AUTO_CONSUME)
> > > > > > > > > > > > > > > > > > > > > > .toList().size())
> > > > > > > > > > > > > > > > > > > > > > .sum();
> > > > > > > > > > > > > > > > > > > > > > if (hasSchema
> > > > > > > > > > > > > > > > > > > > > > || (!producers.isEmpty())
> > > > > > > > > > > > > > > > > > > > > > ||
> > (numActiveConsumersWithoutAutoSchema
> > > > > !=
> > > > > > 0)
> > > > > > > > > > > > > > > > > > > > > > || (ledger.getTotalSize() != 0))
> {
> > > > > > > > > > > > > > > > > > > > > > return
> > > > > > > checkSchemaCompatibleForConsumer(schema);
> > > > > > > > > > > > > > > > > > > > > > } else {
> > > > > > > > > > > > > > > > > > > > > > return
> > > > > > > > > addSchema(schema).thenCompose(schemaVersion ->
> > > > > > > > > > > > > > > > > > > > > >
> > > > CompletableFuture.completedFuture(null));
> > > > > > > > > > > > > > > > > > > > > > }
> > > > > > > > > > > > > > > > > > > > > > });
> > > > > > > > > > > > > > > > > > > > > > }
> > > > > > > > > > > > > > > > > > > > > > Only in one case will there be a
> > bug.
> > > > > > > > > > > > > > > > > > > > > > First, the old pulsar client
> > consume
> > > > the
> > > > > > > empty
> > > > > > > > > > > topic, the
> > > > > > > > > > > > > > consumer
> > > > > > > > > > > > > > > > > > > schema
> > > > > > > > > > > > > > > > > > > > > > is AUTO_CONSUME, and then whether
> > the
> > > > new
> > > > > > or
> > > > > > > old
> > > > > > > > > > > pulsar
> > > > > > > > > > > > > > client
> > > > > > > > > > > > > > > > > > > consume(i.e.
> > > > > > > > > > > > > > > > > > > > > > schema is AVRO) the topic.
> > > > > > > > > > > > > > > > > > > > > > The broker will return the error
> > > > message
> > > > > as
> > > > > > > > > > > > > > > > > > > IncompatibleSchemaException ("
> > > > > > > > > > > > > > > > > > > > > > Topic does not have a schema to
> > check
> > > > ").
> > > > > > The
> > > > > > > > > bug at
> > > > > > > > > > > > > > issue17354 is
> > > > > > > > > > > > > > > > > > > not
> > > > > > > > > > > > > > > > > > > > > > fixed in this case.
> > > > > > > > > > > > > > > > > > > > > > All the other cases will be
> normal.
> > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > Yunze Xu
> > <y...@streamnative.io.invalid
> > > > >
> > > > > > > > > > > 于2022年12月31日周六
> > > > > > > > > > > > > > 20:23写道:
> > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > Defining `AutoConsume` as -3 is
> > > > somehow
> > > > > > > > > strange.
> > > > > > > > > > > Could
> > > > > > > > > > > > > > you clarify
> > > > > > > > > > > > > > > > > > > if
> > > > > > > > > > > > > > > > > > > > > > > backward compatibility is
> > guaranteed?
> > > > > > i.e.
> > > > > > > if
> > > > > > > > > the
> > > > > > > > > > > new
> > > > > > > > > > > > > > Pulsar client
> > > > > > > > > > > > > > > > > > > > > > > uploaded the AUTO_CONSUME
> schema
> > to
> > > > the
> > > > > > > broker,
> > > > > > > > > > > can the
> > > > > > > > > > > > > > old Pulsar
> > > > > > > > > > > > > > > > > > > > > > > clients produce or consume the
> > same
> > > > > topic
> > > > > > > > > anymore?
> > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > > > > > > > > Yunze
> > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > On Fri, Dec 30, 2022 at 11:32
> PM
> > 思楠刘
> > > > <
> > > > > > > > > > > > > > liusinan1...@gmail.com>
> > > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > Hi all,
> > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > I made a PIP to discuss:
> > > > > > > > > > > > > > > > > > >
> > > > https://github.com/apache/pulsar/issues/19113.
> > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > > > > > > > > > Sinan
> > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> >
>

Reply via email to