@Aljoscha Let me bring back to the ML some of the points we discussed
offline.

Ad. 1 Yes I agree it's not just about scheduling. It includes more
changes to the runtime. We might need to make it more prominent in the
write up.

Ad. 2 You have a good point here that switching the default value for
TimeCharacteristic to INGESTION time might not be the best option as it
might hide problems if we assign ingestion time, which is rarely a right
choice for user programs. Maybe we could just go with the EVENT_TIME as
the default?

Ad. 4 That's a very good point! I do agree with you it would be better
to change the behaviour of said methods for batch-style execution. Even
though it changes the behaviour, the overall logic is still correct.
Moreover I'd also recommend deprecating some of the relational-like
methods, which we should rather redirect to the Table API. I added a
section about it to the FLIP (mostly copying over your message). Let me
know what you think about it.

Best,

Dawid

On 25/08/2020 11:39, Aljoscha Krettek wrote:
> Thanks for creating this FLIP! I think the general direction is very
> good but I think there are some specifics that we should also put in
> there and that we may need to discuss here as well.
>
> ## About batch vs streaming scheduling
>
> I think we shouldn't call it "scheduling", because the decision
> between bounded and unbounded affects more than just scheduling. It
> affects how we do network transfers and the semantics of time, among
> other things. So maybe we should differentiate between batch-style and
> streaming-style execution, though I'm not sure I like those terms either.
>
> ## About processing-time support in batch
>
> It's not just about "batch" changing the default to ingestion time is
> a change for stream processing as well. Actually, I don't know if
> ingestion time even makes sense for batch processing. IIRC, with the
> new sources we actually always have a timestamp, so this discussion
> might be moot. Maybe Becket and/or Stephan (cc'ed) could chime in on
> this.
>
> Also, I think it's right that we currently ignore processing-time
> timers at the end of input in streaming jobs, but this has been a
> source of trouble for users. See [1] and several discussions on the
> ML. I'm also cc'ing Flavio here who also ran into this problem. I
> think we should solve this quickly after laying the foundations of
> bounded processing on the DataStream API.
>
> ## About broadcast state support
>
> I think as a low-hanging fruit we could just read the broadcast side
> first and then switch to the regular input. We do need to be careful
> with creating distributed deadlocks, though, so this might be trickier
> than it seems at first.
>
> ## Loose ends and weird semantics
>
> There are some operations in the DataStream API that have semantics
> that might make sense for stream processing but should behave
> differently for batch. For example, KeyedStream.reduce() is
> essentially a reduce on a GlobalWindow with a Trigger that fires on
> every element. In DB terms it produces an UPSERT stream as an output,
> if you get ten input elements for a key you also get ten output
> records. For batch processing it might make more sense to instead only
> produce one output record per key with the result of the aggregation.
> This would be correct for downstream consumers that expect an UPSERT
> stream but it would change the actual physical output stream that they
> see.
>
> There might be other such operations in the DataStream API that have
> slightly weird behaviour that doesn't make much sense when you do
> bounded processing.
>
> Best,
> Aljoscha
>
> [1] https://issues.apache.org/jira/browse/FLINK-18647
>
> On 24.08.20 11:29, Kostas Kloudas wrote:
>> Thanks a lot for the discussion!
>>
>> I will open a voting thread shortly!
>>
>> Kostas
>>
>> On Mon, Aug 24, 2020 at 9:46 AM Kostas Kloudas <kklou...@apache.org>
>> wrote:
>>>
>>> Hi Guowei,
>>>
>>> Thanks for the insightful comment!
>>>
>>> I agree that this can be a limitation of the current runtime, but I
>>> think that this FLIP can go on as it discusses mainly the semantics
>>> that the DataStream API will expose when applied on bounded data.
>>> There will definitely be other FLIPs that will actually handle the
>>> runtime-related topics.
>>>
>>> But it is good to document them nevertheless so that we start soon
>>> ironing out the remaining rough edges.
>>>
>>> Cheers,
>>> Kostas
>>>
>>> On Mon, Aug 24, 2020 at 9:16 AM Guowei Ma <guowei....@gmail.com> wrote:
>>>>
>>>> Hi, Klou
>>>>
>>>> Thanks for your proposal. It's a very good idea.
>>>> Just a little comment about the "Batch vs Streaming Scheduling". 
>>>> In the AUTOMATIC execution mode maybe we could not pick BATCH
>>>> execution mode even if all sources are bounded. For example some
>>>> applications would use the `CheckpointListener`, which is not
>>>> available in the BATCH mode in current implementation.
>>>> So maybe we need more checks in the AUTOMATIC execution mode.
>>>>
>>>> Best,
>>>> Guowei
>>>>
>>>>
>>>> On Thu, Aug 20, 2020 at 10:27 PM Kostas Kloudas
>>>> <kklou...@apache.org> wrote:
>>>>>
>>>>> Hi all,
>>>>>
>>>>> Thanks for the comments!
>>>>>
>>>>> @Dawid: "execution.mode" can be a nice alternative and from a quick
>>>>> look it is not used currently by any configuration option. I will
>>>>> update the FLIP accordingly.
>>>>>
>>>>> @David: Given that having the option to allow timers to fire at the
>>>>> end of the job is already in the FLIP, I will leave it as is and I
>>>>> will update the default policy to be "ignore processing time timers
>>>>> set by the user". This will allow existing dataStream programs to run
>>>>> on bounded inputs. This update will affect point 2 in the "Processing
>>>>> Time Support in Batch" section.
>>>>>
>>>>> If these changes cover your proposals, then I would like to start a
>>>>> voting thread tomorrow evening if this is ok with you.
>>>>>
>>>>> Please let me know until then.
>>>>>
>>>>> Kostas
>>>>>
>>>>> On Tue, Aug 18, 2020 at 3:54 PM David Anderson
>>>>> <da...@alpinegizmo.com> wrote:
>>>>>>
>>>>>> Being able to optionally fire registered processing time timers
>>>>>> at the end of a job would be interesting, and would help in (at
>>>>>> least some of) the cases I have in mind. I don't have a better idea.
>>>>>>
>>>>>> David
>>>>>>
>>>>>> On Mon, Aug 17, 2020 at 8:24 PM Kostas Kloudas
>>>>>> <kklou...@apache.org> wrote:
>>>>>>>
>>>>>>> Hi Kurt and David,
>>>>>>>
>>>>>>> Thanks a lot for the insightful feedback!
>>>>>>>
>>>>>>> @Kurt: For the topic of checkpointing with Batch Scheduling, I
>>>>>>> totally
>>>>>>> agree with you that it requires a lot more work and careful
>>>>>>> thinking
>>>>>>> on the semantics. This FLIP was written under the assumption
>>>>>>> that if
>>>>>>> the user wants to have checkpoints on bounded input, he/she will
>>>>>>> have
>>>>>>> to go with STREAMING as the scheduling mode. Checkpointing for
>>>>>>> BATCH
>>>>>>> can be handled as a separate topic in the future.
>>>>>>>
>>>>>>> In the case of MIXED workloads and for this FLIP, the scheduling
>>>>>>> mode
>>>>>>> should be set to STREAMING. That is why the AUTOMATIC option sets
>>>>>>> scheduling to BATCH only if all the sources are bounded. I am
>>>>>>> not sure
>>>>>>> what are the plans there at the scheduling level, as one could
>>>>>>> imagine
>>>>>>> in the future that in mixed workloads, we schedule first all the
>>>>>>> bounded subgraphs in BATCH mode and we allow only one UNBOUNDED
>>>>>>> subgraph per application, which is going to be scheduled after all
>>>>>>> Bounded ones have finished. Essentially the bounded subgraphs
>>>>>>> will be
>>>>>>> used to bootstrap the unbounded one. But, I am not aware of any
>>>>>>> plans
>>>>>>> towards that direction.
>>>>>>>
>>>>>>>
>>>>>>> @David: The processing time timer handling is a topic that has also
>>>>>>> been discussed in the community in the past, and I do not
>>>>>>> remember any
>>>>>>> final conclusion unfortunately.
>>>>>>>
>>>>>>> In the current context and for bounded input, we chose to favor
>>>>>>> reproducibility of the result, as this is expected in batch
>>>>>>> processing
>>>>>>> where the whole input is available in advance. This is why this
>>>>>>> proposal suggests to not allow processing time timers. But I
>>>>>>> understand your argument that the user may want to be able to
>>>>>>> run the
>>>>>>> same pipeline on batch and streaming this is why we added the two
>>>>>>> options under future work, namely (from the FLIP):
>>>>>>>
>>>>>>> ```
>>>>>>> Future Work: In the future we may consider adding as options the
>>>>>>> capability of:
>>>>>>> * firing all the registered processing time timers at the end of
>>>>>>> a job
>>>>>>> (at close()) or,
>>>>>>> * ignoring all the registered processing time timers at the end
>>>>>>> of a job.
>>>>>>> ```
>>>>>>>
>>>>>>> Conceptually, we are essentially saying that we assume that batch
>>>>>>> execution is assumed to be instantaneous and refers to a single
>>>>>>> "point" in time and any processing-time timers for the future
>>>>>>> may fire
>>>>>>> at the end of execution or be ignored (but not throw an
>>>>>>> exception). I
>>>>>>> could also see ignoring the timers in batch as the default, if this
>>>>>>> makes more sense.
>>>>>>>
>>>>>>> By the way, do you have any usecases in mind that will help us
>>>>>>> better
>>>>>>> shape our processing time timer handling?
>>>>>>>
>>>>>>> Kostas
>>>>>>>
>>>>>>> On Mon, Aug 17, 2020 at 2:52 PM David Anderson
>>>>>>> <da...@alpinegizmo.com> wrote:
>>>>>>>>
>>>>>>>> Kostas,
>>>>>>>>
>>>>>>>> I'm pleased to see some concrete details in this FLIP.
>>>>>>>>
>>>>>>>> I wonder if the current proposal goes far enough in the
>>>>>>>> direction of recognizing the need some users may have for
>>>>>>>> "batch" and "bounded streaming" to be treated differently. If
>>>>>>>> I've understood it correctly, the section on scheduling allows
>>>>>>>> me to choose STREAMING scheduling even if I have bounded
>>>>>>>> sources. I like that approach, because it recognizes that even
>>>>>>>> though I have bounded inputs, I don't necessarily want batch
>>>>>>>> processing semantics. I think it makes sense to extend this
>>>>>>>> idea to processing time support as well.
>>>>>>>>
>>>>>>>> My thinking is that sometimes in development and testing it's
>>>>>>>> reasonable to run exactly the same job as in production, except
>>>>>>>> with different sources and sinks. While it might be a
>>>>>>>> reasonable default, I'm not convinced that switching a
>>>>>>>> processing time streaming job to read from a bounded source
>>>>>>>> should always cause it to fail.
>>>>>>>>
>>>>>>>> David
>>>>>>>>
>>>>>>>> On Wed, Aug 12, 2020 at 5:22 PM Kostas Kloudas
>>>>>>>> <kklou...@apache.org> wrote:
>>>>>>>>>
>>>>>>>>> Hi all,
>>>>>>>>>
>>>>>>>>> As described in FLIP-131 [1], we are aiming at deprecating the
>>>>>>>>> DataSet
>>>>>>>>> API in favour of the DataStream API and the Table API. After
>>>>>>>>> this work
>>>>>>>>> is done, the user will be able to write a program using the
>>>>>>>>> DataStream
>>>>>>>>> API and this will execute efficiently on both bounded and
>>>>>>>>> unbounded
>>>>>>>>> data. But before we reach this point, it is worth discussing and
>>>>>>>>> agreeing on the semantics of some operations as we transition
>>>>>>>>> from the
>>>>>>>>> streaming world to the batch one.
>>>>>>>>>
>>>>>>>>> This thread and the associated FLIP [2] aim at discussing
>>>>>>>>> these issues
>>>>>>>>> as these topics are pretty important to users and can lead to
>>>>>>>>> unpleasant surprises if we do not pay attention.
>>>>>>>>>
>>>>>>>>> Let's have a healthy discussion here and I will be updating
>>>>>>>>> the FLIP
>>>>>>>>> accordingly.
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Kostas
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
>>>>>>>>> [2]
>>>>>>>>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158871522
>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to