Hi Matthias,
Just to provide more context on this problem. I only have 1 partition
per each Kafka Topic at the beginning before the join operation. After
reading the doc:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
<https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission>
Maybe that is the root cause of my problem here, with less than 8
partitions (only 1 partition in my case), using the default
parallelism of 8 will cause this wrong behavior. This is my guess, it
takes a while to test it out... What's your opinion on this? Thanks!
Best,
Fuyao
On Fri, Nov 13, 2020 at 11:57 AM Fuyao Li <fuyaoli2...@gmail.com
<mailto:fuyaoli2...@gmail.com>> wrote:
Hi Matthias,
One more question regarding Flink table parallelism, is it possible
to configure the parallelism for Table operation at operator level,
it seems we don't have such API available, right? Thanks!
Best,
Fuyao
On Fri, Nov 13, 2020 at 11:48 AM Fuyao Li <fuyaoli2...@gmail.com
<mailto:fuyaoli2...@gmail.com>> wrote:
Hi Matthias,
Thanks for your information. I have managed to figure out the
first issue you mentioned. Regarding the second issue. I have
got some progress on it.
I have sent another email with the title 'BoundedOutOfOrderness
Watermark Generator is NOT making the event time to advance'
using another email of mine, fuyao...@oracle.com
<mailto:fuyao...@oracle.com>. That email contains some more
context on my issue. Please take a look. I have made some
progress after sending that new email.
Previously, I had managed to make timelag watermark strategy
working in my code, but my bound out of orderness strategy or
punctuated watermark strategy doesn't work well. It produces 8
watermarks each time. Two cycles are shown below.
I managed to figure out the root cause is that Flink stream
execution environment has a default parallelism as 8.*I didn't
notice in the doc, could the Community add this explicitly into
the official doc to avoid some confusion? Thanks.*
From my understanding, the watermark advances based on the
lowest watermark among the 8, so I can not advance the bound out
of orderness watermark since I am only advancing 1 of the 8
parallelisms. If I set the entire stream execution environment
to be of parallelism 1, it will reflect the watermark in the
context correctly. One more thing is that this behavior is not
reflected in the Flink Cluster web UI interface. I can see the
watermark is advancing, but it is not in reality. *That's
causing the inconsistency problem I mentioned in the other email
I mentioned above. Will this be considered as a bug in the UI?*
My current question is, since I have full outer join operation
before the KeyedProcessFunction here. How can I let the bound of
orderness watermark / punctuated watermark strategy work if the
parallelism > 1? It can only update one of the 8 parallelisms
for the watermark for this onTimer operator. Is this related to
my Table full outer join operation before this step? According
to the doc,
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/config.html#table-exec-resource-default-parallelism
<https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/config.html#table-exec-resource-default-parallelism>
Default parallelism should be the same like the stream
environment. Why can't I update the watermarks for all 8
parallelisms? What should I do to enable this function with
Parallelism larger than 1? Thanks.
First round: (Note the first column of each log row is the
timelag strategy, it is getting updated correctly for all 8
parallelism, but the other two strategies I mentioned above
can't do that..)
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
- Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
- Emit Watermark: watermark based on system time: 1605047266199,
periodicEmitWatermarkTime: 1605047172881, currentMaxTimestamp:
1605047187881 (only one of the 8 parallelism for bound out of
orderness is getting my new watermark)
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
- Emit Watermark: watermark based on system time: 1605047266199,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
- Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
- Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
- Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
- Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
- Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
Second round: (I set the autoWatermark interval to be 5 seconds)
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
- Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
- Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
- Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 1605047172881, currentMaxTimestamp:
1605047187881
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
- Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
- Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
- Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
- Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
- Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
Best regards,
Fuyao
On Fri, Nov 13, 2020 at 9:03 AM Matthias Pohl
<matth...@ververica.com <mailto:matth...@ververica.com>> wrote:
Hi Fuyao,
for your first question about the different behavior
depending on whether you chain the methods or not: Keep in
mind that you have to save the return value of the
assignTimestampsAndWatermarks method call if you don't chain
the methods together as it is also shown in [1].
At least the following example from your first message is
indicating it:
```
retractStream.assignTimestampsAndWatermarks(new
BoRetractStreamTimestampAssigner()); (This is a deprecated
method)
// instead of: retractStream =
retractStream.assignTimestampsAndWatermarks(new
BoRetractStreamTimestampAssigner());
retractStream
.keyBy(<key selector>)
.process(new TableOutputProcessFunction())
.name("ProcessTableOutput")
.uid("ProcessTableOutput")
.addSink(businessObjectSink)
.name("businessObjectSink")
.uid("businessObjectSink")
.setParallelism(1);
```
For your second question about setting the EventTime I'm
going to pull in Timo from the SDK team as I don't see an
issue with your code right away.
Best,
Matthias
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#using-watermark-strategies
<https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#using-watermark-strategies>
On Wed, Nov 4, 2020 at 10:16 PM Fuyao Li
<fuyaoli2...@gmail.com <mailto:fuyaoli2...@gmail.com>> wrote:
Hi Flink Users and Community,
For the first part of the question, the 12 hour time
difference is caused by a time extraction bug myself. I
can get the time translated correctly now. The type cast
problem does have some workarounds to solve it..
My major blocker right now is the onTimer part is not
properly triggered. I guess it is caused by failing to
configure the correct watermarks & timestamp assigners.
Please give me some insights.
1. If I don't chain the assignTimestampsAndWatermarks()
method in together with keyedBy().. and process()..
method. The context.timestamp() in my processElement()
function will be null. Is this some expected behavior?
The Flink examples didn't chain it together. (see
example here:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#using-watermark-strategies
<https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#using-watermark-strategies>)
2. If I use registerEventTimeTimer() in
processElement(). The onTimer method will not be
triggered. However, I can trigger the onTimer method if
I simply change it to registerProcessingTimeTimer(). I
am using the settings below in the stream env.
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(1000L);
My code for method the process chain:
retractStream
.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<Boolean,
Row>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withTimestampAssigner((booleanRowTuple2, timestamp) -> {
Row rowData =
booleanRowTuple2.f1;
LocalDateTime headerTime =
(LocalDateTime)rowData.getField(3);
LocalDateTime linesTime =
(LocalDateTime)rowData.getField(7);
LocalDateTime
latestDBUpdateTime = null;
if (headerTime != null &&
linesTime != null) {
latestDBUpdateTime =
headerTime.isAfter(linesTime) ? headerTime : linesTime;
}
else {
latestDBUpdateTime =
(headerTime != null) ? headerTime : linesTime;
}
if (latestDBUpdateTime !=
null) {
return
latestDBUpdateTime.atZone(ZoneId.of("America/Los_Angeles")).toInstant().toEpochMilli();
}
// In the worst case, we
use system time instead, which should never be reached.
return
System.currentTimeMillis();
}))
// .assignTimestampsAndWatermarks(new
MyWaterStrategy()) // second way to create watermark,
doesn't work
.keyBy(value -> {
// There could be null fields for
header invoice_id field
String invoice_id_key =
(String)value.f1.getField(0);
if (invoice_id_key == null) {
invoice_id_key =
(String)value.f1.getField(4);
}
return invoice_id_key;
})
.process(new TableOutputProcessFunction())
.name("ProcessTableOutput")
.uid("ProcessTableOutput")
.addSink(businessObjectSink)
.name("businessObjectSink")
.uid("businessObjectSink")
.setParallelism(1);
Best regards,
Fuyao
On Mon, Nov 2, 2020 at 4:53 PM Fuyao Li
<fuyaoli2...@gmail.com <mailto:fuyaoli2...@gmail.com>>
wrote:
Hi Flink Community,
I am doing some research work on Flink Datastream
and Table API and I meet two major problems. I am
using Flink 1.11.2, scala version 2.11, java 8. My
use case looks like this. I plan to write a data
processing pipeline with two stages. My goal is to
construct a business object containing information
from several Kafka streams with a primary key and
emit the complete business object if such primary
key doesn't appear in the pipeline for 10 seconds.
In the first stage, I first consume three Kafka
streams and transform it to Flink Datastream using a
deserialization schema containing some type and date
format transformation, and then I register these
data streams as Table and do a full outer join one
by one using Table API. I also add query
configuration for this to avoid excessive state. The
primary key is also the join key.
In the second stage, I transform the joined table to
a retracted stream and put it into
KeyedProcessFunction to generate the business object
if the business object's primary key is inactive for
10 second.
Is this way of handling the data the suggested
approach? (I understand I can directly consume kafka
data in Table API. I haven't tried that yet, maybe
that's better?) Any suggestion is welcomed. During
implementing this, I meet two major problems and
several smaller questions under each problem.
1. Some type cast behavior of retracted streams I
can't explain.
(1) In the initial stage, I registered some field as
*java.sql.Date* or *java.sql.timestamp* following
the examples at
(https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html#data-type-extraction
<https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html#data-type-extraction>)
. After join and transform to retracted stream, it
becomes *java.time.LocalDate* and
*java.time.LocalDateTime* instead.
For example, when first ingesting the Kafka streams,
I registerd a attribute in java.sql.Timestamp type.
@JsonAlias("ATTRIBUTE1")
private @DataTypeHint(value = "TIMESTAMP(6)",
bridgedTo = java.sql.Timestamp.class) Timestamp
ATTRIBUTE1;
When I tried to cast the type information back after
the retracted stream, the code gives me error
information below.
java.lang.ClassCastException:
java.time.LocalDateTime cannot be cast to
java.sql.Timestamp
Maybe I should use toAppendStream instead since
append stream could register type information, but
toRetractedStream can't do that?
(https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/common.html#convert-a-table-into-a-datastream-or-dataset
<https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/common.html#convert-a-table-into-a-datastream-or-dataset>)
My work around is to cast it to LocalDateTime first
and extract the epoch time, this doesn't seem to be
a final solution.
(2) During timestamp conversion, the Flink to
retracted stream seems to lost the AM/PM information
in the stream and causing a 12 hour difference if it
is PM.
I use joda time to do some timestamp conversion in
the first deserialization stage, my pattern looks
like this. "a" means AM/PM information
DateTimeFormatter format3 =
DateTimeFormat.forPattern("dd-MMM-yy HH.mm.ss.SSSSSS
a").withZone(DateTimeZone.getDefault());
After the retracted stream, the AM/PM information is
not preserved.
2. My onTimer method in KeyedProcessFunction can not
be triggered when I scheduled a event timer timer.
I am using event time in my code. I am new to
configure watermarks and I might miss something to
configure it correctly. I also tried to register a
processing time, it could enter and produce some
results.
I am trying to follow the example here:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#example
<https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#example>
My onTimer method looks like this and the scheduled
event doesn't happen..
In processElement():
context.timerService().registerEventTimeTimer(current.getLastModifiedTime()
+ 10000);
My onTimer function
@Override
public void onTimer(long timestamp,
OnTimerContext ctx, Collector<BusinessObject>
collector) throws Exception {
TestBusinessObjectState result =
testBusinessObjectState.value();
log.info <http://log.info/>("Inside onTimer Method,
current key: {}, timestamp: {}, last modified time:
{}", ctx.getCurrentKey(), timestamp,
result.getLastModifiedTime());
// check if this is an outdated timer or
the latest timer
if (timestamp >=
result.getLastModifiedTime() + 10000) {
// emit the state on timeout
log.info <http://log.info/>("Collecting a business
object, {}", result.getBusinessObject().toString());
collector.collect(result.getBusinessObject());
cleanUp(ctx);
}
}
private void cleanUp(Context ctx) throws
Exception {
Long timer =
testBusinessObjectState.value().getLastModifiedTime();
ctx.timerService().deleteEventTimeTimer(timer);
testBusinessObjectState.clear();
}
(1) When I assign the timestamp and watermarks
outside the process() method chain. The
"context.timestamp()" will be null. If I put it
inside the chain, it won't be null. Is this the
expected behavior? In the null case, the strange
thing is that, surprisingly, I can collect the
business object immediately without a designed 10
second waiting time... This shouldn't happen,
right...? The processing timer also seems to work.
The code can enter the on timer method.
retractStream.assignTimestampsAndWatermarks(new
BoRetractStreamTimestampAssigner()); (This is a
deprecated method)
retractStream
.keyBy(<key selector>)
.process(new TableOutputProcessFunction())
.name("ProcessTableOutput")
.uid("ProcessTableOutput")
.addSink(businessObjectSink)
.name("businessObjectSink")
.uid("businessObjectSink")
.setParallelism(1);
(2) For watermarks configuration. I use an field in
the retracted stream as the event time. This time is
usually 15-20 seconds before current time.
In my environment, I have done some settings for
streaming env based on information here(
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#writing-a-periodic-watermarkgenerator
<https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#writing-a-periodic-watermarkgenerator>).
My event doesn't always come, so I think I need to
set auto watermark interval to let the event timer
on timer works correctly. I have added the code below.
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(1000L);
1> Which kind of watermark strategy should I use?
General BoundOutofOrderness or Watermark generator?
I tried to write a Watermark generator and I just
don't how to apply it to the stream correctly. The
documentation doesn't explain very clearly. My code
looks like below and it doesn't work.
assign part:
.assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator((WatermarkGeneratorSupplier<Tuple2<Boolean,
Row>>) context -> new
TableBoundOutofOrdernessGenerator()))
watermark generater:
I just assign the event time attribute following the
example in the doc.
(https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#writing-a-periodic-watermarkgenerator
<https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#writing-a-periodic-watermarkgenerator>)
2> I also tried to use the static method in Water
Strategy. The syntax is correct, but I meet the same
problem in 2.(1).
.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<Boolean,
Row>>forBoundedOutOfOrderness(Duration.ofSeconds(15))
.withTimestampAssigner((booleanRowTuple2, timestamp)
-> {
<Select a event time
attribute in the booleanRowTuple2>
}))
(3) For the retracted datastream, do I need to
explicitly attach it to the stream environment? I
think it is done by default, right? Just want to
confirm it. I do have the env.execute() at the end
of the code.
I understand this is a lot of questions, thanks a
lot for your patience to look through my email! If
there is anything unclear, please reach out to me.
Thanks!
Best regards,
Fuyao Li