????
---- ???????????? ---- | ?????? | ????????<402987...@qq.com.INVALID> | | ???? | 2024??06??26?? 16:38 | | ?????? | user-zh<user-zh@flink.apache.org> | | ?????? | | | ???? | ?????????? | ???? ------------------ ???????? ------------------ ??????: "user-zh" <15171440...@163.com>; ????????: 2024??6??26??(??????) ????4:36 ??????: "user-zh"<user-zh@flink.apache.org>; ????: ?????????? ?? 2024-06-26 15:07:45??"15868861416" <15868861...@163.com> ?????? >????????????ID??PRICE??????????NUMBER??????????????flink-sql????number??????????iceberg??decimal???????????? > > >| | >???? >| >| >15868861...@163.com >| > > >---- ???????????? ---- >| ?????? | Yanquan Lv<decq12y...@gmail.com> | >| ???????? | 2024??06??26?? 14:46 | >| ?????? | <user-zh@flink.apache.org> | >| ???? | Re: ??????cdc????oracle???????????? | >?????????? ID ?? PRINCE ?????????? decimal ????decimal ???????????????????? BASE64 ???? > >???????????????????????????????????????? > >Map<String, Object> customConverterConfigs = new HashMap<>(); >customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, >"numeric"); >JsonDebeziumDeserializationSchema schema = new >JsonDebeziumDeserializationSchema(includeSchema, >customConverterConfigs); > > > > >ha.fen...@aisino.com <ha.fen...@aisino.com> ??2024??6??25?????? 17:26?????? > >?????????? >"ID" "NAME" "ADDTIME" "PRICE" >1 "aa" 2024-6-25 14:21:33 12.22 > >???????? 15868861416 >?????????? 2024-06-25 17:19 >???????? user-zh@flink.apache.org >?????? ??????cdc????oracle???????????? >????????oracle????DEBEZIUM.PRODUCTS???????????????????????????????? > > > > >| | >???? >| >| >15868861...@163.com >| > > >---- ???????????? ---- >| ?????? | ha.fen...@aisino.com<ha.fen...@aisino.com> | >| ???????? | 2024??06??25?? 15:54 | >| ?????? | user-zh<user-zh@flink.apache.org> | >| ???? | cdc????oracle???????????? | >?????????????? >JdbcIncrementalSource<String> oracleChangeEventSource = >new OracleSourceBuilder() >.hostname("host") >.port(1521) >.databaseList("ORCLCDB") >.schemaList("DEBEZIUM") >.tableList("DEBEZIUM.PRODUCTS") >.username("username") >.password("password") >.deserializer(new JsonDebeziumDeserializationSchema()) >.includeSchemaChanges(true) // output the schema changes as well >.startupOptions(StartupOptions.initial()) >.debeziumProperties(debeziumProperties) >.splitSize(2) >.build(); >???????????? > >{"before":null,"after":{"ID":{"scale":0,"value":"AQ=="},"NAME":"aa","ADDTIME":1719325293000,"PRICE":"BMY="},"source":{"version":"1.9.8.Final","connector":"oracle","name":"oracle_logminer","ts_ms":0,"snapshot":"false","db":"ORCL","sequence":null,"schema":"CDC","table":"ORDER_CDC","txId":null,"scn":"0","commit_scn":null,"lcr_position":null,"rs_id":null,"ssn":0,"redo_thread":null},"op":"r","ts_ms":1719301769186,"transaction":null} > >??????????PRICE????????????ID?????????????????????????? >