lalalaYu commented on PR #2815: URL: https://github.com/apache/paimon/pull/2815#issuecomment-2226795538
我尝试实现DebeziumDeserializationSchema, 在deserialize 重载方法中获取keys ,按照Canal format 在payload嵌入pkNames,然后使用paimon的kafka_sync_database可以实现从Debezium-json获得主键建表 ` @Override public void deserialize(SourceRecord record, Collector<String> out) throws Exception { if (this.jsonConverter == null) { this.initializeJsonConverter(); } byte[] bytes = this.jsonConverter.fromConnectData(record.topic(), record.valueSchema(), record.value()); out.collect(initJsonString(record.key(), new String(bytes))); } /** * 参考JsonDebeziumDeserializationSchema对key的解析提取转换为value JSON的pkNames * 重写基于cdc data开起了schema的结构 * @param key 记录的键,为Struct类型 * @param value 记录的值,该值将被转换为JSON字符串的一部分 * @return 返回一个JSON字符串 */ private String initJsonString(Object key, String value) throws Exception { JSONObject element = JSON.parseObject(value); if (key != null) { Struct struct = (Struct)key; JSONArray jsonArr = new JSONArray(); struct.schema().fields().forEach(field -> jsonArr.add(field.name())); element.getJSONObject("payload").put("pkNames", jsonArr); } else { // 目前让key为null的报错,之后根据经验可以考虑改为key为null的默认值 throw new Exception("Conversion error-YSJ: key is null that is required and has no default value"); } return element.toJSONString(); }` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@paimon.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org