Hi, Andrew.
I consulted with some developers of Flink,  currently the output from Kafka 
pipeline didn't contain schema info, and they plan to support this feature in 
the next version of FlinkCDC (FlinkCDC 3.3).

Paimon will use ‘STRING’ type to process the kafka messages without ’schema’ 
information in debezium-json format. But if you want Paimon to retrieve the 
datatype for kafka messages which has not ’schema’ information in debezium-json 
format now, you can create a paimon table using Flink or other way before 
executing the cdc action. Importantly, this way can not handle schema 
evolution, the new fields added after creating table will still be processed as 
 ‘STRING’ type.

Best,
Lei Li

On 2024/10/26 03:32:30 Andrew Otto wrote:
> Hi!
>
> I really like Flink CDC's pipeline connectors
> <https://nightlies.apache.org/flink/flink-cdc-docs-release-3.2/docs/connectors/pipeline-connectors/overview/>!
> So simple!
> I also like Paimon's CDC ingestion action CLI
> <https://paimon.apache.org/docs/master/flink/cdc-ingestion/overview/>.
>
> I like these because I don't need to specify the schemas; they are inferred
> from the source.  I also like the schema evolution support!
>
> Paimon's recent Iceberg Compatibility
> <https://paimon.apache.org/docs/master/migration/iceberg-compatibility/>
> mode looks cool!
>
> I'd like to accomplish the following:
>
>    - MariaDB CDC -> Kafka
>    - Kafka -> Paimon
>    - Query with Spark+Iceberg
>
> I can do MariaDB CDC -> Kafka with the flink-cdc pipeline connector
> <https://nightlies.apache.org/flink/flink-cdc-docs-release-3.2/docs/connectors/pipeline-connectors/kafka/>
> .
> However, I'm stuck on Kafka -> Paimon.
>
> Paimon's Kafka CDC action docs
> <https://paimon.apache.org/docs/0.9/flink/cdc-ingestion/kafka-cdc/> say:
> > Usually, debezium-json contains ‘schema’ field, from which Paimon will
> retrieve data types. Make sure your debezium json has this field, or Paimon
> will use ‘STRING’ type.
>
> However, the messages generated by flink-cdc's pipeline connector do not
> have a schema field.  They look like:
>
> {
>   "before": null,
>   "after": {
>     "rev_id": 37,
>      ...
>   },
>   "op": "c"
> }
>
> The only reference I can find to a schema field is in the debezium-json
> format documentation
> <https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/table/formats/debezium/#debezium-json-ddl>
> .
> > users may setup the Debezium Kafka Connect with the Kafka configuration
> 'value.converter.schemas.enable' enabled to include schema in the message
>
> This leads me to believe that the schema field that Paimon's doc is referring
> to is added not by debezium or flink-cdc, but by Kafka Connect when it is
> used with Debezium proper to write CDC messages to Kafka.
>
>    - Does this mean that the CDC messages generated by the flink-cdc kafka
>    pipeline connector are not compatible with Paimon's kafka_sync_database
>    action?
>    - Or, is there a way to cause flink-cdc pipeline connectors to include
>    schema in the message?
>
>
> I could be misunderstanding something. Is Kafka Connect+Debezium used by
> Flink to support debezium-json formatted messages? I tried passing
> properties.value.converter.schemas.enable:
> true to the flink-cdc pipeline kafka sink but that did not work (as
> expected).
>
> Thank you!
>
> -Andrew Otto
>  Wikimedia Foundation
>
> P.S. Context for what we are trying to do is here: T373144 [SPIKE] Learn
> and document how to use Flink-CDC from MediaWiki MariaDB locally
> <https://phabricator.wikimedia.org/T373144>
>

Reply via email to