Thanks for your quick response. I am using custom implementation of BoundedOutOfOrderenessTimestampExtractor and also tweaked to return initial watermark not a negative value.
One more observation that, when the job's parallelism is around 120, then it works well even with idle stream and Flink UI shows watermark. But when I increase the parallelism above 180 then with idle stream it doesn't write any file to S3. But the moment I remove idle stream then it works fine with any number of parallelism. I have also observed that when the parallelism is above 180, Flink UI never shows watermark although everything is working fine without idle stream. Regards, Ravi On Sun 4 Aug, 2019, 09:53 Rafi Aroch, <rafi.ar...@gmail.com> wrote: > Hi Ravi, > > This sounds related an issue where the watermark is not advancing. This > may happen when you have an idle source. An idle source would report a > Long.MIN_VALUE, therefore the overall min watermark across all consumer > subtask will never proceed. > > First, verify this is indeed the case by looking at the watermarks > reported. You can try to assign a custom watermark emitter logic as seen > here [1]. > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kinesis.html#event-time-for-consumed-records > > Thanks, > Rafi > > > On Sat, Aug 3, 2019 at 8:23 PM Ravi Bhushan Ratnakar < > ravibhushanratna...@gmail.com> wrote: > >> Hi All, >> >> I am designing a streaming pipeline using Flink 1.8.1, which consumes >> messages from Kinesis and apply some business logic on per key basis using >> KeyedProcessFunction and Checkpointing(HeapStateBackend). It is consuming >> messages around 7GB per minutes from multiple Kinesis streams. I am using >> only one Kinesis Source which is configured with multiple streams. >> >> The pipeline processes data and writes output to s3 as expected but I am >> experiencing a very weird issue when one of the stream is completely empty >> then it doesn't flush any file to s3 however it is consuming data from rest >> of the streams. When i remove only this empty stream and again submit the >> job then everything works fine and it writes output to s3. >> >> Regards, >> Ravi >> >