Hi Jacob, > I have multiple upstream sources to connect to depending on the business > model which are not Kafka. Based on criticality of the system and publisher > dependencies, we cannot switch to Kafka for these.
Sounds like you want to implement some custom connectors, [1][2] may be helpful to implement a custom Flink’s Table API connector. Specifically in terms of “Flink Checkpoint & Offset Commit”, the custom source needs to inherit the `SourceReader` interfaces, and you can override `snapshotState()` and `notifyCheckpointComplete()` into your implementations. [3] is the related code of kafka connector under datastream API, [4] is the related code of kafka connector under TABLE API & SQL. [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sourcessinks/ [2] https://flink.apache.org/2021/09/07/implementing-a-custom-source-connector-for-table-api-and-sql-part-one/ [3] https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L98-L177 [4] https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java#L354 xia rui <xiarui0...@gmail.com> 于2024年3月8日周五 10:12写道: > > Hi Jacob. > > Flink uses "notification" to let an operator callback the completion of a > checkpoint. After gathering all checkpoint done messages from TMs, JM sends a > "notify checkpoint completed" RPC to all TMs. Operators will handle this > notification, where checkpoint success callbacks are invoked. For example, > Kafka sources commit the current consuming offset. I think this doc > (https://flink.apache.org/2018/02/28/an-overview-of-end-to-end-exactly-once-processing-in-apache-flink-with-apache-kafka-too/) > may be helpful. > > You can override the `notifyCheckpointComlete()` to customize the behavior of > handling checkpoint completion. > > Best regards Rui Xia > > On Fri, Mar 8, 2024 at 3:03 AM Jacob Rollings <jacobrolling...@gmail.com> > wrote: >> >> >> Hello, >> >> I am implementing proof of concepts based Flink realtime streaming solutions. >> >> I came across below lines in out-of-the-box Flink Kafka connector documents. >> >> >> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/ >> Consumer Offset Committing # >> >> Kafka source commits the current consuming offset when checkpoints are >> completed, for ensuring the consistency between Flink’s checkpoint state and >> committed offsets on Kafka brokers. >> >> >> How is Flink able to control the callbacks from checkpointing? Is there a >> way to override this into my implementations. I have multiple upstream >> sources to connect to depending on the business model which are not Kafka. >> Based on criticality of the system and publisher dependencies, we cannot >> switch to Kafka for these. So I was hoping to do the same which kafka >> connector is doing. >> >> >> Cheers, >> >> JR -- Best, Yanfei