Fwd: Flink Checkpoint & Offset Commit

2024-03-07 Thread Jacob Rollings
Hello,

I am implementing proof of concepts based Flink realtime streaming
solutions.

I came across below lines in out-of-the-box Flink Kafka connector documents.


*https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/*


*Consumer Offset Committing #
*

*Kafka source commits the current consuming offset when checkpoints
are completed, for ensuring the consistency between Flink’s checkpoint
state and committed offsets on Kafka brokers*.


How is Flink able to control the callbacks from checkpointing? Is there a
way to override this into my implementations. I have multiple upstream
sources to connect to depending on the business model which are not Kafka.
Based on criticality of the system and publisher dependencies, we cannot
switch to Kafka for these. So I was hoping to do the same which kafka
connector is doing.


Cheers,

JR


Re: Flink Checkpoint & Offset Commit

2024-03-07 Thread xia rui
Hi Jacob.

Flink uses "notification" to let an operator callback the completion of a
checkpoint. After gathering all checkpoint done messages from TMs, JM sends
a "notify checkpoint completed" RPC to all TMs. Operators will handle this
notification, where checkpoint success callbacks are invoked. For example,
Kafka sources commit the current consuming offset. I think this doc (
https://flink.apache.org/2018/02/28/an-overview-of-end-to-end-exactly-once-processing-in-apache-flink-with-apache-kafka-too/)
may be helpful.

You can override the `notifyCheckpointComlete()` to customize the behavior
of handling checkpoint completion.

Best regards Rui Xia

On Fri, Mar 8, 2024 at 3:03 AM Jacob Rollings 
wrote:

>
> Hello,
>
> I am implementing proof of concepts based Flink realtime streaming
> solutions.
>
> I came across below lines in out-of-the-box Flink Kafka connector
> documents.
>
>
>
> *https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/*
> 
>
> *Consumer Offset Committing #
> *
>
> *Kafka source commits the current consuming offset when checkpoints
> are completed, for ensuring the consistency between Flink’s checkpoint
> state and committed offsets on Kafka brokers*.
>
>
> How is Flink able to control the callbacks from checkpointing? Is there a
> way to override this into my implementations. I have multiple upstream
> sources to connect to depending on the business model which are not Kafka.
> Based on criticality of the system and publisher dependencies, we cannot
> switch to Kafka for these. So I was hoping to do the same which kafka
> connector is doing.
>
>
> Cheers,
>
> JR
>


Re: Flink Checkpoint & Offset Commit

2024-03-07 Thread Yanfei Lei
Hi Jacob,

> I have multiple upstream sources to connect to depending on the business 
> model which are not Kafka. Based on criticality of the system and publisher 
> dependencies, we cannot switch to Kafka for these.

Sounds like you want to implement some custom connectors, [1][2] may
be helpful to implement a custom Flink’s Table API connector.

Specifically in terms of “Flink Checkpoint & Offset Commit”, the
custom source needs to inherit the `SourceReader` interfaces, and you
can override `snapshotState()` and `notifyCheckpointComplete()` into
your implementations.
[3] is the related code of kafka connector under datastream API, [4]
is the related code of kafka connector under TABLE API & SQL.

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sourcessinks/
[2] 
https://flink.apache.org/2021/09/07/implementing-a-custom-source-connector-for-table-api-and-sql-part-one/
[3] 
https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L98-L177
[4] 
https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java#L354

xia rui  于2024年3月8日周五 10:12写道:
>
> Hi Jacob.
>
> Flink uses "notification" to let an operator callback the completion of a 
> checkpoint. After gathering all checkpoint done messages from TMs, JM sends a 
> "notify checkpoint completed" RPC to all TMs. Operators will handle this 
> notification, where checkpoint success callbacks are invoked. For example, 
> Kafka sources commit the current consuming offset. I think this doc 
> (https://flink.apache.org/2018/02/28/an-overview-of-end-to-end-exactly-once-processing-in-apache-flink-with-apache-kafka-too/)
>  may be helpful.
>
> You can override the `notifyCheckpointComlete()` to customize the behavior of 
> handling checkpoint completion.
>
> Best regards Rui Xia
>
> On Fri, Mar 8, 2024 at 3:03 AM Jacob Rollings  
> wrote:
>>
>>
>> Hello,
>>
>> I am implementing proof of concepts based Flink realtime streaming solutions.
>>
>> I came across below lines in out-of-the-box Flink Kafka connector documents.
>>
>>
>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/
>> Consumer Offset Committing #
>>
>> Kafka source commits the current consuming offset when checkpoints are 
>> completed, for ensuring the consistency between Flink’s checkpoint state and 
>> committed offsets on Kafka brokers.
>>
>>
>> How is Flink able to control the callbacks from checkpointing? Is there a 
>> way to override this into my implementations. I have multiple upstream 
>> sources to connect to depending on the business model which are not Kafka. 
>> Based on criticality of the system and publisher dependencies, we cannot 
>> switch to Kafka for these. So I was hoping to do the same which kafka 
>> connector is doing.
>>
>>
>> Cheers,
>>
>> JR



-- 
Best,
Yanfei