Hi Fuyao,
for your first question about the different behavior depending on whether
you chain the methods or not: Keep in mind that you have to save the return
value of the assignTimestampsAndWatermarks method call if you don't chain
the methods together as it is also shown in [1].
At least the following example from your first message is indicating it:
```
retractStream.assignTimestampsAndWatermarks(new
BoRetractStreamTimestampAssigner()); (This is a deprecated method)
// instead of: retractStream =
retractStream.assignTimestampsAndWatermarks(new
BoRetractStreamTimestampAssigner());
retractStream
.keyBy(<key selector>)
.process(new TableOutputProcessFunction())
.name("ProcessTableOutput")
.uid("ProcessTableOutput")
.addSink(businessObjectSink)
.name("businessObjectSink")
.uid("businessObjectSink")
.setParallelism(1);
```
For your second question about setting the EventTime I'm going to pull in
Timo from the SDK team as I don't see an issue with your code right away.
Best,
Matthias
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#using-watermark-strategies
On Wed, Nov 4, 2020 at 10:16 PM Fuyao Li <[email protected]> wrote:
> Hi Flink Users and Community,
>
> For the first part of the question, the 12 hour time difference is caused
> by a time extraction bug myself. I can get the time translated correctly
> now. The type cast problem does have some workarounds to solve it..
>
> My major blocker right now is the onTimer part is not properly triggered.
> I guess it is caused by failing to configure the correct watermarks &
> timestamp assigners. Please give me some insights.
>
> 1. If I don't chain the assignTimestampsAndWatermarks() method in together
> with keyedBy().. and process().. method. The context.timestamp() in my
> processElement() function will be null. Is this some expected behavior? The
> Flink examples didn't chain it together. (see example here:
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#using-watermark-strategies
> )
> 2. If I use registerEventTimeTimer() in processElement(). The onTimer
> method will not be triggered. However, I can trigger the onTimer method if
> I simply change it to registerProcessingTimeTimer(). I am using the
> settings below in the stream env.
>
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> env.getConfig().setAutoWatermarkInterval(1000L);
>
> My code for method the process chain:
> retractStream
>
> .assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<Boolean,
> Row>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
> .withTimestampAssigner((booleanRowTuple2,
> timestamp) -> {
> Row rowData = booleanRowTuple2.f1;
> LocalDateTime headerTime =
> (LocalDateTime)rowData.getField(3);
> LocalDateTime linesTime =
> (LocalDateTime)rowData.getField(7);
>
> LocalDateTime latestDBUpdateTime = null;
> if (headerTime != null && linesTime != null) {
> latestDBUpdateTime =
> headerTime.isAfter(linesTime) ? headerTime : linesTime;
> }
> else {
> latestDBUpdateTime = (headerTime != null)
> ? headerTime : linesTime;
> }
> if (latestDBUpdateTime != null) {
> return
> latestDBUpdateTime.atZone(ZoneId.of("America/Los_Angeles")).toInstant().toEpochMilli();
> }
> // In the worst case, we use system time
> instead, which should never be reached.
> return System.currentTimeMillis();
> }))
> // .assignTimestampsAndWatermarks(new MyWaterStrategy()) //
> second way to create watermark, doesn't work
> .keyBy(value -> {
> // There could be null fields for header invoice_id
> field
> String invoice_id_key = (String)value.f1.getField(0);
> if (invoice_id_key == null) {
> invoice_id_key = (String)value.f1.getField(4);
> }
> return invoice_id_key;
> })
> .process(new TableOutputProcessFunction())
> .name("ProcessTableOutput")
> .uid("ProcessTableOutput")
> .addSink(businessObjectSink)
> .name("businessObjectSink")
> .uid("businessObjectSink")
> .setParallelism(1);
>
> Best regards,
> Fuyao
>
> On Mon, Nov 2, 2020 at 4:53 PM Fuyao Li <[email protected]> wrote:
>
>> Hi Flink Community,
>>
>> I am doing some research work on Flink Datastream and Table API and I
>> meet two major problems. I am using Flink 1.11.2, scala version 2.11, java
>> 8. My use case looks like this. I plan to write a data processing pipeline
>> with two stages. My goal is to construct a business object containing
>> information from several Kafka streams with a primary key and emit the
>> complete business object if such primary key doesn't appear in the
>> pipeline for 10 seconds.
>>
>> In the first stage, I first consume three Kafka streams and transform it
>> to Flink Datastream using a deserialization schema containing some type and
>> date format transformation, and then I register these data streams as Table
>> and do a full outer join one by one using Table API. I also add query
>> configuration for this to avoid excessive state. The primary key is also
>> the join key.
>>
>> In the second stage, I transform the joined table to a retracted stream
>> and put it into KeyedProcessFunction to generate the business object if the
>> business object's primary key is inactive for 10 second.
>>
>> Is this way of handling the data the suggested approach? (I understand I
>> can directly consume kafka data in Table API. I haven't tried that yet,
>> maybe that's better?) Any suggestion is welcomed. During implementing this,
>> I meet two major problems and several smaller questions under each problem.
>>
>>
>> 1. Some type cast behavior of retracted streams I can't explain.
>>
>> (1) In the initial stage, I registered some field as *java.sql.Date* or
>> *java.sql.timestamp* following the examples at (
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html#data-type-extraction)
>> . After join and transform to retracted stream, it becomes
>> *java.time.LocalDate* and *java.time.LocalDateTime* instead.
>>
>> For example, when first ingesting the Kafka streams, I registerd a
>> attribute in java.sql.Timestamp type.
>>
>> @JsonAlias("ATTRIBUTE1")
>> private @DataTypeHint(value = "TIMESTAMP(6)", bridgedTo =
>> java.sql.Timestamp.class) Timestamp ATTRIBUTE1;
>>
>> When I tried to cast the type information back after the retracted
>> stream, the code gives me error information below.
>>
>> java.lang.ClassCastException: java.time.LocalDateTime cannot be cast to
>> java.sql.Timestamp
>>
>> Maybe I should use toAppendStream instead since append stream could
>> register type information, but toRetractedStream can't do that? (
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/common.html#convert-a-table-into-a-datastream-or-dataset
>> )
>>
>> My work around is to cast it to LocalDateTime first and extract the epoch
>> time, this doesn't seem to be a final solution.
>>
>> (2) During timestamp conversion, the Flink to retracted stream seems to
>> lost the AM/PM information in the stream and causing a 12 hour difference
>> if it is PM.
>>
>> I use joda time to do some timestamp conversion in the first
>> deserialization stage, my pattern looks like this. "a" means AM/PM
>> information
>>
>> DateTimeFormatter format3 = DateTimeFormat.forPattern("dd-MMM-yy
>> HH.mm.ss.SSSSSS a").withZone(DateTimeZone.getDefault());
>>
>> After the retracted stream, the AM/PM information is not preserved.
>>
>>
>> 2. My onTimer method in KeyedProcessFunction can not be triggered when I
>> scheduled a event timer timer.
>>
>> I am using event time in my code. I am new to configure watermarks and I
>> might miss something to configure it correctly. I also tried to register a
>> processing time, it could enter and produce some results.
>>
>> I am trying to follow the example here:
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#example
>>
>> My onTimer method looks like this and the scheduled event doesn't happen..
>>
>> In processElement():
>>
>> context.timerService().registerEventTimeTimer(current.getLastModifiedTime()
>> + 10000);
>>
>> My onTimer function
>>
>> @Override
>> public void onTimer(long timestamp, OnTimerContext ctx,
>> Collector<BusinessObject> collector) throws Exception {
>> TestBusinessObjectState result = testBusinessObjectState.value();
>> log.info("Inside onTimer Method, current key: {}, timestamp: {},
>> last modified time: {}", ctx.getCurrentKey(), timestamp,
>> result.getLastModifiedTime());
>>
>> // check if this is an outdated timer or the latest timer
>> if (timestamp >= result.getLastModifiedTime() + 10000) {
>> // emit the state on timeout
>> log.info("Collecting a business object, {}",
>> result.getBusinessObject().toString());
>> collector.collect(result.getBusinessObject());
>>
>> cleanUp(ctx);
>> }
>> }
>>
>> private void cleanUp(Context ctx) throws Exception {
>> Long timer =
>> testBusinessObjectState.value().getLastModifiedTime();
>> ctx.timerService().deleteEventTimeTimer(timer);
>> testBusinessObjectState.clear();
>> }
>>
>>
>> (1) When I assign the timestamp and watermarks outside the process()
>> method chain. The "context.timestamp()" will be null. If I put it inside
>> the chain, it won't be null. Is this the expected behavior? In the null
>> case, the strange thing is that, surprisingly, I can collect the business
>> object immediately without a designed 10 second waiting time... This
>> shouldn't happen, right...? The processing timer also seems to work. The
>> code can enter the on timer method.
>>
>> retractStream.assignTimestampsAndWatermarks(new
>> BoRetractStreamTimestampAssigner()); (This is a deprecated method)
>>
>> retractStream
>> .keyBy(<key selector>)
>> .process(new TableOutputProcessFunction())
>> .name("ProcessTableOutput")
>> .uid("ProcessTableOutput")
>> .addSink(businessObjectSink)
>> .name("businessObjectSink")
>> .uid("businessObjectSink")
>> .setParallelism(1);
>>
>> (2) For watermarks configuration. I use an field in the retracted stream
>> as the event time. This time is usually 15-20 seconds before current time.
>>
>> In my environment, I have done some settings for streaming env based on
>> information here(
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#writing-a-periodic-watermarkgenerator).
>> My event doesn't always come, so I think I need to set auto watermark
>> interval to let the event timer on timer works correctly. I have added the
>> code below.
>>
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> env.getConfig().setAutoWatermarkInterval(1000L);
>>
>> 1> Which kind of watermark strategy should I use? General
>> BoundOutofOrderness or Watermark generator?
>>
>> I tried to write a Watermark generator and I just don't how to apply it
>> to the stream correctly. The documentation doesn't explain very clearly. My
>> code looks like below and it doesn't work.
>>
>> assign part:
>>
>> .assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator((WatermarkGeneratorSupplier<Tuple2<Boolean,
>> Row>>) context -> new TableBoundOutofOrdernessGenerator()))
>>
>> watermark generater:
>>
>> I just assign the event time attribute following the example in the doc. (
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#writing-a-periodic-watermarkgenerator
>> )
>>
>> 2> I also tried to use the static method in Water Strategy. The syntax is
>> correct, but I meet the same problem in 2.(1).
>>
>> .assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<Boolean,
>> Row>>forBoundedOutOfOrderness(Duration.ofSeconds(15))
>> .withTimestampAssigner((booleanRowTuple2,
>> timestamp) -> {
>> <Select a event time attribute in the
>> booleanRowTuple2>
>> }))
>>
>>
>> (3) For the retracted datastream, do I need to explicitly attach it to
>> the stream environment? I think it is done by default, right? Just want to
>> confirm it. I do have the env.execute() at the end of the code.
>>
>> I understand this is a lot of questions, thanks a lot for your patience
>> to look through my email! If there is anything unclear, please reach out to
>> me. Thanks!
>>
>>
>> Best regards,
>>
>> Fuyao Li
>>
>