Dear Flink community,

my Flink pipeline listens on inserts into MongoDB GridFS and processes the
inserted document which is then written into a MongoDB Sink again, which is
working so far.

Now I want to enable Checkpointing but after reading the documentations,
training slides, stackoverflow, archives, etc. for a day I still don't know
how to make our source support checkpointing.

I.e. my main question is:
1. How can I enable our source to be rewindable so that the Changestreams
are continued where the pipeline crashes? (see pipeline code below)

Side questions/notes:
2. Is 1. possible with reasonable amount of work or would you recommend
adding Kafka between Flink and MongoDB?
*    (We're planning on replace MongoDB GridFS with Google Object Storage
soon, as our documents, currently being a few MB will become GB-large,
soon.)*
3. I noticed that there are existing community MongoDB Connectors, but I
don't think they're made for the Changestream use-case of ours:
   a. [FLINK-6573] PR#20848
   b. gihub/mongo-flink/mongo-flink


CODE:

Watch GridFS inserts on both collections, fs.files and fs.chunks with:
```
class MongoDBSource extends
RichSourceFunction<ChangeStreamDocument<Document>> {
  void run(...) {
    collection.watch(operationType: insert)   .iterator()
.forEachRemaining(ctx::collect)
  }
}
```

Join the two changestreams and process the events:
```
DataStream<XYZ> stream(...) {
  chunks = new MongoDBSource(fs.chunks)
  files = new MongoDBSource(fs.files)

  env.addSource(chunks).addSource(files)

  files.join(chunks).where(filesKeySelector).where(chunksFilesKeySelector)

  .window(withGap(1s))
  .apply(new JoinFunction() { join() { return new Tuple2<>(filesChange,
chunkChange) }

  .process(new DocumentsMerger())
}
```

Load the files from GridFS to process it:
```
class DocumentsMerger extends new
ProcessFunction<Tuple2<ChangeStreamDocument<>, ChangeStreamDocument<>>,
XYZ> {
  processElement(changeTupel, ctx, Collector<XYZ> out) {

    // Wait for GridFS `fs.files` and all GridFS `fs.chunks` to be available
    gridFSBucket.find(files_id)
    if (measurementChunkIndex != numOfMeasurementChunks - 1) return

    dataStream = gridFSBucket.openDownloadStream(files_id)
    XYZ = dataStream.process()
    out.collect(XYZ)
  }
}
```


I hope this is the right place for this question, but I can also move this
question to Stackoverflow and reference the mailing list there if you want,
to be easier found by others.

Regards
Armin

Reply via email to