数据流图是 mongodb --> flink cdc --> kafka (canal json)
看了flink cdc解析出的mongodb oplog转成json字符串是下面这样子[1],而下游需要从kafka消费canal 
json格式的消息,中间的格式转换得自己实现是么?
但mongodb oplog是不带schema信息的,而且没有canal中的old字段信息,这块信息要怎么转换呢?


另,我用flink sql如下往kafka发送canal json格式数据是不完整的[2],并不是一个标准的canal 
json数据[3]。这是已知的issue么?


CREATETABLE 
mongo_customers(_idSTRING,customer_idINT,nameSTRING,addressSTRING,PRIMARYKEY(_id)NOTENFORCED)WITH('connector'='mongodb-cdc','hosts'='localhost:27017','username'='mongouser','password'='mongopw','database'='mgdb','collection'='customers');
CREATETABLE 
kafka_customers(_idSTRING,customer_idINT,nameSTRING,addressSTRING,PRIMARYKEY(_id)NOTENFORCED)WITH('connector'='upsert-kafka','topic'='customers','properties.bootstrap.servers'='localhost:9092',
'format'='canal-json');
INSERT INTO kafka_customers SELECT * FROM mongo_customers;


[1]
{
"_id": "{\"_id\": {\"$oid\": \"614a9ae069736f5fcc48fe59\"}, \"copyingData\": 
true}",
"operationType": "insert",
"fullDocument": "{\"_id\": {\"$oid\": \"614a9ae069736f5fcc48fe59\"}, 
\"_class\": \"com.huifu.uqp.dal.mongo.model.TopTransOrder\", \"reqSeqId\": 
\"123\", \"ordId\": \"1440509760709632000\", \"outTransId\": \"123\", 
\"merOrdId\": \"123\", \"hfSeqId\": \"123\", \"partyOrderId\": \"123\", 
\"bankSeqId\": \"123\", \"orgOrdId\": \"123\", \"orgTermOrdId\": \"123\", 
\"orgHuifuSeqId\": \"123\", \"transDate\": \"20210913\", \"productId\": 
\"app8\", \"serviceId\": \"6767639\", \"topAgentId\": \"123\", 
\"belongAgentId\": \"123\", \"chainsId\": \"123\", \"huifuId\": 
\"666684552350\", \"transMajorCategory\": \"123\", \"consoleActualPayChannel\": 
\"123\", \"consolePayType\": \"123\", \"consolePreAuthFlag\": \"123\", 
\"consoleSubsidyFlag\": \"123\", \"consoleDcType\": \"123\", \"consoleIsFq\": 
\"123\", \"consoleAcctDivFlag\": \"123\", \"actualPayChannel\": \"123\", 
\"payChannel\": \"123\", \"transType\": \"123\", \"payType\": \"123\", 
\"dcType\": \"123\", \"isAcctDiv\": \"123\", \"isDelayAcct\": \"123\", 
\"creditType\": \"123\", \"devsId\": \"123\", \"ordAmt\": 123.32, \"feeAmt\": 
123.0, \"actOrdAmt\": 123.0, \"actualRefAmt\": 123.0, \"refAmt\": 123.0, 
\"refFeeAmt\": 123.0, \"subsidyAmt\": 123.0, \"subsidyRefAmt\": 123.0, 
\"payCardId\": \"123\", \"feeRecType\": \"123\", \"feeFlag\": \"123\", 
\"transStat\": \"S\", \"createTime\": {\"$date\": 1632279264987}, 
\"transFinishTime\": \"123\", \"kafkaTime\": \"123\", \"tableName\": \"123\", 
\"offset\": \"123\", \"recordVersion\": \"123\", \"sign\": \"123\"}",
"source": {
"ts_ms": 0,
"snapshot": "true"
},
"ns": {
"db": "amp_test",
"coll": "TopTransOrder"
},
"to": null,
"documentKey": "{\"_id\": {\"$oid\": \"614a9ae069736f5fcc48fe59\"}}",
"updateDescription": null,
"clusterTime": null,
"txnNumber": null,
"lsid": null
}




