Dear Flink community, my Flink pipeline listens on inserts into MongoDB GridFS and processes the inserted document which is then written into a MongoDB Sink again, which is working so far.
Now I want to enable Checkpointing but after reading the documentations, training slides, stackoverflow, archives, etc. for a day I still don't know how to make our source support checkpointing. I.e. my main question is: 1. How can I enable our source to be rewindable so that the Changestreams are continued where the pipeline crashes? (see pipeline code below) Side questions/notes: 2. Is 1. possible with reasonable amount of work or would you recommend adding Kafka between Flink and MongoDB? * (We're planning on replace MongoDB GridFS with Google Object Storage soon, as our documents, currently being a few MB will become GB-large, soon.)* 3. I noticed that there are existing community MongoDB Connectors, but I don't think they're made for the Changestream use-case of ours: a. [FLINK-6573] PR#20848 b. gihub/mongo-flink/mongo-flink CODE: Watch GridFS inserts on both collections, fs.files and fs.chunks with: ``` class MongoDBSource extends RichSourceFunction<ChangeStreamDocument<Document>> { void run(...) { collection.watch(operationType: insert) .iterator() .forEachRemaining(ctx::collect) } } ``` Join the two changestreams and process the events: ``` DataStream<XYZ> stream(...) { chunks = new MongoDBSource(fs.chunks) files = new MongoDBSource(fs.files) env.addSource(chunks).addSource(files) files.join(chunks).where(filesKeySelector).where(chunksFilesKeySelector) .window(withGap(1s)) .apply(new JoinFunction() { join() { return new Tuple2<>(filesChange, chunkChange) } .process(new DocumentsMerger()) } ``` Load the files from GridFS to process it: ``` class DocumentsMerger extends new ProcessFunction<Tuple2<ChangeStreamDocument<>, ChangeStreamDocument<>>, XYZ> { processElement(changeTupel, ctx, Collector<XYZ> out) { // Wait for GridFS `fs.files` and all GridFS `fs.chunks` to be available gridFSBucket.find(files_id) if (measurementChunkIndex != numOfMeasurementChunks - 1) return dataStream = gridFSBucket.openDownloadStream(files_id) XYZ = dataStream.process() out.collect(XYZ) } } ``` I hope this is the right place for this question, but I can also move this question to Stackoverflow and reference the mailing list there if you want, to be easier found by others. Regards Armin