Hi Matthias,

Just to provide more context on this problem. I only have 1 partition per
each Kafka Topic at the beginning before the join operation. After reading
the doc:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission

Maybe that is the root cause of my problem here, with less than 8
partitions (only 1 partition in my case), using the default parallelism of
8 will cause this wrong behavior. This is my guess, it takes a while to
test it out... What's your opinion on this? Thanks!

Best,

Fuyao

On Fri, Nov 13, 2020 at 11:57 AM Fuyao Li <fuyaoli2...@gmail.com> wrote:

> Hi Matthias,
>
> One more question regarding Flink table parallelism, is it possible to
> configure the parallelism for Table operation at operator level, it seems
> we don't have such API available, right? Thanks!
>
> Best,
> Fuyao
>
> On Fri, Nov 13, 2020 at 11:48 AM Fuyao Li <fuyaoli2...@gmail.com> wrote:
>
>> Hi Matthias,
>>
>> Thanks for your information. I have managed to figure out the first issue
>> you mentioned. Regarding the second issue. I have got some progress on it.
>>
>> I have sent another email with the title 'BoundedOutOfOrderness Watermark
>> Generator is NOT making the event time to advance' using another email of
>> mine, fuyao...@oracle.com. That email contains some more context on my
>> issue. Please take a look. I have made some progress after sending that new
>> email.
>>
>> Previously, I had managed to make timelag watermark strategy working in
>> my code, but my bound out of orderness strategy or punctuated watermark
>> strategy doesn't work well. It produces 8 watermarks each time. Two cycles
>> are shown below.
>>
>> I managed to figure out the root cause is that Flink stream execution
>> environment has a default parallelism as 8.* I didn't notice in the doc,
>> could the Community add this explicitly into the official doc to avoid some
>> confusion? Thanks.*
>>
>> From my understanding, the watermark advances based on the lowest
>> watermark among the 8, so I can not advance the bound out of orderness
>> watermark since I am only advancing 1 of the 8 parallelisms. If I set the
>> entire stream execution environment to be of parallelism 1, it will reflect
>> the watermark in the context correctly. One more thing is that this
>> behavior is not reflected in the Flink Cluster web UI interface. I can see
>> the watermark is advancing, but it is not in reality. *That's causing
>> the inconsistency problem I mentioned in the other email I mentioned above.
>> Will this be considered as a bug in the UI?*
>>
>> My current question is, since I have full outer join operation before the
>> KeyedProcessFunction here. How can I let the bound of orderness watermark /
>> punctuated watermark strategy work if the parallelism > 1? It can only
>> update one of the 8 parallelisms for the watermark for this onTimer
>> operator. Is this related to my Table full outer join operation before this
>> step? According to the doc,
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/config.html#table-exec-resource-default-parallelism
>>
>> Default parallelism should be the same like the stream environment. Why
>> can't I update the watermarks for all 8 parallelisms? What should I do to
>> enable this function with Parallelism larger than 1? Thanks.
>>
>> First round: (Note the first column of each log row is the timelag
>> strategy, it is getting updated correctly for all 8 parallelism, but the
>> other two strategies I mentioned above can't do that..)
>>
>> 14:28:01,199 INFO
>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
>> Watermark: watermark based on system time: 1605047266198,
>> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>> 14:28:01,199 INFO
>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
>> Watermark: watermark based on system time: 1605047266199,
>> periodicEmitWatermarkTime: 1605047172881, currentMaxTimestamp:
>> 1605047187881 (only one of the 8 parallelism for bound out of orderness is
>> getting my new watermark)
>> 14:28:01,199 INFO
>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
>> Watermark: watermark based on system time: 1605047266199,
>> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>> 14:28:01,199 INFO
>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
>> Watermark: watermark based on system time: 1605047266198,
>> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>> 14:28:01,199 INFO
>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
>> Watermark: watermark based on system time: 1605047266198,
>> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>> 14:28:01,199 INFO
>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
>> Watermark: watermark based on system time: 1605047266198,
>> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>> 14:28:01,199 INFO
>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
>> Watermark: watermark based on system time: 1605047266198,
>> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>> 14:28:01,199 INFO
>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
>> Watermark: watermark based on system time: 1605047266198,
>> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>
>> Second round: (I set the autoWatermark interval to be 5 seconds)
>> 14:28:06,200 INFO
>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
>> Watermark: watermark based on system time: 1605047271200,
>> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>> 14:28:06,200 INFO
>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
>> Watermark: watermark based on system time: 1605047271200,
>> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>> 14:28:06,200 INFO
>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
>> Watermark: watermark based on system time: 1605047271200,
>> periodicEmitWatermarkTime: 1605047172881, currentMaxTimestamp: 1605047187881
>> 14:28:06,200 INFO
>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
>> Watermark: watermark based on system time: 1605047271200,
>> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>> 14:28:06,200 INFO
>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
>> Watermark: watermark based on system time: 1605047271200,
>> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>> 14:28:06,200 INFO
>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
>> Watermark: watermark based on system time: 1605047271200,
>> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>> 14:28:06,200 INFO
>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
>> Watermark: watermark based on system time: 1605047271200,
>> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>> 14:28:06,200 INFO
>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
>> Watermark: watermark based on system time: 1605047271200,
>> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>
>>
>> Best regards,
>>
>> Fuyao
>>
>> On Fri, Nov 13, 2020 at 9:03 AM Matthias Pohl <matth...@ververica.com>
>> wrote:
>>
>>> Hi Fuyao,
>>> for your first question about the different behavior depending on
>>> whether you chain the methods or not: Keep in mind that you have to save
>>> the return value of the assignTimestampsAndWatermarks method call if you
>>> don't chain the methods together as it is also shown in [1].
>>> At least the following example from your first message is indicating it:
>>> ```
>>> retractStream.assignTimestampsAndWatermarks(new
>>> BoRetractStreamTimestampAssigner()); (This is a deprecated method)
>>> // instead of: retractStream =
>>> retractStream.assignTimestampsAndWatermarks(new
>>> BoRetractStreamTimestampAssigner());
>>> retractStream
>>>     .keyBy(<key selector>)
>>>     .process(new TableOutputProcessFunction())
>>>     .name("ProcessTableOutput")
>>>     .uid("ProcessTableOutput")
>>>     .addSink(businessObjectSink)
>>>     .name("businessObjectSink")
>>>     .uid("businessObjectSink")
>>>     .setParallelism(1);
>>> ```
>>>
>>> For your second question about setting the EventTime I'm going to pull
>>> in Timo from the SDK team as I don't see an issue with your code right away.
>>>
>>> Best,
>>> Matthias
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#using-watermark-strategies
>>>
>>> On Wed, Nov 4, 2020 at 10:16 PM Fuyao Li <fuyaoli2...@gmail.com> wrote:
>>>
>>>> Hi Flink Users and Community,
>>>>
>>>> For the first part of the question, the 12 hour time difference is
>>>> caused by a time extraction bug myself. I can get the time translated
>>>> correctly now. The type cast problem does have some workarounds to solve
>>>> it..
>>>>
>>>> My major blocker right now is the onTimer part is not properly
>>>> triggered. I guess it is caused by failing to configure the correct
>>>> watermarks & timestamp assigners. Please give me some insights.
>>>>
>>>> 1. If I don't chain the assignTimestampsAndWatermarks() method in
>>>> together with keyedBy().. and process().. method. The context.timestamp()
>>>> in my processElement() function will be null. Is this some expected
>>>> behavior? The Flink examples didn't chain it together. (see example here:
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#using-watermark-strategies
>>>> )
>>>> 2. If I use registerEventTimeTimer() in processElement(). The onTimer
>>>> method will not be triggered. However, I can trigger the onTimer method if
>>>> I simply change it to registerProcessingTimeTimer(). I am using the
>>>> settings below in the stream env.
>>>>
>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>> env.getConfig().setAutoWatermarkInterval(1000L);
>>>>
>>>> My code for method the process chain:
>>>> retractStream
>>>>
>>>> .assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<Boolean,
>>>> Row>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
>>>>                         .withTimestampAssigner((booleanRowTuple2,
>>>> timestamp) -> {
>>>>                             Row rowData = booleanRowTuple2.f1;
>>>>                             LocalDateTime headerTime =
>>>> (LocalDateTime)rowData.getField(3);
>>>>                             LocalDateTime linesTime =
>>>> (LocalDateTime)rowData.getField(7);
>>>>
>>>>                             LocalDateTime latestDBUpdateTime = null;
>>>>                             if (headerTime != null && linesTime !=
>>>> null) {
>>>>                                 latestDBUpdateTime =
>>>> headerTime.isAfter(linesTime) ? headerTime : linesTime;
>>>>                             }
>>>>                             else {
>>>>                                 latestDBUpdateTime = (headerTime !=
>>>> null) ? headerTime : linesTime;
>>>>                             }
>>>>                             if (latestDBUpdateTime != null) {
>>>>                                 return
>>>> latestDBUpdateTime.atZone(ZoneId.of("America/Los_Angeles")).toInstant().toEpochMilli();
>>>>                             }
>>>>                             // In the worst case, we use system time
>>>> instead, which should never be reached.
>>>>                             return System.currentTimeMillis();
>>>>                         }))
>>>> //              .assignTimestampsAndWatermarks(new MyWaterStrategy())
>>>>  // second way to create watermark, doesn't work
>>>>                 .keyBy(value -> {
>>>>                     // There could be null fields for header invoice_id
>>>> field
>>>>                     String invoice_id_key =
>>>> (String)value.f1.getField(0);
>>>>                     if (invoice_id_key == null) {
>>>>                         invoice_id_key = (String)value.f1.getField(4);
>>>>                     }
>>>>                     return invoice_id_key;
>>>>                 })
>>>>                 .process(new TableOutputProcessFunction())
>>>>                 .name("ProcessTableOutput")
>>>>                 .uid("ProcessTableOutput")
>>>>                 .addSink(businessObjectSink)
>>>>                 .name("businessObjectSink")
>>>>                 .uid("businessObjectSink")
>>>>                 .setParallelism(1);
>>>>
>>>> Best regards,
>>>> Fuyao
>>>>
>>>> On Mon, Nov 2, 2020 at 4:53 PM Fuyao Li <fuyaoli2...@gmail.com> wrote:
>>>>
>>>>> Hi Flink Community,
>>>>>
>>>>> I am doing some research work on Flink Datastream and Table API and I
>>>>> meet two major problems. I am using Flink 1.11.2, scala version 2.11, java
>>>>> 8. My use case looks like this. I plan to write a data processing pipeline
>>>>> with two stages. My goal is to construct a business object containing
>>>>> information from several Kafka streams with a primary key and emit the
>>>>> complete business object if such primary key doesn't  appear in the
>>>>> pipeline for 10 seconds.
>>>>>
>>>>> In the first stage, I first consume three Kafka streams and transform
>>>>> it to Flink Datastream using a deserialization schema containing some type
>>>>> and date format transformation, and then I register these data streams as
>>>>> Table and do a full outer join one by one using Table API. I also add 
>>>>> query
>>>>> configuration for this to avoid excessive state. The primary key is also
>>>>> the join key.
>>>>>
>>>>> In the second stage, I transform the joined table to a retracted
>>>>> stream and put it into KeyedProcessFunction to generate the business 
>>>>> object
>>>>> if the business object's primary key is inactive for 10 second.
>>>>>
>>>>> Is this way of handling the data the suggested approach? (I understand
>>>>> I can directly consume kafka data in Table API. I haven't tried that yet,
>>>>> maybe that's better?) Any suggestion is welcomed. During implementing 
>>>>> this,
>>>>> I meet two major problems and several smaller questions under each 
>>>>> problem.
>>>>>
>>>>>
>>>>> 1. Some type cast behavior of retracted streams I can't explain.
>>>>>
>>>>> (1) In the initial stage, I registered some field as *java.sql.Date*
>>>>>  or *java.sql.timestamp* following the examples at (
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html#data-type-extraction)
>>>>> . After join and transform to retracted stream, it becomes
>>>>> *java.time.LocalDate* and *java.time.LocalDateTime* instead.
>>>>>
>>>>> For example, when first ingesting the Kafka streams, I registerd a
>>>>> attribute in java.sql.Timestamp type.
>>>>>
>>>>>  @JsonAlias("ATTRIBUTE1")
>>>>>  private @DataTypeHint(value = "TIMESTAMP(6)", bridgedTo =
>>>>> java.sql.Timestamp.class) Timestamp ATTRIBUTE1;
>>>>>
>>>>> When I tried to cast the type information back after the retracted
>>>>> stream, the code gives me error information below.
>>>>>
>>>>>  java.lang.ClassCastException: java.time.LocalDateTime cannot be cast
>>>>> to java.sql.Timestamp
>>>>>
>>>>> Maybe I should use toAppendStream instead since append stream could
>>>>> register type information, but toRetractedStream can't do that? (
>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/common.html#convert-a-table-into-a-datastream-or-dataset
>>>>> )
>>>>>
>>>>> My work around is to cast it to LocalDateTime first and extract the
>>>>> epoch time, this doesn't seem to be a final solution.
>>>>>
>>>>> (2) During timestamp conversion, the Flink to retracted stream seems
>>>>> to lost the AM/PM information in the stream and causing a 12 hour
>>>>> difference if it is PM.
>>>>>
>>>>> I use joda time to do some timestamp conversion in the first
>>>>> deserialization stage, my pattern looks like this. "a" means AM/PM
>>>>> information
>>>>>
>>>>>  DateTimeFormatter format3 = DateTimeFormat.forPattern("dd-MMM-yy
>>>>> HH.mm.ss.SSSSSS a").withZone(DateTimeZone.getDefault());
>>>>>
>>>>> After the retracted stream, the AM/PM information is not preserved.
>>>>>
>>>>>
>>>>> 2. My onTimer method in KeyedProcessFunction can not be triggered when
>>>>> I scheduled a event timer timer.
>>>>>
>>>>> I am using event time in my code. I am new to configure watermarks and
>>>>> I might miss something to configure it correctly. I also tried to register
>>>>> a processing time, it could enter and produce some results.
>>>>>
>>>>> I am trying to follow the example here:
>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#example
>>>>>
>>>>> My onTimer method looks like this and the scheduled event doesn't
>>>>> happen..
>>>>>
>>>>> In processElement():
>>>>>
>>>>> context.timerService().registerEventTimeTimer(current.getLastModifiedTime()
>>>>> + 10000);
>>>>>
>>>>> My onTimer function
>>>>>
>>>>>   @Override
>>>>>     public void onTimer(long timestamp, OnTimerContext ctx,
>>>>> Collector<BusinessObject> collector) throws Exception {
>>>>>         TestBusinessObjectState result =
>>>>> testBusinessObjectState.value();
>>>>>         log.info("Inside onTimer Method, current key: {}, timestamp:
>>>>> {}, last modified time: {}", ctx.getCurrentKey(), timestamp,
>>>>> result.getLastModifiedTime());
>>>>>
>>>>>         // check if this is an outdated timer or the latest timer
>>>>>         if (timestamp >= result.getLastModifiedTime() + 10000) {
>>>>>             // emit the state on timeout
>>>>>             log.info("Collecting a business object, {}",
>>>>> result.getBusinessObject().toString());
>>>>>             collector.collect(result.getBusinessObject());
>>>>>
>>>>>             cleanUp(ctx);
>>>>>         }
>>>>>     }
>>>>>
>>>>>     private void cleanUp(Context ctx) throws Exception {
>>>>>         Long timer =
>>>>> testBusinessObjectState.value().getLastModifiedTime();
>>>>>         ctx.timerService().deleteEventTimeTimer(timer);
>>>>>         testBusinessObjectState.clear();
>>>>>     }
>>>>>
>>>>>
>>>>> (1) When I assign the timestamp and watermarks outside the process()
>>>>> method chain. The "context.timestamp()" will be null. If I put it inside
>>>>> the chain, it won't be null. Is this the expected behavior? In the null
>>>>> case, the strange thing is that, surprisingly, I can collect the business
>>>>> object immediately without a designed 10 second waiting time... This
>>>>> shouldn't happen, right...? The processing timer also seems to work. The
>>>>> code can enter the on timer method.
>>>>>
>>>>>  retractStream.assignTimestampsAndWatermarks(new
>>>>> BoRetractStreamTimestampAssigner()); (This is a deprecated method)
>>>>>
>>>>>  retractStream
>>>>>     .keyBy(<key selector>)
>>>>>     .process(new TableOutputProcessFunction())
>>>>>     .name("ProcessTableOutput")
>>>>>     .uid("ProcessTableOutput")
>>>>>     .addSink(businessObjectSink)
>>>>>     .name("businessObjectSink")
>>>>>     .uid("businessObjectSink")
>>>>>     .setParallelism(1);
>>>>>
>>>>> (2) For watermarks configuration. I use an field in the retracted
>>>>> stream as the event time. This time is usually 15-20 seconds before 
>>>>> current
>>>>> time.
>>>>>
>>>>> In my environment, I have done some settings for streaming env based
>>>>> on information here(
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#writing-a-periodic-watermarkgenerator).
>>>>> My event doesn't always come, so I think I need to set auto watermark
>>>>> interval to let the event timer on timer works correctly. I have added the
>>>>> code below.
>>>>>
>>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>>> env.getConfig().setAutoWatermarkInterval(1000L);
>>>>>
>>>>> 1> Which kind of watermark strategy should I use? General
>>>>> BoundOutofOrderness or Watermark generator?
>>>>>
>>>>> I tried to write a Watermark generator and I just don't how to apply
>>>>> it to the stream correctly. The documentation doesn't explain very 
>>>>> clearly.
>>>>> My code looks like below and it doesn't work.
>>>>>
>>>>> assign part:
>>>>>
>>>>> .assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator((WatermarkGeneratorSupplier<Tuple2<Boolean,
>>>>> Row>>) context -> new TableBoundOutofOrdernessGenerator()))
>>>>>
>>>>> watermark generater:
>>>>>
>>>>> I just assign the event time attribute following the example in the
>>>>> doc. (
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#writing-a-periodic-watermarkgenerator
>>>>> )
>>>>>
>>>>> 2> I also tried to use the static method in Water Strategy. The syntax
>>>>> is correct, but I meet the same problem in 2.(1).
>>>>>
>>>>>     .assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<Boolean,
>>>>> Row>>forBoundedOutOfOrderness(Duration.ofSeconds(15))
>>>>>                         .withTimestampAssigner((booleanRowTuple2,
>>>>> timestamp) -> {
>>>>>                             <Select a event time attribute in the
>>>>> booleanRowTuple2>
>>>>>                         }))
>>>>>
>>>>>
>>>>> (3) For the retracted datastream, do I need to explicitly attach it to
>>>>> the stream environment? I think it is done by default, right? Just want to
>>>>> confirm it. I do have the env.execute() at the end of the code.
>>>>>
>>>>> I understand this is a lot of questions, thanks a lot for
>>>>> your patience to look through my email! If there is anything unclear,
>>>>> please reach out to me. Thanks!
>>>>>
>>>>>
>>>>> Best regards,
>>>>>
>>>>> Fuyao Li
>>>>>
>>>>

Reply via email to