Hi Mingliang,

let me answer your second question first:

> Another question is about the alignment buffer, I thought it was only
used for multiple input stream cases. But for keyed process function , what
is actually aligned?

When a task sends records to multiple downstream tasks (task not
operators!) due to a broadcast or partition/keyBy/shuffle connection, the
task broadcasts each checkpoint barrier to all of its receiving tasks.
Therefore, each task that receives records from multiple tasks will receive
multiple checkpoint barriers. (Checkpoint barriers behave similar to
watermarks in this regard)
In order to provide exactly-once state consistency, a task must buffer
records from input connection that forwarded a barrier until barriers from
all input connections have been received and the state checkpoint was
initiated.

What does this mean for the long checkpoint alignment that you observe?
Checkpoint alignment starts when the first barrier is received and ends
when the last barrier is received.
Hence, it seems as if one task manager receives some barrier(s) later than
the other nodes, probably because it is more heavily loaded.
The fact that all affected tasks run on the same TM and that you mentioned
backpressure is a hint for that because TMs multiplex the connection of all
tasks.

Regarding the memory configuration question, I am not sure if there is a
way to override the JVM heap size on YARN. Maybe others can answer this
question.

Best,
Fabian

2018-08-12 18:36 GMT+02:00 祁明良 <m...@xiaohongshu.com>:

> Hi all,
>
>
> I have several questions regarding the checkpoint. The background is I'm
> using a ProcessFunction keyed by user_id somehow works like following:
>
> inputStream
>   .keyBy(x => getUserKey(x))
>   .process(...)
>
> It runs on yarn with 40 TMs * 2 slots each, when I look at the checkpoint 
> metrics, only a small number of subtasks have a large "alignment 
> buffered/duration", and looks like either all the 2 slots on the same TM are 
> both high or both low.  What may probably cause this?
>
>
>    1. maybe data skew, but I see the amount of data is almost same
>    2. or network?
>    3. The system is under back pressure, but I don't understand why only like 
> 4 out of 80 subtasks perform like this.
>
> Another question is about the alignment buffer, I thought it was only used 
> for multiple input stream cases. But for keyed process function , what is 
> actually aligned?
>
> The last question is about tuning rocksdb, I try to assign some memory to 
> writebuffer and block cache, and the doc says "typically by decreasing the 
> JVM heap size of the TaskManagers by the same amount" , and taskmanager heap 
> size is "On YARN setups, this value is automatically configured to the size 
> of the TaskManager's YARN container, minus a certain tolerance value." This 
> looks like I should decrease the taskmanager heap and the value is set by 
> YARN automatically, so what should I do?
>
> Best,
>
> Mingliang
>
>
> 本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(
> 包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件!
> This communication may contain privileged or other confidential
> information of Red. If you have received it in error, please advise the
> sender by reply e-mail and immediately delete the message and any
> attachments without copying or disclosing the contents. Thank you.
>
>

Reply via email to