Hi Kostas,

Sorry for jumping in on this discussion :)

What you suggest for finite sources and waiting for checkpoints is pretty
ugly in many cases. Especially if you would otherwise read from a finite
source (a file for instance) and want to end the job asap.

Would it make sense to not discard and delete the buckets, just keep the
files? They could be marked somewhat differently from pending files but at
least they would still be accessible for users in these cases.

Gyula

Kostas Kloudas <k.klou...@data-artisans.com> ezt írta (időpont: 2018. aug.
22., Sze, 10:36):

> Hi Benoit,
>
> Thanks for using the StreamingFileSink. My answers/explanations are
> inlined.
> In most of your observations, you are correct.
>
> On Aug 21, 2018, at 11:45 PM, Benoit MERIAUX <bmeri...@octo.com> wrote:
>
> Hi,
>
> I have some questions about the new StreamingFileSink in 1.6.
>
> My usecase is pretty simple.
> I have a cassandra table with 150Millions of lines.
> They are partitioned by buckets of 100 000 lines.
>
> My job is to export each "bucket" to a file (1 bucket = 1 file), so the
> job is degined like this:
>
> The source get the bucketList
> then a flatmap task, fetch the lines matching the bucket and map all the
> 100 000 lines from cassandra to the collector
> then a streamingFileSink write each line into a file by bucket (RowFormat).
>
> The checkpointing is enabled, each 10s
> The rollingPolicy is OnCheckpointRollingPolicy, and the bucketAssigner is
> implemented by bucketId (my bucketId not the sink's one :).
>
> My problem is at the end of the job, i only have in-progress.part files
> for each bucket.
>
> I do not understand how i can trigger the finalization of the sink and
> have the bucket part files committed.
>
> So I read the code of the StreamingFileSink and the Bucket classes.
>
> If i have well understood, the in-progress bucket files can be closed then
> committed (closePartFile method of the Bucket) and move to "pending state"
> following the rollingPolicy to wait for a checkpoint to be moved to
> "finished" state.
>
>
> 1) The in-progress files are closed based on the rolling policy and they
> are put in pending state,
> 2) they are staged for commit when the “next” checkpoint arrives after
> they are put to pending state, and
> 3) they are published when that checkpoint is declared successful.
>
> So the rollingPolicy can roll part files on each line, on each
> BucketInterval or on each checkpoint.
> In my case with the OnCheckpointRollingPolicy, it is only on each
> checkpoint. Am I right ?
>
>
> The OnCheckpointRollingPolicy rolls the in-progress file when it receives
> a checkpoint, and publishes the file when the checkpoint is complete.
>
> But in general, you can specify your own rolling policy when using row
> formats.
> There, you can specify to roll:
> 1) based on an inactivity interval or interval since the in-progress file
> was created
> 2) based on size of the file
> 3) on every checkpoint
> 4) a combination of the above
>
> I would recommend to check the DefaultRollingPolicy for an example.
>
> And when a checkpoint is successful all the pending file are moved to
> "finished" state and are exploitable by another jobs.
>
> then this is where I start losing myself.
>
> indeed one thing suprised me in the code of the close method of the
> StreamingFileSink.
> It discards
> <https://github.com/apache/flink/blob/852502b7d51f91c6f9c3479424516d0b9ae255e5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java#L293>
>  all
> active buckets since the last successful checkpoint!
> but at the end of the successful job, no checkpoint is triggered
> automatically if the minimal interval since the last checkpoint is not
> expired. so what happen to data written since the last checkpoint ?
> (-> Is this sink only for endless Stream ?)
>
>
> You are correct here that this is an undesired behavior for jobs with
> finite input.
>
> The reason is that due to legacy, the close() method is called for both
> normal (i.e. successful) and abnormal (i.e. failure) termination of the job.
> Given this, we could not simply declare the pending files as valid because
> in case of a failure, we would have inconsistencies.
> So we went for the conservative approach which simply discards the files.
>
> How do i do to get all my file with all my data in "finished" state when
> my job is finished with success ?
> Do I need to trigger a checkpoint manually?
> Is there a better fitting sink for my usecase ?
> Should i use a another rollingPolicy ? even with the bucket interval there
> still is a window between the interval and the end of the job during which
> some part files are not closed and committed.
>
>
> What you can do is use the default policy and specify an inactivity
> interval, this will make sure that in-progress files are closed and then
> you wait for
> some checkpoints to come and succeed, before canceling the job. I know and
> I agree that this is not the most elegant solution, but until the root
> problem is fixed, I think that this is the best solution.
>
> I hope this helps,
> Kostas
>
>
> Even in case of an endless stream, I suggest to improve the behavior of
> the close method by calling closePartFile on each active bucket so all
> valid data since last checkpoint can be committed to pending state waiting
> for the checkpoint a the end of the job.
> it seems to be the case of the BucketingSink
> <https://github.com/apache/flink/blob/852502b7d51f91c6f9c3479424516d0b9ae255e5/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L437>
> -> I can do a PR for this
>
>
> I'm open to all suggestions :)
>
> regards,
>
> --
>   <http://www.octo.com/>
> Benoit  *MERIAUX*
> *OCTO Technology*
> Consultant Confirmé - Tribu NAD
> .................................................
> 34, Avenue de l'Opéra
> <https://maps.google.com/?q=34,+Avenue+de+l'Op%C3%A9ra+75002+Paris&entry=gmail&source=g>
> 75002 Paris
> <https://maps.google.com/?q=34,+Avenue+de+l'Op%C3%A9ra+75002+Paris&entry=gmail&source=g>
> +33 (0)786 59 12 30 <%2F%2F%2B33786591230>
> www.octo.com  - blog.octo.com
> *www.usievents.com <http://www.usievents.com/>*
> @USIEvents <https://twitter.com/USIEvents>  - @OCTOTechnology
> <https://twitter.com/OCTOTechnology>
> .................................................
>
>
>

Reply via email to