Hi Flink Users and Community,

For the first part of the question, the 12 hour time difference is caused
by a time extraction bug myself. I can get the time translated correctly
now. The type cast problem does have some workarounds to solve it..

My major blocker right now is the onTimer part is not properly triggered. I
guess it is caused by failing to configure the correct watermarks &
timestamp assigners. Please give me some insights.

1. If I don't chain the assignTimestampsAndWatermarks() method in together
with keyedBy().. and process().. method. The context.timestamp() in my
processElement() function will be null. Is this some expected behavior? The
Flink examples didn't chain it together. (see example here:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#using-watermark-strategies
)
2. If I use registerEventTimeTimer() in processElement(). The onTimer
method will not be triggered. However, I can trigger the onTimer method if
I simply change it to registerProcessingTimeTimer(). I am using the
settings below in the stream env.

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(1000L);

My code for method the process chain:
retractStream

.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<Boolean,
Row>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
                        .withTimestampAssigner((booleanRowTuple2,
timestamp) -> {
                            Row rowData = booleanRowTuple2.f1;
                            LocalDateTime headerTime =
(LocalDateTime)rowData.getField(3);
                            LocalDateTime linesTime =
(LocalDateTime)rowData.getField(7);

                            LocalDateTime latestDBUpdateTime = null;
                            if (headerTime != null && linesTime != null) {
                                latestDBUpdateTime =
headerTime.isAfter(linesTime) ? headerTime : linesTime;
                            }
                            else {
                                latestDBUpdateTime = (headerTime != null) ?
headerTime : linesTime;
                            }
                            if (latestDBUpdateTime != null) {
                                return
latestDBUpdateTime.atZone(ZoneId.of("America/Los_Angeles")).toInstant().toEpochMilli();
                            }
                            // In the worst case, we use system time
instead, which should never be reached.
                            return System.currentTimeMillis();
                        }))
//              .assignTimestampsAndWatermarks(new MyWaterStrategy())  //
second way to create watermark, doesn't work
                .keyBy(value -> {
                    // There could be null fields for header invoice_id
field
                    String invoice_id_key = (String)value.f1.getField(0);
                    if (invoice_id_key == null) {
                        invoice_id_key = (String)value.f1.getField(4);
                    }
                    return invoice_id_key;
                })
                .process(new TableOutputProcessFunction())
                .name("ProcessTableOutput")
                .uid("ProcessTableOutput")
                .addSink(businessObjectSink)
                .name("businessObjectSink")
                .uid("businessObjectSink")
                .setParallelism(1);

Best regards,
Fuyao

On Mon, Nov 2, 2020 at 4:53 PM Fuyao Li <fuyaoli2...@gmail.com> wrote:

