I updated the FLIP, you can check out the changes here: https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158871522&selectedPageVersions=16&selectedPageVersions=15

There is still the open question of what IGNORE means for getProcessingTime().

Plus, I introduced a setting for ignoring Triggers because I think it otherwise doesn't work well with FAILing hard on processing-time API calls. I described it in the FLIP, so please have a look at the diff I linked above.

Aljoscha


On 08.09.20 11:35, Dawid Wysakowicz wrote:
The only one where I could see that users want different behaviour
BATCH jobs on the DataStream API. I agree that processing-time does
not make much sense in batch jobs. However, if users have written some
business logic using processing-time timers their jobs will silently
not work if we set the default to IGNORE. Setting it to FAIL would at
least make users aware that something is not right.
I see your point. I was also undecided myself which option to use here.
I went with IGNORE for the reason that I thought the common/the most
prominent functions should work just out of the box without much
additional tweaking. I found the case of "running the same program in
BATCH and STREAM" one of such cases and therefore optimized the options
for that case. That's why went with IGNORE instead of FAIL. Again I am
good with either of the two.

I can also see a small group of users wanting processing-time timers
for BATCH. We could, for example, fire all processing-time timers at
the "end of input", then we also set the watermark to +Inf.
I agree. I think this case would be covered with ENABLE + TRIGGER. I do
agree though it makes sense to mention this case explicitly as the
ENABLE option would behave slightly different in BATCH than in STREAM.
Maybe not strictly speaking different, but would be worth explaining
anyway. The way I heard from some people you can think of BATCH
processing happening instantaneously in processing time. Therefore there
can be no timers triggered in between records. In BATCH processing the
only time when timers can be triggered is at the end of input. Or at
least that is how I see it.

Another thing is: what should we do with new triggers that are set
after the end-of-input. If we have TRIGGER and users keep setting new
processing-time timers in the callback, would we continue firing them.
Or should the behaviour bee QUIESCE_AND_TRIGGER, where we work off
remaining timers but don't add new ones? Do we silently ignore adding
new ones?
My take on this issue is that it should be good enough to have the
QUIESCE_AND_TRIGGER behaviour with ignoring timers registered after the
end of input. We can not fail hard in such scenario, unless we expose a
flag saying the timer is after the end of input. Otherwise I can not see
a way to correctly safe guard for this scenario. I can see some use
cases that would benefit from allowing the timers registration, e.g.
periodically checking if some external process finished. In my opinion
this is a bit of a different topic, as it is actually an issue of
inverting the control when an operator can finish. Right now it is the
task that decides that the job/operator finishes at the end of input.

By the way, I assume WAIT means we wait for processing-time to
actually reach the time of pending timers? Or did you have something
else in mind with this?
Yes, that's what I meant. I actually took the options from this
issue[1], where there is some discussion on that topic as well.

Best,

Dawid

[1] https://issues.apache.org/jira/browse/FLINK-18647

On 08/09/2020 10:57, Aljoscha Krettek wrote:
I agree with almost all of your points!

The only one where I could see that users want different behaviour
BATCH jobs on the DataStream API. I agree that processing-time does
not make much sense in batch jobs. However, if users have written some
business logic using processing-time timers their jobs will silently
not work if we set the default to IGNORE. Setting it to FAIL would at
least make users aware that something is not right.

I can also see a small group of users wanting processing-time timers
for BATCH. We could, for example, fire all processing-time timers at
the "end of input", then we also set the watermark to +Inf.

Another thing is: what should we do with new triggers that are set
after the end-of-input. If we have TRIGGER and users keep setting new
processing-time timers in the callback, would we continue firing them.
Or should the behaviour bee QUIESCE_AND_TRIGGER, where we work off
remaining timers but don't add new ones? Do we silently ignore adding
new ones?

By the way, I assume WAIT means we wait for processing-time to
actually reach the time of pending timers? Or did you have something
else in mind with this?

Aljoscha

On 08.09.20 09:19, Dawid Wysakowicz wrote:
Hey Aljoscha

