Hello Hans-Peter,
I’m a little confused which version of your code you are testing against:
* ProcessingTimeSessionWindows or EventTimeSessionWindows?
* did you keep the withIdleness() ??
As said before:
* for ProcessingTimeSessionWindows, watermarks play no role
* if you keep withIdleness(), then the respective sparse DataStream is
event-time-less most of the time, i.e. no triggers fire to close a session
window
* withIdleness() makes only sense if you merge/union/connect multiple
DataStream where at least one stream has their watermarks updated regularly
(i.e. it is not withIdleness())
* this is not your case, your DAG is linear, no union nor connects
* in event-time mode processing time plays no role, watermarks exclusively
take the role of the progress of model (event) time and hence the triggering of
windows
* in order to trigger a (session-)window at time A the window operator
needs to receive a watermark of at least time A
* next catch regards partitioning
* your first watermark strategy kafkaWmstrategy generates
per-Kafka-partition watermarks
* a keyBy() reshuffles these partitions onto the number of subtasks
according to the hash of the key
* this results in a per subtask calculation of the lowest watermark of
all Kafka partitions that happen to be processed by that subtask
* i.e. if a single Kafka partition makes no watermark progress the
subtask watermark makes no progress
* this surfaces in sparse data as in your case
* your second watermark strategy wmStrategy makes things worse because
* it discards the correct watermarks of the first watermark strategy
* and replaces it with something that is arbitrary (at this point it is
hard to guess the correct max lateness that is a mixture of the events from
multiple Kafka partitions)
Concusion:
The only way to make the event time session windows work for you in a timely
manner is to make sure watermarks on all involved partitions make progress,
i.e. new events arrive on all partitions in a regular manner.
Hope this helps
Thias
From: HG <[email protected]>
Sent: Tuesday, March 29, 2022 1:07 PM
To: Schwalbe Matthias <[email protected]>
Cc: user <[email protected]>
Subject: Re: Watermarks event time vs processing time
⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠
Hello Matthias,
When I remove all the watermark strategies it does not process anything .
For example when I use WatermarkStrategy.noWatermarks() instead of the one I
build nothing seems to happen at all.
Also when I skip the part where I add wmStrategy to create tuple4dswm:
DataStream<Tuple4<Long, Long, String, String>> tuple4dswm =
tuple4ds.assignTimestampsAndWatermarks(wmStrategy);
Nothing is processed.
Regards Hans-Peter
Op wo 16 mrt. 2022 om 15:52 schreef Schwalbe Matthias
<[email protected]<mailto:[email protected]>>:
Hi Hanspeter,
Let me relate some hints that might help you getting concepts clearer.
From your description I make following assumptions where your are not specific
enough (please confirm or correct in your answer):
a. You store incoming events in state per transaction_id to be
sorted/aggregated(min/max time) by event time later on
b. So far you used a session window to determine the point in time when
to emit the stored/enriched/sorted events
c. Watermarks are generated with bounded out of orderness
d. You use session windows with a specific gap
e. In your experiment you ever only send 1000 events and then stop
producing incoming events
Now to your questions:
* For processing time session windows, watermarks play no role whatsoever,
you simply assume that you’ve seen all events belonging so a single transaction
id if the last such event for a specific transaction id was processed
sessionWindowGap milliseconds ago
* Therefore you see all enriched incoming events the latest
sessionWindowGap ms after the last incoming event (+ some latency)
* In event time mode and resp event time session windows the situation is
exactly the same, only that processing time play no role
* A watermark means (ideally) that no event older than the watermark time
ever follows the watermark (which itself is a meta-event that flows with the
proper events on the same channels)
* In order for a session gap window to forward the enriched events the
window operator needs to receive a watermark that is sessionWindowGap
milliseconds beyond the latest incoming event (in terms of the respective event
time)
* The watermark generator in order to generate a new watermark that
triggers this last session window above needs to encounter an (any) event that
has a timestamp of (<latest event in session window> + outOfOrderness +
sessionWindowGap + 1ms)
* Remember, the watermark generator never generated watermarks based on
processing time, but only based on the timestamps it has seen in events
actually encountered
* Coming back to your idleness configuration: it only means that the
incoming stream becomes idle == timeless after a while … i.e. watermarks won’t
make progress from this steam, and it tells all downstream operators
* Idleness specification is only useful if a respective operator has
another source of valid watermarks (i.e. after a union of two streams, one
active/one idle ….). this is not your case
I hope this clarifies your situation.
Cheers
Thias
From: HG <[email protected]<mailto:[email protected]>>
Sent: Mittwoch, 16. März 2022 10:06
To: user <[email protected]<mailto:[email protected]>>
Subject: Watermarks event time vs processing time
⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠
Hi,
I read from a Kafka topic events that are in JSON format
These event contain a handling time (aka event time) in epoch milliseconds, a
transaction_id and a large nested JSON structure.
I need to group the events by transaction_id, order them by handling time and
calculate the differences in handling time.
The events are updated with this calculated elapsed time and pushed further.
So all events that go in should come out with the elapsed time added.
For testing I use events that are old (so handling time is not nearly the wall
clock time)
Initially I used EventTimeSessionWindows but somehow the processing did not run
as expected.
When I pushed 1000 events eventually 800 or so would appear at the output.
This was resolved by switching to ProcessingTimeSessionWindows .
My thought was then that I could remove the watermarkstrategies with watermarks
with timestamp assigners (handling time) for the Kafka input stream and the
data stream.
However this was not the case.
Can anyone enlighten me as to why the watermark strategies are still needed?
Below the code
KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
.setProperties(kafkaProps)
.setProperty("ssl.truststore.type", trustStoreType)
.setProperty("ssl.truststore.password", trustStorePassword)
.setProperty("ssl.truststore.location", trustStoreLocation)
.setProperty("security.protocol", securityProtocol)
.setProperty("partition.discovery.interval.ms<http://partition.discovery.interval.ms>",
partitionDiscoveryIntervalMs)
.setProperty("commit.offsets.on.checkpoint",
commitOffsetsOnCheckpoint)
.setGroupId(inputGroupId)
.setClientIdPrefix(clientId)
.setTopics(kafkaInputTopic)
.setDeserializer(KafkaRecordDeserializationSchema.of(new
JSONKeyValueDeserializationSchema(fetchMetadata)))
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
.build();
/* A watermark is needed to prevent duplicates! */
WatermarkStrategy<ObjectNode> kafkaWmstrategy = WatermarkStrategy
.<ObjectNode>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness)))
.withIdleness(Duration.ofSeconds(Integer.parseInt(idleness)))
.withTimestampAssigner(new
SerializableTimestampAssigner<ObjectNode>() {
@Override
public long extractTimestamp(ObjectNode element, long
eventTime) {
return
element.get("value").get("handling_time").asLong();
}
});
/* Use the watermark stragegy to create a datastream */
DataStream<ObjectNode> ds = env.fromSource(source, kafkaWmstrategy,
"Kafka Source");
/* Split the ObjectNode into a Tuple4 */
DataStream<Tuple4<Long, Long, String, String>> tuple4ds =
ds.flatMap(new Splitter());
WatermarkStrategy<Tuple4<Long, Long, String, String>> wmStrategy =
WatermarkStrategy
.<Tuple4<Long, Long, String,
String>>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness)))
.withIdleness(Duration.ofSeconds(Integer.parseInt(idleness)))
.withTimestampAssigner(new
SerializableTimestampAssigner<Tuple4<Long, Long, String, String>>() {
@Override
public long extractTimestamp(Tuple4<Long, Long, String,
String> element, long eventTime) {
return element.f0;
}
});
DataStream<Tuple4<Long, Long, String, String>> tuple4dswm =
tuple4ds.assignTimestampsAndWatermarks(wmStrategy);
DataStream<String> tuple4DsWmKeyedbytr = tuple4dswm
.keyBy(new KeySelector<Tuple4<Long, Long, String, String>,
String>() {
@Override
public String getKey(Tuple4<Long, Long, String, String> value)
throws Exception {
return value.f2;
}
})
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(Integer.parseInt(sessionWindowGap))))
.allowedLateness(Time.seconds(Integer.parseInt(sessionAllowedLateness)))
.process(new MyProcessWindowFunction());
KafkaSink<String> kSink = KafkaSink.<String>builder()
.setBootstrapServers(outputBrokers)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(kafkaOutputTopic)
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
// Sink to the Kafka topic
tuple4DsWmKeyedbytr.sinkTo(kSink);
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng
verboten.
This message is intended only for the named recipient and may contain
confidential or privileged information. As the confidentiality of email
communication cannot be guaranteed, we do not accept any responsibility for the
confidentiality and the intactness of this message. If you have received it in
error, please advise the sender by return e-mail and delete this message and
any attachments. Any unauthorised use or dissemination of this information is
strictly prohibited.
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng
verboten.
This message is intended only for the named recipient and may contain
confidential or privileged information. As the confidentiality of email
communication cannot be guaranteed, we do not accept any responsibility for the
confidentiality and the intactness of this message. If you have received it in
error, please advise the sender by return e-mail and delete this message and
any attachments. Any unauthorised use or dissemination of this information is
strictly prohibited.