Hi Fuyao,
for your first question about the different behavior depending on whether
you chain the methods or not: Keep in mind that you have to save the return
value of the assignTimestampsAndWatermarks method call if you don't chain
the methods together as it is also shown in [1].
At least the following example from your first message is indicating it:
```
retractStream.assignTimestampsAndWatermarks(new
BoRetractStreamTimestampAssigner()); (This is a deprecated method)
// instead of: retractStream =
retractStream.assignTimestampsAndWatermarks(new
BoRetractStreamTimestampAssigner());
retractStream
    .keyBy(<key selector>)
    .process(new TableOutputProcessFunction())
    .name("ProcessTableOutput")
    .uid("ProcessTableOutput")
    .addSink(businessObjectSink)
    .name("businessObjectSink")
    .uid("businessObjectSink")
    .setParallelism(1);
```

For your second question about setting the EventTime I'm going to pull in
Timo from the SDK team as I don't see an issue with your code right away.

Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#using-watermark-strategies

On Wed, Nov 4, 2020 at 10:16 PM Fuyao Li <fuyaoli2...@gmail.com> wrote:

> Hi Flink Users and Community,
>
> For the first part of the question, the 12 hour time difference is caused
> by a time extraction bug myself. I can get the time translated correctly
> now. The type cast problem does have some workarounds to solve it..
>
> My major blocker right now is the onTimer part is not properly triggered.
> I guess it is caused by failing to configure the correct watermarks &
> timestamp assigners. Please give me some insights.
>
> 1. If I don't chain the assignTimestampsAndWatermarks() method in together
> with keyedBy().. and process().. method. The context.timestamp() in my
> processElement() function will be null. Is this some expected behavior? The
> Flink examples didn't chain it together. (see example here:
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#using-watermark-strategies
> )
> 2. If I use registerEventTimeTimer() in processElement(). The onTimer
> method will not be triggered. However, I can trigger the onTimer method if
> I simply change it to registerProcessingTimeTimer(). I am using the
> settings below in the stream env.
>
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> env.getConfig().setAutoWatermarkInterval(1000L);
>
> My code for method the process chain:
> retractStream
>
> .assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<Boolean,
> Row>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
>                         .withTimestampAssigner((booleanRowTuple2,
> timestamp) -> {
>                             Row rowData = booleanRowTuple2.f1;
>                             LocalDateTime headerTime =
> (LocalDateTime)rowData.getField(3);
>                             LocalDateTime linesTime =
> (LocalDateTime)rowData.getField(7);
>
>                             LocalDateTime latestDBUpdateTime = null;
>                             if (headerTime != null && linesTime != null) {
>                                 latestDBUpdateTime =
> headerTime.isAfter(linesTime) ? headerTime : linesTime;
>                             }
>                             else {
>                                 latestDBUpdateTime = (headerTime != null)
> ? headerTime : linesTime;
>                             }
>                             if (latestDBUpdateTime != null) {
>                                 return
> latestDBUpdateTime.atZone(ZoneId.of("America/Los_Angeles")).toInstant().toEpochMilli();
>                             }
>                             // In the worst case, we use system time
> instead, which should never be reached.
>                             return System.currentTimeMillis();
>                         }))
> //              .assignTimestampsAndWatermarks(new MyWaterStrategy())  //
> second way to create watermark, doesn't work
>                 .keyBy(value -> {
>                     // There could be null fields for header invoice_id
> field
>                     String invoice_id_key = (String)value.f1.getField(0);
>                     if (invoice_id_key == null) {
>                         invoice_id_key = (String)value.f1.getField(4);
>                     }
>                     return invoice_id_key;
>                 })
>                 .process(new TableOutputProcessFunction())
>                 .name("ProcessTableOutput")
>                 .uid("ProcessTableOutput")
>                 .addSink(businessObjectSink)
>                 .name("businessObjectSink")
>                 .uid("businessObjectSink")
>                 .setParallelism(1);
>
> Best regards,
> Fuyao
>
> On Mon, Nov 2, 2020 at 4:53 PM Fuyao Li <fuyaoli2...@gmail.com> wrote:
>
>> Hi Flink Community,
>>
>> I am doing some research work on Flink Datastream and Table API and I
>> meet two major problems. I am using Flink 1.11.2, scala version 2.11, java
>> 8. My use case looks like this. I plan to write a data processing pipeline
>> with two stages. My goal is to construct a business object containing
>> information from several Kafka streams with a primary key and emit the
>> complete business object if such primary key doesn't  appear in the
>> pipeline for 10 seconds.
>>
>> In the first stage, I first consume three Kafka streams and transform it
>> to Flink Datastream using a deserialization schema containing some type and
>> date format transformation, and then I register these data streams as Table
>> and do a full outer join one by one using Table API. I also add query
>> configuration for this to avoid excessive state. The primary key is also
>> the join key.
>>
>> In the second stage, I transform the joined table to a retracted stream
>> and put it into KeyedProcessFunction to generate the business object if the
>> business object's primary key is inactive for 10 second.
>>
>> Is this way of handling the data the suggested approach? (I understand I
>> can directly consume kafka data in Table API. I haven't tried that yet,
>> maybe that's better?) Any suggestion is welcomed. During implementing this,
>> I meet two major problems and several smaller questions under each problem.
>>
>>
>> 1. Some type cast behavior of retracted streams I can't explain.
>>
>> (1) In the initial stage, I registered some field as *java.sql.Date* or
>> *java.sql.timestamp* following the examples at (
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html#data-type-extraction)
>> . After join and transform to retracted stream, it becomes
>> *java.time.LocalDate* and *java.time.LocalDateTime* instead.
>>
>> For example, when first ingesting the Kafka streams, I registerd a
>> attribute in java.sql.Timestamp type.
>>
>>  @JsonAlias("ATTRIBUTE1")
>>  private @DataTypeHint(value = "TIMESTAMP(6)", bridgedTo =
>> java.sql.Timestamp.class) Timestamp ATTRIBUTE1;
>>
>> When I tried to cast the type information back after the retracted
>> stream, the code gives me error information below.
>>
>>  java.lang.ClassCastException: java.time.LocalDateTime cannot be cast to
>> java.sql.Timestamp
>>
>> Maybe I should use toAppendStream instead since append stream could
>> register type information, but toRetractedStream can't do that? (
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/common.html#convert-a-table-into-a-datastream-or-dataset
>> )
>>
>> My work around is to cast it to LocalDateTime first and extract the epoch
>> time, this doesn't seem to be a final solution.
>>
>> (2) During timestamp conversion, the Flink to retracted stream seems to
>> lost the AM/PM information in the stream and causing a 12 hour difference
>> if it is PM.
>>
>> I use joda time to do some timestamp conversion in the first
>> deserialization stage, my pattern looks like this. "a" means AM/PM
>> information
>>
>>  DateTimeFormatter format3 = DateTimeFormat.forPattern("dd-MMM-yy
>> HH.mm.ss.SSSSSS a").withZone(DateTimeZone.getDefault());
>>
>> After the retracted stream, the AM/PM information is not preserved.
>>
>>
>> 2. My onTimer method in KeyedProcessFunction can not be triggered when I
>> scheduled a event timer timer.
>>
>> I am using event time in my code. I am new to configure watermarks and I
>> might miss something to configure it correctly. I also tried to register a
>> processing time, it could enter and produce some results.
>>
>> I am trying to follow the example here:
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#example
>>
>> My onTimer method looks like this and the scheduled event doesn't happen..
>>
>> In processElement():
>>
>> context.timerService().registerEventTimeTimer(current.getLastModifiedTime()
>> + 10000);
>>
>> My onTimer function
>>
>>   @Override
>>     public void onTimer(long timestamp, OnTimerContext ctx,
>> Collector<BusinessObject> collector) throws Exception {
>>         TestBusinessObjectState result = testBusinessObjectState.value();
>>         log.info("Inside onTimer Method, current key: {}, timestamp: {},
>> last modified time: {}", ctx.getCurrentKey(), timestamp,
>> result.getLastModifiedTime());
>>
>>         // check if this is an outdated timer or the latest timer
>>         if (timestamp >= result.getLastModifiedTime() + 10000) {
>>             // emit the state on timeout
>>             log.info("Collecting a business object, {}",
>> result.getBusinessObject().toString());
>>             collector.collect(result.getBusinessObject());
>>
>>             cleanUp(ctx);
>>         }
>>     }
>>
>>     private void cleanUp(Context ctx) throws Exception {
>>         Long timer =
>> testBusinessObjectState.value().getLastModifiedTime();
>>         ctx.timerService().deleteEventTimeTimer(timer);
>>         testBusinessObjectState.clear();
>>     }
>>
>>
>> (1) When I assign the timestamp and watermarks outside the process()
>> method chain. The "context.timestamp()" will be null. If I put it inside
>> the chain, it won't be null. Is this the expected behavior? In the null
>> case, the strange thing is that, surprisingly, I can collect the business
>> object immediately without a designed 10 second waiting time... This
>> shouldn't happen, right...? The processing timer also seems to work. The
>> code can enter the on timer method.
>>
>>  retractStream.assignTimestampsAndWatermarks(new
>> BoRetractStreamTimestampAssigner()); (This is a deprecated method)
>>
>>  retractStream
>>     .keyBy(<key selector>)
>>     .process(new TableOutputProcessFunction())
>>     .name("ProcessTableOutput")
>>     .uid("ProcessTableOutput")
>>     .addSink(businessObjectSink)
>>     .name("businessObjectSink")
>>     .uid("businessObjectSink")
>>     .setParallelism(1);
>>
>> (2) For watermarks configuration. I use an field in the retracted stream
>> as the event time. This time is usually 15-20 seconds before current time.
>>
>> In my environment, I have done some settings for streaming env based on
>> information here(
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#writing-a-periodic-watermarkgenerator).
>> My event doesn't always come, so I think I need to set auto watermark
>> interval to let the event timer on timer works correctly. I have added the
>> code below.
>>
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> env.getConfig().setAutoWatermarkInterval(1000L);
>>
>> 1> Which kind of watermark strategy should I use? General
>> BoundOutofOrderness or Watermark generator?
>>
>> I tried to write a Watermark generator and I just don't how to apply it
>> to the stream correctly. The documentation doesn't explain very clearly. My
>> code looks like below and it doesn't work.
>>
>> assign part:
>>
>> .assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator((WatermarkGeneratorSupplier<Tuple2<Boolean,
>> Row>>) context -> new TableBoundOutofOrdernessGenerator()))
>>
>> watermark generater:
>>
>> I just assign the event time attribute following the example in the doc. (
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#writing-a-periodic-watermarkgenerator
>> )
>>
>> 2> I also tried to use the static method in Water Strategy. The syntax is
>> correct, but I meet the same problem in 2.(1).
>>
>>     .assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<Boolean,
>> Row>>forBoundedOutOfOrderness(Duration.ofSeconds(15))
>>                         .withTimestampAssigner((booleanRowTuple2,
>> timestamp) -> {
>>                             <Select a event time attribute in the
>> booleanRowTuple2>
>>                         }))
>>
>>
>> (3) For the retracted datastream, do I need to explicitly attach it to
>> the stream environment? I think it is done by default, right? Just want to
>> confirm it. I do have the env.execute() at the end of the code.
>>
>> I understand this is a lot of questions, thanks a lot for your patience
>> to look through my email! If there is anything unclear, please reach out to
>> me. Thanks!
>>
>>
>> Best regards,
>>
>> Fuyao Li
>>
>

Reply via email to