1. I don't know much about flink, but what I see here is that you need to
save a `ResolvedCatalogTable`, which I see has `CatalogTable`, so it is
used to record the metadata information of the table.
**In
org.apache.flink.connector.pulsar.table.catalog.impl.PulsarCatalogSupport#createTable**
```java
@Override
public void createTable(ObjectPath tablePath, ResolvedCatalogTable
table)
throws PulsarAdminException {
// only allow creating table in explict database, the topic is used
to save table
// information
if (!isExplicitDatabase(tablePath.getDatabaseName())) {
throw new CatalogException(
String.format(
"Can't create explict table under pulsar
tenant/namespace: %s because it's a native database",
tablePath.getDatabaseName()));
}
String mappedTopic = findExplicitTablePlaceholderTopic(tablePath);
pulsarAdminTool.createTopic(mappedTopic, 1);
// use pulsar schema to store explicit table information
try {
SchemaInfo schemaInfo =
TableSchemaHelper.generateSchemaInfo(table.toProperties());
pulsarAdminTool.uploadSchema(mappedTopic, schemaInfo);
} catch (Exception e) {
// delete topic if table info cannot be persisted
try {
pulsarAdminTool.deleteTopic(mappedTopic);
} catch (PulsarAdminException ex) {
// do nothing
}
e.printStackTrace();
throw new CatalogException("Can't store table metadata");
}
}
```
2. In `TableSchemaHelper.generateSchemaInfo(table.toProperties());`:
Why must SchemaType.BYTE be used? Is it OK to use SchemaType.JSON?
```java
public static SchemaInfo generateSchemaInfo(Map<String, String>
properties)
throws JsonProcessingException {
ObjectMapper mapper = new ObjectMapper();
// json
String json = mapper.writeValueAsString(properties);
return SchemaInfoImpl.builder()
.name("flink_table_schema")
//.type(SchemaType.BYTES)
//.schema(mapper.writeValueAsBytes(properties))
// SchemaType.JSON
.type(SchemaType.JSON)
.schema(json.getBytes())
.build();
}
```
3. Sorry, I'm a newbie. Can an experienced developer help with this? Thanks!
Thanks,
sinan
Enrico Olivelli <[email protected]> 于2023年4月14日周五 22:50写道:
> Il giorno ven 14 apr 2023 alle ore 16:48 Christophe Bornet
> <[email protected]> ha scritto:
> >
> > The change was not reverted for 3.0.0-RC so in the current state the
> > Flink Pulsar catalog won't work with Pulsar 3.0.
> > To give more details, Flink Pulsar stores Flink Table metadata in the
> > metadata of BYTES schema topics (those topics don't have messages,
> > only the schema metadata of these topics is used).
>
> I think that we should not introduce this breaking change, it will be
> a big pain for the users in the Flink ecosystem
>
> Enrico
>
> >
> > Le mer. 29 mars 2023 à 09:54, Yufan Sheng <[email protected]> a écrit :
> > >
> > > Hi SiNan,
> > >
> > > In the flink world, we don't always rely on the schema information
> > > provided by Pulsar or other connector systems. Flink application has
> > > its own (de)serialization schema logic, which treats the messages only
> > > in a binary format like a byte array.
> > >
> > > In flink-connector-pulsar, we only use the schema when the users want
> > > to do some evolution check. Otherwise, we will only send messages in
> > > BYTES schema.
> > >
> > > On Tue, Mar 28, 2023 at 10:06 AM SiNan Liu <[email protected]>
> wrote:
> > > >
> > > > Hi yufan.
> > > > Can you describe a bit the usage scenario of byte schema in
> > > > flink-connector-pulsa?
> > > >
> > > >
> > > > Thanks,
> > > > sinan
> > > >
> > > > Yufan Sheng <[email protected]> 于 2023年3月28日周二 上午9:53写道:
> > > >
> > > > > As the flink-connector-pulsar developer, I don't want to disable
> the
> > > > > BYTES schema upload. In my opinion, using BYTES schema means the
> users
> > > > > want to bypass the schema check and handle the schema validation by
> > > > > themselves.
> > > > >
> > > > > On Tue, Mar 28, 2023 at 8:58 AM SiNan Liu <[email protected]>
> wrote:
> > > > > >
> > > > > > Hi, everyone.
> > > > > > When a user uploads bytes schema. We can warn the user and skip
> uploading
> > > > > > bytes schema.
> > > > > > Also check to see if the topic has a schema other than bytes.
> > > > > > 1. If yes, warn the user that it is not necessary to upload
> bytes schema.
> > > > > > You can subscribe to a topic using bytes schema.
> > > > > > 2. If there is no schema, warn the user that the topic does not
> have a
> > > > > > schema. The default is bytes schema, and there is no need to
> upload it.
> > > > > > Rather than simply throwing an exception rejecting the upload
> bytes
> > > > > schema.
> > > > > >
> > > > > >
> > > > > > Thanks,
> > > > > > sinan
> > > > > >
> > > > > >
> > > > > > Christophe Bornet <[email protected]> 于 2023年3月28日周二
> 上午1:15写道:
> > > > > >
> > > > > > > This change broke the Flink SQL Pulsar connector:
> > > > > > > https://github.com/streamnative/flink/issues/270
> > > > > > > So I propose to revert it.
> > > > > > >
> > > > > > > Le ven. 9 déc. 2022 à 11:57, labuladong <
> [email protected]> a
> > > > > écrit :
> > > > > > > >
> > > > > > > > Hi pulsar community,
> > > > > > > >
> > > > > > > >
> > > > > > > > I'd like to discuss the behavior of schema uploading, for
> more
> > > > > context
> > > > > > > see https://github.com/apache/pulsar/issues/18825
> > > > > > > >
> > > > > > > >
> > > > > > > > I think that forbidding users to upload `BYTES` schema is a
> > > > > recommended
> > > > > > > way to solve this issue. But this may change the existing
> behavior, so
> > > > > do
> > > > > > > you have any suggestion about this issue?
> > > > > > > >
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Donglai
> > > > > > >
> > > > >
>