Hi Sachin,
It is recommended to use org.bson.Document to convert MongoDB Extended JSON
into Java types, and then perform further field mapping.
--------------------------------------------------------------------------------------------------------
.deserializer(new DebeziumDeserializationSchema<Document>() {
@Override
public void deserialize(SourceRecord record,
Collector<Document> out) {
Optional.ofNullable(record)
.map(SourceRecord::value)
.map(Struct.class::cast)
.map(struct ->
struct.getString("fullDocument"))
.map(Document::parse)
// mapping to other class types
.ifPresent(out::collect);
}
@Override
public TypeInformation<Document> getProducedType() {
return Types.GENERIC(Document.class);
}
})
--------------------------------------------------------------------------------------------------------
Best,
Jiabao
On 2024/08/19 07:03:07 Sachin Mittal wrote:
> Hi,
> I have configured my connector in following way:
>
> MongoDBSource.<T>builder()
> ...
> .deserializer(new MongoDeserializationSchema<T>(clazz))
> .build();
>
>
> My class MongoDeserializationSchema is defined like:
>
> public class MongoDeserializationSchema<T> implements
> DebeziumDeserializationSchema<T> {
> ...
> private final Class<T> clazz;
> private transient JsonConverter jsonConverter;
>
> public MongoDeserializationSchema(Class<T> clazz) { this.clazz = clazz; }
>
> public void deserialize(SourceRecord record, Collector<T> collector) {
> if (this.jsonConverter == null) {
> this.initializeJsonConverter();
> }
> try {
> byte[] bytes =
> jsonConverter.fromConnectData(record.topic(),
> record.valueSchema(), record.value());
> T data = null;
> ... // deserialize to data from bytes
> if (data != null) {
> collector.collect(data);
> }
> }
>
> ....
> }
>
> public TypeInformation<T> getProducedType() { return Types.POJO(clazz); }
>
> private void initializeJsonConverter() {
> this.jsonConverter = new JsonConverter();
> HashMap<String, Object> configs = new HashMap(2);
> configs.put("converter.type", ConverterType.VALUE.getName());
> configs.put("schemas.enable", false);
> this.jsonConverter.configure(configs);
> }
> }
>
>
> So I am using org.apache.kafka.connect.json.JsonConverter to
> deserialize SourceRecord to by data type T
>
> This is working fine, but in case source records contains a long
> number it formats number fields like:
>
> {"field": {"$numberLong": "0"}
>
> It breaks as POJO would have expected "field": 0
>
> Somewhere I have read that one needs to specify:
>
> "output.json.formatter":
> "com.mongodb.kafka.connect.source.json.formatter.ExtendedJson"
>
> So that source record formats the full document like a regular JSON,
> but I am not sure how and where I can specify this in my
> configuration.
>
> Can anyone help me with this?
>
>
> Thanks
>
> Sachin
>