Hi,
I have configured my connector in following way:
MongoDBSource.<T>builder()
...
.deserializer(new MongoDeserializationSchema<T>(clazz))
.build();
My class MongoDeserializationSchema is defined like:
public class MongoDeserializationSchema<T> implements
DebeziumDeserializationSchema<T> {
...
private final Class<T> clazz;
private transient JsonConverter jsonConverter;
public MongoDeserializationSchema(Class<T> clazz) { this.clazz = clazz; }
public void deserialize(SourceRecord record, Collector<T> collector) {
if (this.jsonConverter == null) {
this.initializeJsonConverter();
}
try {
byte[] bytes =
jsonConverter.fromConnectData(record.topic(),
record.valueSchema(), record.value());
T data = null;
... // deserialize to data from bytes
if (data != null) {
collector.collect(data);
}
}
....
}
public TypeInformation<T> getProducedType() { return Types.POJO(clazz); }
private void initializeJsonConverter() {
this.jsonConverter = new JsonConverter();
HashMap<String, Object> configs = new HashMap(2);
configs.put("converter.type", ConverterType.VALUE.getName());
configs.put("schemas.enable", false);
this.jsonConverter.configure(configs);
}
}
So I am using org.apache.kafka.connect.json.JsonConverter to
deserialize SourceRecord to by data type T
This is working fine, but in case source records contains a long
number it formats number fields like:
{"field": {"$numberLong": "0"}
It breaks as POJO would have expected "field": 0
Somewhere I have read that one needs to specify:
"output.json.formatter":
"com.mongodb.kafka.connect.source.json.formatter.ExtendedJson"
So that source record formats the full document like a regular JSON,
but I am not sure how and where I can specify this in my
configuration.
Can anyone help me with this?
Thanks
Sachin