Re: Dump snapshot of big table in real time using StreamingFileSink

2019-01-24 Thread knur
Bump? 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Dump snapshot of big table in real time using StreamingFileSink

2019-01-18 Thread Cristian C
The output is a bunch of files in parquet format. The thing reading them
would be presto, so I can really tell it to ignore some rows but not
others. Not to mention that the files would keep piling making sql queries
super slow.

On Fri, Jan 18, 2019, 10:01 AM Jamie Grier  Sorry my earlier comment should read: "It would just read all the files in
> order and NOT worry about which data rows are in which files"
>
> On Fri, Jan 18, 2019 at 10:00 AM Jamie Grier  wrote:
>
>> Hmm..  I would have to look into the code for the StreamingFileSink more
>> closely to understand the concern but typically you should not be concerned
>> at all with *when* checkpoints happen.  They are meant to be a completely
>> asynchronous background process that has absolutely no bearing on
>> application semantics.  The output should be thought of as a stream rather
>> than a snapshot.
>>
>> Can you rework the downstream consumer of the output data such that you
>> don't have to worry about this?  It would just read all the files in order
>> and worry about which data rows are in which files.
>>
>> Anyway, maybe Kostas can add more since he wrote the StreamingFileSink
>> code.  I've cc'd him directly.
>>
>> -Jamie
>>
>>
>> On Fri, Jan 18, 2019 at 9:44 AM Cristian C 
>> wrote:
>>
>>> Well, the problem is that, conceptually, the way I'm trying to approach
>>> this is ok. But in practice, it has some edge cases.
>>>
>>> So back to my original premise: if you both, trigger and checkpoint
>>> happen around the same time, there is a chance that the streaming file sink
>>> rolls the bucket BEFORE it has received all the data. In other words, it
>>> would create incomplete snapshots of the table.
>>>
>>> Keep in mind that every snapshot is written to a different folder. And
>>> they are supposed to represent the state of the whole table at a point in
>>> time.
>>>
>>> On Fri, Jan 18, 2019, 8:26 AM Jamie Grier >>
 Oh sorry..  Logically, since the ContinuousProcessingTimeTrigger never
 PURGES but only FIRES what I said is semantically true.  The window
 contents are never cleared.

 What I missed is that in this case since you're using a function that
 incrementally reduces on the fly rather than processing all the data when
 it's triggered your state is always kept to one element per key.  Your'e
 correct but in general with non-incremental window functions the state
 would grow unbounded in this configuration.

 So it looks like your approach should work just fine.

 -Jamie



 On Thu, Jan 17, 2019 at 10:18 PM knur  wrote:

> Hello Jamie.
>
> Thanks for taking a look at this. So, yes, I want to write only the
> last
> data for each key every X minutes. In other words, I want a snapshot
> of the
> whole database every X minutes.
>
> >  The issue is that the window never get's PURGED so the data just
> > continues to accumulate in the window.  This will grow without bound.
>
> The window not being purged does not necessarily mean that the data
> will be
> accumulated indefinitely. How so? Well, Flink has two mechanisms to
> remove
> data from a window: triggering a FIRE/FIRE_AND_PURGE or using an
> evictor.
>
> The reduce function has an implicit evictor that automatically removes
> events from the window pane that are no longer needed. i.e. it keeps in
> state only the element that was reduced. Here is an example:
>
> env.socketTextStream("localhost", )
>   .keyBy { it.first().toString() }
>   .window(GlobalWindows.create())
>
>
> .trigger(ContinuousProcessingTimeTrigger.of(WindowTime.seconds(seconds)))
>   .reduce { left, right ->
> println("left: $left, right: $right")
> if (left.length > right.length) {
>   left
> } else {
>   right
> }
>   }
>   .printToErr()
>
> For your claim to hold true, every time the trigger fires one would
> expect
> to see ALL the elements by a key being printed over and over again in
> the
> reduce function. However, if you run a job similar to this one in your
> lang
> of choice, you will notice that the print statement is effectively
> called
> only once per event per key.
>
> In fact, not using purge is intentional. Because I want to hold every
> record
> (the last one by its primary key) of the database in state so that I
> can
> write a snapshot of the whole database.
>
> So for instance, let's say my table has two columns: id and time. And
> I have
> the following events:
>
> 1,January
> 2,February
> 1,March
>
> I want to write to S3 two records: "1,March", and "2,February".
>
> Now, let's say two more events come into the stream:
>
> 3,April
> 1,June
>
> Then I want to write to S3 

