Hello Kartik, For your case, if events ingested/Second is 300/60=5 and payload size is 2kb , per second, ingestion size 5*2k=10kb. Network buffer size is 32kb by default. You can also decrease the value to 16k.
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/config/#taskmanager-memory-segment-size You can also read: https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/memory/network_mem_tuning/ -A On Mon, Apr 1, 2024 at 6:05 AM Kartik Kushwaha <kushwaha.kartik...@gmail.com> wrote: > Thank you. I will check and get back on both the sugesstions made by > Asimansu and Xuyang. > > I am using Flink 1.17.0 > > Regards, > Kartik > > On Mon, Apr 1, 2024, 5:13 AM Asimansu Bera <asimansu.b...@gmail.com> > wrote: > >> Hello Karthik, >> >> You may check the execution-buffer-timeout-interval parameter. This value >> is an important one for your case. I had a similar issue experienced in the >> past. >> >> >> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/config/#execution-buffer-timeout-interval >> >> For your case, not sure about your version of Flink, default value is >> 100ms - means every operator will have 100ms wait time to fill up the >> network buffer before flushing out the network buffer for data exchange to >> downstream operators. >> >> You can change this parameter to 0 ( or few ms ) so that data would be >> flushed for every (few) record(s). Then share your findings. >> >> >> -A >> >> >> >> >> >> >> On Sun, Mar 31, 2024 at 9:47 PM Xuyang <xyzhong...@163.com> wrote: >> >>> Hi, Kartik. >>> On flink ui, is there any operator that maintains a relatively high >>> *busy*? Could you also try using a flame graph to provide more >>> information?[1] >>> >>> [1] >>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/flame_graphs/ >>> >>> >>> -- >>> Best! >>> Xuyang >>> >>> >>> At 2024-03-30 18:07:23, "Kartik Kushwaha" <kushwaha.karti...@gmail.com> >>> wrote: >>> >>> Hello, >>> >>> I have a Streaming event processing job that looks like this. >>> >>> *Source - ProcessFn(3 in total) - Sink* >>> >>> I am seeing a delay of 50ms to 250ms between each operators (maybe >>> buffering or serde delays) leading to a slow end- to-end processing. What >>> could be the reason for such high latency? >>> >>> Some more details: >>> - Source operator is getting continuous events at a rate of 200 to 300 >>> events per minute through controlled tests. >>> - Using DataStream<POJO> between the operators. It has simple types and >>> the input payload got from source in byte[] format as fields. Right now the >>> size of the payload is in few kb's. >>> - Same events are also processed by another Flink job that looks >>> *source-processFn(mongoWriter)-sink. >>> *Here the end-to-end processing is less than 5ms. Similar Stream<pojo> >>> is being carried. >>> - The original(problematic) pipeline, has extraction, validation, >>> transformation processFn. But each of these steps get completed within >>> couple of ms. I am calculating the processing time inside these process >>> functions by *endTime - startTime* logic in the java code. So the >>> Average time of the event inside the operators is just 1ms. >>> - There is no back pressure shown in the flink ui on these operators. >>> - Input events are continously flowing from the source at a very high >>> rate without any delays. So waiting on the buffer can be ruled out. >>> >>> Regards, >>> Kartik >>> >>> >>> >>> >>> >>> >>>