> Hi Flink Community,
>
> I am doing some research work on Flink Datastream and Table API and I meet
> two major problems. I am using Flink 1.11.2, scala version 2.11, java 8. My
> use case looks like this. I plan to write a data processing pipeline with
> two stages. My goal is to construct a business object containing
> information from several Kafka streams with a primary key and emit the
> complete business object if such primary key doesn't  appear in the
> pipeline for 10 seconds.
>
> In the first stage, I first consume three Kafka streams and transform it
> to Flink Datastream using a deserialization schema containing some type and
> date format transformation, and then I register these data streams as Table
> and do a full outer join one by one using Table API. I also add query
> configuration for this to avoid excessive state. The primary key is also
> the join key.
>
> In the second stage, I transform the joined table to a retracted stream
> and put it into KeyedProcessFunction to generate the business object if the
> business object's primary key is inactive for 10 second.
>
> Is this way of handling the data the suggested approach? (I understand I
> can directly consume kafka data in Table API. I haven't tried that yet,
> maybe that's better?) Any suggestion is welcomed. During implementing this,
> I meet two major problems and several smaller questions under each problem.
>
>
> 1. Some type cast behavior of retracted streams I can't explain.
>
> (1) In the initial stage, I registered some field as *java.sql.Date* or
> *java.sql.timestamp* following the examples at (
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html#data-type-extraction)
> . After join and transform to retracted stream, it becomes
> *java.time.LocalDate* and *java.time.LocalDateTime* instead.
>
> For example, when first ingesting the Kafka streams, I registerd a
> attribute in java.sql.Timestamp type.
>
>  @JsonAlias("ATTRIBUTE1")
>  private @DataTypeHint(value = "TIMESTAMP(6)", bridgedTo =
> java.sql.Timestamp.class) Timestamp ATTRIBUTE1;
>
> When I tried to cast the type information back after the retracted stream,
> the code gives me error information below.
>
>  java.lang.ClassCastException: java.time.LocalDateTime cannot be cast to
> java.sql.Timestamp
>
> Maybe I should use toAppendStream instead since append stream could
> register type information, but toRetractedStream can't do that? (
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/common.html#convert-a-table-into-a-datastream-or-dataset
> )
>
> My work around is to cast it to LocalDateTime first and extract the epoch
> time, this doesn't seem to be a final solution.
>
> (2) During timestamp conversion, the Flink to retracted stream seems to
> lost the AM/PM information in the stream and causing a 12 hour difference
> if it is PM.
>
> I use joda time to do some timestamp conversion in the first
> deserialization stage, my pattern looks like this. "a" means AM/PM
> information
>
>  DateTimeFormatter format3 = DateTimeFormat.forPattern("dd-MMM-yy
> HH.mm.ss.SSSSSS a").withZone(DateTimeZone.getDefault());
>
> After the retracted stream, the AM/PM information is not preserved.
>
>
> 2. My onTimer method in KeyedProcessFunction can not be triggered when I
> scheduled a event timer timer.
>
> I am using event time in my code. I am new to configure watermarks and I
> might miss something to configure it correctly. I also tried to register a
> processing time, it could enter and produce some results.
>
> I am trying to follow the example here:
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#example
>
> My onTimer method looks like this and the scheduled event doesn't happen..
>
> In processElement():
>
> context.timerService().registerEventTimeTimer(current.getLastModifiedTime()
> + 10000);
>
> My onTimer function
>
>   @Override
>     public void onTimer(long timestamp, OnTimerContext ctx,
> Collector<BusinessObject> collector) throws Exception {
>         TestBusinessObjectState result = testBusinessObjectState.value();
>         log.info("Inside onTimer Method, current key: {}, timestamp: {},
> last modified time: {}", ctx.getCurrentKey(), timestamp,
> result.getLastModifiedTime());
>
>         // check if this is an outdated timer or the latest timer
>         if (timestamp >= result.getLastModifiedTime() + 10000) {
>             // emit the state on timeout
>             log.info("Collecting a business object, {}",
> result.getBusinessObject().toString());
>             collector.collect(result.getBusinessObject());
>
>             cleanUp(ctx);
>         }
>     }
>
>     private void cleanUp(Context ctx) throws Exception {
>         Long timer = testBusinessObjectState.value().getLastModifiedTime();
>         ctx.timerService().deleteEventTimeTimer(timer);
>         testBusinessObjectState.clear();
>     }
>
>
> (1) When I assign the timestamp and watermarks outside the process()
> method chain. The "context.timestamp()" will be null. If I put it inside
> the chain, it won't be null. Is this the expected behavior? In the null
> case, the strange thing is that, surprisingly, I can collect the business
> object immediately without a designed 10 second waiting time... This
> shouldn't happen, right...? The processing timer also seems to work. The
> code can enter the on timer method.
>
>  retractStream.assignTimestampsAndWatermarks(new
> BoRetractStreamTimestampAssigner()); (This is a deprecated method)
>
>  retractStream
>     .keyBy(<key selector>)
>     .process(new TableOutputProcessFunction())
>     .name("ProcessTableOutput")
>     .uid("ProcessTableOutput")
>     .addSink(businessObjectSink)
>     .name("businessObjectSink")
>     .uid("businessObjectSink")
>     .setParallelism(1);
>
> (2) For watermarks configuration. I use an field in the retracted stream
> as the event time. This time is usually 15-20 seconds before current time.
>
> In my environment, I have done some settings for streaming env based on
> information here(
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#writing-a-periodic-watermarkgenerator).
> My event doesn't always come, so I think I need to set auto watermark
> interval to let the event timer on timer works correctly. I have added the
> code below.
>
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> env.getConfig().setAutoWatermarkInterval(1000L);
>
> 1> Which kind of watermark strategy should I use? General
> BoundOutofOrderness or Watermark generator?
>
> I tried to write a Watermark generator and I just don't how to apply it to
> the stream correctly. The documentation doesn't explain very clearly. My
> code looks like below and it doesn't work.
>
> assign part:
>
> .assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator((WatermarkGeneratorSupplier<Tuple2<Boolean,
> Row>>) context -> new TableBoundOutofOrdernessGenerator()))
>
> watermark generater:
>
> I just assign the event time attribute following the example in the doc. (
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#writing-a-periodic-watermarkgenerator
> )
>
> 2> I also tried to use the static method in Water Strategy. The syntax is
> correct, but I meet the same problem in 2.(1).
>
>     .assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<Boolean,
> Row>>forBoundedOutOfOrderness(Duration.ofSeconds(15))
>                         .withTimestampAssigner((booleanRowTuple2,
> timestamp) -> {
>                             <Select a event time attribute in the
> booleanRowTuple2>
>                         }))
>
>
> (3) For the retracted datastream, do I need to explicitly attach it to the
> stream environment? I think it is done by default, right? Just want to
> confirm it. I do have the env.execute() at the end of the code.
>
> I understand this is a lot of questions, thanks a lot for your patience
> to look through my email! If there is anything unclear, please reach out to
> me. Thanks!
>
>
> Best regards,
>
> Fuyao Li
>

Reply via email to