I have pushed out a PR to revert this change first.

https://github.com/apache/pulsar/pull/20123

Please help review.

Thanks,
Penghui

On Tue, Apr 18, 2023 at 11:36 AM PengHui Li <peng...@apache.org> wrote:

> > Flink uses the schema to store some kv based properties. If we can
> expose all the operations of the topic level metadata store. We can
> truly drop the use of uploading the BYTES schema in the Flink
> connector.
>
> After 3.0.0, Pulsar will provide the ability to set properties for a topic
> https://github.com/apache/pulsar/pull/17238.
>
> Thanks,
> Penghui
>
> On Mon, Apr 17, 2023 at 11:44 PM Yufan Sheng <syh...@gmail.com> wrote:
>
>> >I support reverting the PR first and then looking for a long-term
>> solution.
>>
>> Flink uses the schema to store some kv based properties. If we can
>> expose all the operations of the topic level metadata store. We can
>> truly drop the use of uploading the BYTES schema in the Flink
>> connector.
>>
>> On Mon, Apr 17, 2023 at 12:12 PM PengHui Li <peng...@apache.org> wrote:
>> >
>> > I'm sorry. I have provided the wrong description of the changes from
>> the PR.
>> > The PR has changed the server side, so it's hard for users to ask to
>> > upgrade
>> > the Flink connector if the pulsar server is upgraded.
>> >
>> > I support reverting the PR first and then looking for a long-term
>> solution.
>> >
>> > Best,
>> > Penghui
>> >
>> > On Mon, Apr 17, 2023 at 10:16 AM PengHui Li <peng...@apache.org> wrote:
>> >
>> > > > Did we consider
>> > > making a call to upload a Bytes schema a no-op?
>> > >
>> > > It was a BUG that the PR fixed.
>> > > You will not be able to get the uploaded schema as expected.
>> > > Please take a look at the details from the GitHub issue.
>> > >
>> > > What is the challenge for the Flink connector now?
>> > > The changes only take effect on the client side.
>> > > So, the issue will only happen if they use a new connector.
>> > > Upgrading the Pulsar server will not make any impaction?
>> > > Is it better to fix the Flink connector?
>> > > IMO, the Flink connector should not use admin-api
>> > > to upload a BYTE schema. It's a redundant operation.
>> > > Pulsar will do nothing.
>> > >
>> > > What do you think about a long-term solution?
>> > >
>> > > Regards
>> > > - Penghui
>> > >
>> > > On Sat, Apr 15, 2023 at 12:52 AM Michael Marshall <
>> mmarsh...@apache.org>
>> > > wrote:
>> > >
>> > >> I think the primary point is that unless there is a strict need, we
>> > >> shouldn't introduce breaking changes to the implementation. Why did
>> we
>> > >> choose to forbid users from uploading a Bytes schema? Did we consider
>> > >> making a call to upload a Bytes schema a no-op?
>> > >>
>> > >> Thanks,
>> > >> Michael
>> > >>
>> > >> On Fri, Apr 14, 2023 at 10:46 AM SiNan Liu <liusinan1...@gmail.com>
>> > >> wrote:
>> > >> >
>> > >> > 1. I don't know much about flink, but what I see here is that you
>> need
>> > >> to
>> > >> > save a `ResolvedCatalogTable`, which I see has `CatalogTable`, so
>> it is
>> > >> > used to record the metadata information of the table.
>> > >> > **In
>> > >> >
>> > >>
>> org.apache.flink.connector.pulsar.table.catalog.impl.PulsarCatalogSupport#createTable**
>> > >> > ```java
>> > >> >     @Override
>> > >> >     public void createTable(ObjectPath tablePath,
>> ResolvedCatalogTable
>> > >> > table)
>> > >> >             throws PulsarAdminException {
>> > >> >         // only allow creating table in explict database, the
>> topic is
>> > >> used
>> > >> > to save table
>> > >> >         // information
>> > >> >         if (!isExplicitDatabase(tablePath.getDatabaseName())) {
>> > >> >             throw new CatalogException(
>> > >> >                     String.format(
>> > >> >                             "Can't create explict table under
>> pulsar
>> > >> > tenant/namespace: %s because it's a native database",
>> > >> >                             tablePath.getDatabaseName()));
>> > >> >         }
>> > >> >
>> > >> >         String mappedTopic =
>> > >> findExplicitTablePlaceholderTopic(tablePath);
>> > >> >         pulsarAdminTool.createTopic(mappedTopic, 1);
>> > >> >
>> > >> >         // use pulsar schema to store explicit table information
>> > >> >         try {
>> > >> >             SchemaInfo schemaInfo =
>> > >> > TableSchemaHelper.generateSchemaInfo(table.toProperties());
>> > >> >             pulsarAdminTool.uploadSchema(mappedTopic, schemaInfo);
>> > >> >         } catch (Exception e) {
>> > >> >             // delete topic if table info cannot be persisted
>> > >> >             try {
>> > >> >                 pulsarAdminTool.deleteTopic(mappedTopic);
>> > >> >             } catch (PulsarAdminException ex) {
>> > >> >                 // do nothing
>> > >> >             }
>> > >> >             e.printStackTrace();
>> > >> >             throw new CatalogException("Can't store table
>> metadata");
>> > >> >         }
>> > >> >     }
>> > >> > ```
>> > >> >
>> > >> > 2. In
>> `TableSchemaHelper.generateSchemaInfo(table.toProperties());`:
>> > >> > Why must SchemaType.BYTE be used? Is it OK to use SchemaType.JSON?
>> > >> > ```java
>> > >> >     public static SchemaInfo generateSchemaInfo(Map<String, String>
>> > >> > properties)
>> > >> >             throws JsonProcessingException {
>> > >> >         ObjectMapper mapper = new ObjectMapper();
>> > >> >         // json
>> > >> >         String json = mapper.writeValueAsString(properties);
>> > >> >         return SchemaInfoImpl.builder()
>> > >> >                 .name("flink_table_schema")
>> > >> >                 //.type(SchemaType.BYTES)
>> > >> >                 //.schema(mapper.writeValueAsBytes(properties))
>> > >> >                 // SchemaType.JSON
>> > >> >                 .type(SchemaType.JSON)
>> > >> >                 .schema(json.getBytes())
>> > >> >                 .build();
>> > >> >     }
>> > >> > ```
>> > >> >
>> > >> > 3. Sorry, I'm a newbie. Can an experienced developer help with
>> this?
>> > >> Thanks!
>> > >> >
>> > >> >
>> > >> > Thanks,
>> > >> > sinan
>> > >> >
>> > >> >
>> > >> > Enrico Olivelli <eolive...@gmail.com> 于2023年4月14日周五 22:50写道:
>> > >> >
>> > >> > > Il giorno ven 14 apr 2023 alle ore 16:48 Christophe Bornet
>> > >> > > <bornet.ch...@gmail.com> ha scritto:
>> > >> > > >
>> > >> > > > The change was not reverted for 3.0.0-RC so in the current
>> state the
>> > >> > > > Flink Pulsar catalog won't work with Pulsar 3.0.
>> > >> > > > To give more details, Flink Pulsar stores Flink Table metadata
>> in
>> > >> the
>> > >> > > > metadata of BYTES schema topics (those topics don't have
>> messages,
>> > >> > > > only the schema metadata of these topics is used).
>> > >> > >
>> > >> > > I think that we should not introduce this breaking change, it
>> will be
>> > >> > > a big pain for the users in the Flink ecosystem
>> > >> > >
>> > >> > > Enrico
>> > >> > >
>> > >> > > >
>> > >> > > > Le mer. 29 mars 2023 à 09:54, Yufan Sheng <syh...@gmail.com> a
>> > >> écrit :
>> > >> > > > >
>> > >> > > > > Hi SiNan,
>> > >> > > > >
>> > >> > > > > In the flink world, we don't always rely on the schema
>> information
>> > >> > > > > provided by Pulsar or other connector systems. Flink
>> application
>> > >> has
>> > >> > > > > its own (de)serialization schema logic, which treats the
>> messages
>> > >> only
>> > >> > > > > in a binary format like a byte array.
>> > >> > > > >
>> > >> > > > > In flink-connector-pulsar, we only use the schema when the
>> users
>> > >> want
>> > >> > > > > to do some evolution check. Otherwise, we will only send
>> messages
>> > >> in
>> > >> > > > > BYTES schema.
>> > >> > > > >
>> > >> > > > > On Tue, Mar 28, 2023 at 10:06 AM SiNan Liu <
>> > >> liusinan1...@gmail.com>
>> > >> > > wrote:
>> > >> > > > > >
>> > >> > > > > > Hi yufan.
>> > >> > > > > > Can you describe a bit the usage scenario of byte schema in
>> > >> > > > > > flink-connector-pulsa?
>> > >> > > > > >
>> > >> > > > > >
>> > >> > > > > > Thanks,
>> > >> > > > > > sinan
>> > >> > > > > >
>> > >> > > > > > Yufan Sheng <syh...@gmail.com> 于 2023年3月28日周二 上午9:53写道:
>> > >> > > > > >
>> > >> > > > > > > As the flink-connector-pulsar developer, I don't want to
>> > >> disable
>> > >> > > the
>> > >> > > > > > > BYTES schema upload. In my opinion, using BYTES schema
>> means
>> > >> the
>> > >> > > users
>> > >> > > > > > > want to bypass the schema check and handle the schema
>> > >> validation by
>> > >> > > > > > > themselves.
>> > >> > > > > > >
>> > >> > > > > > > On Tue, Mar 28, 2023 at 8:58 AM SiNan Liu <
>> > >> liusinan1...@gmail.com>
>> > >> > > wrote:
>> > >> > > > > > > >
>> > >> > > > > > > > Hi, everyone.
>> > >> > > > > > > > When a user uploads bytes schema. We can warn the user
>> and
>> > >> skip
>> > >> > > uploading
>> > >> > > > > > > > bytes schema.
>> > >> > > > > > > > Also check to see if the topic has a schema other than
>> > >> bytes.
>> > >> > > > > > > > 1. If yes, warn the user that it is not necessary to
>> upload
>> > >> > > bytes schema.
>> > >> > > > > > > > You can subscribe to a topic using bytes schema.
>> > >> > > > > > > > 2. If there is no schema, warn the user that the topic
>> does
>> > >> not
>> > >> > > have a
>> > >> > > > > > > > schema. The default is bytes schema, and there is no
>> need to
>> > >> > > upload it.
>> > >> > > > > > > > Rather than simply throwing an exception rejecting the
>> > >> upload
>> > >> > > bytes
>> > >> > > > > > > schema.
>> > >> > > > > > > >
>> > >> > > > > > > >
>> > >> > > > > > > > Thanks,
>> > >> > > > > > > > sinan
>> > >> > > > > > > >
>> > >> > > > > > > >
>> > >> > > > > > > > Christophe Bornet <bornet.ch...@gmail.com> 于
>> 2023年3月28日周二
>> > >> > > 上午1:15写道:
>> > >> > > > > > > >
>> > >> > > > > > > > > This change broke the Flink SQL Pulsar connector:
>> > >> > > > > > > > > https://github.com/streamnative/flink/issues/270
>> > >> > > > > > > > > So I propose to revert it.
>> > >> > > > > > > > >
>> > >> > > > > > > > > Le ven. 9 déc. 2022 à 11:57, labuladong <
>> > >> > > labulad...@foxmail.com> a
>> > >> > > > > > > écrit :
>> > >> > > > > > > > > >
>> > >> > > > > > > > > > Hi pulsar community,
>> > >> > > > > > > > > >
>> > >> > > > > > > > > >
>> > >> > > > > > > > > > I'd like to discuss the behavior of schema
>> uploading,
>> > >> for
>> > >> > > more
>> > >> > > > > > > context
>> > >> > > > > > > > > see https://github.com/apache/pulsar/issues/18825
>> > >> > > > > > > > > >
>> > >> > > > > > > > > >
>> > >> > > > > > > > > > I think that forbidding users to upload `BYTES`
>> schema
>> > >> is a
>> > >> > > > > > > recommended
>> > >> > > > > > > > > way to solve this issue. But this may change the
>> existing
>> > >> > > behavior, so
>> > >> > > > > > > do
>> > >> > > > > > > > > you have any suggestion about this issue?
>> > >> > > > > > > > > >
>> > >> > > > > > > > > >
>> > >> > > > > > > > > > Thanks,
>> > >> > > > > > > > > > Donglai
>> > >> > > > > > > > >
>> > >> > > > > > >
>> > >> > >
>> > >>
>> > >
>>
>

Reply via email to