Hi Yun,

My job graph is: (A: 1) -(rebalance)-> (B: 32) -(hash)-> (C: 32). A lists
files, forwards to B as FileInputSlits. B parses those files and shuffles
the data records to C as keyed streams.
C is the slowest in the graph, A is the fastest.

I relied on the slf4j/logback logs to derive that conclusion. There's one
log entry for each context.collect() call of A, and there's one log entry
whenever B open a new FileInputSplits (B is Flink's
ContinuousFileReaderOperator).
My logback configuration is:             <pattern>%d{yyyy-MM-dd
HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} -
%msg%n</pattern>

The logs I got from A showed messages in order (by *dt *in my case).
However, the logs I got from B showed that messages' order was lost (please
refer to the logs below). I suppose that each logback %thread corresponding
exactly one B_i.

Thanks and regards,
Averell

























*2019-10-30 05:30:43.548 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2017-12-12/part-00119-2dd7fe37-5e1b-4bc7-8bc4-fc632b419ac02019-10-30
05:30:51.239 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2017-12-13/part-00001-3ee818c2-c543-4744-957b-7fd0391e01432019-10-30
05:31:06.537 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2017-12-13/part-00083-3ee818c2-c543-4744-957b-7fd0391e01432019-10-30
05:31:13.611 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2017-12-13/part-00159-3ee818c2-c543-4744-957b-7fd0391e01432019-10-30
05:31:20.826 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2017-12-14/part-00041-c4b2a37e-066d-4adb-b610-a714e7b45b8b2019-10-30
05:31:28.487 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2017-12-14/part-00121-c4b2a37e-066d-4adb-b610-a714e7b45b8b2019-10-30
05:31:35.806 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2017-12-15/part-00001-3830100b-611e-455d-b6f9-9bce78ca51392019-10-30
05:31:42.739 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2017-12-15/part-00081-3830100b-611e-455d-b6f9-9bce78ca51392019-10-30
05:31:49.861 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2018-01-01/part-00045-1dc6388b-b72c-4bcd-a337-35c371b583f62019-10-30
05:31:55.834 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2018-01-01/part-00130-1dc6388b-b72c-4bcd-a337-35c371b583f62019-10-30
05:32:02.097 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2017-12-15/part-00161-3830100b-611e-455d-b6f9-9bce78ca51392019-10-30
05:32:06.452 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2018-01-02/part-00000-4ef9a43f-d0de-412c-9a3f-01f990cee55f2019-10-30
05:32:11.379 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2018-01-02/part-00077-4ef9a43f-d0de-412c-9a3f-01f990cee55f2019-10-30
05:32:16.103 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2018-01-02/part-00147-4ef9a43f-d0de-412c-9a3f-01f990cee55f2019-10-30
05:32:21.025 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2017-12-16/part-00039-d12ed910-d58b-46b2-b607-784ebf1266d42019-10-30
05:32:25.758 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2018-01-03/part-00043-92a58007-0c35-479b-b9e5-6663fae4e71c2019-10-30
05:32:30.156 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2018-01-03/part-00123-92a58007-0c35-479b-b9e5-6663fae4e71c2019-10-30
05:32:34.169 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2017-12-16/part-00121-d12ed910-d58b-46b2-b607-784ebf1266d42019-10-30
05:32:39.462 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2018-01-04/part-00001-413d1982-21b8-4bfb-828e-8014c9dfdb162019-10-30
05:32:43.551 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2018-01-04/part-00085-413d1982-21b8-4bfb-828e-8014c9dfdb162019-10-30
05:32:48.100 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2018-01-04/part-00166-413d1982-21b8-4bfb-828e-8014c9dfdb162019-10-30
05:32:52.629 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2017-12-17/part-00001-491d8c85-7eb2-48c7-af06-501934f65a832019-10-30
05:32:57.834 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2018-01-05/part-00045-19080414-962a-455c-b342-fcf3e36f1cc52019-10-30
05:33:01.943 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2018-01-05/part-00113-19080414-962a-455c-b342-fcf3e36f1cc52019-10-30
05:33:06.871 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2017-12-17/part-00082-491d8c85-7eb2-48c7-af06-501934f65a83*

On Fri, Nov 1, 2019 at 1:32 PM Yun Gao <yungao...@aliyun.com> wrote:

>      Hi Averell,
>
>            If I understood right, the job graph is A (parallelism = 1) -->
> B (parallelism > 1), then I think the records sending into the subtask B_i
> should be the same as the order sending out from A. Therefore, could you
> also provide more details on the topology ? Is there only the two
> operators? And could you also provide how the message order is checked in
> B_i ?
>
>    Best,
>    Yun
>
> ------------------------------------------------------------------
> From:Averell <lvhu...@gmail.com>
> Send Time:2019 Oct. 31 (Thu.) 12:55
> To:user <user@flink.apache.org>
> Subject:Preserving (best effort) messages order between operators
>
> Hi,
>
> I have a source function with parallelism = 1, sending out records ordered
>
> by event-time. These records are then re-balanced to the next operator which
> has parallelism > 1. I observed that within each subtask of the 2nd
> operator, the order of the messages is not maintained. Is this behaviour
>
> expected? If it is, is there any way to avoid that? Or at least reduce that?
> I have high back-pressure on that 2nd operator as the one after that is
> slow. There is also high back-pressure on the 1st operator, which makes my
> problem more severe (the mentioned out-of-order is high). If I could
> throttle the 1st operator when back-pressure is high, then I could mitigate
> the mentioned problem. But I could not find any guide on doing that.
>
> Could you please help?
>
> Thanks.
> Averell
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>
>

Reply via email to