Hi Matthias,

Thanks for your information. I have managed to figure out the first issue
you mentioned. Regarding the second issue. I have got some progress on it.

I have sent another email with the title 'BoundedOutOfOrderness Watermark
Generator is NOT making the event time to advance' using another email of
mine, fuyao...@oracle.com. That email contains some more context on my
issue. Please take a look. I have made some progress after sending that new
email.

Previously, I had managed to make timelag watermark strategy working in my
code, but my bound out of orderness strategy or punctuated watermark
strategy doesn't work well. It produces 8 watermarks each time. Two cycles
are shown below.

I managed to figure out the root cause is that Flink stream execution
environment has a default parallelism as 8.* I didn't notice in the doc,
could the Community add this explicitly into the official doc to avoid some
confusion? Thanks.*

>From my understanding, the watermark advances based on the lowest watermark
among the 8, so I can not advance the bound out of orderness watermark
since I am only advancing 1 of the 8 parallelisms. If I set the entire
stream execution environment to be of parallelism 1, it will reflect the
watermark in the context correctly. One more thing is that this behavior is
not reflected in the Flink Cluster web UI interface. I can see the
watermark is advancing, but it is not in reality. *That's causing the
inconsistency problem I mentioned in the other email I mentioned above.
Will this be considered as a bug in the UI?*

My current question is, since I have full outer join operation before the
KeyedProcessFunction here. How can I let the bound of orderness watermark /
punctuated watermark strategy work if the parallelism > 1? It can only
update one of the 8 parallelisms for the watermark for this onTimer
operator. Is this related to my Table full outer join operation before this
step? According to the doc,
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/config.html#table-exec-resource-default-parallelism

Default parallelism should be the same like the stream environment. Why
can't I update the watermarks for all 8 parallelisms? What should I do to
enable this function with Parallelism larger than 1? Thanks.

