StreamingFileSink not committing file to S3

2019-08-05 Thread Ravi Bhushan Ratnakar
Thanks for your quick response. I am using custom implementation of
BoundedOutOfOrderenessTimestampExtractor and also tweaked to return initial
watermark not a negative value.

One more observation that,  when the job's parallelism is around 120, then
it works well even with idle stream and Flink UI shows watermark. But when
I increase the parallelism above 180 then with idle stream it doesn't write
any file to S3. But the moment I remove idle stream then it works fine with
any number of parallelism.

I have also observed that when the parallelism is above 180, Flink UI never
shows watermark although everything is working fine without idle stream.

Regards,
Ravi

On Sun 4 Aug, 2019, 09:53 Rafi Aroch,  wrote:

> Hi Ravi,
>
> This sounds related an issue where the watermark is not advancing. This
> may happen when you have an idle source. An idle source would report a
> Long.MIN_VALUE, therefore the overall min watermark across all consumer
> subtask will never proceed.
>
> First, verify this is indeed the case by looking at the watermarks
> reported. You can try to assign a custom watermark emitter logic as seen
> here [1].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kinesis.html#event-time-for-consumed-records
>
> Thanks,
> Rafi
>
>
> On Sat, Aug 3, 2019 at 8:23 PM Ravi Bhushan Ratnakar <
> ravibhushanratna...@gmail.com> wrote:
>
>> Hi All,
>>
>> I am designing a streaming pipeline using Flink 1.8.1, which consumes
>> messages from Kinesis and apply some business logic on per key basis using
>> KeyedProcessFunction and Checkpointing(HeapStateBackend). It is consuming
>> messages around 7GB per minutes from multiple Kinesis streams. I am using
>> only one Kinesis Source which is configured with multiple streams.
>>
>> The pipeline processes data and writes output to s3 as expected but I am
>> experiencing a very weird issue when one of the stream is completely empty
>> then it doesn't flush any file to s3 however it is consuming data from rest
>> of the streams. When i remove only this empty stream and again submit the
>> job then everything works fine and it writes output to s3.
>>
>> Regards,
>> Ravi
>>
>


Re: StreamingFileSink not committing file to S3

2019-08-05 Thread Theo Diefenthal
Hi Ravi, 

Please checkout [1] and [2]. That is related to Kafka but probably applies to 
Kinesis as well. If one stream is empty, there is no way for Flink to know 
about the watermark of that stream and Flink can't advance the watermark. 
Following downstream operators can thus not know if there will be any more data 
coming from the empty stream. (Think about a source which is just offline or 
has network issues for some time and once back online, will deliver all old 
data). This leads to Flink being unable to commit the final result up until 
there is some data coming in from the empty stream. 

Best regards 
Theo 

[1] [ 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
 | 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
 ] 
[2] https://issues.apache.org/jira/browse/FLINK-5479 


Von: "Ravi Bhushan Ratnakar"  
An: "user"  
Gesendet: Samstag, 3. August 2019 19:23:25 
Betreff: StreamingFileSink not committing file to S3 

Hi All, 
I am designing a streaming pipeline using Flink 1.8.1, which consumes messages 
from Kinesis and apply some business logic on per key basis using 
KeyedProcessFunction and Checkpointing(HeapStateBackend). It is consuming 
messages around 7GB per minutes from multiple Kinesis streams. I am using only 
one Kinesis Source which is configured with multiple streams. 

The pipeline processes data and writes output to s3 as expected but I am 
experiencing a very weird issue when one of the stream is completely empty then 
it doesn't flush any file to s3 however it is consuming data from rest of the 
streams. When i remove only this empty stream and again submit the job then 
everything works fine and it writes output to s3. 

Regards, 
Ravi 


StreamingFileSink not committing file to S3

2019-08-03 Thread Ravi Bhushan Ratnakar
Hi All,

I am designing a streaming pipeline using Flink 1.8.1, which consumes
messages from Kinesis and apply some business logic on per key basis using
KeyedProcessFunction and Checkpointing(HeapStateBackend). It is consuming
messages around 7GB per minutes from multiple Kinesis streams. I am using
only one Kinesis Source which is configured with multiple streams.

The pipeline processes data and writes output to s3 as expected but I am
experiencing a very weird issue when one of the stream is completely empty
then it doesn't flush any file to s3 however it is consuming data from rest
of the streams. When i remove only this empty stream and again submit the
job then everything works fine and it writes output to s3.

Regards,
Ravi