Re: Dump snapshot of big table in real time using StreamingFileSink

2019-01-18 Thread Jamie Grier
Hmm..  I would have to look into the code for the StreamingFileSink more
closely to understand the concern but typically you should not be concerned
at all with *when* checkpoints happen.  They are meant to be a completely
asynchronous background process that has absolutely no bearing on
application semantics.  The output should be thought of as a stream rather
than a snapshot.

Can you rework the downstream consumer of the output data such that you
don't have to worry about this?  It would just read all the files in order
and worry about which data rows are in which files.

Anyway, maybe Kostas can add more since he wrote the StreamingFileSink
code.  I've cc'd him directly.

-Jamie


On Fri, Jan 18, 2019 at 9:44 AM Cristian C  wrote:

> Well, the problem is that, conceptually, the way I'm trying to approach
> this is ok. But in practice, it has some edge cases.
>
> So back to my original premise: if you both, trigger and checkpoint happen
> around the same time, there is a chance that the streaming file sink rolls
> the bucket BEFORE it has received all the data. In other words, it would
> create incomplete snapshots of the table.
>
> Keep in mind that every snapshot is written to a different folder. And
> they are supposed to represent the state of the whole table at a point in
> time.
>
> On Fri, Jan 18, 2019, 8:26 AM Jamie Grier 
>> Oh sorry..  Logically, since the ContinuousProcessingTimeTrigger never
>> PURGES but only FIRES what I said is semantically true.  The window
>> contents are never cleared.
>>
>> What I missed is that in this case since you're using a function that
>> incrementally reduces on the fly rather than processing all the data when
>> it's triggered your state is always kept to one element per key.  Your'e
>> correct but in general with non-incremental window functions the state
>> would grow unbounded in this configuration.
>>
>> So it looks like your approach should work just fine.
>>
>> -Jamie
>>
>>
>>
>> On Thu, Jan 17, 2019 at 10:18 PM knur  wrote:
>>
>>> Hello Jamie.
>>>
>>> Thanks for taking a look at this. So, yes, I want to write only the last
>>> data for each key every X minutes. In other words, I want a snapshot of
>>> the
>>> whole database every X minutes.
>>>
>>> >  The issue is that the window never get's PURGED so the data just
>>> > continues to accumulate in the window.  This will grow without bound.
>>>
>>> The window not being purged does not necessarily mean that the data will
>>> be
>>> accumulated indefinitely. How so? Well, Flink has two mechanisms to
>>> remove
>>> data from a window: triggering a FIRE/FIRE_AND_PURGE or using an evictor.
>>>
>>> The reduce function has an implicit evictor that automatically removes
>>> events from the window pane that are no longer needed. i.e. it keeps in
>>> state only the element that was reduced. Here is an example:
>>>
>>> env.socketTextStream("localhost", )
>>>   .keyBy { it.first().toString() }
>>>   .window(GlobalWindows.create())
>>>
>>> .trigger(ContinuousProcessingTimeTrigger.of(WindowTime.seconds(seconds)))
>>>   .reduce { left, right ->
>>> println("left: $left, right: $right")
>>> if (left.length > right.length) {
>>>   left
>>> } else {
>>>   right
>>> }
>>>   }
>>>   .printToErr()
>>>
>>> For your claim to hold true, every time the trigger fires one would
>>> expect
>>> to see ALL the elements by a key being printed over and over again in the
>>> reduce function. However, if you run a job similar to this one in your
>>> lang
>>> of choice, you will notice that the print statement is effectively called
>>> only once per event per key.
>>>
>>> In fact, not using purge is intentional. Because I want to hold every
>>> record
>>> (the last one by its primary key) of the database in state so that I can
>>> write a snapshot of the whole database.
>>>
>>> So for instance, let's say my table has two columns: id and time. And I
>>> have
>>> the following events:
>>>
>>> 1,January
>>> 2,February
>>> 1,March
>>>
>>> I want to write to S3 two records: "1,March", and "2,February".
>>>
>>> Now, let's say two more events come into the stream:
>>>
>>> 3,April
>>> 1,June
>>>
>>> Then I want to write to S3 three records: "1,June", "2,February" and
>>> "3,April".
>>>
>>> In other words, I can't just purge the windows, because I would lose the
>>> record with id 2.
>>>
>>>
>>>
>>> --
>>> Sent from:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>
>>