A couple of thoughts for the two remaining TODOs in the doc:

# Processing Time Support in BATCH/BOUNDED execution mode

I think there are two somewhat orthogonal problems around this topic:
      1. Firing processing timers at the end of the job
      2. Having processing timers in the BATCH mode
The way I see it there are three main use cases for different
combinations of the aforementioned dimensions:
      1. Regular streaming jobs: STREAM mode with UNBOUNDED sources
         - we do want to have processing timers
         - there is no end of the job
      2. Debugging/Testing streaming jobs: STREAM mode with BOUNDED
sources
         - we do want to have processing timers
         - we want to fire/wait for the timers at the end
      3. batch jobs with DataStream API:
         - we do **NOT** want to have processing timers either during
processing or at the end. We want to either fail-hard or ignore the
timers. Generally speaking, in BATCH mode the processing timers do not
make sense, therefore it would be better to fail-hard. It would be the
safest option, as some of the user logic might depend on the processing
timers. Failing hard would give the user opportunity to react to the
changed behaviour. On the other hand if we want to make it possible to
run exact same program both in STREAM and BATCH mode we must have an
option to simply ignore processing     timers.
         - we never want to actually trigger the timers. Neither during
runtime nor at the end

Having the above in mind, I am thinking if we should introduce two
separate options:
       * processing-time.timers = ENABLE/FAIL/IGNORE
       * processing-time.on-end-of-input = CANCEL/WAIT/TRIGGER
With the two options we can satisfy all the above cases. The default
settings would be:
STREAM:
        processing-time.timers = ENABLE
        processing-time.on-end-of-input = TRIGGER
BATCH:
       processing-time.timers = IGNORE
       processing-time.on-end-of-input = CANCEL

# Event time triggers
I do say that from the implementation perspective, but I find it hard to
actually ignore the event-time triggers. We would have to adjust the
implementation of WindowOperator to do that. At the same time I see no
problem with simply keeping them working. I am wondering if we
should/could just leave them as they are.

# Broadcast State
As far as I am concerned there are no core semantical problems with the
Broadcast State. As of now, it does not give any guarantees about the
order in which the broadcast and non-broadcast sides are executed even
in streaming. It also does not expose any mechanisms to implement an
event/processing-time alignments (you cannot register timers in the
broadcast side). I can't see any of the guarantees breaking in the BATCH
mode.
I do agree it would give somewhat nicer properties in BATCH if we
consumed the broadcast side first. It would make the operation
deterministic and let users implement a broadcast join properly on top
of this method. Nevertheless I see it as an extension of the DataStream
API for BATCH execution rather than making the DataStream API work for
BATCH.  Therefore I'd be fine with the leaving the Broadcast State out
of the FLIP

What do you think?

On 01/09/2020 13:46, Aljoscha Krettek wrote:
Hmm, it seems I left out the Dev ML in my mail. Looping that back in..


On 28.08.20 13:54, Dawid Wysakowicz wrote:
@Aljoscha Let me bring back to the ML some of the points we discussed
offline.

Ad. 1 Yes I agree it's not just about scheduling. It includes more
changes to the runtime. We might need to make it more prominent in the
write up.

Ad. 2 You have a good point here that switching the default value for
TimeCharacteristic to INGESTION time might not be the best option
as it
might hide problems if we assign ingestion time, which is rarely a
right
choice for user programs. Maybe we could just go with the
EVENT_TIME as
the default?

Ad. 4 That's a very good point! I do agree with you it would be better
to change the behaviour of said methods for batch-style execution.
Even
though it changes the behaviour, the overall logic is still correct.
Moreover I'd also recommend deprecating some of the relational-like
methods, which we should rather redirect to the Table API. I added a
section about it to the FLIP (mostly copying over your message).
Let me
know what you think about it.

Best,

Dawid

On 25/08/2020 11:39, Aljoscha Krettek wrote:
Thanks for creating this FLIP! I think the general direction is very
good but I think there are some specifics that we should also put in
there and that we may need to discuss here as well.

