Hi, 这是个已知问题,目前 debezium 同步不同数据库并没有保证一模一样地数据格式,比如同步 PG 的UPDATE消息时候,before 和 after 字段就不是全的。 这个问题会在后面地版本中解决。
Best, Jark On Wed, 22 Jul 2020 at 21:07, Leonard Xu <xbjt...@gmail.com> wrote: > Hello, > > 代码在为before这条数据设置rowKind时抛了一个NPE,before正常应该是不为null的。 > 看起来是你的数据问题,一条 update 的changelog, before 为null, > 这是不合理的,没有before的数据,是无法处理after的数据的。 > 如果确认是脏数据,可以开启ignore-parse-errors跳过[1] > > 祝好 > Leonard > [1] > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/debezium.html#debezium-json-ignore-parse-errors > < > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/debezium.html#debezium-json-ignore-parse-errors > > > > { > "payload": { > "before": null, > "after": { > "id": 2, > "name": "liushimin", > "age": "24", > "sex": "man", > "phone": "155555555" > }, > "source": { > "version": "1.2.0.Final", > "connector": "postgresql", > "name": "postgres", > "ts_ms": 1595409754151, > "snapshot": "false", > "db": "postgres", > "schema": "public", > "table": "person", > "txId": 569, > "lsn": 23632344, > "xmin": null > }, > "op": "u", > "ts_ms": 1595409754270, > "transaction": null > } > } > > > 在 2020年7月22日,17:34,amen...@163.com 写道: > > > > > {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"age"},{"type":"string","optional":true,"field":"sex"},{"type":"string","optional":true,"field":"phone"}],"optional":true,"name":"postgres.public.person.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"age"},{"type":"string","optional":true,"field":"sex"},{"type":"string","optional":true,"field":"phone"}],"optional":true,"name":"postgres.public.person.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"postgres.public.person.Envelope"},"payload":{"before":null,"after":{"id":2,"name":"liushimin","age":"24","sex":"man","phone":"155555555"},"source":{"version":"1.2.0.Final","connector":"postgresql","name":"postgres","ts_ms":1595409754151,"snapshot":"false","db":"postgres","schema":"public","table":"person","txId":569,"lsn":23632344,"xmin":null},"op":"u","ts_ms":1595409754270,"transaction":null}} > >