Re: Dump snapshot of big table in real time using StreamingFileSink

2019-01-18 Thread Jamie Grier
Sorry my earlier comment should read: "It would just read all the files in
order and NOT worry about which data rows are in which files"

On Fri, Jan 18, 2019 at 10:00 AM Jamie Grier  wrote:

> Hmm..  I would have to look into the code for the StreamingFileSink more
> closely to understand the concern but typically you should not be concerned
> at all with *when* checkpoints happen.  They are meant to be a completely
> asynchronous background process that has absolutely no bearing on
> application semantics.  The output should be thought of as a stream rather
> than a snapshot.
>
> Can you rework the downstream consumer of the output data such that you
> don't have to worry about this?  It would just read all the files in order
> and worry about which data rows are in which files.
>
> Anyway, maybe Kostas can add more since he wrote the StreamingFileSink
> code.  I've cc'd him directly.
>
> -Jamie
>
>
> On Fri, Jan 18, 2019 at 9:44 AM Cristian C 
> wrote:
>
>> Well, the problem is that, conceptually, the way I'm trying to approach
>> this is ok. But in practice, it has some edge cases.
>>
>> So back to my original premise: if you both, trigger and checkpoint
>> happen around the same time, there is a chance that the streaming file sink
>> rolls the bucket BEFORE it has received all the data. In other words, it
>> would create incomplete snapshots of the table.
>>
>> Keep in mind that every snapshot is written to a different folder. And
>> they are supposed to represent the state of the whole table at a point in
>> time.
>>
>> On Fri, Jan 18, 2019, 8:26 AM Jamie Grier >
>>> Oh sorry..  Logically, since the ContinuousProcessingTimeTrigger never
>>> PURGES but only FIRES what I said is semantically true.  The window
>>> contents are never cleared.
>>>
>>> What I missed is that in this case since you're using a function that
>>> incrementally reduces on the fly rather than processing all the data when
>>> it's triggered your state is always kept to one element per key.  Your'e
>>> correct but in general with non-incremental window functions the state
>>> would grow unbounded in this configuration.
>>>
>>> So it looks like your approach should work just fine.
>>>
>>> -Jamie
>>>
>>>
>>>
>>> On Thu, Jan 17, 2019 at 10:18 PM knur  wrote:
>>>
 Hello Jamie.

 Thanks for taking a look at this. So, yes, I want to write only the last
 data for each key every X minutes. In other words, I want a snapshot of
 the
 whole database every X minutes.

 >  The issue is that the window never get's PURGED so the data just
 > continues to accumulate in the window.  This will grow without bound.

 The window not being purged does not necessarily mean that the data
 will be
 accumulated indefinitely. How so? Well, Flink has two mechanisms to
 remove
 data from a window: triggering a FIRE/FIRE_AND_PURGE or using an
 evictor.

 The reduce function has an implicit evictor that automatically removes
 events from the window pane that are no longer needed. i.e. it keeps in
 state only the element that was reduced. Here is an example:

 env.socketTextStream("localhost", )
   .keyBy { it.first().toString() }
   .window(GlobalWindows.create())


 .trigger(ContinuousProcessingTimeTrigger.of(WindowTime.seconds(seconds)))
   .reduce { left, right ->
 println("left: $left, right: $right")
 if (left.length > right.length) {
   left
 } else {
   right
 }
   }
   .printToErr()

 For your claim to hold true, every time the trigger fires one would
 expect
 to see ALL the elements by a key being printed over and over again in
 the
 reduce function. However, if you run a job similar to this one in your
 lang
 of choice, you will notice that the print statement is effectively
 called
 only once per event per key.

 In fact, not using purge is intentional. Because I want to hold every
 record
 (the last one by its primary key) of the database in state so that I can
 write a snapshot of the whole database.

 So for instance, let's say my table has two columns: id and time. And I
 have
 the following events:

 1,January
 2,February
 1,March

 I want to write to S3 two records: "1,March", and "2,February".

 Now, let's say two more events come into the stream:

 3,April
 1,June

 Then I want to write to S3 three records: "1,June", "2,February" and
 "3,April".

 In other words, I can't just purge the windows, because I would lose the
 record with id 2.



 --
 Sent from:
 http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

