Hi Fuyao, for your first question about the different behavior depending on whether you chain the methods or not: Keep in mind that you have to save the return value of the assignTimestampsAndWatermarks method call if you don't chain the methods together as it is also shown in [1]. At least the following example from your first message is indicating it: ``` retractStream.assignTimestampsAndWatermarks(new BoRetractStreamTimestampAssigner()); (This is a deprecated method) // instead of: retractStream = retractStream.assignTimestampsAndWatermarks(new BoRetractStreamTimestampAssigner()); retractStream .keyBy(<key selector>) .process(new TableOutputProcessFunction()) .name("ProcessTableOutput") .uid("ProcessTableOutput") .addSink(businessObjectSink) .name("businessObjectSink") .uid("businessObjectSink") .setParallelism(1); ```
For your second question about setting the EventTime I'm going to pull in Timo from the SDK team as I don't see an issue with your code right away. Best, Matthias [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#using-watermark-strategies On Wed, Nov 4, 2020 at 10:16 PM Fuyao Li <fuyaoli2...@gmail.com> wrote: > Hi Flink Users and Community, > > For the first part of the question, the 12 hour time difference is caused > by a time extraction bug myself. I can get the time translated correctly > now. The type cast problem does have some workarounds to solve it.. > > My major blocker right now is the onTimer part is not properly triggered. > I guess it is caused by failing to configure the correct watermarks & > timestamp assigners. Please give me some insights. > > 1. If I don't chain the assignTimestampsAndWatermarks() method in together > with keyedBy().. and process().. method. The context.timestamp() in my > processElement() function will be null. Is this some expected behavior? The > Flink examples didn't chain it together. (see example here: > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#using-watermark-strategies > ) > 2. If I use registerEventTimeTimer() in processElement(). The onTimer > method will not be triggered. However, I can trigger the onTimer method if > I simply change it to registerProcessingTimeTimer(). I am using the > settings below in the stream env. > > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > env.getConfig().setAutoWatermarkInterval(1000L); > > My code for method the process chain: > retractStream > > .assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<Boolean, > Row>>forBoundedOutOfOrderness(Duration.ofSeconds(20)) > .withTimestampAssigner((booleanRowTuple2, > timestamp) -> { > Row rowData = booleanRowTuple2.f1; > LocalDateTime headerTime = > (LocalDateTime)rowData.getField(3); > LocalDateTime linesTime = > (LocalDateTime)rowData.getField(7); > > LocalDateTime latestDBUpdateTime = null; > if (headerTime != null && linesTime != null) { > latestDBUpdateTime = > headerTime.isAfter(linesTime) ? headerTime : linesTime; > } > else { > latestDBUpdateTime = (headerTime != null) > ? headerTime : linesTime; > } > if (latestDBUpdateTime != null) { > return > latestDBUpdateTime.atZone(ZoneId.of("America/Los_Angeles")).toInstant().toEpochMilli(); > } > // In the worst case, we use system time > instead, which should never be reached. > return System.currentTimeMillis(); > })) > // .assignTimestampsAndWatermarks(new MyWaterStrategy()) // > second way to create watermark, doesn't work > .keyBy(value -> { > // There could be null fields for header invoice_id > field > String invoice_id_key = (String)value.f1.getField(0); > if (invoice_id_key == null) { > invoice_id_key = (String)value.f1.getField(4); > } > return invoice_id_key; > }) > .process(new TableOutputProcessFunction()) > .name("ProcessTableOutput") > .uid("ProcessTableOutput") > .addSink(businessObjectSink) > .name("businessObjectSink") > .uid("businessObjectSink") > .setParallelism(1); > > Best regards, > Fuyao > > On Mon, Nov 2, 2020 at 4:53 PM Fuyao Li <fuyaoli2...@gmail.com> wrote: > >> Hi Flink Community, >> >> I am doing some research work on Flink Datastream and Table API and I >> meet two major problems. I am using Flink 1.11.2, scala version 2.11, java >> 8. My use case looks like this. I plan to write a data processing pipeline >> with two stages. My goal is to construct a business object containing >> information from several Kafka streams with a primary key and emit the >> complete business object if such primary key doesn't appear in the >> pipeline for 10 seconds. >> >> In the first stage, I first consume three Kafka streams and transform it >> to Flink Datastream using a deserialization schema containing some type and >> date format transformation, and then I register these data streams as Table >> and do a full outer join one by one using Table API. I also add query >> configuration for this to avoid excessive state. The primary key is also >> the join key. >> >> In the second stage, I transform the joined table to a retracted stream >> and put it into KeyedProcessFunction to generate the business object if the >> business object's primary key is inactive for 10 second. >> >> Is this way of handling the data the suggested approach? (I understand I >> can directly consume kafka data in Table API. I haven't tried that yet, >> maybe that's better?) Any suggestion is welcomed. During implementing this, >> I meet two major problems and several smaller questions under each problem. >> >> >> 1. Some type cast behavior of retracted streams I can't explain. >> >> (1) In the initial stage, I registered some field as *java.sql.Date* or >> *java.sql.timestamp* following the examples at ( >> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html#data-type-extraction) >> . After join and transform to retracted stream, it becomes >> *java.time.LocalDate* and *java.time.LocalDateTime* instead. >> >> For example, when first ingesting the Kafka streams, I registerd a >> attribute in java.sql.Timestamp type. >> >> @JsonAlias("ATTRIBUTE1") >> private @DataTypeHint(value = "TIMESTAMP(6)", bridgedTo = >> java.sql.Timestamp.class) Timestamp ATTRIBUTE1; >> >> When I tried to cast the type information back after the retracted >> stream, the code gives me error information below. >> >> java.lang.ClassCastException: java.time.LocalDateTime cannot be cast to >> java.sql.Timestamp >> >> Maybe I should use toAppendStream instead since append stream could >> register type information, but toRetractedStream can't do that? ( >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/common.html#convert-a-table-into-a-datastream-or-dataset >> ) >> >> My work around is to cast it to LocalDateTime first and extract the epoch >> time, this doesn't seem to be a final solution. >> >> (2) During timestamp conversion, the Flink to retracted stream seems to >> lost the AM/PM information in the stream and causing a 12 hour difference >> if it is PM. >> >> I use joda time to do some timestamp conversion in the first >> deserialization stage, my pattern looks like this. "a" means AM/PM >> information >> >> DateTimeFormatter format3 = DateTimeFormat.forPattern("dd-MMM-yy >> HH.mm.ss.SSSSSS a").withZone(DateTimeZone.getDefault()); >> >> After the retracted stream, the AM/PM information is not preserved. >> >> >> 2. My onTimer method in KeyedProcessFunction can not be triggered when I >> scheduled a event timer timer. >> >> I am using event time in my code. I am new to configure watermarks and I >> might miss something to configure it correctly. I also tried to register a >> processing time, it could enter and produce some results. >> >> I am trying to follow the example here: >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#example >> >> My onTimer method looks like this and the scheduled event doesn't happen.. >> >> In processElement(): >> >> context.timerService().registerEventTimeTimer(current.getLastModifiedTime() >> + 10000); >> >> My onTimer function >> >> @Override >> public void onTimer(long timestamp, OnTimerContext ctx, >> Collector<BusinessObject> collector) throws Exception { >> TestBusinessObjectState result = testBusinessObjectState.value(); >> log.info("Inside onTimer Method, current key: {}, timestamp: {}, >> last modified time: {}", ctx.getCurrentKey(), timestamp, >> result.getLastModifiedTime()); >> >> // check if this is an outdated timer or the latest timer >> if (timestamp >= result.getLastModifiedTime() + 10000) { >> // emit the state on timeout >> log.info("Collecting a business object, {}", >> result.getBusinessObject().toString()); >> collector.collect(result.getBusinessObject()); >> >> cleanUp(ctx); >> } >> } >> >> private void cleanUp(Context ctx) throws Exception { >> Long timer = >> testBusinessObjectState.value().getLastModifiedTime(); >> ctx.timerService().deleteEventTimeTimer(timer); >> testBusinessObjectState.clear(); >> } >> >> >> (1) When I assign the timestamp and watermarks outside the process() >> method chain. The "context.timestamp()" will be null. If I put it inside >> the chain, it won't be null. Is this the expected behavior? In the null >> case, the strange thing is that, surprisingly, I can collect the business >> object immediately without a designed 10 second waiting time... This >> shouldn't happen, right...? The processing timer also seems to work. The >> code can enter the on timer method. >> >> retractStream.assignTimestampsAndWatermarks(new >> BoRetractStreamTimestampAssigner()); (This is a deprecated method) >> >> retractStream >> .keyBy(<key selector>) >> .process(new TableOutputProcessFunction()) >> .name("ProcessTableOutput") >> .uid("ProcessTableOutput") >> .addSink(businessObjectSink) >> .name("businessObjectSink") >> .uid("businessObjectSink") >> .setParallelism(1); >> >> (2) For watermarks configuration. I use an field in the retracted stream >> as the event time. This time is usually 15-20 seconds before current time. >> >> In my environment, I have done some settings for streaming env based on >> information here( >> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#writing-a-periodic-watermarkgenerator). >> My event doesn't always come, so I think I need to set auto watermark >> interval to let the event timer on timer works correctly. I have added the >> code below. >> >> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >> env.getConfig().setAutoWatermarkInterval(1000L); >> >> 1> Which kind of watermark strategy should I use? General >> BoundOutofOrderness or Watermark generator? >> >> I tried to write a Watermark generator and I just don't how to apply it >> to the stream correctly. The documentation doesn't explain very clearly. My >> code looks like below and it doesn't work. >> >> assign part: >> >> .assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator((WatermarkGeneratorSupplier<Tuple2<Boolean, >> Row>>) context -> new TableBoundOutofOrdernessGenerator())) >> >> watermark generater: >> >> I just assign the event time attribute following the example in the doc. ( >> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#writing-a-periodic-watermarkgenerator >> ) >> >> 2> I also tried to use the static method in Water Strategy. The syntax is >> correct, but I meet the same problem in 2.(1). >> >> .assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<Boolean, >> Row>>forBoundedOutOfOrderness(Duration.ofSeconds(15)) >> .withTimestampAssigner((booleanRowTuple2, >> timestamp) -> { >> <Select a event time attribute in the >> booleanRowTuple2> >> })) >> >> >> (3) For the retracted datastream, do I need to explicitly attach it to >> the stream environment? I think it is done by default, right? Just want to >> confirm it. I do have the env.execute() at the end of the code. >> >> I understand this is a lot of questions, thanks a lot for your patience >> to look through my email! If there is anything unclear, please reach out to >> me. Thanks! >> >> >> Best regards, >> >> Fuyao Li >> >