Hi Flink Community, I am doing some research work on Flink Datastream and Table API and I meet two major problems. I am using Flink 1.11.2, scala version 2.11, java 8. My use case looks like this. I plan to write a data processing pipeline with two stages. My goal is to construct a business object containing information from several Kafka streams with a primary key and emit the complete business object if such primary key doesn't appear in the pipeline for 10 seconds.
In the first stage, I first consume three Kafka streams and transform it to Flink Datastream using a deserialization schema containing some type and date format transformation, and then I register these data streams as Table and do a full outer join one by one using Table API. I also add query configuration for this to avoid excessive state. The primary key is also the join key. In the second stage, I transform the joined table to a retracted stream and put it into KeyedProcessFunction to generate the business object if the business object's primary key is inactive for 10 second. Is this way of handling the data the suggested approach? (I understand I can directly consume kafka data in Table API. I haven't tried that yet, maybe that's better?) Any suggestion is welcomed. During implementing this, I meet two major problems and several smaller questions under each problem. 1. Some type cast behavior of retracted streams I can't explain. (1) In the initial stage, I registered some field as *java.sql.Date* or *java.sql.timestamp* following the examples at ( https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html#data-type-extraction) . After join and transform to retracted stream, it becomes *java.time.LocalDate* and *java.time.LocalDateTime* instead. For example, when first ingesting the Kafka streams, I registerd a attribute in java.sql.Timestamp type. @JsonAlias("ATTRIBUTE1") private @DataTypeHint(value = "TIMESTAMP(6)", bridgedTo = java.sql.Timestamp.class) Timestamp ATTRIBUTE1; When I tried to cast the type information back after the retracted stream, the code gives me error information below. java.lang.ClassCastException: java.time.LocalDateTime cannot be cast to java.sql.Timestamp Maybe I should use toAppendStream instead since append stream could register type information, but toRetractedStream can't do that? ( https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/common.html#convert-a-table-into-a-datastream-or-dataset ) My work around is to cast it to LocalDateTime first and extract the epoch time, this doesn't seem to be a final solution. (2) During timestamp conversion, the Flink to retracted stream seems to lost the AM/PM information in the stream and causing a 12 hour difference if it is PM. I use joda time to do some timestamp conversion in the first deserialization stage, my pattern looks like this. "a" means AM/PM information DateTimeFormatter format3 = DateTimeFormat.forPattern("dd-MMM-yy HH.mm.ss.SSSSSS a").withZone(DateTimeZone.getDefault()); After the retracted stream, the AM/PM information is not preserved. 2. My onTimer method in KeyedProcessFunction can not be triggered when I scheduled a event timer timer. I am using event time in my code. I am new to configure watermarks and I might miss something to configure it correctly. I also tried to register a processing time, it could enter and produce some results. I am trying to follow the example here: https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#example My onTimer method looks like this and the scheduled event doesn't happen.. In processElement(): context.timerService().registerEventTimeTimer(current.getLastModifiedTime() + 10000); My onTimer function @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<BusinessObject> collector) throws Exception { TestBusinessObjectState result = testBusinessObjectState.value(); log.info("Inside onTimer Method, current key: {}, timestamp: {}, last modified time: {}", ctx.getCurrentKey(), timestamp, result.getLastModifiedTime()); // check if this is an outdated timer or the latest timer if (timestamp >= result.getLastModifiedTime() + 10000) { // emit the state on timeout log.info("Collecting a business object, {}", result.getBusinessObject().toString()); collector.collect(result.getBusinessObject()); cleanUp(ctx); } } private void cleanUp(Context ctx) throws Exception { Long timer = testBusinessObjectState.value().getLastModifiedTime(); ctx.timerService().deleteEventTimeTimer(timer); testBusinessObjectState.clear(); } (1) When I assign the timestamp and watermarks outside the process() method chain. The "context.timestamp()" will be null. If I put it inside the chain, it won't be null. Is this the expected behavior? In the null case, the strange thing is that, surprisingly, I can collect the business object immediately without a designed 10 second waiting time... This shouldn't happen, right...? The processing timer also seems to work. The code can enter the on timer method. retractStream.assignTimestampsAndWatermarks(new BoRetractStreamTimestampAssigner()); (This is a deprecated method) retractStream .keyBy(<key selector>) .process(new TableOutputProcessFunction()) .name("ProcessTableOutput") .uid("ProcessTableOutput") .addSink(businessObjectSink) .name("businessObjectSink") .uid("businessObjectSink") .setParallelism(1); (2) For watermarks configuration. I use an field in the retracted stream as the event time. This time is usually 15-20 seconds before current time. In my environment, I have done some settings for streaming env based on information here( https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#writing-a-periodic-watermarkgenerator). My event doesn't always come, so I think I need to set auto watermark interval to let the event timer on timer works correctly. I have added the code below. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.getConfig().setAutoWatermarkInterval(1000L); 1> Which kind of watermark strategy should I use? General BoundOutofOrderness or Watermark generator? I tried to write a Watermark generator and I just don't how to apply it to the stream correctly. The documentation doesn't explain very clearly. My code looks like below and it doesn't work. assign part: .assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator((WatermarkGeneratorSupplier<Tuple2<Boolean, Row>>) context -> new TableBoundOutofOrdernessGenerator())) watermark generater: I just assign the event time attribute following the example in the doc. ( https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#writing-a-periodic-watermarkgenerator ) 2> I also tried to use the static method in Water Strategy. The syntax is correct, but I meet the same problem in 2.(1). .assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<Boolean, Row>>forBoundedOutOfOrderness(Duration.ofSeconds(15)) .withTimestampAssigner((booleanRowTuple2, timestamp) -> { <Select a event time attribute in the booleanRowTuple2> })) (3) For the retracted datastream, do I need to explicitly attach it to the stream environment? I think it is done by default, right? Just want to confirm it. I do have the env.execute() at the end of the code. I understand this is a lot of questions, thanks a lot for your patience to look through my email! If there is anything unclear, please reach out to me. Thanks! Best regards, Fuyao Li