>>>


Re: Dump snapshot of big table in real time using StreamingFileSink

2019-01-18 Thread Cristian C
Well, the problem is that, conceptually, the way I'm trying to approach
this is ok. But in practice, it has some edge cases.

So back to my original premise: if you both, trigger and checkpoint happen
around the same time, there is a chance that the streaming file sink rolls
the bucket BEFORE it has received all the data. In other words, it would
create incomplete snapshots of the table.

Keep in mind that every snapshot is written to a different folder. And they
are supposed to represent the state of the whole table at a point in time.

On Fri, Jan 18, 2019, 8:26 AM Jamie Grier  Oh sorry..  Logically, since the ContinuousProcessingTimeTrigger never
> PURGES but only FIRES what I said is semantically true.  The window
> contents are never cleared.
>
> What I missed is that in this case since you're using a function that
> incrementally reduces on the fly rather than processing all the data when
> it's triggered your state is always kept to one element per key.  Your'e
> correct but in general with non-incremental window functions the state
> would grow unbounded in this configuration.
>
> So it looks like your approach should work just fine.
>
> -Jamie
>
>
>
> On Thu, Jan 17, 2019 at 10:18 PM knur  wrote:
>
>> Hello Jamie.
>>
>> Thanks for taking a look at this. So, yes, I want to write only the last
>> data for each key every X minutes. In other words, I want a snapshot of
>> the
>> whole database every X minutes.
>>
>> >  The issue is that the window never get's PURGED so the data just
>> > continues to accumulate in the window.  This will grow without bound.
>>
>> The window not being purged does not necessarily mean that the data will
>> be
>> accumulated indefinitely. How so? Well, Flink has two mechanisms to remove
>> data from a window: triggering a FIRE/FIRE_AND_PURGE or using an evictor.
>>
>> The reduce function has an implicit evictor that automatically removes
>> events from the window pane that are no longer needed. i.e. it keeps in
>> state only the element that was reduced. Here is an example:
>>
>> env.socketTextStream("localhost", )
>>   .keyBy { it.first().toString() }
>>   .window(GlobalWindows.create())
>>
>> .trigger(ContinuousProcessingTimeTrigger.of(WindowTime.seconds(seconds)))
>>   .reduce { left, right ->
>> println("left: $left, right: $right")
>> if (left.length > right.length) {
>>   left
>> } else {
>>   right
>> }
>>   }
>>   .printToErr()
>>
>> For your claim to hold true, every time the trigger fires one would expect
>> to see ALL the elements by a key being printed over and over again in the
>> reduce function. However, if you run a job similar to this one in your
>> lang
>> of choice, you will notice that the print statement is effectively called
>> only once per event per key.
>>
>> In fact, not using purge is intentional. Because I want to hold every
>> record
>> (the last one by its primary key) of the database in state so that I can
>> write a snapshot of the whole database.
>>
>> So for instance, let's say my table has two columns: id and time. And I
>> have
>> the following events:
>>
>> 1,January
>> 2,February
>> 1,March
>>
>> I want to write to S3 two records: "1,March", and "2,February".
>>
>> Now, let's say two more events come into the stream:
>>
>> 3,April
>> 1,June
>>
>> Then I want to write to S3 three records: "1,June", "2,February" and
>> "3,April".
>>
>> In other words, I can't just purge the windows, because I would lose the
>> record with id 2.
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>


Re: Dump snapshot of big table in real time using StreamingFileSink

2019-01-18 Thread Jamie Grier
Oh sorry..  Logically, since the ContinuousProcessingTimeTrigger never
PURGES but only FIRES what I said is semantically true.  The window
contents are never cleared.

