Hi Gagan,

a typical solution to such a problem is to introduce an artifical key (enrichment id + some additional suffix), you can then keyBy on this artificial key and thus spread the workload more evenly. Of course you need to make sure that records of the second stream are duplicated to all operators with the same artificial key.

Depending on the frequency of the second stream, it might also worth to use a broadcast join that distributes the second stream to all operators such that all operators can perform the enrichment step in a round robin fashion.

Regards,
Timo

Am 07.01.19 um 14:45 schrieb Gagan Agrawal:
Flink Version is 1.7.
Thanks Zhijiang for your pointer. Initially I was checking only for few. However I just checked for all and found couple of them having queue length of 40+ which seems to be due to skewness in data. Is there any general guide lines on how to handle skewed data? In my case I am taking union and then keyBy (with custom stateful Process function) on enrichment id of 2 streams (1 enrichment stream with low volume and another regular data stream with high volume). I see that 30% of my data stream records have same enrichment Id and hence go to same tasks which results in skewness. Any pointers on how to handle skewness while doing keyBy would be of great help.

Gagan

On Mon, Jan 7, 2019 at 3:25 PM zhijiang <wangzhijiang...@aliyun.com <mailto:wangzhijiang...@aliyun.com>> wrote:

    Hi Gagan,

    What flink version do you use? And have you checked the
    buffers.inputQueueLength for all the related parallelism
    (connected with A) of B?  It may exist the scenario that only one
    parallelim B is full of inqueue buffers which back pressure A, and
    the input queue for other parallelism B is empty.

    Best,
    Zhijiang

        ------------------------------------------------------------------
        From:Gagan Agrawal <agrawalga...@gmail.com
        <mailto:agrawalga...@gmail.com>>
        Send Time:2019年1月7日(星期一) 12:06
        To:user <user@flink.apache.org <mailto:user@flink.apache.org>>
        Subject:Buffer stats when Back Pressure is high

        Hi,
        I want to understand does any of buffer stats help in
        debugging / validating that downstream operator is performing
        slow when Back Pressure is high? Say I have A -> B operators
        and A shows High Back Pressure which indicates something wrong
        or not performing well on B side which is slowing down
        operator A. However when I look at buffers.inputQueueLength
        for operator B, it's 0. My understanding is that when B is
        processing slow, it's input buffer will be full of incoming
        messages which ultimately blocks/slows down upstream operator
        A. However it doesn't seem to be happening in my case. Can
        someone throw some light on how should different stats around
        buffers (e.g buffers.inPoolUsage, buffers.inputQueueLength,
        numBuffersInLocalPerSecond, numBuffersInRemotePerSecond) look
        like when downstream operator is performing slow?

        Gagan



Reply via email to