I have pushed out a PR to revert this change first. https://github.com/apache/pulsar/pull/20123
Please help review. Thanks, Penghui On Tue, Apr 18, 2023 at 11:36 AM PengHui Li <peng...@apache.org> wrote: > > Flink uses the schema to store some kv based properties. If we can > expose all the operations of the topic level metadata store. We can > truly drop the use of uploading the BYTES schema in the Flink > connector. > > After 3.0.0, Pulsar will provide the ability to set properties for a topic > https://github.com/apache/pulsar/pull/17238. > > Thanks, > Penghui > > On Mon, Apr 17, 2023 at 11:44 PM Yufan Sheng <syh...@gmail.com> wrote: > >> >I support reverting the PR first and then looking for a long-term >> solution. >> >> Flink uses the schema to store some kv based properties. If we can >> expose all the operations of the topic level metadata store. We can >> truly drop the use of uploading the BYTES schema in the Flink >> connector. >> >> On Mon, Apr 17, 2023 at 12:12 PM PengHui Li <peng...@apache.org> wrote: >> > >> > I'm sorry. I have provided the wrong description of the changes from >> the PR. >> > The PR has changed the server side, so it's hard for users to ask to >> > upgrade >> > the Flink connector if the pulsar server is upgraded. >> > >> > I support reverting the PR first and then looking for a long-term >> solution. >> > >> > Best, >> > Penghui >> > >> > On Mon, Apr 17, 2023 at 10:16 AM PengHui Li <peng...@apache.org> wrote: >> > >> > > > Did we consider >> > > making a call to upload a Bytes schema a no-op? >> > > >> > > It was a BUG that the PR fixed. >> > > You will not be able to get the uploaded schema as expected. >> > > Please take a look at the details from the GitHub issue. >> > > >> > > What is the challenge for the Flink connector now? >> > > The changes only take effect on the client side. >> > > So, the issue will only happen if they use a new connector. >> > > Upgrading the Pulsar server will not make any impaction? >> > > Is it better to fix the Flink connector? >> > > IMO, the Flink connector should not use admin-api >> > > to upload a BYTE schema. It's a redundant operation. >> > > Pulsar will do nothing. >> > > >> > > What do you think about a long-term solution? >> > > >> > > Regards >> > > - Penghui >> > > >> > > On Sat, Apr 15, 2023 at 12:52 AM Michael Marshall < >> mmarsh...@apache.org> >> > > wrote: >> > > >> > >> I think the primary point is that unless there is a strict need, we >> > >> shouldn't introduce breaking changes to the implementation. Why did >> we >> > >> choose to forbid users from uploading a Bytes schema? Did we consider >> > >> making a call to upload a Bytes schema a no-op? >> > >> >> > >> Thanks, >> > >> Michael >> > >> >> > >> On Fri, Apr 14, 2023 at 10:46 AM SiNan Liu <liusinan1...@gmail.com> >> > >> wrote: >> > >> > >> > >> > 1. I don't know much about flink, but what I see here is that you >> need >> > >> to >> > >> > save a `ResolvedCatalogTable`, which I see has `CatalogTable`, so >> it is >> > >> > used to record the metadata information of the table. >> > >> > **In >> > >> > >> > >> >> org.apache.flink.connector.pulsar.table.catalog.impl.PulsarCatalogSupport#createTable** >> > >> > ```java >> > >> > @Override >> > >> > public void createTable(ObjectPath tablePath, >> ResolvedCatalogTable >> > >> > table) >> > >> > throws PulsarAdminException { >> > >> > // only allow creating table in explict database, the >> topic is >> > >> used >> > >> > to save table >> > >> > // information >> > >> > if (!isExplicitDatabase(tablePath.getDatabaseName())) { >> > >> > throw new CatalogException( >> > >> > String.format( >> > >> > "Can't create explict table under >> pulsar >> > >> > tenant/namespace: %s because it's a native database", >> > >> > tablePath.getDatabaseName())); >> > >> > } >> > >> > >> > >> > String mappedTopic = >> > >> findExplicitTablePlaceholderTopic(tablePath); >> > >> > pulsarAdminTool.createTopic(mappedTopic, 1); >> > >> > >> > >> > // use pulsar schema to store explicit table information >> > >> > try { >> > >> > SchemaInfo schemaInfo = >> > >> > TableSchemaHelper.generateSchemaInfo(table.toProperties()); >> > >> > pulsarAdminTool.uploadSchema(mappedTopic, schemaInfo); >> > >> > } catch (Exception e) { >> > >> > // delete topic if table info cannot be persisted >> > >> > try { >> > >> > pulsarAdminTool.deleteTopic(mappedTopic); >> > >> > } catch (PulsarAdminException ex) { >> > >> > // do nothing >> > >> > } >> > >> > e.printStackTrace(); >> > >> > throw new CatalogException("Can't store table >> metadata"); >> > >> > } >> > >> > } >> > >> > ``` >> > >> > >> > >> > 2. In >> `TableSchemaHelper.generateSchemaInfo(table.toProperties());`: >> > >> > Why must SchemaType.BYTE be used? Is it OK to use SchemaType.JSON? >> > >> > ```java >> > >> > public static SchemaInfo generateSchemaInfo(Map<String, String> >> > >> > properties) >> > >> > throws JsonProcessingException { >> > >> > ObjectMapper mapper = new ObjectMapper(); >> > >> > // json >> > >> > String json = mapper.writeValueAsString(properties); >> > >> > return SchemaInfoImpl.builder() >> > >> > .name("flink_table_schema") >> > >> > //.type(SchemaType.BYTES) >> > >> > //.schema(mapper.writeValueAsBytes(properties)) >> > >> > // SchemaType.JSON >> > >> > .type(SchemaType.JSON) >> > >> > .schema(json.getBytes()) >> > >> > .build(); >> > >> > } >> > >> > ``` >> > >> > >> > >> > 3. Sorry, I'm a newbie. Can an experienced developer help with >> this? >> > >> Thanks! >> > >> > >> > >> > >> > >> > Thanks, >> > >> > sinan >> > >> > >> > >> > >> > >> > Enrico Olivelli <eolive...@gmail.com> 于2023年4月14日周五 22:50写道: >> > >> > >> > >> > > Il giorno ven 14 apr 2023 alle ore 16:48 Christophe Bornet >> > >> > > <bornet.ch...@gmail.com> ha scritto: >> > >> > > > >> > >> > > > The change was not reverted for 3.0.0-RC so in the current >> state the >> > >> > > > Flink Pulsar catalog won't work with Pulsar 3.0. >> > >> > > > To give more details, Flink Pulsar stores Flink Table metadata >> in >> > >> the >> > >> > > > metadata of BYTES schema topics (those topics don't have >> messages, >> > >> > > > only the schema metadata of these topics is used). >> > >> > > >> > >> > > I think that we should not introduce this breaking change, it >> will be >> > >> > > a big pain for the users in the Flink ecosystem >> > >> > > >> > >> > > Enrico >> > >> > > >> > >> > > > >> > >> > > > Le mer. 29 mars 2023 à 09:54, Yufan Sheng <syh...@gmail.com> a >> > >> écrit : >> > >> > > > > >> > >> > > > > Hi SiNan, >> > >> > > > > >> > >> > > > > In the flink world, we don't always rely on the schema >> information >> > >> > > > > provided by Pulsar or other connector systems. Flink >> application >> > >> has >> > >> > > > > its own (de)serialization schema logic, which treats the >> messages >> > >> only >> > >> > > > > in a binary format like a byte array. >> > >> > > > > >> > >> > > > > In flink-connector-pulsar, we only use the schema when the >> users >> > >> want >> > >> > > > > to do some evolution check. Otherwise, we will only send >> messages >> > >> in >> > >> > > > > BYTES schema. >> > >> > > > > >> > >> > > > > On Tue, Mar 28, 2023 at 10:06 AM SiNan Liu < >> > >> liusinan1...@gmail.com> >> > >> > > wrote: >> > >> > > > > > >> > >> > > > > > Hi yufan. >> > >> > > > > > Can you describe a bit the usage scenario of byte schema in >> > >> > > > > > flink-connector-pulsa? >> > >> > > > > > >> > >> > > > > > >> > >> > > > > > Thanks, >> > >> > > > > > sinan >> > >> > > > > > >> > >> > > > > > Yufan Sheng <syh...@gmail.com> 于 2023年3月28日周二 上午9:53写道: >> > >> > > > > > >> > >> > > > > > > As the flink-connector-pulsar developer, I don't want to >> > >> disable >> > >> > > the >> > >> > > > > > > BYTES schema upload. In my opinion, using BYTES schema >> means >> > >> the >> > >> > > users >> > >> > > > > > > want to bypass the schema check and handle the schema >> > >> validation by >> > >> > > > > > > themselves. >> > >> > > > > > > >> > >> > > > > > > On Tue, Mar 28, 2023 at 8:58 AM SiNan Liu < >> > >> liusinan1...@gmail.com> >> > >> > > wrote: >> > >> > > > > > > > >> > >> > > > > > > > Hi, everyone. >> > >> > > > > > > > When a user uploads bytes schema. We can warn the user >> and >> > >> skip >> > >> > > uploading >> > >> > > > > > > > bytes schema. >> > >> > > > > > > > Also check to see if the topic has a schema other than >> > >> bytes. >> > >> > > > > > > > 1. If yes, warn the user that it is not necessary to >> upload >> > >> > > bytes schema. >> > >> > > > > > > > You can subscribe to a topic using bytes schema. >> > >> > > > > > > > 2. If there is no schema, warn the user that the topic >> does >> > >> not >> > >> > > have a >> > >> > > > > > > > schema. The default is bytes schema, and there is no >> need to >> > >> > > upload it. >> > >> > > > > > > > Rather than simply throwing an exception rejecting the >> > >> upload >> > >> > > bytes >> > >> > > > > > > schema. >> > >> > > > > > > > >> > >> > > > > > > > >> > >> > > > > > > > Thanks, >> > >> > > > > > > > sinan >> > >> > > > > > > > >> > >> > > > > > > > >> > >> > > > > > > > Christophe Bornet <bornet.ch...@gmail.com> 于 >> 2023年3月28日周二 >> > >> > > 上午1:15写道: >> > >> > > > > > > > >> > >> > > > > > > > > This change broke the Flink SQL Pulsar connector: >> > >> > > > > > > > > https://github.com/streamnative/flink/issues/270 >> > >> > > > > > > > > So I propose to revert it. >> > >> > > > > > > > > >> > >> > > > > > > > > Le ven. 9 déc. 2022 à 11:57, labuladong < >> > >> > > labulad...@foxmail.com> a >> > >> > > > > > > écrit : >> > >> > > > > > > > > > >> > >> > > > > > > > > > Hi pulsar community, >> > >> > > > > > > > > > >> > >> > > > > > > > > > >> > >> > > > > > > > > > I'd like to discuss the behavior of schema >> uploading, >> > >> for >> > >> > > more >> > >> > > > > > > context >> > >> > > > > > > > > see https://github.com/apache/pulsar/issues/18825 >> > >> > > > > > > > > > >> > >> > > > > > > > > > >> > >> > > > > > > > > > I think that forbidding users to upload `BYTES` >> schema >> > >> is a >> > >> > > > > > > recommended >> > >> > > > > > > > > way to solve this issue. But this may change the >> existing >> > >> > > behavior, so >> > >> > > > > > > do >> > >> > > > > > > > > you have any suggestion about this issue? >> > >> > > > > > > > > > >> > >> > > > > > > > > > >> > >> > > > > > > > > > Thanks, >> > >> > > > > > > > > > Donglai >> > >> > > > > > > > > >> > >> > > > > > > >> > >> > > >> > >> >> > > >> >