What I missed is that in this case since you're using a function that
incrementally reduces on the fly rather than processing all the data when
it's triggered your state is always kept to one element per key.  Your'e
correct but in general with non-incremental window functions the state
would grow unbounded in this configuration.

So it looks like your approach should work just fine.

-Jamie



On Thu, Jan 17, 2019 at 10:18 PM knur  wrote:

> Hello Jamie.
>
> Thanks for taking a look at this. So, yes, I want to write only the last
> data for each key every X minutes. In other words, I want a snapshot of the
> whole database every X minutes.
>
> >  The issue is that the window never get's PURGED so the data just
> > continues to accumulate in the window.  This will grow without bound.
>
> The window not being purged does not necessarily mean that the data will be
> accumulated indefinitely. How so? Well, Flink has two mechanisms to remove
> data from a window: triggering a FIRE/FIRE_AND_PURGE or using an evictor.
>
> The reduce function has an implicit evictor that automatically removes
> events from the window pane that are no longer needed. i.e. it keeps in
> state only the element that was reduced. Here is an example:
>
> env.socketTextStream("localhost", )
>   .keyBy { it.first().toString() }
>   .window(GlobalWindows.create())
>
> .trigger(ContinuousProcessingTimeTrigger.of(WindowTime.seconds(seconds)))
>   .reduce { left, right ->
> println("left: $left, right: $right")
> if (left.length > right.length) {
>   left
> } else {
>   right
> }
>   }
>   .printToErr()
>
> For your claim to hold true, every time the trigger fires one would expect
> to see ALL the elements by a key being printed over and over again in the
> reduce function. However, if you run a job similar to this one in your lang
> of choice, you will notice that the print statement is effectively called
> only once per event per key.
>
> In fact, not using purge is intentional. Because I want to hold every
> record
> (the last one by its primary key) of the database in state so that I can
> write a snapshot of the whole database.
>
> So for instance, let's say my table has two columns: id and time. And I
> have
> the following events:
>
> 1,January
> 2,February
> 1,March
>
> I want to write to S3 two records: "1,March", and "2,February".
>
> Now, let's say two more events come into the stream:
>
> 3,April
> 1,June
>
> Then I want to write to S3 three records: "1,June", "2,February" and
> "3,April".
>
> In other words, I can't just purge the windows, because I would lose the
> record with id 2.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Dump snapshot of big table in real time using StreamingFileSink

2019-01-17 Thread knur
Hello Jamie.

Thanks for taking a look at this. So, yes, I want to write only the last
data for each key every X minutes. In other words, I want a snapshot of the
whole database every X minutes.

>  The issue is that the window never get's PURGED so the data just
> continues to accumulate in the window.  This will grow without bound.

The window not being purged does not necessarily mean that the data will be
accumulated indefinitely. How so? Well, Flink has two mechanisms to remove
data from a window: triggering a FIRE/FIRE_AND_PURGE or using an evictor.

The reduce function has an implicit evictor that automatically removes
events from the window pane that are no longer needed. i.e. it keeps in
state only the element that was reduced. Here is an example:

env.socketTextStream("localhost", )
  .keyBy { it.first().toString() }
  .window(GlobalWindows.create())
 
.trigger(ContinuousProcessingTimeTrigger.of(WindowTime.seconds(seconds)))
  .reduce { left, right ->
println("left: $left, right: $right")
if (left.length > right.length) {
  left
} else {
  right
}
  }
  .printToErr()

For your claim to hold true, every time the trigger fires one would expect
to see ALL the elements by a key being printed over and over again in the
reduce function. However, if you run a job similar to this one in your lang
of choice, you will notice that the print statement is effectively called
only once per event per key.

In fact, not using purge is intentional. Because I want to hold every record
(the last one by its primary key) of the database in state so that I can
write a snapshot of the whole database.

So for instance, let's say my table has two columns: id and time. And I have
the following events:

1,January
2,February
1,March

I want to write to S3 two records: "1,March", and "2,February".

Now, let's say two more events come into the stream:

3,April
1,June

Then I want to write to S3 three records: "1,June", "2,February" and
"3,April".

In other words, I can't just purge the windows, because I would lose the
record with id 2.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Dump snapshot of big table in real time using StreamingFileSink