[2]
{"data":[{"_id":"614a9b3769736f5fcc492613","id":null,"reqSeqId":"123","ordId":"1440510124339011584","outTransId":"123","merOrdId":"123","hfSeqId":"123","partyOrderId":"123","bankSeqId":"123","orgOrdId":"123","orgTermOrdId":"123","orgHuifuSeqId":"123","transDate":"20210913","productId":"app3","serviceId":"6767679","topAgentId":"123","belongAgentId":"123","chainsId":"123","huifuId":"66668455531","transMajorCategory":"123","consoleActualPayChannel":"123","consolePayType":"123","consolePreAuthFlag":"123","consoleSubsidyFlag":"123","consoleDcType":"123","consoleAcctDivFlag":"123","actualPayChannel":"123","payChannel":"123","transType":"123","payType":"123","dcType":"123","isAcctDiv":"123","isDelayAcct":"123","creditType":"123","devsId":"123","ordAmt":123.32,"feeAmt":123,"actOrdAmt":123,"actualRefAmt":123,"refAmt":123,"refFeeAmt":123,"subsidyAmt":123,"subsidyRefAmt":123,"payCardId":"123","feeRecType":"123","feeFlag":"123","transStat":"S","transFinishTime":"123","tableName":"123","offset":"123","recordVersion":"123","sign":"123","synModifyTime":null,"merName":null,"bankName":null,"bankRespDesc":null,"bagentId":null,"bankId":null,"cashRespDesc":null,"reqDate":null,"accSplitBunch":null,"acctId":null,"fqFeeAmt":null,"payCardIdEnc":null,"goodsDesc":null,"remark":null,"synTtlDate":null,"outOrdId":null,"devType":null,"feeHuifuId":null,"feeAcctId":null,"orgTransDate":null,"orgOrdAmt":null,"orgCreateTime":null,"userType":null,"userId":null,"userIdExt":null,"settleAmt":null,"refCnt":null,"consoleCountSum":null,"topConsolePayType":null,"orgMerOrdId":null,"feeAllowanceFlag":null,"correctStat":null,"addedOrgFeeAmt":null,"discountFeeAmt":null,"acctFinishTime":null,"pospSeqId":null,"outOrderId":null,"cashTransId":null,"orgPayType":null,"orgPayChannel":null,"branch1HuifuId":null,"branch2HuifuId":null,"branch3HuifuId":null,"branch4HuifuId":null,"branch5HuifuId":null,"branchHuifuId":null,"level":null,"branchChannelId":null,"orgFeeAmt":null,"orgConsoleIsFq":null,"orgCreditType":null,"fqMerDiscountFlag":null,"payScene":null,"labels":null,"orgTransType":null,"orgFeeRecType":null,"orgFeeFlag":null,"orgDiscountFeeAmt":null,"merOperId":null,"operType":null,"batchId":null,"authNo":null,"refNum":null,"bankMerId":null,"bankMerName":null,"posMerId":null,"posMerName":null,"acqrInstId":null,"doubleExempt":null,"pnrDevId":null,"posTermId":null,"realPayType":null,"channelFinishTime":null,"transRefundBankId":null,"transRefundBankName":null,"orgRealPayType":null,"orgDevsId":null,"merPriv":null,"transRefundOutOrdId":null,"orgHfSeqId":null,"synMode":null,"cloudPay":null,"terminalReqDate":null,"terminalPayChannel":null,"huifuFstOrg":null,"huifuSecOrg":null,"huifuThdOrg":null,"huifuForOrg":null,"huifuSales":null,"partnerBd":null,"organizationId":null,"upperOrgId":null,"merOrg":null,"partnerInnerFstOrg":null,"partnerInnerSecOrg":null,"partnerInnerThdOrg":null,"partnerFstOrg":null,"partnerSecOrg":null,"partnerThdOrg":null,"collectMerFstOrg":null,"collectMerSecOrg":null,"collectMerThdOrg":null,"collectMerForOrg":null,"collectMerFivOrg":null,"collectMerSixOrg":null,"fullPath":null}],"type":"INSERT"}




[3]
https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/connectors/table/formats/canal/#%e5%a6%82%e4%bd%95%e4%bd%bf%e7%94%a8-canal-format





在 2022-08-22 22:57:04,"Xuyang" <xyzhong...@163.com> 写道:
>Hi, 请问你的需求是 “debezium数据”-&gt; flink -&gt;“canal ”么? 
>如果是这样的话,可以用UDF[1]来尝试下。<br/><br/>[1] 
>https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/
>在 2022-08-21 10:49:29,"casel.chen" <casel_c...@126.com> 写道:
>>flink cdc如何将捕获变更记录转成canal json格式输出到下游kafka?
>>flink cdc获取的是debezium格式记录(用的是 JsonDebeziumDeserializationSchema),要如何转换成canal 
>>json格式输出呢?有没有例子或关键代码展示?谢谢!

Reply via email to