Hi Piotrek, Thank you for your detailed answer.
Yes, I want to generate the ack when all the records of the file are written to DB. So to understand what you are saying , we will receive a single EOF watermark value at the ack operator when all the downstream operator process all the records of the file. But what I understand regarding the watermark is each parallel instance of the operator will emit the watermark, so how do I ensure that the EOF is reached or will I receive only one watermark at the ack operator ? So the pipeline topology will look like DataStream readFileStream = env.readFile() readFileStream .transform(// ContrinousFileReaderOperator) .key(0) .map(// encrichment) .addSink(// DB) instead of add sink, should it be a simple map operator which writes to DB so that we can have a next ack operator which will generate the response. Also, how do I get/access the Watermark value in the ack operator ? It will be a simple map operator, right ? Regards, Vinay Patil On Thu, Jan 25, 2018 at 4:50 AM, Piotr Nowojski <pi...@data-artisans.com> wrote: > Hi, > > As you figured out, some dummy EOF record is one solution, however you > might try to achieve it also by wrapping an existing CSV function. Your > wrapper could emit this dummy EOF record. Another (probably better) idea is > to use Watermark(Long.MAX_VALUE) for the EOF marker. Stream source and/or > ContrinousFileReaderOperator will do that for you, so you would just need > to handle the Watermark. > > The question is, do you need to perform the ACK operation AFTER all of the > DB writes, or just after reading the CSV file? If the latter one, you could > add some custom ACK operator with parallelism one just after the CSV source > that waits for the EOF Watermark. > > If it is the first one (some kind of committing the DB writes), you would > need to to wait until the EOF passes through all of your operators. You > would need something like that: > > parallelism 1 for source -> default parallelism for keyBy/enrichment/db > writes -> parallelism 1 for ACK operator on Watermark(Long.MAX_VALUE) > > I hope this helps, > Piotrek > > On 24 Jan 2018, at 23:19, Vinay Patil <vinay18.pa...@gmail.com> wrote: > > Hi Guys, > > Following is how my pipeline looks (DataStream API) : > > [1] Read the data from the csv file > [2] KeyBy it by some id > [3] Do the enrichment and write it to DB > > [1] reads the data in sequence as it has single parallelism and then I > have default parallelism for the other operators. > > I want to generate a response (ack) when all the data of the file is > processed. How can I achieve this ? > > One solution I can think of is to have EOF dummy record in a file and a > unique field for all the records in that file. Doing a keyBy on this field > will make sure that all records are sent to a single slot. So, when EOF > dummy records is read I can generate a response/ack. > > Is there a better way I can deal with this ? > > > Regards, > Vinay Patil > > >