On Fri, 1 Mar 2019 at 13:05, LINZ, Arnaud <al...@bouyguestelecom.fr> wrote:

> Hi,
>
>
>
> I think I should go into more details to explain my use case.
>
> I have one non parallel source (parallelism = 1) that list binary files in
> a HDFS directory. DataSet emitted by the source is a data set of file
> names, not file content. These filenames are rebalanced, and sent to
> workers (parallelism = 15) that will use a flatmapper that open the file,
> read it, decode it, and send records (forward mode) to the sinks (with a
> few 1-to-1 mapping in-between). So the flatmap operation is a
> time-consuming one as the files are more than 200Mb large each; the
> flatmapper will emit millions of record to the sink given one source record
> (filename).
>
>
>
> The rebalancing, occurring at the file name level, does not use much I/O
> and I cannot use one-to-one mode at that point if I want some parallelims
> since I have only one source.
>
>
>
> I did not put file decoding directly in the sources because I have no good
> way to distribute files to sources without a controller (input directory is
> unique, filenames are random and cannot be “attributed” to one particular
> source instance easily).
>

Crazy idea: If you know the task number and the number of tasks, you can
hash the filename using a shared algorithm (e.g. md5 or sha1 or crc32) and
then just check modulo number of tasks == task number.

That would let you run the list files in parallel without sharing state.
which would allow file decoding directly in the sources


