Hi Yun,

Thanks for the reply! This is very helpful.
For the Sink interface, I checked 
ReducingUpsertSink<https://github.com/apache/flink/blob/release-1.14/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertSink.java>
 implementation. If I use the new Sink interface, for these 
committable/transaction related interfaces and classes, I think I can just 
create some dummy class like this one as a place holder and leave it empty like 
this example, this should effectively achieve AT_LEAST_ONCE, right? Thanks.

Best regards,
Fuyao


From: Yun Gao <yungao...@aliyun.com>
Date: Wednesday, February 16, 2022 at 00:54
To: Fuyao Li <fuyao...@oracle.com>, user <user@flink.apache.org>
Subject: Re: Re: [External] : Re: Use TwoPhaseCommitSinkFunction or 
StatefulSink+TwoPhaseCommittingSink to achieve EXACTLY_ONCE sink?
Hi Fuyao,

Very sorry for the late reply.

For the question 1, I think it would not cause data corruption: in Flink the 
checkpoint is achived via
inserting barriers into the stream of normal records, and the snapshot is taken 
in the same thread with
the record processing. Thus the snapshot of the operators would always at the 
boundary of the records.

For the question 3, if the outside system does not support transaction, there 
are might two other ways to
implement the exactly-once semantics:

1. If the record always has a key and the external systems support 
deduplication, then it might be possible to
use AT_LEAST_ONCE sinks and let the external system to deduplicate the records.
2. Another possible method to reduce the requirements on the external systems 
is to use WAL sinks: the record might
be first written into some external systems (like file system) as a kind of 
logs. Once a checkpoint succeed, we could
then write the records before this checkpoint into the external systems. It 
needs note that writting these records into the
external systems must also be retriable: the Flink jobs might still fail during 
writting and after restarted, the writting should
restarted exactly from the next record. This required the external system have 
some method to query the offset of the currently
written records.


For AT_LEASE_ONCE sink 
RichSinkFunction<https://urldefense.com/v3/__https:/github.com/apache/flink/blob/release-1.14/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.java__;!!ACWV5N9M2RV99hQ!diEOgJTtuymyjqSpWaJPIgIfBq7rmdNshNbUNNERhBh6gxIDkJUHbD-XBWt9Bsk$>
 should also works, but if possible we still recommend to use the new sink API~


Best,
Yun


------------------Original Mail ------------------
Sender:Fuyao Li <fuyao...@oracle.com>
Send Date:Tue Feb 15 08:26:32 2022
Recipients:Yun Gao <yungao...@aliyun.com>, user <user@flink.apache.org>
Subject:Re: [External] : Re: Use TwoPhaseCommitSinkFunction or 
StatefulSink+TwoPhaseCommittingSink to achieve EXACTLY_ONCE sink?
Hi Yun,

Please ignore my question 2. I think the Sink part is the decisive factor to 
ensure end to end exactly once.

If I want to implement a AT LEAST ONCE sink, which interface should I 
implement? 
MaybeRichSinkFunction<https://urldefense.com/v3/__https:/github.com/apache/flink/blob/release-1.14/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.java__;!!ACWV5N9M2RV99hQ!diEOgJTtuymyjqSpWaJPIgIfBq7rmdNshNbUNNERhBh6gxIDkJUHbD-XBWt9Bsk$>
 should be enough? Any suggestions on this?

For question 3, maybe I can add some deduplication code at consumer side to 
uptake the AT_LEASE_ONCE sink produced messages. If OSS doesn’t support exactly 
once semantic, it seems impossible for me to handle it at Flink code side.

Thanks,
Fuyao

From:Fuyao Li <fuyao...@oracle.com>
Date: Thursday, February 10, 2022 at 15:48
To: Yun Gao <yungao...@aliyun.com>, user <user@flink.apache.org>
Subject: Re: [External] : Re: Use TwoPhaseCommitSinkFunction or 
StatefulSink+TwoPhaseCommittingSink to achieve EXACTLY_ONCE sink?
Hello Yun,

Thanks for the quick response. This is really helpful.

I have confirmed with Oracle Streaming Service (OSS) that they currently don’t 
support EXACTLY_ONCE semantic, only AT_LEAST_ONCE semantic works. They suggest 
to add some deduplicate mechanisms at Sink to mitigate the issue.

Question 1:
So the scenario should looks like this:
When the flink application restarts after it fails, it will start from this 
checkpoint offset. The messages has been processed after the checkpoint before 
the failure will be processed twice here after the restart. Is there any chance 
of data corruption here, for example, breaking the window and sending out 
incomplete records? I am using some session windows based on DataStream event 
time timers.

