Hi,

If an operator has multiple inputs, it’s watermark will be the minimum of all 
of the inputs. Thus your hypothetical “ACK Operator” will get 
Watermark(Long.MAX_VALUE) only when of the preceding operators report 
Watermark(Long.MAX_VALUE). 

Yes, instead of simply adding sink, you would have to use something like 
`flatMap`, that doesn’t emit anything, only passes the watermark (default 
implementation are doing exactly that).

To access watermark, you can use DataStream.transform function and pass your 
own implementation of an operator extending from AbstractStreamOperator. 
Probably you would only need to override processWatermark() method and there 
you could do the ACK operation once you get 
org.apache.flink.streaming.api.watermark.Watermark#MAX_WATERMARK.

Piotrek

> On 25 Jan 2018, at 17:56, Vinay Patil <vinay18.pa...@gmail.com> wrote:
> 
> Hi Piotrek,
> 
> Thank you for your detailed answer.
> 
> Yes, I want to generate the ack when all the records of the file are written 
> to DB.
> 
> So to understand what you are saying , we will receive a single EOF watermark 
> value at the ack operator when all the downstream operator process all the 
> records of the file. But what I understand regarding the watermark is each 
> parallel instance of the operator will emit the watermark, so how do I ensure 
> that the EOF is reached  or will I receive only one watermark at the ack 
> operator ?
> 
> 
> So the pipeline topology will look like 
> 
> DataStream  readFileStream = env.readFile()
> 
> readFileStream
>                          .transform(// ContrinousFileReaderOperator)
>                          .key(0)
>                          .map(// encrichment)
>                           .addSink(// DB)
> 
>  instead of add sink, should it be a  simple map operator which writes to DB 
> so that we can have a next ack operator which will generate the response.
> 
> Also, how do I get/access the Watermark value in the ack operator ? It will 
> be a simple  map operator, right ?
> 
> 
> 
> 
> 
> Regards,
> Vinay Patil
> 
> On Thu, Jan 25, 2018 at 4:50 AM, Piotr Nowojski <pi...@data-artisans.com 
> <mailto:pi...@data-artisans.com>> wrote:
> Hi,
> 
> As you figured out, some dummy EOF record is one solution, however you might 
> try to achieve it also by wrapping an existing CSV function. Your wrapper 
> could emit this dummy EOF record. Another (probably better) idea is to use 
> Watermark(Long.MAX_VALUE) for the EOF marker. Stream source and/or 
> ContrinousFileReaderOperator will do that for you, so you would just need to 
> handle the Watermark.
> 
> The question is, do you need to perform the ACK operation AFTER all of the DB 
> writes, or just after reading the CSV file? If the latter one, you could add 
> some custom ACK operator with parallelism one just after the CSV source that 
> waits for the EOF Watermark. 
> 
> If it is the first one (some kind of committing the DB writes), you would 
> need to to wait until the EOF passes through all of your operators. You would 
> need something like that:
> 
> parallelism 1 for source -> default parallelism for keyBy/enrichment/db 
> writes -> parallelism 1 for ACK operator on Watermark(Long.MAX_VALUE)
> 
> I hope this helps,
> Piotrek
> 
>> On 24 Jan 2018, at 23:19, Vinay Patil <vinay18.pa...@gmail.com 
>> <mailto:vinay18.pa...@gmail.com>> wrote:
>> 
>> Hi Guys,
>> 
>> Following is how my pipeline looks (DataStream API) :
>> 
>> [1] Read the data from the csv file
>> [2] KeyBy it by some id
>> [3] Do the enrichment and write it to DB
>> 
>> [1] reads the data in sequence as it has single parallelism and then I have 
>> default parallelism for the other operators.
>> 
>> I want to generate a response (ack) when all the data of the file is 
>> processed. How can I achieve this ?
>> 
>> One solution I can think of is to have EOF dummy record in a file and a 
>> unique field for all the records in that file. Doing a keyBy on this field 
>> will make sure that all records are sent to a single slot. So, when EOF  
>> dummy records is read I can generate a response/ack.
>> 
>> Is there a better way I can deal with this ?
>> 
>> 
>> Regards,
>> Vinay Patil
> 
> 

Reply via email to