Hi Gagan,
a typical solution to such a problem is to introduce an artifical key
(enrichment id + some additional suffix), you can then keyBy on this
artificial key and thus spread the workload more evenly. Of course you
need to make sure that records of the second stream are duplicated to
all operators with the same artificial key.
Depending on the frequency of the second stream, it might also worth to
use a broadcast join that distributes the second stream to all operators
such that all operators can perform the enrichment step in a round robin
fashion.
Regards,
Timo
Am 07.01.19 um 14:45 schrieb Gagan Agrawal:
Flink Version is 1.7.
Thanks Zhijiang for your pointer. Initially I was checking only for
few. However I just checked for all and found couple of them having
queue length of 40+ which seems to be due to skewness in data. Is
there any general guide lines on how to handle skewed data? In my case
I am taking union and then keyBy (with custom stateful Process
function) on enrichment id of 2 streams (1 enrichment stream with low
volume and another regular data stream with high volume). I see that
30% of my data stream records have same enrichment Id and hence go to
same tasks which results in skewness. Any pointers on how to handle
skewness while doing keyBy would be of great help.
Gagan
On Mon, Jan 7, 2019 at 3:25 PM zhijiang <wangzhijiang...@aliyun.com
<mailto:wangzhijiang...@aliyun.com>> wrote:
Hi Gagan,
What flink version do you use? And have you checked the
buffers.inputQueueLength for all the related parallelism
(connected with A) of B? It may exist the scenario that only one
parallelim B is full of inqueue buffers which back pressure A, and
the input queue for other parallelism B is empty.
Best,
Zhijiang
------------------------------------------------------------------
From:Gagan Agrawal <agrawalga...@gmail.com
<mailto:agrawalga...@gmail.com>>
Send Time:2019年1月7日(星期一) 12:06
To:user <user@flink.apache.org <mailto:user@flink.apache.org>>
Subject:Buffer stats when Back Pressure is high
Hi,
I want to understand does any of buffer stats help in
debugging / validating that downstream operator is performing
slow when Back Pressure is high? Say I have A -> B operators
and A shows High Back Pressure which indicates something wrong
or not performing well on B side which is slowing down
operator A. However when I look at buffers.inputQueueLength
for operator B, it's 0. My understanding is that when B is
processing slow, it's input buffer will be full of incoming
messages which ultimately blocks/slows down upstream operator
A. However it doesn't seem to be happening in my case. Can
someone throw some light on how should different stats around
buffers (e.g buffers.inPoolUsage, buffers.inputQueueLength,
numBuffersInLocalPerSecond, numBuffersInRemotePerSecond) look
like when downstream operator is performing slow?
Gagan