Hi Jacob,

> I have multiple upstream sources to connect to depending on the business 
> model which are not Kafka. Based on criticality of the system and publisher 
> dependencies, we cannot switch to Kafka for these.

Sounds like you want to implement some custom connectors, [1][2] may
be helpful to implement a custom Flink’s Table API connector.

Specifically in terms of “Flink Checkpoint & Offset Commit”, the
custom source needs to inherit the `SourceReader` interfaces, and you
can override `snapshotState()` and `notifyCheckpointComplete()` into
your implementations.
[3] is the related code of kafka connector under datastream API, [4]
is the related code of kafka connector under TABLE API & SQL.

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sourcessinks/
[2] 
https://flink.apache.org/2021/09/07/implementing-a-custom-source-connector-for-table-api-and-sql-part-one/
[3] 
https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L98-L177
[4] 
https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java#L354

xia rui <xiarui0...@gmail.com> 于2024年3月8日周五 10:12写道:
>
> Hi Jacob.
>
> Flink uses "notification" to let an operator callback the completion of a 
> checkpoint. After gathering all checkpoint done messages from TMs, JM sends a 
> "notify checkpoint completed" RPC to all TMs. Operators will handle this 
> notification, where checkpoint success callbacks are invoked. For example, 
> Kafka sources commit the current consuming offset. I think this doc 
> (https://flink.apache.org/2018/02/28/an-overview-of-end-to-end-exactly-once-processing-in-apache-flink-with-apache-kafka-too/)
>  may be helpful.
>
> You can override the `notifyCheckpointComlete()` to customize the behavior of 
> handling checkpoint completion.
>
> Best regards Rui Xia
>
> On Fri, Mar 8, 2024 at 3:03 AM Jacob Rollings <jacobrolling...@gmail.com> 
> wrote:
>>
>>
>> Hello,
>>
>> I am implementing proof of concepts based Flink realtime streaming solutions.
>>
>> I came across below lines in out-of-the-box Flink Kafka connector documents.
>>
>>
>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/
>> Consumer Offset Committing #
>>
>> Kafka source commits the current consuming offset when checkpoints are 
>> completed, for ensuring the consistency between Flink’s checkpoint state and 
>> committed offsets on Kafka brokers.
>>
>>
>> How is Flink able to control the callbacks from checkpointing? Is there a 
>> way to override this into my implementations. I have multiple upstream 
>> sources to connect to depending on the business model which are not Kafka. 
>> Based on criticality of the system and publisher dependencies, we cannot 
>> switch to Kafka for these. So I was hoping to do the same which kafka 
>> connector is doing.
>>
>>
>> Cheers,
>>
>> JR



-- 
Best,
Yanfei

Reply via email to