First round: (Note the first column of each log row is the timelag
strategy, it is getting updated correctly for all 8 parallelism, but the
other two strategies I mentioned above can't do that..)

14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
Watermark: watermark based on system time: 1605047266199,
periodicEmitWatermarkTime: 1605047172881, currentMaxTimestamp:
1605047187881 (only one of the 8 parallelism for bound out of orderness is
getting my new watermark)
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
Watermark: watermark based on system time: 1605047266199,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000

Second round: (I set the autoWatermark interval to be 5 seconds)
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 1605047172881, currentMaxTimestamp: 1605047187881
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000


Best regards,

Fuyao

On Fri, Nov 13, 2020 at 9:03 AM Matthias Pohl <matth...@ververica.com>
wrote:

> Hi Fuyao,
> for your first question about the different behavior depending on whether
> you chain the methods or not: Keep in mind that you have to save the return
> value of the assignTimestampsAndWatermarks method call if you don't chain
> the methods together as it is also shown in [1].
> At least the following example from your first message is indicating it:
> ```
> retractStream.assignTimestampsAndWatermarks(new
> BoRetractStreamTimestampAssigner()); (This is a deprecated method)
> // instead of: retractStream =
> retractStream.assignTimestampsAndWatermarks(new
> BoRetractStreamTimestampAssigner());
> retractStream
>     .keyBy(<key selector>)
>     .process(new TableOutputProcessFunction())
>     .name("ProcessTableOutput")
>     .uid("ProcessTableOutput")
>     .addSink(businessObjectSink)
>     .name("businessObjectSink")
>     .uid("businessObjectSink")
>     .setParallelism(1);
> ```
>
> For your second question about setting the EventTime I'm going to pull in
> Timo from the SDK team as I don't see an issue with your code right away.
>
> Best,
> Matthias
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#using-watermark-strategies
>
> On Wed, Nov 4, 2020 at 10:16 PM Fuyao Li <fuyaoli2...@gmail.com> wrote:
>
>> Hi Flink Users and Community,
>>
>> For the first part of the question, the 12 hour time difference is caused
>> by a time extraction bug myself. I can get the time translated correctly
>> now. The type cast problem does have some workarounds to solve it..
>>
>> My major blocker right now is the onTimer part is not properly triggered.
>> I guess it is caused by failing to configure the correct watermarks &
>> timestamp assigners. Please give me some insights.
>>
>> 1. If I don't chain the assignTimestampsAndWatermarks() method in
>> together with keyedBy().. and process().. method. The context.timestamp()
>> in my processElement() function will be null. Is this some expected
>> behavior? The Flink examples didn't chain it together. (see example here:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#using-watermark-strategies
>> )
>> 2. If I use registerEventTimeTimer() in processElement(). The onTimer
>> method will not be triggered. However, I can trigger the onTimer method if
>> I simply change it to registerProcessingTimeTimer(). I am using the
>> settings below in the stream env.
>>
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> env.getConfig().setAutoWatermarkInterval(1000L);
>>
>> My code for method the process chain:
>> retractStream
>>
>> .assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<Boolean,
>> Row>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
>>                         .withTimestampAssigner((booleanRowTuple2,
>> timestamp) -> {
>>                             Row rowData = booleanRowTuple2.f1;
>>                             LocalDateTime headerTime =
>> (LocalDateTime)rowData.getField(3);
>>                             LocalDateTime linesTime =
>> (LocalDateTime)rowData.getField(7);
>>
>>                             LocalDateTime latestDBUpdateTime = null;
>>                             if (headerTime != null && linesTime != null) {
>>                                 latestDBUpdateTime =
>> headerTime.isAfter(linesTime) ? headerTime : linesTime;
>>                             }
>>                             else {
>>                                 latestDBUpdateTime = (headerTime != null)
>> ? headerTime : linesTime;
>>                             }
>>                             if (latestDBUpdateTime != null) {
>>                                 return
>> latestDBUpdateTime.atZone(ZoneId.of("America/Los_Angeles")).toInstant().toEpochMilli();
>>                             }
>>                             // In the worst case, we use system time
>> instead, which should never be reached.
>>                             return System.currentTimeMillis();
>>                         }))
>> //              .assignTimestampsAndWatermarks(new MyWaterStrategy())  //
>> second way to create watermark, doesn't work
>>                 .keyBy(value -> {
>>                     // There could be null fields for header invoice_id
>> field
>>                     String invoice_id_key = (String)value.f1.getField(0);
>>                     if (invoice_id_key == null) {
>>                         invoice_id_key = (String)value.f1.getField(4);
>>                     }
>>                     return invoice_id_key;
>>                 })
>>                 .process(new TableOutputProcessFunction())
>>                 .name("ProcessTableOutput")
>>                 .uid("ProcessTableOutput")
>>                 .addSink(businessObjectSink)
>>                 .name("businessObjectSink")
>>                 .uid("businessObjectSink")
>>                 .setParallelism(1);
>>
>> Best regards,
>> Fuyao
>>
>> On Mon, Nov 2, 2020 at 4:53 PM Fuyao Li <fuyaoli2...@gmail.com> wrote:
>>
>>> Hi Flink Community,
>>>
>>> I am doing some research work on Flink Datastream and Table API and I
>>> meet two major problems. I am using Flink 1.11.2, scala version 2.11, java
>>> 8. My use case looks like this. I plan to write a data processing pipeline
>>> with two stages. My goal is to construct a business object containing
>>> information from several Kafka streams with a primary key and emit the
>>> complete business object if such primary key doesn't  appear in the
>>> pipeline for 10 seconds.
>>>
>>> In the first stage, I first consume three Kafka streams and transform it
>>> to Flink Datastream using a deserialization schema containing some type and
>>> date format transformation, and then I register these data streams as Table
>>> and do a full outer join one by one using Table API. I also add query
>>> configuration for this to avoid excessive state. The primary key is also
>>> the join key.
>>>
>>> In the second stage, I transform the joined table to a retracted stream
>>> and put it into KeyedProcessFunction to generate the business object if the
>>> business object's primary key is inactive for 10 second.
>>>
>>> Is this way of handling the data the suggested approach? (I understand I
>>> can directly consume kafka data in Table API. I haven't tried that yet,
>>> maybe that's better?) Any suggestion is welcomed. During implementing this,
>>> I meet two major problems and several smaller questions under each problem.
>>>
>>>
>>> 1. Some type cast behavior of retracted streams I can't explain.
>>>
>>> (1) In the initial stage, I registered some field as *java.sql.Date* or
>>> *java.sql.timestamp* following the examples at (
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html#data-type-extraction)
>>> . After join and transform to retracted stream, it becomes
>>> *java.time.LocalDate* and *java.time.LocalDateTime* instead.
>>>
>>> For example, when first ingesting the Kafka streams, I registerd a
>>> attribute in java.sql.Timestamp type.
>>>
>>>  @JsonAlias("ATTRIBUTE1")
>>>  private @DataTypeHint(value = "TIMESTAMP(6)", bridgedTo =
>>> java.sql.Timestamp.class) Timestamp ATTRIBUTE1;
>>>
>>> When I tried to cast the type information back after the retracted
>>> stream, the code gives me error information below.
>>>
>>>  java.lang.ClassCastException: java.time.LocalDateTime cannot be cast to
>>> java.sql.Timestamp
>>>
>>> Maybe I should use toAppendStream instead since append stream could
>>> register type information, but toRetractedStream can't do that? (
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/common.html#convert-a-table-into-a-datastream-or-dataset
>>> )
>>>
>>> My work around is to cast it to LocalDateTime first and extract the
>>> epoch time, this doesn't seem to be a final solution.
>>>
>>> (2) During timestamp conversion, the Flink to retracted stream seems to
>>> lost the AM/PM information in the stream and causing a 12 hour difference
>>> if it is PM.
>>>
>>> I use joda time to do some timestamp conversion in the first
>>> deserialization stage, my pattern looks like this. "a" means AM/PM
>>> information
>>>
>>>  DateTimeFormatter format3 = DateTimeFormat.forPattern("dd-MMM-yy
>>> HH.mm.ss.SSSSSS a").withZone(DateTimeZone.getDefault());
>>>
>>> After the retracted stream, the AM/PM information is not preserved.
>>>
>>>
>>> 2. My onTimer method in KeyedProcessFunction can not be triggered when I
>>> scheduled a event timer timer.
>>>
>>> I am using event time in my code. I am new to configure watermarks and I
>>> might miss something to configure it correctly. I also tried to register a
>>> processing time, it could enter and produce some results.
>>>
>>> I am trying to follow the example here:
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#example
>>>
>>> My onTimer method looks like this and the scheduled event doesn't
>>> happen..
>>>
>>> In processElement():
>>>
>>> context.timerService().registerEventTimeTimer(current.getLastModifiedTime()
>>> + 10000);
>>>
>>> My onTimer function
>>>
>>>   @Override
>>>     public void onTimer(long timestamp, OnTimerContext ctx,
>>> Collector<BusinessObject> collector) throws Exception {
>>>         TestBusinessObjectState result = testBusinessObjectState.value();
>>>         log.info("Inside onTimer Method, current key: {}, timestamp:
>>> {}, last modified time: {}", ctx.getCurrentKey(), timestamp,
>>> result.getLastModifiedTime());
>>>
>>>         // check if this is an outdated timer or the latest timer
>>>         if (timestamp >= result.getLastModifiedTime() + 10000) {
>>>             // emit the state on timeout
>>>             log.info("Collecting a business object, {}",
>>> result.getBusinessObject().toString());
>>>             collector.collect(result.getBusinessObject());
>>>
>>>             cleanUp(ctx);
>>>         }
>>>     }
>>>
>>>     private void cleanUp(Context ctx) throws Exception {
>>>         Long timer =
>>> testBusinessObjectState.value().getLastModifiedTime();
>>>         ctx.timerService().deleteEventTimeTimer(timer);
>>>         testBusinessObjectState.clear();
>>>     }
>>>
>>>
>>> (1) When I assign the timestamp and watermarks outside the process()
>>> method chain. The "context.timestamp()" will be null. If I put it inside
>>> the chain, it won't be null. Is this the expected behavior? In the null
>>> case, the strange thing is that, surprisingly, I can collect the business
>>> object immediately without a designed 10 second waiting time... This
>>> shouldn't happen, right...? The processing timer also seems to work. The
>>> code can enter the on timer method.
>>>
>>>  retractStream.assignTimestampsAndWatermarks(new
>>> BoRetractStreamTimestampAssigner()); (This is a deprecated method)
>>>
>>>  retractStream
>>>     .keyBy(<key selector>)
>>>     .process(new TableOutputProcessFunction())
>>>     .name("ProcessTableOutput")
>>>     .uid("ProcessTableOutput")
>>>     .addSink(businessObjectSink)
>>>     .name("businessObjectSink")
>>>     .uid("businessObjectSink")
>>>     .setParallelism(1);
>>>
>>> (2) For watermarks configuration. I use an field in the retracted stream
>>> as the event time. This time is usually 15-20 seconds before current time.
>>>
>>> In my environment, I have done some settings for streaming env based on
>>> information here(
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#writing-a-periodic-watermarkgenerator).
>>> My event doesn't always come, so I think I need to set auto watermark
>>> interval to let the event timer on timer works correctly. I have added the
>>> code below.
>>>
>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>> env.getConfig().setAutoWatermarkInterval(1000L);
>>>
>>> 1> Which kind of watermark strategy should I use? General
>>> BoundOutofOrderness or Watermark generator?
>>>
>>> I tried to write a Watermark generator and I just don't how to apply it
>>> to the stream correctly. The documentation doesn't explain very clearly. My
>>> code looks like below and it doesn't work.
>>>
>>> assign part:
>>>
>>> .assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator((WatermarkGeneratorSupplier<Tuple2<Boolean,
>>> Row>>) context -> new TableBoundOutofOrdernessGenerator()))
>>>
>>> watermark generater:
>>>
>>> I just assign the event time attribute following the example in the doc.
>>> (
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#writing-a-periodic-watermarkgenerator
>>> )
>>>
>>> 2> I also tried to use the static method in Water Strategy. The syntax
>>> is correct, but I meet the same problem in 2.(1).
>>>
>>>     .assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<Boolean,
>>> Row>>forBoundedOutOfOrderness(Duration.ofSeconds(15))
>>>                         .withTimestampAssigner((booleanRowTuple2,
>>> timestamp) -> {
>>>                             <Select a event time attribute in the
>>> booleanRowTuple2>
>>>                         }))
>>>
>>>
>>> (3) For the retracted datastream, do I need to explicitly attach it to
>>> the stream environment? I think it is done by default, right? Just want to
>>> confirm it. I do have the env.execute() at the end of the code.
>>>
>>> I understand this is a lot of questions, thanks a lot for your patience
>>> to look through my email! If there is anything unclear, please reach out to
>>> me. Thanks!
>>>
>>>
>>> Best regards,
>>>
>>> Fuyao Li
>>>
>>

Reply via email to