Hi Yun,

Please ignore my question 2. I think the Sink part is the decisive factor to 
ensure end to end exactly once.

If I want to implement a AT LEAST ONCE sink, which interface should I 
implement? Maybe 
RichSinkFunction<https://github.com/apache/flink/blob/release-1.14/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.java>
 should be enough? Any suggestions on this?

For question 3, maybe I can add some deduplication code at consumer side to 
uptake the AT_LEASE_ONCE sink produced messages. If OSS doesn’t support exactly 
once semantic, it seems impossible for me to handle it at Flink code side.

Thanks,
Fuyao

From: Fuyao Li <fuyao...@oracle.com>
Date: Thursday, February 10, 2022 at 15:48
To: Yun Gao <yungao...@aliyun.com>, user <user@flink.apache.org>
Subject: Re: [External] : Re: Use TwoPhaseCommitSinkFunction or 
StatefulSink+TwoPhaseCommittingSink to achieve EXACTLY_ONCE sink?
Hello Yun,

Thanks for the quick response. This is really helpful.

I have confirmed with Oracle Streaming Service (OSS) that they currently don’t 
support EXACTLY_ONCE semantic, only AT_LEAST_ONCE semantic works. They suggest 
to add some deduplicate mechanisms at Sink to mitigate the issue.

Question 1:
So the scenario should looks like this:
When the flink application restarts after it fails, it will start from this 
checkpoint offset. The messages has been processed after the checkpoint before 
the failure will be processed twice here after the restart. Is there any chance 
of data corruption here, for example, breaking the window and sending out 
incomplete records? I am using some session windows based on DataStream event 
time timers.

Question 2:
For the 
KafkaSource<https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#kafka-source>,
 I noticed that we don’t have a place to configure the semantic? Maybe enabling 
the checkpoint with EXACTLY_ONCE should guarantee the source’s exactly once 
semantic here? Please correct me if I am wrong here.

Question 3:
To guarantee the end-to-end exactly once, I think we must make sure the sink is 
exactly once, right? Since OSS has such limitation, is it possible to achieve 
effective EXACTLY_ONCE semantic through additional logic at Flink side since I 
can’t do too much on OSS side? Or it is technically impossible?
If possible, I think I should implement the 
Sink<https://github.com/apache/flink/blob/release-1.14/flink-core/src/main/java/org/apache/flink/api/connector/sink/Sink.java>
 you mentioned.

Thank you very much for the help!
Fuyao


From: Yun Gao <yungao...@aliyun.com>
Date: Wednesday, February 9, 2022 at 23:17
To: Fuyao Li <fuyao...@oracle.com>, user <user@flink.apache.org>
Subject: [External] : Re: Use TwoPhaseCommitSinkFunction or 
StatefulSink+TwoPhaseCommittingSink to achieve EXACTLY_ONCE sink?
Hi Fuyao,

Logically if a system want to support end-to-end exactly once,
it should support transactions:
1. The transactions hold the records, it get pre-committed on snapshot state
and get committed on checkpont succeed.
2. The transaction should still be able to be aborted after pre-committed.
3. Once pre-committed, the transactions must be able to be committed, even if
the flink jobs fails between pre-committed and committed, after the job 
restarted
these transaction should be able to be committed again.

If the external system meet such conditions, to implement an exactly-once sink,
the option b) should be more recommend. However, these interface is newly added
in the upcoming 1.15 and it might need to be wait for about 1.5 month before 
releasing.

An early version for option b is the org.apache.flink.api.connector.sink.Sink. 
It is much
similar to the option b) and are supported since 1.13. It would still be 
supported in the
next several releases and  it also be able to be migrated to the option b) 
easily.

Best,
Yun


------------------Original Mail ------------------
Sender:Fuyao Li <fuyao...@oracle.com>
Send Date:Thu Feb 10 07:01:51 2022
Recipients:user <user@flink.apache.org>
Subject:Use TwoPhaseCommitSinkFunction or StatefulSink+TwoPhaseCommittingSink 
to achieve EXACTLY_ONCE sink?
Hello Community,


I have two questions regarding Flink custom sink with EXACTLY_ONCE semantic.


  1.  I have a SDK that could publish messages based on HTTP (backed by Oracle 
Streaming Service --- very similar to Kafka).  This will be my Flink 
application’s sink. Is it possible to use this SDK as sink with EXACTLY_ONCE 
semantic? HTTP is stateless here… If possible, what could be added in SDK to 
support EXACTLY_ONCE?
  2.  If it is possible for question 1, then I need to implement a custom sink 
for this. Which option should I use?

     *   Option 
1:TwoPhaseCommitSinkFunction<https://urldefense.com/v3/__https:/nightlies.apache.org/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.html__;!!ACWV5N9M2RV99hQ!ckD7MXv5zSgCh50yzgUgsG8AZAcjIVIFyjt7kyfvHveN21sjsr2gOqXjIAX36vY$>
     *   Option 
2:StatefulSink<https://urldefense.com/v3/__https:/github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSink.java__;!!ACWV5N9M2RV99hQ!ckD7MXv5zSgCh50yzgUgsG8AZAcjIVIFyjt7kyfvHveN21sjsr2gOqXj5cKfxA0$>
 + 
TwoPhaseCommittingSink<https://urldefense.com/v3/__https:/github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.java__;!!ACWV5N9M2RV99hQ!ckD7MXv5zSgCh50yzgUgsG8AZAcjIVIFyjt7kyfvHveN21sjsr2gOqXjA088CIg$>

The legacy FlinkKafkaProducer seems to be using option (a) ---- This will be 
removed from Flink in the future. The 
newKafkaSink<https://urldefense.com/v3/__https:/nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/*kafka-sink__;Iw!!ACWV5N9M2RV99hQ!ckD7MXv5zSgCh50yzgUgsG8AZAcjIVIFyjt7kyfvHveN21sjsr2gOqXj02BG7zk$>
 seems to be using option (b). Based on the comment in the code, it seems 
option (a) is recommended, which one should I use? Please suggest if I am 
missing anything, or any other better solutions in my case?


Thanks,
Fuyao





Reply via email to