In case of reading from input files, at the EOF event, readers will send 
Watermark(Long.MAX_VALUE) on all of the output edges and those watermarks will 
be propagated accordingly. So your ACK operator will get 
Watermark(Long.MAX_VALUE) only when it gets it from ALL of it’s input edges.

When reading from Kafka, you do not have an EOF event, so you it would not be 
possible to use this Watermark(Long.MAX_VALUE). In that case you would need to 
emit some dummy EOF record, containing some meta information like filename 
alongside with correctly set event time to a value greater then original even 
read from Kafka which contained the filename to process. You would have to pass 
this EOF dummy record to your EOF operator. There you you would need to create 
some kind of mapping 

fileName -> event time marking EOF

And each time you process EOF record, you add new entry to this mapping. Now 
whenever you process watermarks, you can check for which fileNames does this 
watermark guarantees that file has been processed completely.

However this is more complicated and you would have to handle thins like:
- cleaning up the mapping (avoiding OutOfMemory)
- making sure that watermarks are generated without unnecessary latencies (when 
reading from file, EOF immediately emits Watermark(Long.MAX_VALUE), which might 
not always be the case for Kafka: 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission>)

Piotrek

> On 30 Jan 2018, at 15:17, Vinay Patil <vinay18.pa...@gmail.com> wrote:
> 
> Yeh, so this is the current implementation.
> 
> One question regarding the Watermark, since watermark is chosen as minimum 
> value of all of input streams, only one input  stream will have watermark 
> value to LONG.MAX_VALUE which denotes the EOF processing whereas the other 
> streams will not have this value , is my understanding right ? So in this 
> case LONG.MAX_VALUE will always be a greater value than it's input streams. 
> Or the LONG.MAX_VALUE watermark will flow from each input stream ?
> 
> 
> I was thinking of directly reading from Kafka as source in Flink in order to 
> remove the middle layer of independent Kafka Consumer which is triggering 
> Flink job.
> 
> So, the pipeline will be 1. readFrom Kafka -> take the File location -> read 
> using FileReaderOperator
> 
> But in this case how do I determine for which File I have received the 
> LONG.MAX_VALUE, it will get complicated.
> 
> 
> 
> Regards,
> Vinay Patil
> 
> On Tue, Jan 30, 2018 at 1:57 AM, Piotr Nowojski <pi...@data-artisans.com 
> <mailto:pi...@data-artisans.com>> wrote:
> Thanks for the clarification :)
> 
> Since you have one Job per an ACK, you can just relay on 
> Watermark(Long.MAX_VALUE) to mark the end of the processing.
> 
> More complicated solution (compared to what I proposed before) would be 
> needed if you had one long living job (for example multiple weeks) and it 
> would need to produce multiple ACKs in different point of time.
> 
> Piotrek
> 
> 
>> On 29 Jan 2018, at 15:43, Vinay Patil <vinay18.pa...@gmail.com 
>> <mailto:vinay18.pa...@gmail.com>> wrote:
>> 
>> Sure, here is the complete design that we have :
>> 
>> File metadata (NFS location of file) is stored in kafka , we are having a 
>> Kafka Consumer (not flink one) which will read from each partition and 
>> trigger a Flink job on cluster. 
>> 
>> The Flink job will then read from a file and do the processing as I 
>> mentioned earlier.
>> 
>> The requirement here is we need to trigger a ACK if the validations for all 
>> the records in a file are successful.
>> 
>> P.S I know we are not using Kafka to its full potential and are just using 
>> it for storing metadata :) 
>> 
>> Regards,
>> Vinay Patil
>> 
>> On Thu, Jan 25, 2018 at 11:57 AM, Piotr Nowojski <pi...@data-artisans.com 
>> <mailto:pi...@data-artisans.com>> wrote:
>> Could you rephrase what is your concern? 
>> 
>> Thanks, Piotrek
>> 
>> 
>>> On 25 Jan 2018, at 18:54, Vinay Patil <vinay18.pa...@gmail.com 
>>> <mailto:vinay18.pa...@gmail.com>> wrote:
>>> 
>>> Hi,
>>> 
>>> No, to clarify I need to send the ack for each file when it gets processed 
>>> completely and there are multiple files that I am going to read from the 
>>> shared location.
>>> 
>>> Regards,
>>> Vinay Patil
>>> 
>>> On Thu, Jan 25, 2018 at 11:37 AM, Piotr Nowojski <pi...@data-artisans.com 
>>> <mailto:pi...@data-artisans.com>> wrote:
>>>> 
>>>> Yes, make sense. Just looked at the code of ContinousFileReaderOperator 
>>>> ,it does not emit any Watermark in the processWatermark function. It only 
>>>> does it in the close function , so we will get the max value when all 
>>>> records are read.
>>>> 
>>> 
>>> Yes.
>>> 
>>>> What if I am reading multiple files from a shared location, in that case I 
>>>> will have to override the processElement as well to generate a ack for 
>>>> that particular file, so the flatMap will simply write to DB  and emit the 
>>>> fileName .
>>>> 
>>> 
>>> Yes, as long as you are OK with performing all of the fileName ACKs at the 
>>> end (once all of the source files were processed), and not ASAP.
>>>  
>>>> In the processWatermark function of ACK operator, when the watermark is 
>>>> Long.MAX_VALUE I will collect the record , and the processElement function 
>>>> will then generate the ack based on some condition met , right ?
>>> 
>>> Other way around. In your ACK Operator processElement you would have to 
>>> collect all of the fileNames on the flink state (ListState?) and process 
>>> them in the ACK Operator processWatermark once it gets Long.MAX_VALUE.
>>> 
>>> Piotrek
>>>   
>>>> 
>>>> Regards,
>>>> Vinay Patil
>>>> 
>>>> On Thu, Jan 25, 2018 at 11:10 AM, Piotr Nowojski <pi...@data-artisans.com 
>>>> <mailto:pi...@data-artisans.com>> wrote:
>>>> Hi,
>>>> 
>>>> If an operator has multiple inputs, it’s watermark will be the minimum of 
>>>> all of the inputs. Thus your hypothetical “ACK Operator” will get 
>>>> Watermark(Long.MAX_VALUE) only when of the preceding operators report 
>>>> Watermark(Long.MAX_VALUE). 
>>>> 
>>>> Yes, instead of simply adding sink, you would have to use something like 
>>>> `flatMap`, that doesn’t emit anything, only passes the watermark (default 
>>>> implementation are doing exactly that).
>>>> 
>>>> To access watermark, you can use DataStream.transform function and pass 
>>>> your own implementation of an operator extending from 
>>>> AbstractStreamOperator. Probably you would only need to override 
>>>> processWatermark() method and there you could do the ACK operation once 
>>>> you get org.apache.flink.streaming.api.watermark.Watermark#MAX_WATERMARK.
>>>> 
>>>> Piotrek
>>>> 
>>>> 
>>>>> On 25 Jan 2018, at 17:56, Vinay Patil <vinay18.pa...@gmail.com 
>>>>> <mailto:vinay18.pa...@gmail.com>> wrote:
>>>>> 
>>>>> Hi Piotrek,
>>>>> 
>>>>> Thank you for your detailed answer.
>>>>> 
>>>>> Yes, I want to generate the ack when all the records of the file are 
>>>>> written to DB.
>>>>> 
>>>>> So to understand what you are saying , we will receive a single EOF 
>>>>> watermark value at the ack operator when all the downstream operator 
>>>>> process all the records of the file. But what I understand regarding the 
>>>>> watermark is each parallel instance of the operator will emit the 
>>>>> watermark, so how do I ensure that the EOF is reached  or will I receive 
>>>>> only one watermark at the ack operator ?
>>>>> 
>>>>> 
>>>>> So the pipeline topology will look like 
>>>>> 
>>>>> DataStream  readFileStream = env.readFile()
>>>>> 
>>>>> readFileStream
>>>>>                          .transform(// ContrinousFileReaderOperator)
>>>>>                          .key(0)
>>>>>                          .map(// encrichment)
>>>>>                           .addSink(// DB)
>>>>> 
>>>>>  instead of add sink, should it be a  simple map operator which writes to 
>>>>> DB so that we can have a next ack operator which will generate the 
>>>>> response.
>>>>> 
>>>>> Also, how do I get/access the Watermark value in the ack operator ? It 
>>>>> will be a simple  map operator, right ?
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> Regards,
>>>>> Vinay Patil
>>>>> 
>>>>> On Thu, Jan 25, 2018 at 4:50 AM, Piotr Nowojski <pi...@data-artisans.com 
>>>>> <mailto:pi...@data-artisans.com>> wrote:
>>>>> Hi,
>>>>> 
>>>>> As you figured out, some dummy EOF record is one solution, however you 
>>>>> might try to achieve it also by wrapping an existing CSV function. Your 
>>>>> wrapper could emit this dummy EOF record. Another (probably better) idea 
>>>>> is to use Watermark(Long.MAX_VALUE) for the EOF marker. Stream source 
>>>>> and/or ContrinousFileReaderOperator will do that for you, so you would 
>>>>> just need to handle the Watermark.
>>>>> 
>>>>> The question is, do you need to perform the ACK operation AFTER all of 
>>>>> the DB writes, or just after reading the CSV file? If the latter one, you 
>>>>> could add some custom ACK operator with parallelism one just after the 
>>>>> CSV source that waits for the EOF Watermark. 
>>>>> 
>>>>> If it is the first one (some kind of committing the DB writes), you would 
>>>>> need to to wait until the EOF passes through all of your operators. You 
>>>>> would need something like that:
>>>>> 
>>>>> parallelism 1 for source -> default parallelism for keyBy/enrichment/db 
>>>>> writes -> parallelism 1 for ACK operator on Watermark(Long.MAX_VALUE)
>>>>> 
>>>>> I hope this helps,
>>>>> Piotrek
>>>>> 
>>>>>> On 24 Jan 2018, at 23:19, Vinay Patil <vinay18.pa...@gmail.com 
>>>>>> <mailto:vinay18.pa...@gmail.com>> wrote:
>>>>>> 
>>>>>> Hi Guys,
>>>>>> 
>>>>>> Following is how my pipeline looks (DataStream API) :
>>>>>> 
>>>>>> [1] Read the data from the csv file
>>>>>> [2] KeyBy it by some id
>>>>>> [3] Do the enrichment and write it to DB
>>>>>> 
>>>>>> [1] reads the data in sequence as it has single parallelism and then I 
>>>>>> have default parallelism for the other operators.
>>>>>> 
>>>>>> I want to generate a response (ack) when all the data of the file is 
>>>>>> processed. How can I achieve this ?
>>>>>> 
>>>>>> One solution I can think of is to have EOF dummy record in a file and a 
>>>>>> unique field for all the records in that file. Doing a keyBy on this 
>>>>>> field will make sure that all records are sent to a single slot. So, 
>>>>>> when EOF  dummy records is read I can generate a response/ack.
>>>>>> 
>>>>>> Is there a better way I can deal with this ?
>>>>>> 
>>>>>> 
>>>>>> Regards,
>>>>>> Vinay Patil
>>>>> 
>>>>> 
>>>> 
>>>> 
>>> 
>>> 
>> 
>> 
> 
> 

Reply via email to