Hi Piotrek,

Thank you for your detailed answer.

Yes, I want to generate the ack when all the records of the file are
written to DB.

So to understand what you are saying , we will receive a single EOF
watermark value at the ack operator when all the downstream operator
process all the records of the file. But what I understand regarding the
watermark is each parallel instance of the operator will emit the
watermark, so how do I ensure that the EOF is reached  or will I receive
only one watermark at the ack operator ?


So the pipeline topology will look like

DataStream  readFileStream = env.readFile()

readFileStream
                         .transform(// ContrinousFileReaderOperator)
                         .key(0)
                         .map(// encrichment)
                          .addSink(// DB)

 instead of add sink, should it be a  simple map operator which writes to
DB so that we can have a next ack operator which will generate the response.

Also, how do I get/access the Watermark value in the ack operator ? It will
be a simple  map operator, right ?





Regards,
Vinay Patil

On Thu, Jan 25, 2018 at 4:50 AM, Piotr Nowojski <pi...@data-artisans.com>
wrote:

> Hi,
>
> As you figured out, some dummy EOF record is one solution, however you
> might try to achieve it also by wrapping an existing CSV function. Your
> wrapper could emit this dummy EOF record. Another (probably better) idea is
> to use Watermark(Long.MAX_VALUE) for the EOF marker. Stream source and/or
> ContrinousFileReaderOperator will do that for you, so you would just need
> to handle the Watermark.
>
> The question is, do you need to perform the ACK operation AFTER all of the
> DB writes, or just after reading the CSV file? If the latter one, you could
> add some custom ACK operator with parallelism one just after the CSV source
> that waits for the EOF Watermark.
>
> If it is the first one (some kind of committing the DB writes), you would
> need to to wait until the EOF passes through all of your operators. You
> would need something like that:
>
> parallelism 1 for source -> default parallelism for keyBy/enrichment/db
> writes -> parallelism 1 for ACK operator on Watermark(Long.MAX_VALUE)
>
> I hope this helps,
> Piotrek
>
> On 24 Jan 2018, at 23:19, Vinay Patil <vinay18.pa...@gmail.com> wrote:
>
> Hi Guys,
>
> Following is how my pipeline looks (DataStream API) :
>
> [1] Read the data from the csv file
> [2] KeyBy it by some id
> [3] Do the enrichment and write it to DB
>
> [1] reads the data in sequence as it has single parallelism and then I
> have default parallelism for the other operators.
>
> I want to generate a response (ack) when all the data of the file is
> processed. How can I achieve this ?
>
> One solution I can think of is to have EOF dummy record in a file and a
> unique field for all the records in that file. Doing a keyBy on this field
> will make sure that all records are sent to a single slot. So, when EOF
> dummy records is read I can generate a response/ack.
>
> Is there a better way I can deal with this ?
>
>
> Regards,
> Vinay Patil
>
>
>

Reply via email to