I think the primary point is that unless there is a strict need, we
shouldn't introduce breaking changes to the implementation. Why did we
choose to forbid users from uploading a Bytes schema? Did we consider
making a call to upload a Bytes schema a no-op?

Thanks,
Michael

On Fri, Apr 14, 2023 at 10:46 AM SiNan Liu <liusinan1...@gmail.com> wrote:
>
> 1. I don't know much about flink, but what I see here is that you need to
> save a `ResolvedCatalogTable`, which I see has `CatalogTable`, so it is
> used to record the metadata information of the table.
> **In
> org.apache.flink.connector.pulsar.table.catalog.impl.PulsarCatalogSupport#createTable**
> ```java
>     @Override
>     public void createTable(ObjectPath tablePath, ResolvedCatalogTable
> table)
>             throws PulsarAdminException {
>         // only allow creating table in explict database, the topic is used
> to save table
>         // information
>         if (!isExplicitDatabase(tablePath.getDatabaseName())) {
>             throw new CatalogException(
>                     String.format(
>                             "Can't create explict table under pulsar
> tenant/namespace: %s because it's a native database",
>                             tablePath.getDatabaseName()));
>         }
>
>         String mappedTopic = findExplicitTablePlaceholderTopic(tablePath);
>         pulsarAdminTool.createTopic(mappedTopic, 1);
>
>         // use pulsar schema to store explicit table information
>         try {
>             SchemaInfo schemaInfo =
> TableSchemaHelper.generateSchemaInfo(table.toProperties());
>             pulsarAdminTool.uploadSchema(mappedTopic, schemaInfo);
>         } catch (Exception e) {
>             // delete topic if table info cannot be persisted
>             try {
>                 pulsarAdminTool.deleteTopic(mappedTopic);
>             } catch (PulsarAdminException ex) {
>                 // do nothing
>             }
>             e.printStackTrace();
>             throw new CatalogException("Can't store table metadata");
>         }
>     }
> ```
>
> 2. In `TableSchemaHelper.generateSchemaInfo(table.toProperties());`:
> Why must SchemaType.BYTE be used? Is it OK to use SchemaType.JSON?
> ```java
>     public static SchemaInfo generateSchemaInfo(Map<String, String>
> properties)
>             throws JsonProcessingException {
>         ObjectMapper mapper = new ObjectMapper();
>         // json
>         String json = mapper.writeValueAsString(properties);
>         return SchemaInfoImpl.builder()
>                 .name("flink_table_schema")
>                 //.type(SchemaType.BYTES)
>                 //.schema(mapper.writeValueAsBytes(properties))
>                 // SchemaType.JSON
>                 .type(SchemaType.JSON)
>                 .schema(json.getBytes())
>                 .build();
>     }
> ```
>
> 3. Sorry, I'm a newbie. Can an experienced developer help with this? Thanks!
>
>
> Thanks,
> sinan
>
>
> Enrico Olivelli <eolive...@gmail.com> 于2023年4月14日周五 22:50写道:
>
> > Il giorno ven 14 apr 2023 alle ore 16:48 Christophe Bornet
> > <bornet.ch...@gmail.com> ha scritto:
> > >
> > > The change was not reverted for 3.0.0-RC so in the current state the
> > > Flink Pulsar catalog won't work with Pulsar 3.0.
> > > To give more details, Flink Pulsar stores Flink Table metadata in the
> > > metadata of BYTES schema topics (those topics don't have messages,
> > > only the schema metadata of these topics is used).
> >
> > I think that we should not introduce this breaking change, it will be
> > a big pain for the users in the Flink ecosystem
> >
> > Enrico
> >
> > >
> > > Le mer. 29 mars 2023 à 09:54, Yufan Sheng <syh...@gmail.com> a écrit :
> > > >
> > > > Hi SiNan,
> > > >
> > > > In the flink world, we don't always rely on the schema information
> > > > provided by Pulsar or other connector systems. Flink application has
> > > > its own (de)serialization schema logic, which treats the messages only
> > > > in a binary format like a byte array.
> > > >
> > > > In flink-connector-pulsar, we only use the schema when the users want
> > > > to do some evolution check. Otherwise, we will only send messages in
> > > > BYTES schema.
> > > >
> > > > On Tue, Mar 28, 2023 at 10:06 AM SiNan Liu <liusinan1...@gmail.com>
> > wrote:
> > > > >
> > > > > Hi yufan.
> > > > > Can you describe a bit the usage scenario of byte schema in
> > > > > flink-connector-pulsa?
> > > > >
> > > > >
> > > > > Thanks,
> > > > > sinan
> > > > >
> > > > > Yufan Sheng <syh...@gmail.com> 于 2023年3月28日周二 上午9:53写道:
> > > > >
> > > > > > As the flink-connector-pulsar developer, I don't want to disable
> > the
> > > > > > BYTES schema upload. In my opinion, using BYTES schema means the
> > users
> > > > > > want to bypass the schema check and handle the schema validation by
> > > > > > themselves.
> > > > > >
> > > > > > On Tue, Mar 28, 2023 at 8:58 AM SiNan Liu <liusinan1...@gmail.com>
> > wrote:
> > > > > > >
> > > > > > > Hi, everyone.
> > > > > > > When a user uploads bytes schema. We can warn the user and skip
> > uploading
> > > > > > > bytes schema.
> > > > > > > Also check to see if the topic has a schema other than bytes.
> > > > > > > 1. If yes, warn the user that it is not necessary to upload
> > bytes schema.
> > > > > > > You can subscribe to a topic using bytes schema.
> > > > > > > 2. If there is no schema, warn the user that the topic does not
> > have a
> > > > > > > schema. The default is bytes schema, and there is no need to
> > upload it.
> > > > > > > Rather than simply throwing an exception rejecting the upload
> > bytes
> > > > > > schema.
> > > > > > >
> > > > > > >
> > > > > > > Thanks,
> > > > > > > sinan
> > > > > > >
> > > > > > >
> > > > > > > Christophe Bornet <bornet.ch...@gmail.com> 于 2023年3月28日周二
> > 上午1:15写道:
> > > > > > >
> > > > > > > > This change broke the Flink SQL Pulsar connector:
> > > > > > > > https://github.com/streamnative/flink/issues/270
> > > > > > > > So I propose to revert it.
> > > > > > > >
> > > > > > > > Le ven. 9 déc. 2022 à 11:57, labuladong <
> > labulad...@foxmail.com> a
> > > > > > écrit :
> > > > > > > > >
> > > > > > > > > Hi pulsar community,
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > I'd like to discuss the behavior of schema uploading, for
> > more
> > > > > > context
> > > > > > > > see https://github.com/apache/pulsar/issues/18825
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > I think that forbidding users to upload `BYTES` schema is a
> > > > > > recommended
> > > > > > > > way to solve this issue. But this may change the existing
> > behavior, so
> > > > > > do
> > > > > > > > you have any suggestion about this issue?
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Donglai
> > > > > > > >
> > > > > >
> >

Reply via email to