Hi Xilang,

Let me try to summarize your requirements.
If I understood you correctly, you are not only concerned about the
exactly-once guarantees but also need a consistent view of the data.
The data in all files that are finalized need to originate from a prefix of
the stream, i.e., all records written to the finalized files must have a
timestamp smaller or equal than T and there might not be a record with a
timestamp larger than T.

I think this can be achieved with the current implementation and event-time
processing. If the result of a job is emitted by a timer which is triggered
by watermarks, you will have the prefix property (even taking out-of-order
records into account!).
The reason is that watermarks and checkpoint barriers are shipped like
regular data records. By using timers, all computations are "synchonized"
by the watermarks which cannot be overtaken by checkpoint barriers.

Best, Fabian

2018-07-02 4:22 GMT+02:00 XilangYan <xilang....@gmail.com>:

> Thank you Minglei,
>
> I should describe my current flow and my requirement more clearly.
> 1. any data we collect have a send-time
> 2. when collect data, we also send another counter message, says we have
> collect 45 message whose send-time is 2018-07-02 10:02:00
> 3. data is sent to kafka(or other message system), and flink receives data
> from kafka and write to HDFS
> 4. when flink finished part of messages(neither .pending nor .inprogress,
> when "finish" it must be finished state that can be read by other system),
> we send another counter message, says we have processed 40 message whose
> send-time is  2018-07-02 10:02:00
>
> What i have did in flink is :
> 1. I add a config to BucktingSink, the config name is rolloverOnCheckpoint
> 2. I add another sink says CounterSink which counts message by send-time
> 2. in BucktingSink.snapshotState, if rolloverOnCheckpoint is set to true,
> I
> close current files and move them   to pending state
> 3. in CounterSink.snapshotState I prepare to send the special counter
> message
> 4. when checkpoint completed BucktingSink.notifyCheckpointComplete will
> move
> pending files to finish state, CounterSink.notifyCheckpointComplete  will
> send the special counter message
>
> So in our counter-system, when the processed-message-counter is equal to
> the
> received-message-counter, it meas ETL can continue their jobs.
>
> The jira you submitted is not exactly what I want, however it will be great
> if we can figure out a common solution to this requirement, although I
> think
> it is difficult unless, as you said, we add some assumption like watermark.
> On the other side, I think watermark may be able to archived by use the
> combination of inactiveBucketThreashold and batchRolloverInterval already.
>
>
>
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>

Reply via email to