2019-01-17 Thread Jamie Grier
If I'm understanding you correctly you're just trying to do some data
reduction so that you write data for each key once every five minutes
rather than for every CDC update..  Is that correct?  You also want to keep
the state for most recent key you've ever seen so you don't apply writes
out of order.

The code you've provided isn't quite right AFAICT.  The issue is that the
window never get's PURGED so the data just continues to accumulate in the
window.  This will grow without bound.

My advise would be to take a look at ProcessFunction and write one that
does exactly what you want rather than messing around with windows and
triggers for this use case.  It will be much simpler in the end.

-Jamie




On Thu, Jan 17, 2019 at 4:32 PM knur  wrote:

> Hello there.
>
> So we have some Postgres tables that are mutable, and we want to create a
> snapshot of them in S3 every X minutes. So we plan to use Debezium to send
> a
> CDC log of every row change into a Kafka topic, and then have Flink keep
> the
> latest state of each row to save that data into S3 subsequently.
>
> Our current job looks like this and works somehow well in most cases:
>
>// checkpoint interval is set to run every 10 minutes
>
> kafkaSource
>   .keyB { it.id }
>   .window(GlobalWindows.create())
>   .trigger(ContinuousProcessingTimeTrigger.of(WindowTime.minutes(5)))
>   .reduce { left, right ->
> if (left.timestamp() > right.timestamp()) {
>   left
> } else {
>   right
> }
>   }
>   .addSink(StreamingFileSink
> .forBulkFormat(Path(outputDir),
> ParquetAvroWriters.forGenericRecord(avroSchema))
>
>
> .withBucketAssigner(DateTimeBucketAssignerr("'date='-MM-dd/'hour='HH/'minute='mm"))
> .build())
>
> We use `GlobalWindows.create()` because we want to hold in Flink's state
> ALL
> the changes send into Kafka (the reduce function, according to the docs,
> will make sure to evict all events except the last one).
>
> This works, but we know there could be some edge cases. For instance, if
> the
> trigger fires around the same time that a checkpoint, we could get into a
> position where StreamingFileSink rolls an incomplete set of all the events
> triggered.
>
> So a couple of questions:
>
> 1. Is there a way to mark the events with the timestamp of the trigger that
> fired them?
> 2. Is the approach we took fine? (keep in mind that we will deal with giant
> tables, so a batch job that queries them every N seconds is not an option).
> 3. Do you foresee any other edge cases?
>
> Thanks for taking a look at this.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Dump snapshot of big table in real time using StreamingFileSink

2019-01-17 Thread knur
Hello there.

So we have some Postgres tables that are mutable, and we want to create a
snapshot of them in S3 every X minutes. So we plan to use Debezium to send a
CDC log of every row change into a Kafka topic, and then have Flink keep the
latest state of each row to save that data into S3 subsequently.

Our current job looks like this and works somehow well in most cases:

   // checkpoint interval is set to run every 10 minutes

kafkaSource
  .keyB { it.id }
  .window(GlobalWindows.create())
  .trigger(ContinuousProcessingTimeTrigger.of(WindowTime.minutes(5)))
  .reduce { left, right ->
if (left.timestamp() > right.timestamp()) {
  left
} else {
  right
}
  }
  .addSink(StreamingFileSink
.forBulkFormat(Path(outputDir),
ParquetAvroWriters.forGenericRecord(avroSchema))
   
.withBucketAssigner(DateTimeBucketAssignerr("'date='-MM-dd/'hour='HH/'minute='mm"))
.build())

We use `GlobalWindows.create()` because we want to hold in Flink's state ALL
the changes send into Kafka (the reduce function, according to the docs,
will make sure to evict all events except the last one).

This works, but we know there could be some edge cases. For instance, if the
trigger fires around the same time that a checkpoint, we could get into a
position where StreamingFileSink rolls an incomplete set of all the events
triggered.

So a couple of questions:

1. Is there a way to mark the events with the timestamp of the trigger that
fired them?
2. Is the approach we took fine? (keep in mind that we will deal with giant
tables, so a batch job that queries them every N seconds is not an option).
3. Do you foresee any other edge cases?

Thanks for taking a look at this.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/