Question 2:
For the 
KafkaSource<https://urldefense.com/v3/__https:/nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/*kafka-source__;Iw!!ACWV5N9M2RV99hQ!diEOgJTtuymyjqSpWaJPIgIfBq7rmdNshNbUNNERhBh6gxIDkJUHbD-XDo8FpLM$>,
 I noticed that we don’t have a place to configure the semantic? Maybe enabling 
the checkpoint with EXACTLY_ONCE should guarantee the source’s exactly once 
semantic here? Please correct me if I am wrong here.

Question 3:
To guarantee the end-to-end exactly once, I think we must make sure the sink is 
exactly once, right? Since OSS has such limitation,is it possible to achieve 
effective EXACTLY_ONCE semantic through additional logic at Flink side since I 
can’t do too much on OSS side? Or it is technically impossible?
If possible, I think I should implement 
theSink<https://urldefense.com/v3/__https:/github.com/apache/flink/blob/release-1.14/flink-core/src/main/java/org/apache/flink/api/connector/sink/Sink.java__;!!ACWV5N9M2RV99hQ!diEOgJTtuymyjqSpWaJPIgIfBq7rmdNshNbUNNERhBh6gxIDkJUHbD-XFpo5StE$>
 you mentioned.

Thank you very much for the help!
Fuyao


From:Yun Gao <yungao...@aliyun.com>
Date: Wednesday, February 9, 2022 at 23:17
To: Fuyao Li <fuyao...@oracle.com>, user <user@flink.apache.org>
Subject: [External] : Re: Use TwoPhaseCommitSinkFunction or 
StatefulSink+TwoPhaseCommittingSink to achieve EXACTLY_ONCE sink?
Hi Fuyao,

Logically if a system want to support end-to-end exactly once,
it should support transactions:
1. The transactions hold the records, it get pre-committed on snapshot state
and get committed on checkpont succeed.
2. The transaction should still be able to be aborted after pre-committed.
3. Once pre-committed, the transactions must be able to be committed, even if
the flink jobs fails between pre-committed and committed, after the job 
restarted
these transaction should be able to be committed again.

If the external system meet such conditions, to implement an exactly-once sink,
the option b) should be more recommend. However, these interface is newly added
in the upcoming 1.15 and it might need to be wait for about 1.5 month before 
releasing.

An early version for option b is the org.apache.flink.api.connector.sink.Sink. 
It is much
similar to the option b) and are supported since 1.13. It would still be 
supported in the
next several releases and  it also be able to be migrated to the option b) 
easily.

Best,
Yun


------------------Original Mail ------------------
Sender:Fuyao Li <fuyao...@oracle.com>
Send Date:Thu Feb 10 07:01:51 2022
Recipients:user <user@flink.apache.org>
Subject:Use TwoPhaseCommitSinkFunction or StatefulSink+TwoPhaseCommittingSink 
to achieve EXACTLY_ONCE sink?
Hello Community,


I have two questions regarding Flink custom sink with EXACTLY_ONCE semantic.


  1.  I have a SDK that could publish messages based on HTTP (backed by Oracle 
Streaming Service --- very similar to Kafka).  This will be my Flink 
application’s sink. Is it possible to use this SDK as sink with EXACTLY_ONCE 
semantic? HTTP is stateless here… If possible, what could be added in SDK to 
support EXACTLY_ONCE?
  2.  If it is possible for question 1, then I need to implement a custom sink 
for this. Which option should I use?

     *   Option 
1:TwoPhaseCommitSinkFunction<https://urldefense.com/v3/__https:/nightlies.apache.org/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.html__;!!ACWV5N9M2RV99hQ!ckD7MXv5zSgCh50yzgUgsG8AZAcjIVIFyjt7kyfvHveN21sjsr2gOqXjIAX36vY$>
     *   Option 
2:StatefulSink<https://urldefense.com/v3/__https:/github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSink.java__;!!ACWV5N9M2RV99hQ!ckD7MXv5zSgCh50yzgUgsG8AZAcjIVIFyjt7kyfvHveN21sjsr2gOqXj5cKfxA0$>
 + 
TwoPhaseCommittingSink<https://urldefense.com/v3/__https:/github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.java__;!!ACWV5N9M2RV99hQ!ckD7MXv5zSgCh50yzgUgsG8AZAcjIVIFyjt7kyfvHveN21sjsr2gOqXjA088CIg$>

The legacy FlinkKafkaProducer seems to be using option (a) ---- This will be 
removed from Flink in the future. The 
newKafkaSink<https://urldefense.com/v3/__https:/nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/*kafka-sink__;Iw!!ACWV5N9M2RV99hQ!ckD7MXv5zSgCh50yzgUgsG8AZAcjIVIFyjt7kyfvHveN21sjsr2gOqXj02BG7zk$>
 seems to be using option (b). Based on the comment in the code, it seems 
option (a) is recommended, which one should I use? Please suggest if I am 
missing anything, or any other better solutions in my case?


Thanks,
Fuyao





Reply via email to