> Alternatively, I could have used a dispatcher daemon separated from the
> streaming app that distribute files to various directories, each directory
> being associated with a flink source instance, and put the file reading &
> decoding directly in the source, but that seemed more complex to code and
> exploit than the filename source. Would it have been better from the
> checkpointing perspective?
>
>
>
> About the ungraceful source sleep(), is there a way, programmatically, to
> know the “load” of the app, or to determine if checkpointing takes too much
> time, so that I can do it only on purpose?
>
>
>
> Thanks,
>
> Arnaud
>
>
>
> *De :* zhijiang <wangzhijiang...@aliyun.com>
> *Envoyé :* vendredi 1 mars 2019 04:59
> *À :* user <user@flink.apache.org>; LINZ, Arnaud <al...@bouyguestelecom.fr
> >
> *Objet :* Re: Checkpoints and catch-up burst (heavy back pressure)
>
>
>
> Hi Arnaud,
>
>
>
> Thanks for the further feedbacks!
>
>
>
> For option1: 40min still does not makes sense, which indicates it might
> take more time to finish checkpoint in your case. I also experienced some
> scenarios of catching up data to take several hours to finish one
> checkpoint. If the current checkpoint expires because of timeout, the next
> new triggered checkpoint might still be failed for timeout. So it seems
> better to wait the current checkpoint until finishes, not expires it,
> unless we can not bear this long time for some reasons such as wondering
> failover to restore more data during this time.
>
>
>
> For option2: The default network setting should be make sense. The lower
> values might cause performance regression and the higher values would
> increase the inflighing buffers and checkpoint delay more seriously.
>
>
>
> For option3: If the resource is limited, it is still not working on your
> side.
>
>
>
> It is an option and might work in your case for sleeping some time in
> source as you mentioned, although it seems not a graceful way.
>
>
>
> I think there are no data skew in your case to cause backpressure, because
> you used the rebalance mode as mentioned. Another option might use the
> forward mode which would be better than rebalance mode if possible in your
> case. Because the source and downstream task is one-to-one in forward mode,
> so the total flighting buffers are 2+2+8 for one single downstream task
> before barrier. If in rebalance mode, the total flighting buffer would be
> (a*2+a*2+8) for one single downstream task (`a` is the parallelism of
> source vertex), because it is all-to-all connection. The barrier alignment
> takes more time in rebalance mode than forward mode.
>
>
>
> Best,
>
> Zhijiang
>
> ------------------------------------------------------------------
>
> From:LINZ, Arnaud <al...@bouyguestelecom.fr>
>
> Send Time:2019年3月1日(星期五) 00:46
>
> To:zhijiang <wangzhijiang...@aliyun.com>; user <user@flink.apache.org>
>
> Subject:RE: Checkpoints and catch-up burst (heavy back pressure)
>
>
>
> Update :
>
> Option  1 does not work. It still fails at the end of the timeout, no
> matter its value.
>
> Should I implement a “bandwidth” management system by using artificial
> Thread.sleep in the source depending on the back pressure ?
>
>
>
> *De :* LINZ, Arnaud
> *Envoyé :* jeudi 28 février 2019 15:47
> *À :* 'zhijiang' <wangzhijiang...@aliyun.com>; user <user@flink.apache.org
> >
> *Objet :* RE: Checkpoints and catch-up burst (heavy back pressure)
>
>
>
> Hi Zhihiang,
>
>
>
> Thanks for your feedback.
>
>    - I’ll try option 1 ; time out is 4min for now, I’ll switch it to
>    40min and will let you know. Setting it higher than 40 min does not make
>    much sense since after 40 min the pending output is already quite large.
>    - Option 3 won’t work ; I already take too many ressources, and as my
>    source is more or less a hdfs directory listing, it will always be far
>    faster than any mapper that reads the file and emits records based on its
>    content or sink that store the transformed data, unless I put “sleeps” in
>    it (but is this really a good idea?)
>    - Option 2: taskmanager.network.memory.buffers-per-channel and
>    taskmanager.network.memory.buffers-per-gate are currently unset in my
>    configuration (so to their default of 2 and 8), but for this streaming app
>    I have very few exchanges between nodes (just a rebalance after the source
>    that emit file names, everything else is local to the node). Should I
>    adjust their values nonetheless ? To higher or lower values ?
>
> Best,
>
> Arnaud
>
> *De :* zhijiang <wangzhijiang...@aliyun.com>
> *Envoyé :* jeudi 28 février 2019 10:58
> *À :* user <user@flink.apache.org>; LINZ, Arnaud <al...@bouyguestelecom.fr
> >
> *Objet :* Re: Checkpoints and catch-up burst (heavy back pressure)
>
>
>
> Hi Arnaud,
>
>
>
> I think there are two key points. First the checkpoint barrier might be
> emitted delay from source under high backpressure for synchronizing lock.
>
> Second the barrier has to be queued in flighting data buffers, so the
> downstream task has to process all the buffers before barriers to trigger
> checkpoint and this would take some time under back pressure.
>
>
>
> There has three ways to work around:
>
> 1. Increase the checkpoint timeout avoid expire in short time.
>
> 2. Decrease the setting of network buffers to decrease the amount of
> flighting buffers before barrier, you can check the config of
>  "taskmanager.network.memory.buffers-per-channel" and
> "taskmanager.network.memory.buffers-per-gate".
>
> 3. Adjust the parallelism such as increasing it for sink vertex in order
> to process source data faster, to avoid backpressure in some extent.
>
>
>
> You could check which way is suitable for your scenario and may have a try.
>
>
>
> Best,
>
> Zhijiang
>
> ------------------------------------------------------------------
>
> From:LINZ, Arnaud <al...@bouyguestelecom.fr>
>
> Send Time:2019年2月28日(星期四) 17:28
>
> To:user <user@flink.apache.org>
>
> Subject:Checkpoints and catch-up burst (heavy back pressure)
>
>
>
> Hello,
>
>
>
> I have a simple streaming app that get data from a source and store it to
> HDFS using a sink similar to the bucketing file sink. Checkpointing mode is
> “exactly once”.
>
> Everything is fine on a “normal” course as the sink is faster than the
> source; but when we stop the application for a while and then restart it,
> we have a catch-up burst to get all the messages emitted in the meanwhile.
>
> During this burst, the source is faster than the sink, and all checkpoints
> fail (time out) until the source has been totally caught up. This is
> annoying because the sink does not “commit” the data before a successful
> checkpoint is made, and so the app release all the “catch up” data as a
> atomic block that can be huge if the streaming app was stopped for a while,
> adding an unwanted stress to all the following hive treatments that use the
> data provided in micro batches and to the Hadoop cluster.
>
>
>
> How should I handle the situation? Is there something special to do to get
> checkpoints even during heavy load?
>
>
>
> The problem does not seem to be new, but I was unable to find any
> practical solution in the documentation.
>
>
>
> Best regards,
>
> Arnaud
>
>
>
>
>
>
>
>
>
>
> ------------------------------
>
>
> L'intégrité de ce message n'étant pas assurée sur internet, la société
> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces
> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si
> vous n'êtes pas destinataire de ce message, merci de le détruire et
> d'avertir l'expéditeur.
>
> The integrity of this message cannot be guaranteed on the Internet. The
> company that sent this message cannot therefore be held liable for its
> content nor attachments. Any unauthorized use or dissemination is
> prohibited. If you are not the intended recipient of this message, then
> please delete it and notify the sender.
>
>
>
>
>

Reply via email to