[ 
https://issues.apache.org/jira/browse/FLINK-25256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17499879#comment-17499879
 ] 

Brian Zhou commented on FLINK-25256:
------------------------------------

Yes, this interface is now used in the both the legacy reader and our coming 
FLIP-27 reader(still in review now).

Let me explain the reason first.

In general, Pravega handles checkpoint in a different way with other common 
messaging system such as Kafka. It is a self-contained approach that does not 
require the extra external management for offsets. Pravega itself has an 
internal synchronizer to synchronize the read progress for each reader and 
returns a Checkpoint object on the reader output to have a consistent view of 
the read progress.

This is why we need such ExternallyInducedSource interface to integrate with 
Flink, because we need to control the time we trigger and pass the barrier to 
achieve the exactly once semantics on the source side.

We need to fix this issue as we are just starting integrating Flink 1.14.

> Savepoints do not work with ExternallyInducedSources
> ----------------------------------------------------
>
>                 Key: FLINK-25256
>                 URL: https://issues.apache.org/jira/browse/FLINK-25256
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.14.0, 1.13.3
>            Reporter: Dawid Wysakowicz
>            Priority: Major
>
> It is not possible to take a proper savepoint with 
> {{ExternallyInducedSource}} or {{ExternallyInducedSourceReader}} (both legacy 
> and FLIP-27 versions). The problem is that we're hardcoding 
> {{CheckpointOptions}} in the {{triggerHook}}.
> The outcome of current state is that operators would try to take checkpoints 
> in the checkpoint location whereas the {{CheckpointCoordinator}} will write 
> metadata for those states in the savepoint location.
> Moreover the situation gets even weirder (I have not checked it entirely), if 
> we have a mixture of {{ExternallyInducedSource(s)}} and regular sources. In 
> such a case the location and format at which the state of a particular task 
> is persisted depends on the order of barriers arrival. If a barrier from a 
> regular source arrives last the task takes a savepoint, on the other hand if 
> last barrier is from an externally induced source it will take a checkpoint.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to