## About batch vs streaming scheduling

I think we shouldn't call it "scheduling", because the decision
between bounded and unbounded affects more than just scheduling. It
affects how we do network transfers and the semantics of time, among
other things. So maybe we should differentiate between batch-style
and
streaming-style execution, though I'm not sure I like those terms
either.

## About processing-time support in batch

It's not just about "batch" changing the default to ingestion time is
a change for stream processing as well. Actually, I don't know if
ingestion time even makes sense for batch processing. IIRC, with the
new sources we actually always have a timestamp, so this discussion
might be moot. Maybe Becket and/or Stephan (cc'ed) could chime in on
this.

Also, I think it's right that we currently ignore processing-time
timers at the end of input in streaming jobs, but this has been a
source of trouble for users. See [1] and several discussions on the
ML. I'm also cc'ing Flavio here who also ran into this problem. I
think we should solve this quickly after laying the foundations of
bounded processing on the DataStream API.

## About broadcast state support

I think as a low-hanging fruit we could just read the broadcast side
first and then switch to the regular input. We do need to be careful
with creating distributed deadlocks, though, so this might be
trickier
than it seems at first.

## Loose ends and weird semantics

There are some operations in the DataStream API that have semantics
that might make sense for stream processing but should behave
differently for batch. For example, KeyedStream.reduce() is
essentially a reduce on a GlobalWindow with a Trigger that fires on
every element. In DB terms it produces an UPSERT stream as an output,
if you get ten input elements for a key you also get ten output
records. For batch processing it might make more sense to instead
only
produce one output record per key with the result of the aggregation.
This would be correct for downstream consumers that expect an UPSERT
stream but it would change the actual physical output stream that
they
see.

There might be other such operations in the DataStream API that have
slightly weird behaviour that doesn't make much sense when you do
bounded processing.

Best,
Aljoscha

[1] https://issues.apache.org/jira/browse/FLINK-18647

On 24.08.20 11:29, Kostas Kloudas wrote:
Thanks a lot for the discussion!

I will open a voting thread shortly!

Kostas

On Mon, Aug 24, 2020 at 9:46 AM Kostas Kloudas <kklou...@apache.org>
wrote:

Hi Guowei,

Thanks for the insightful comment!

I agree that this can be a limitation of the current runtime, but I
think that this FLIP can go on as it discusses mainly the semantics
that the DataStream API will expose when applied on bounded data.
There will definitely be other FLIPs that will actually handle the
runtime-related topics.

But it is good to document them nevertheless so that we start soon
ironing out the remaining rough edges.

Cheers,
Kostas

On Mon, Aug 24, 2020 at 9:16 AM Guowei Ma <guowei....@gmail.com>
wrote:

Hi, Klou

Thanks for your proposal. It's a very good idea.
Just a little comment about the "Batch vs Streaming Scheduling".
In the AUTOMATIC execution mode maybe we could not pick BATCH
execution mode even if all sources are bounded. For example some
applications would use the `CheckpointListener`, which is not
available in the BATCH mode in current implementation.
So maybe we need more checks in the AUTOMATIC execution mode.

Best,
Guowei


On Thu, Aug 20, 2020 at 10:27 PM Kostas Kloudas
<kklou...@apache.org> wrote:

Hi all,

Thanks for the comments!

@Dawid: "execution.mode" can be a nice alternative and from a
quick
look it is not used currently by any configuration option. I will
update the FLIP accordingly.

@David: Given that having the option to allow timers to fire
at the
end of the job is already in the FLIP, I will leave it as is
and I
will update the default policy to be "ignore processing time
timers
set by the user". This will allow existing dataStream programs
to run
on bounded inputs. This update will affect point 2 in the
"Processing
Time Support in Batch" section.

If these changes cover your proposals, then I would like to
start a
voting thread tomorrow evening if this is ok with you.

Please let me know until then.

Kostas

On Tue, Aug 18, 2020 at 3:54 PM David Anderson
<da...@alpinegizmo.com> wrote:

Being able to optionally fire registered processing time timers
at the end of a job would be interesting, and would help in (at
least some of) the cases I have in mind. I don't have a better
idea.

David

On Mon, Aug 17, 2020 at 8:24 PM Kostas Kloudas
<kklou...@apache.org> wrote:

Hi Kurt and David,

Thanks a lot for the insightful feedback!

@Kurt: For the topic of checkpointing with Batch Scheduling, I
totally
agree with you that it requires a lot more work and careful
thinking
on the semantics. This FLIP was written under the assumption
that if
the user wants to have checkpoints on bounded input, he/she
will
have
to go with STREAMING as the scheduling mode. Checkpointing for
BATCH
can be handled as a separate topic in the future.

In the case of MIXED workloads and for this FLIP, the
scheduling
mode
should be set to STREAMING. That is why the AUTOMATIC option
sets
scheduling to BATCH only if all the sources are bounded. I am
not sure
what are the plans there at the scheduling level, as one could
imagine
in the future that in mixed workloads, we schedule first all
the
bounded subgraphs in BATCH mode and we allow only one UNBOUNDED
subgraph per application, which is going to be scheduled after
all
Bounded ones have finished. Essentially the bounded subgraphs
will be
used to bootstrap the unbounded one. But, I am not aware of any
plans
towards that direction.


@David: The processing time timer handling is a topic that has
also
been discussed in the community in the past, and I do not
remember any
final conclusion unfortunately.

In the current context and for bounded input, we chose to favor
reproducibility of the result, as this is expected in batch
processing
where the whole input is available in advance. This is why this
proposal suggests to not allow processing time timers. But I
understand your argument that the user may want to be able to
run the
same pipeline on batch and streaming this is why we added
the two
options under future work, namely (from the FLIP):

```
Future Work: In the future we may consider adding as options
the
capability of:
* firing all the registered processing time timers at the
end of
a job
(at close()) or,
* ignoring all the registered processing time timers at the end
of a job.
```

Conceptually, we are essentially saying that we assume that
batch
execution is assumed to be instantaneous and refers to a single
"point" in time and any processing-time timers for the future
may fire
at the end of execution or be ignored (but not throw an
exception). I
could also see ignoring the timers in batch as the default, if
this
makes more sense.

By the way, do you have any usecases in mind that will help us
better
shape our processing time timer handling?

Kostas

On Mon, Aug 17, 2020 at 2:52 PM David Anderson
<da...@alpinegizmo.com> wrote:

Kostas,

I'm pleased to see some concrete details in this FLIP.

I wonder if the current proposal goes far enough in the
direction of recognizing the need some users may have for
"batch" and "bounded streaming" to be treated differently. If
I've understood it correctly, the section on scheduling allows
me to choose STREAMING scheduling even if I have bounded
sources. I like that approach, because it recognizes that even
though I have bounded inputs, I don't necessarily want batch
processing semantics. I think it makes sense to extend this
idea to processing time support as well.

My thinking is that sometimes in development and testing it's
reasonable to run exactly the same job as in production,
except
with different sources and sinks. While it might be a
reasonable default, I'm not convinced that switching a
processing time streaming job to read from a bounded source
should always cause it to fail.

David

On Wed, Aug 12, 2020 at 5:22 PM Kostas Kloudas
<kklou...@apache.org> wrote:

Hi all,

As described in FLIP-131 [1], we are aiming at deprecating
the
DataSet
API in favour of the DataStream API and the Table API. After
this work
is done, the user will be able to write a program using the
DataStream
API and this will execute efficiently on both bounded and
unbounded
data. But before we reach this point, it is worth discussing
and
agreeing on the semantics of some operations as we transition
from the
streaming world to the batch one.

This thread and the associated FLIP [2] aim at discussing
these issues
as these topics are pretty important to users and can lead to
unpleasant surprises if we do not pay attention.

Let's have a healthy discussion here and I will be updating
the FLIP
accordingly.

Cheers,
Kostas

[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741


[2]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158871522









Reply via email to