Treating the BYTES schema differently (i.e. forbid uploading it) than other
schemas will be confusing to users.

If the BYTES schema is the default schema for a new topic, then how about
saving a BYTES schema in the registry instead of saving nothing?



On Mon, Apr 17, 2023 at 8:42 PM PengHui Li <peng...@apache.org> wrote:

> I have pushed out a PR to revert this change first.
>
> https://github.com/apache/pulsar/pull/20123
>
> Please help review.
>
> Thanks,
> Penghui
>
> On Tue, Apr 18, 2023 at 11:36 AM PengHui Li <peng...@apache.org> wrote:
>
> > > Flink uses the schema to store some kv based properties. If we can
> > expose all the operations of the topic level metadata store. We can
> > truly drop the use of uploading the BYTES schema in the Flink
> > connector.
> >
> > After 3.0.0, Pulsar will provide the ability to set properties for a
> topic
> > https://github.com/apache/pulsar/pull/17238.
> >
> > Thanks,
> > Penghui
> >
> > On Mon, Apr 17, 2023 at 11:44 PM Yufan Sheng <syh...@gmail.com> wrote:
> >
> >> >I support reverting the PR first and then looking for a long-term
> >> solution.
> >>
> >> Flink uses the schema to store some kv based properties. If we can
> >> expose all the operations of the topic level metadata store. We can
> >> truly drop the use of uploading the BYTES schema in the Flink
> >> connector.
> >>
> >> On Mon, Apr 17, 2023 at 12:12 PM PengHui Li <peng...@apache.org> wrote:
> >> >
> >> > I'm sorry. I have provided the wrong description of the changes from
> >> the PR.
> >> > The PR has changed the server side, so it's hard for users to ask to
> >> > upgrade
> >> > the Flink connector if the pulsar server is upgraded.
> >> >
> >> > I support reverting the PR first and then looking for a long-term
> >> solution.
> >> >
> >> > Best,
> >> > Penghui
> >> >
> >> > On Mon, Apr 17, 2023 at 10:16 AM PengHui Li <peng...@apache.org>
> wrote:
> >> >
> >> > > > Did we consider
> >> > > making a call to upload a Bytes schema a no-op?
> >> > >
> >> > > It was a BUG that the PR fixed.
> >> > > You will not be able to get the uploaded schema as expected.
> >> > > Please take a look at the details from the GitHub issue.
> >> > >
> >> > > What is the challenge for the Flink connector now?
> >> > > The changes only take effect on the client side.
> >> > > So, the issue will only happen if they use a new connector.
> >> > > Upgrading the Pulsar server will not make any impaction?
> >> > > Is it better to fix the Flink connector?
> >> > > IMO, the Flink connector should not use admin-api
> >> > > to upload a BYTE schema. It's a redundant operation.
> >> > > Pulsar will do nothing.
> >> > >
> >> > > What do you think about a long-term solution?
> >> > >
> >> > > Regards
> >> > > - Penghui
> >> > >
> >> > > On Sat, Apr 15, 2023 at 12:52 AM Michael Marshall <
> >> mmarsh...@apache.org>
> >> > > wrote:
> >> > >
> >> > >> I think the primary point is that unless there is a strict need, we
> >> > >> shouldn't introduce breaking changes to the implementation. Why did
> >> we
> >> > >> choose to forbid users from uploading a Bytes schema? Did we
> consider
> >> > >> making a call to upload a Bytes schema a no-op?
> >> > >>
> >> > >> Thanks,
> >> > >> Michael
> >> > >>
> >> > >> On Fri, Apr 14, 2023 at 10:46 AM SiNan Liu <liusinan1...@gmail.com
> >
> >> > >> wrote:
> >> > >> >
> >> > >> > 1. I don't know much about flink, but what I see here is that you
> >> need
> >> > >> to
> >> > >> > save a `ResolvedCatalogTable`, which I see has `CatalogTable`, so
> >> it is
> >> > >> > used to record the metadata information of the table.
> >> > >> > **In
> >> > >> >
> >> > >>
> >>
> org.apache.flink.connector.pulsar.table.catalog.impl.PulsarCatalogSupport#createTable**
> >> > >> > ```java
> >> > >> >     @Override
> >> > >> >     public void createTable(ObjectPath tablePath,
> >> ResolvedCatalogTable
> >> > >> > table)
> >> > >> >             throws PulsarAdminException {
> >> > >> >         // only allow creating table in explict database, the
> >> topic is
> >> > >> used
> >> > >> > to save table
> >> > >> >         // information
> >> > >> >         if (!isExplicitDatabase(tablePath.getDatabaseName())) {
> >> > >> >             throw new CatalogException(
> >> > >> >                     String.format(
> >> > >> >                             "Can't create explict table under
> >> pulsar
> >> > >> > tenant/namespace: %s because it's a native database",
> >> > >> >                             tablePath.getDatabaseName()));
> >> > >> >         }
> >> > >> >
> >> > >> >         String mappedTopic =
> >> > >> findExplicitTablePlaceholderTopic(tablePath);
> >> > >> >         pulsarAdminTool.createTopic(mappedTopic, 1);
> >> > >> >
> >> > >> >         // use pulsar schema to store explicit table information
> >> > >> >         try {
> >> > >> >             SchemaInfo schemaInfo =
> >> > >> > TableSchemaHelper.generateSchemaInfo(table.toProperties());
> >> > >> >             pulsarAdminTool.uploadSchema(mappedTopic,
> schemaInfo);
> >> > >> >         } catch (Exception e) {
> >> > >> >             // delete topic if table info cannot be persisted
> >> > >> >             try {
> >> > >> >                 pulsarAdminTool.deleteTopic(mappedTopic);
> >> > >> >             } catch (PulsarAdminException ex) {
> >> > >> >                 // do nothing
> >> > >> >             }
> >> > >> >             e.printStackTrace();
> >> > >> >             throw new CatalogException("Can't store table
> >> metadata");
> >> > >> >         }
> >> > >> >     }
> >> > >> > ```
> >> > >> >
> >> > >> > 2. In
> >> `TableSchemaHelper.generateSchemaInfo(table.toProperties());`:
> >> > >> > Why must SchemaType.BYTE be used? Is it OK to use
> SchemaType.JSON?
> >> > >> > ```java
> >> > >> >     public static SchemaInfo generateSchemaInfo(Map<String,
> String>
> >> > >> > properties)
> >> > >> >             throws JsonProcessingException {
> >> > >> >         ObjectMapper mapper = new ObjectMapper();
> >> > >> >         // json
> >> > >> >         String json = mapper.writeValueAsString(properties);
> >> > >> >         return SchemaInfoImpl.builder()
> >> > >> >                 .name("flink_table_schema")
> >> > >> >                 //.type(SchemaType.BYTES)
> >> > >> >                 //.schema(mapper.writeValueAsBytes(properties))
> >> > >> >                 // SchemaType.JSON
> >> > >> >                 .type(SchemaType.JSON)
> >> > >> >                 .schema(json.getBytes())
> >> > >> >                 .build();
> >> > >> >     }
> >> > >> > ```
> >> > >> >
> >> > >> > 3. Sorry, I'm a newbie. Can an experienced developer help with
> >> this?
> >> > >> Thanks!
> >> > >> >
> >> > >> >
> >> > >> > Thanks,
> >> > >> > sinan
> >> > >> >
> >> > >> >
> >> > >> > Enrico Olivelli <eolive...@gmail.com> 于2023年4月14日周五 22:50写道:
> >> > >> >
> >> > >> > > Il giorno ven 14 apr 2023 alle ore 16:48 Christophe Bornet
> >> > >> > > <bornet.ch...@gmail.com> ha scritto:
> >> > >> > > >
> >> > >> > > > The change was not reverted for 3.0.0-RC so in the current
> >> state the
> >> > >> > > > Flink Pulsar catalog won't work with Pulsar 3.0.
> >> > >> > > > To give more details, Flink Pulsar stores Flink Table
> metadata
> >> in
> >> > >> the
> >> > >> > > > metadata of BYTES schema topics (those topics don't have
> >> messages,
> >> > >> > > > only the schema metadata of these topics is used).
> >> > >> > >
> >> > >> > > I think that we should not introduce this breaking change, it
> >> will be
> >> > >> > > a big pain for the users in the Flink ecosystem
> >> > >> > >
> >> > >> > > Enrico
> >> > >> > >
> >> > >> > > >
> >> > >> > > > Le mer. 29 mars 2023 à 09:54, Yufan Sheng <syh...@gmail.com>
> a
> >> > >> écrit :
> >> > >> > > > >
> >> > >> > > > > Hi SiNan,
> >> > >> > > > >
> >> > >> > > > > In the flink world, we don't always rely on the schema
> >> information
> >> > >> > > > > provided by Pulsar or other connector systems. Flink
> >> application
> >> > >> has
> >> > >> > > > > its own (de)serialization schema logic, which treats the
> >> messages
> >> > >> only
> >> > >> > > > > in a binary format like a byte array.
> >> > >> > > > >
> >> > >> > > > > In flink-connector-pulsar, we only use the schema when the
> >> users
> >> > >> want
> >> > >> > > > > to do some evolution check. Otherwise, we will only send
> >> messages
> >> > >> in
> >> > >> > > > > BYTES schema.
> >> > >> > > > >
> >> > >> > > > > On Tue, Mar 28, 2023 at 10:06 AM SiNan Liu <
> >> > >> liusinan1...@gmail.com>
> >> > >> > > wrote:
> >> > >> > > > > >
> >> > >> > > > > > Hi yufan.
> >> > >> > > > > > Can you describe a bit the usage scenario of byte schema
> in
> >> > >> > > > > > flink-connector-pulsa?
> >> > >> > > > > >
> >> > >> > > > > >
> >> > >> > > > > > Thanks,
> >> > >> > > > > > sinan
> >> > >> > > > > >
> >> > >> > > > > > Yufan Sheng <syh...@gmail.com> 于 2023年3月28日周二 上午9:53写道:
> >> > >> > > > > >
> >> > >> > > > > > > As the flink-connector-pulsar developer, I don't want
> to
> >> > >> disable
> >> > >> > > the
> >> > >> > > > > > > BYTES schema upload. In my opinion, using BYTES schema
> >> means
> >> > >> the
> >> > >> > > users
> >> > >> > > > > > > want to bypass the schema check and handle the schema
> >> > >> validation by
> >> > >> > > > > > > themselves.
> >> > >> > > > > > >
> >> > >> > > > > > > On Tue, Mar 28, 2023 at 8:58 AM SiNan Liu <
> >> > >> liusinan1...@gmail.com>
> >> > >> > > wrote:
> >> > >> > > > > > > >
> >> > >> > > > > > > > Hi, everyone.
> >> > >> > > > > > > > When a user uploads bytes schema. We can warn the
> user
> >> and
> >> > >> skip
> >> > >> > > uploading
> >> > >> > > > > > > > bytes schema.
> >> > >> > > > > > > > Also check to see if the topic has a schema other
> than
> >> > >> bytes.
> >> > >> > > > > > > > 1. If yes, warn the user that it is not necessary to
> >> upload
> >> > >> > > bytes schema.
> >> > >> > > > > > > > You can subscribe to a topic using bytes schema.
> >> > >> > > > > > > > 2. If there is no schema, warn the user that the
> topic
> >> does
> >> > >> not
> >> > >> > > have a
> >> > >> > > > > > > > schema. The default is bytes schema, and there is no
> >> need to
> >> > >> > > upload it.
> >> > >> > > > > > > > Rather than simply throwing an exception rejecting
> the
> >> > >> upload
> >> > >> > > bytes
> >> > >> > > > > > > schema.
> >> > >> > > > > > > >
> >> > >> > > > > > > >
> >> > >> > > > > > > > Thanks,
> >> > >> > > > > > > > sinan
> >> > >> > > > > > > >
> >> > >> > > > > > > >
> >> > >> > > > > > > > Christophe Bornet <bornet.ch...@gmail.com> 于
> >> 2023年3月28日周二
> >> > >> > > 上午1:15写道:
> >> > >> > > > > > > >
> >> > >> > > > > > > > > This change broke the Flink SQL Pulsar connector:
> >> > >> > > > > > > > > https://github.com/streamnative/flink/issues/270
> >> > >> > > > > > > > > So I propose to revert it.
> >> > >> > > > > > > > >
> >> > >> > > > > > > > > Le ven. 9 déc. 2022 à 11:57, labuladong <
> >> > >> > > labulad...@foxmail.com> a
> >> > >> > > > > > > écrit :
> >> > >> > > > > > > > > >
> >> > >> > > > > > > > > > Hi pulsar community,
> >> > >> > > > > > > > > >
> >> > >> > > > > > > > > >
> >> > >> > > > > > > > > > I'd like to discuss the behavior of schema
> >> uploading,
> >> > >> for
> >> > >> > > more
> >> > >> > > > > > > context
> >> > >> > > > > > > > > see https://github.com/apache/pulsar/issues/18825
> >> > >> > > > > > > > > >
> >> > >> > > > > > > > > >
> >> > >> > > > > > > > > > I think that forbidding users to upload `BYTES`
> >> schema
> >> > >> is a
> >> > >> > > > > > > recommended
> >> > >> > > > > > > > > way to solve this issue. But this may change the
> >> existing
> >> > >> > > behavior, so
> >> > >> > > > > > > do
> >> > >> > > > > > > > > you have any suggestion about this issue?
> >> > >> > > > > > > > > >
> >> > >> > > > > > > > > >
> >> > >> > > > > > > > > > Thanks,
> >> > >> > > > > > > > > > Donglai
> >> > >> > > > > > > > >
> >> > >> > > > > > >
> >> > >> > >
> >> > >>
> >> > >
> >>
> >
>


-- 
Best Regards,
Neng

Reply via email to