Hi Flink Community,

I am doing some research work on Flink Datastream and Table API and I meet
two major problems. I am using Flink 1.11.2, scala version 2.11, java 8. My
use case looks like this. I plan to write a data processing pipeline with
two stages. My goal is to construct a business object containing
information from several Kafka streams with a primary key and emit the
complete business object if such primary key doesn't  appear in the
pipeline for 10 seconds.

In the first stage, I first consume three Kafka streams and transform it to
Flink Datastream using a deserialization schema containing some type and
date format transformation, and then I register these data streams as Table
and do a full outer join one by one using Table API. I also add query
configuration for this to avoid excessive state. The primary key is also
the join key.

In the second stage, I transform the joined table to a retracted stream and
put it into KeyedProcessFunction to generate the business object if the
business object's primary key is inactive for 10 second.

Is this way of handling the data the suggested approach? (I understand I
can directly consume kafka data in Table API. I haven't tried that yet,
maybe that's better?) Any suggestion is welcomed. During implementing this,
I meet two major problems and several smaller questions under each problem.


1. Some type cast behavior of retracted streams I can't explain.

(1) In the initial stage, I registered some field as *java.sql.Date* or
*java.sql.timestamp* following the examples at (
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html#data-type-extraction)
. After join and transform to retracted stream, it becomes
*java.time.LocalDate* and *java.time.LocalDateTime* instead.

For example, when first ingesting the Kafka streams, I registerd a
attribute in java.sql.Timestamp type.

 @JsonAlias("ATTRIBUTE1")
 private @DataTypeHint(value = "TIMESTAMP(6)", bridgedTo =
java.sql.Timestamp.class) Timestamp ATTRIBUTE1;

When I tried to cast the type information back after the retracted stream,
the code gives me error information below.

 java.lang.ClassCastException: java.time.LocalDateTime cannot be cast to
java.sql.Timestamp

Maybe I should use toAppendStream instead since append stream could
register type information, but toRetractedStream can't do that? (
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/common.html#convert-a-table-into-a-datastream-or-dataset
)

My work around is to cast it to LocalDateTime first and extract the epoch
time, this doesn't seem to be a final solution.

(2) During timestamp conversion, the Flink to retracted stream seems to
lost the AM/PM information in the stream and causing a 12 hour difference
if it is PM.

I use joda time to do some timestamp conversion in the first
deserialization stage, my pattern looks like this. "a" means AM/PM
information

 DateTimeFormatter format3 = DateTimeFormat.forPattern("dd-MMM-yy
HH.mm.ss.SSSSSS a").withZone(DateTimeZone.getDefault());

After the retracted stream, the AM/PM information is not preserved.


2. My onTimer method in KeyedProcessFunction can not be triggered when I
scheduled a event timer timer.

I am using event time in my code. I am new to configure watermarks and I
might miss something to configure it correctly. I also tried to register a
processing time, it could enter and produce some results.

I am trying to follow the example here:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#example

My onTimer method looks like this and the scheduled event doesn't happen..

In processElement():

context.timerService().registerEventTimeTimer(current.getLastModifiedTime()
+ 10000);

My onTimer function

  @Override
    public void onTimer(long timestamp, OnTimerContext ctx,
Collector<BusinessObject> collector) throws Exception {
        TestBusinessObjectState result = testBusinessObjectState.value();
        log.info("Inside onTimer Method, current key: {}, timestamp: {},
last modified time: {}", ctx.getCurrentKey(), timestamp,
result.getLastModifiedTime());

        // check if this is an outdated timer or the latest timer
        if (timestamp >= result.getLastModifiedTime() + 10000) {
            // emit the state on timeout
            log.info("Collecting a business object, {}",
result.getBusinessObject().toString());
            collector.collect(result.getBusinessObject());

            cleanUp(ctx);
        }
    }

    private void cleanUp(Context ctx) throws Exception {
        Long timer = testBusinessObjectState.value().getLastModifiedTime();
        ctx.timerService().deleteEventTimeTimer(timer);
        testBusinessObjectState.clear();
    }


(1) When I assign the timestamp and watermarks outside the process() method
chain. The "context.timestamp()" will be null. If I put it inside the
chain, it won't be null. Is this the expected behavior? In the null case,
the strange thing is that, surprisingly, I can collect the business object
immediately without a designed 10 second waiting time... This shouldn't
happen, right...? The processing timer also seems to work. The code can
enter the on timer method.

 retractStream.assignTimestampsAndWatermarks(new
BoRetractStreamTimestampAssigner()); (This is a deprecated method)

 retractStream
    .keyBy(<key selector>)
    .process(new TableOutputProcessFunction())
    .name("ProcessTableOutput")
    .uid("ProcessTableOutput")
    .addSink(businessObjectSink)
    .name("businessObjectSink")
    .uid("businessObjectSink")
    .setParallelism(1);

(2) For watermarks configuration. I use an field in the retracted stream as
the event time. This time is usually 15-20 seconds before current time.

In my environment, I have done some settings for streaming env based on
information here(
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#writing-a-periodic-watermarkgenerator).
My event doesn't always come, so I think I need to set auto watermark
interval to let the event timer on timer works correctly. I have added the
code below.

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(1000L);

1> Which kind of watermark strategy should I use? General
BoundOutofOrderness or Watermark generator?

I tried to write a Watermark generator and I just don't how to apply it to
the stream correctly. The documentation doesn't explain very clearly. My
code looks like below and it doesn't work.

assign part:

.assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator((WatermarkGeneratorSupplier<Tuple2<Boolean,
Row>>) context -> new TableBoundOutofOrdernessGenerator()))

watermark generater:

I just assign the event time attribute following the example in the doc. (
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#writing-a-periodic-watermarkgenerator
)

2> I also tried to use the static method in Water Strategy. The syntax is
correct, but I meet the same problem in 2.(1).

    .assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<Boolean,
Row>>forBoundedOutOfOrderness(Duration.ofSeconds(15))
                        .withTimestampAssigner((booleanRowTuple2,
timestamp) -> {
                            <Select a event time attribute in the
booleanRowTuple2>
                        }))


(3) For the retracted datastream, do I need to explicitly attach it to the
stream environment? I think it is done by default, right? Just want to
confirm it. I do have the env.execute() at the end of the code.

I understand this is a lot of questions, thanks a lot for your patience
to look through my email! If there is anything unclear, please reach out to
me. Thanks!


Best regards,

Fuyao Li

Reply via email to