Hi Austin,

yes, it should also work for ingestion time.

I am not entirely sure whether event time is preserved when converting a
Table into a retract stream. It should be possible and if it is not
working, then I guess it is a missing feature. But I am sure that @Timo
Walther <twal...@apache.org> knows more about it. In doubt, you could
assign a new watermark generator when having obtained the retract stream.

Here is also a link to some information about event time and watermarks
[1]. Unfortunately, it does not state anything about the direction Table =>
DataStream.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/time_attributes.html

Cheers,
Till

On Fri, Oct 2, 2020 at 12:10 AM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Hey Till,
>
> Just a quick question on time characteristics -- this should work for
> IngestionTime as well, correct? Is there anything special I need to do to
> have the CsvTableSource/ toRetractStream call to carry through the assigned
> timestamps, or do I have to re-assign timestamps during the conversion? I'm
> currently getting the `Record has Long.MIN_VALUE timestamp (= no timestamp
> marker)` error, though I'm seeing timestamps being assigned if I step
> through the AutomaticWatermarkContext.
>
> Thanks,
> Austin
>
> On Thu, Oct 1, 2020 at 10:52 AM Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> Perfect, thanks so much Till!
>>
>> On Thu, Oct 1, 2020 at 5:13 AM Till Rohrmann <trohrm...@apache.org>
>> wrote:
>>
>>> Hi Austin,
>>>
>>> I believe that the problem is the processing time window. Unlike for
>>> event time where we send a MAX_WATERMARK at the end of the stream to
>>> trigger all remaining windows, this does not happen for processing time
>>> windows. Hence, if your stream ends and you still have an open processing
>>> time window, then it will never get triggered.
>>>
>>> The problem should disappear if you use event time or if you process
>>> unbounded streams which never end.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Thu, Oct 1, 2020 at 12:01 AM Austin Cawley-Edwards <
>>> austin.caw...@gmail.com> wrote:
>>>
>>>> Hey all,
>>>>
>>>> Thanks for your patience. I've got a small repo that reproduces the
>>>> issue here: https://github.com/austince/flink-1.10-sql-windowing-error
>>>>
>>>> Not sure what I'm doing wrong but it feels silly.
>>>>
>>>> Thanks so much!
>>>> Austin
>>>>
>>>> On Tue, Sep 29, 2020 at 3:48 PM Austin Cawley-Edwards <
>>>> austin.caw...@gmail.com> wrote:
>>>>
>>>>> Hey Till,
>>>>>
>>>>> Thanks for the reply -- I'll try to see if I can reproduce this in a
>>>>> small repo and share it with you.
>>>>>
>>>>> Best,
>>>>> Austin
>>>>>
>>>>> On Tue, Sep 29, 2020 at 3:58 AM Till Rohrmann <trohrm...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hi Austin,
>>>>>>
>>>>>> could you share with us the exact job you are running (including the
>>>>>> custom window trigger)? This would help us to better understand your
>>>>>> problem.
>>>>>>
>>>>>> I am also pulling in Klou and Timo who might help with the windowing
>>>>>> logic and the Table to DataStream conversion.
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>>
>>>>>> On Mon, Sep 28, 2020 at 11:49 PM Austin Cawley-Edwards <
>>>>>> austin.caw...@gmail.com> wrote:
>>>>>>
>>>>>>> Hey all,
>>>>>>>
>>>>>>> I'm not sure if I've missed something in the docs, but I'm having a
>>>>>>> bit of trouble with a streaming SQL job that starts w/ raw SQL queries 
>>>>>>> and
>>>>>>> then transitions to a more traditional streaming job. I'm on Flink 1.10
>>>>>>> using the Blink planner, running locally with no checkpointing.
>>>>>>>
>>>>>>> The job looks roughly like:
>>>>>>>
>>>>>>> CSV 1 -->
>>>>>>> CSV 2 -->  SQL Query to Join --> toRetractStream --> keyed time
>>>>>>> window w/ process func & custom trigger --> some other ops
>>>>>>> CSV 3 -->
>>>>>>>
>>>>>>>
>>>>>>> When I remove the windowing directly after the `toRetractStream`,
>>>>>>> the records make it to the "some other ops" stage, but with the 
>>>>>>> windowing,
>>>>>>> those operations are sometimes not sent any data. I can also get data 
>>>>>>> sent
>>>>>>> to the downstream operators by putting in a no-op map before the window 
>>>>>>> and
>>>>>>> placing some breakpoints in there to manually slow down processing.
>>>>>>>
>>>>>>>
>>>>>>> The logs don't seem to indicate anything went wrong and generally
>>>>>>> look like:
>>>>>>>
>>>>>>> 4819 [Source: Custom File source (1/1)] INFO
>>>>>>>  org.apache.flink.runtime.taskmanager.Task  - Source: Custom File source
>>>>>>> (1/1) (3578629787c777320d9ab030c004abd4) switched from RUNNING to
>>>>>>> FINISHED.\4819 [Source: Custom File source (1/1)] INFO
>>>>>>>  org.apache.flink.runtime.taskmanager.Task  - Freeing task resources for
>>>>>>> Source: Custom File source (1/1) (3578629787c777320d9ab030c004abd4).
>>>>>>> 4819 [Source: Custom File source (1/1)] INFO
>>>>>>>  org.apache.flink.runtime.taskmanager.Task  - Ensuring all FileSystem
>>>>>>> streams are closed for task Source: Custom File source (1/1)
>>>>>>> (3578629787c777320d9ab030c004abd4) [FINISHED]
>>>>>>> 4820 [flink-akka.actor.default-dispatcher-5] INFO
>>>>>>>  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Un-registering 
>>>>>>> task
>>>>>>> and sending final execution state FINISHED to JobManager for task 
>>>>>>> Source:
>>>>>>> Custom File source (1/1) 3578629787c777320d9ab030c004abd4.
>>>>>>> ...
>>>>>>> 4996 [Window(TumblingProcessingTimeWindows(60000),
>>>>>>> TimedCountTrigger, ProcessWindowFunction$1) (1/1)] INFO
>>>>>>>  org.apache.flink.runtime.taskmanager.Task  -
>>>>>>> Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger,
>>>>>>> ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f) 
>>>>>>> switched
>>>>>>> from RUNNING to FINISHED.
>>>>>>> 4996 [Window(TumblingProcessingTimeWindows(60000),
>>>>>>> TimedCountTrigger, ProcessWindowFunction$1) (1/1)] INFO
>>>>>>>  org.apache.flink.runtime.taskmanager.Task  - Freeing task resources for
>>>>>>> Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger,
>>>>>>> ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f).
>>>>>>> 4996 [Window(TumblingProcessingTimeWindows(60000),
>>>>>>> TimedCountTrigger, ProcessWindowFunction$1) (1/1)] INFO
>>>>>>>  org.apache.flink.runtime.taskmanager.Task  - Ensuring all FileSystem
>>>>>>> streams are closed for task Window(TumblingProcessingTimeWindows(60000),
>>>>>>> TimedCountTrigger, ProcessWindowFunction$1) (1/1)
>>>>>>> (907acf9bfa2f4a9bbd13997b8b34d91f) [FINISHED]
>>>>>>> ...
>>>>>>> rest of the shutdown
>>>>>>> ...
>>>>>>> Program execution finished
>>>>>>> Job with JobID 889b161e432c0e69a8d760bbed205d5d has finished.
>>>>>>> Job Runtime: 783 ms
>>>>>>>
>>>>>>>
>>>>>>> Is there something I'm missing in my setup? Could it be my custom
>>>>>>> window trigger? Bug? I'm stumped.
>>>>>>>
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Austin
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>
>>>

Reply via email to