Hello Matthias,
I am still using ProcessingTimeSessionWindow.
But it turns out I was wrong.
I tested a couple of times and it did not seem to work.
But now it does with both watermarkstrategies removed.
My apologies.'
Regards Hans-Peter
This is the code:
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setMaxParallelism(Integer.parseInt(envMaxParallelism));
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.enableCheckpointing(Integer.parseInt(envEnableCheckpointing));
Properties kafkaProps = new Properties();
kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
inputBrokers);
kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
inputGroupId);
kafkaProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
autoCommit);
kafkaProps.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
autoCommitInterval);
kafkaProps.setProperty("ssl.truststore.type", inputTrustStoreType);
kafkaProps.setProperty("ssl.truststore.password",
inputTrustStorePassword);
kafkaProps.setProperty("ssl.truststore.location",
inputTrustStoreLocation);
kafkaProps.setProperty("security.protocol", inputSecurityProtocol);
kafkaProps.setProperty("ssl.enabled.protocols",
inputSslEnabledProtocols);
KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
.setProperties(kafkaProps)
.setGroupId(inputGroupId)
.setClientIdPrefix(clientId)
.setTopics(kafkaInputTopic)
.setDeserializer(KafkaRecordDeserializationSchema.of(new
JSONKeyValueDeserializationSchema(fetchMetadata)))
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
.build();
/* Use the watermark stragegy to create a datastream */
DataStream<ObjectNode> ds = env.fromSource(source,
WatermarkStrategy.noWatermarks(), "Kafka Source");
/* Split the ObjectNode into a Tuple4 */
DataStream<Tuple4<Long, Long, String, String>> tuple4ds =
ds.flatMap(new Splitter())
DataStream<String> tuple4DsWmKeyedbytr = tuple4ds
.keyBy(new KeySelector<Tuple4<Long, Long, String, String>,
String>() {
@Override
public String getKey(Tuple4<Long, Long, String, String>
value) throws Exception {
return value.f2;
}
})
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(Integer.parseInt(sessionWindowGap))))
.allowedLateness(Time.seconds(Integer.parseInt(sessionAllowedLateness)))
.process(new MyProcessWindowFunction());
Properties sinkkafkaProps = new Properties();
sinkkafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
outputBrokers);
sinkkafkaProps.setProperty("ssl.truststore.type",
outputTrustStoreType);
sinkkafkaProps.setProperty("ssl.truststore.location",
outputTrustStoreLocation);
sinkkafkaProps.setProperty("ssl.truststore.password",
outputTrustStorePassword);
sinkkafkaProps.setProperty("security.protocol",
outputSecurityProtocol);
sinkkafkaProps.setProperty("max.request.size", maxRequestSize);
sinkkafkaProps.setProperty("ssl.enabled.protocols",
outputSslEnabledProtocols);
KafkaSink<String> kSink = KafkaSink.<String>builder()
.setBootstrapServers(outputBrokers)
.setKafkaProducerConfig(sinkkafkaProps)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(kafkaOutputTopic)
.setValueSerializationSchema(new
SimpleStringSchema())
.build()
)
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
// Sink to the Kafka topic
tuple4DsWmKeyedbytr.sinkTo(kSink);
// Splits the Object node into a Tuple4
private static class Splitter implements FlatMapFunction<ObjectNode,
Tuple4<Long, Long, String, String>> {
@Override
public void flatMap(ObjectNode json, Collector<Tuple4<Long, Long,
String, String>> out) throws Exception {
// retrieved handling_time twice intentionally one of which
will be used for the watermark strategy and the other for the calculation
of the elapsed time
out.collect(new Tuple4<Long, Long, String,
String>(json.get("value").get("handling_time").asLong(),
json.get("value").get("handling_time").asLong(),
json.get("value").get("transaction_id").asText(),
json.get("value").get("original_event").toPrettyString()));
}
}
// Class to sort the events that belong to the same transactions
public static class SortEventsHandlingTime implements
Comparator<Tuple4<Long, Long, String, String>> {
// Let's compare 2 Tuple4 objects
public int compare(Tuple4<Long, Long, String, String> o1,
Tuple4<Long, Long, String, String> o2) {
int result =
Long.compare(Long.parseLong(o1.getField(0).toString()),
Long.parseLong(o2.getField(0).toString()));
if (result > 0) {
return 1;
} else if (result == 0) {
return 0;
} else {
return -1;
}
}
}
// Sorts the events and calculates the elapsed times
static class MyProcessWindowFunction extends
ProcessWindowFunction<Tuple4<Long, Long, String, String>, String, String,
TimeWindow> {
@Override
public void process(String key, Context context,
Iterable<Tuple4<Long, Long, String, String>> input, Collector<String> out)
throws JsonProcessingException {
Long elapsed = 0L;
Long pHandlingTime = 0L;
Long cumulativeElapsed = 0L;
List<Tuple4<Long, Long, String, String>> inputList = new
ArrayList<>();
input.forEach(inputList::add);
inputList.sort(new SortEventsHandlingTime());
ObjectMapper mapper = new ObjectMapper();
for (Tuple4<Long, Long, String, String> in: inputList){
if (pHandlingTime.equals(0L)) {
elapsed = 0L;
} else {
elapsed =
Long.parseLong(in.getField(0).toString()) - pHandlingTime;
}
cumulativeElapsed = cumulativeElapsed + elapsed;
pHandlingTime = Long.parseLong(in.getField(0).toString());
JsonNode originalEvent =
mapper.readTree(in.getField(3).toString());
// Cast
ObjectNode o = (ObjectNode)
originalEvent.get("Message").get("endpoints").get(0).get("endpoint_handlers").get(0);
o.put("handling_time", in.getField(0).toString());
o.put("elapsed_time", elapsed.toString());
o.put("cumulative_elapsed_time",
cumulativeElapsed.toString());
out.collect(((ObjectNode) originalEvent).toString());
}
}
}
Op di 29 mrt. 2022 om 15:23 schreef Schwalbe Matthias <
[email protected]>:
> Hello Hans-Peter,
>
>
>
> I’m a little confused which version of your code you are testing against:
>
> - ProcessingTimeSessionWindows or EventTimeSessionWindows?
> - did you keep the withIdleness() ??
>
>
>
> As said before:
>
> - for ProcessingTimeSessionWindows, watermarks play no role
> - if you keep withIdleness(), then the respective sparse DataStream is
> event-time-less most of the time, i.e. no triggers fire to close a session
> window
> - withIdleness() makes only sense if you merge/union/connect multiple
> DataStream where at least one stream has their watermarks updated regularly
> (i.e. it is not withIdleness())
> - this is not your case, your DAG is linear, no union nor connects
> - in event-time mode processing time plays no role, watermarks
> exclusively take the role of the progress of model (event) time and hence
> the triggering of windows
> - in order to trigger a (session-)window at time A the window operator
> needs to receive a watermark of at least time A
> - next catch regards partitioning
> - your first watermark strategy kafkaWmstrategy generates
> per-Kafka-partition watermarks
> - a keyBy() reshuffles these partitions onto the number of subtasks
> according to the hash of the key
> - this results in a per subtask calculation of the *lowest*
> watermark of all Kafka partitions that happen to be processed by that
> subtask
> - i.e. if a single Kafka partition makes no watermark progress the
> subtask watermark makes no progress
> - this surfaces in sparse data as in your case
> - your second watermark strategy wmStrategy makes things worse because
> - it discards the correct watermarks of the first watermark strategy
> - and replaces it with something that is arbitrary (at this point
> it is hard to guess the correct max lateness that is a mixture of the
> events from multiple Kafka partitions)
>
>
>
> Concusion:
>
> The only way to make the event time session windows work for you in a
> timely manner is to make sure watermarks on all involved partitions make
> progress, i.e. new events arrive on all partitions in a regular manner.
>
>
>
> Hope this helps
>
>
>
> Thias
>
>
>
>
>
> *From:* HG <[email protected]>
> *Sent:* Tuesday, March 29, 2022 1:07 PM
> *To:* Schwalbe Matthias <[email protected]>
> *Cc:* user <[email protected]>
> *Subject:* Re: Watermarks event time vs processing time
>
>
>
> ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠
>
>
>
> Hello Matthias,
>
>
>
> When I remove all the watermark strategies it does not process anything .
>
> For example when I use WatermarkStrategy*.*noWatermarks*()* instead of
> the one I build nothing seems to happen at all.
>
>
>
> Also when I skip the part where I add wmStrategy to create tuple4dswm:
> DataStream<Tuple4<Long, Long, String, String>> tuple4dswm =
> tuple4ds.assignTimestampsAndWatermarks(wmStrategy);
>
>
>
> Nothing is processed.
>
>
>
> Regards Hans-Peter
>
>
>
> Op wo 16 mrt. 2022 om 15:52 schreef Schwalbe Matthias <
> [email protected]>:
>
> Hi Hanspeter,
>
>
>
> Let me relate some hints that might help you getting concepts clearer.
>
>
>
> From your description I make following assumptions where your are not
> specific enough (please confirm or correct in your answer):
>
> a. You store incoming events in state per transaction_id to be
> sorted/aggregated(min/max time) by event time later on
>
> b. So far you used a session window to determine the point in time
> when to emit the stored/enriched/sorted events
>
> c. Watermarks are generated with bounded out of orderness
>
> d. You use session windows with a specific gap
>
> e. In your experiment you ever only send 1000 events and then stop
> producing incoming events
>
>
>
> Now to your questions:
>
> - For processing time session windows, watermarks play no role
> whatsoever, you simply assume that you’ve seen all events belonging so a
> single transaction id if the last such event for a specific transaction id
> was processed sessionWindowGap milliseconds ago
> - Therefore you see all enriched incoming events the latest
> sessionWindowGap ms after the last incoming event (+ some latency)
> - In event time mode and resp event time session windows the situation
> is exactly the same, only that processing time play no role
> - A watermark means (ideally) that no event older than the watermark
> time ever follows the watermark (which itself is a meta-event that flows
> with the proper events on the same channels)
> - In order for a session gap window to forward the enriched events the
> window operator needs to receive a watermark that is sessionWindowGap
> milliseconds beyond the latest incoming event (in terms of the respective
> event time)
> - The watermark generator in order to generate a new watermark that
> triggers this last session window above needs to encounter an (any) event
> that has a timestamp of (<latest event in session window> + outOfOrderness
> + sessionWindowGap + 1ms)
> - Remember, the watermark generator never generated watermarks based
> on processing time, but only based on the timestamps it has seen in events
> actually encountered
> - Coming back to your idleness configuration: it only means that the
> incoming stream becomes idle == timeless after a while … i.e. watermarks
> won’t make progress from this steam, and it tells all downstream operators
> - Idleness specification is only useful if a respective operator has
> another source of valid watermarks (i.e. after a union of two streams, one
> active/one idle ….). this is not your case
>
>
>
> I hope this clarifies your situation.
>
>
>
> Cheers
>
>
>
>
>
> Thias
>
>
>
>
>
> *From:* HG <[email protected]>
> *Sent:* Mittwoch, 16. März 2022 10:06
> *To:* user <[email protected]>
> *Subject:* Watermarks event time vs processing time
>
>
>
> ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠
>
>
>
> Hi,
>
>
>
> I read from a Kafka topic events that are in JSON format
>
> These event contain a handling time (aka event time) in epoch
> milliseconds, a transaction_id and a large nested JSON structure.
>
> I need to group the events by transaction_id, order them by handling time
> and calculate the differences in handling time.
>
> The events are updated with this calculated elapsed time and pushed
> further.
>
> So all events that go in should come out with the elapsed time added.
>
>
>
> For testing I use events that are old (so handling time is not nearly the
> wall clock time)
>
> Initially I used EventTimeSessionWindows but somehow the processing did
> not run as expected.
>
> When I pushed 1000 events eventually 800 or so would appear at the output.
> This was resolved by switching to ProcessingTimeSessionWindows .
> My thought was then that I could remove the watermarkstrategies with
> watermarks with timestamp assigners (handling time) for the Kafka input
> stream and the data stream.
>
> However this was not the case.
>
>
>
> Can anyone enlighten me as to why the watermark strategies are still
> needed?
>
>
>
> Below the code
>
>
>
> KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
> .setProperties(kafkaProps)
> .setProperty("ssl.truststore.type", trustStoreType)
> .setProperty("ssl.truststore.password", trustStorePassword)
> .setProperty("ssl.truststore.location", trustStoreLocation)
> .setProperty("security.protocol", securityProtocol)
> .setProperty("partition.discovery.interval.ms",
> partitionDiscoveryIntervalMs)
> .setProperty("commit.offsets.on.checkpoint",
> commitOffsetsOnCheckpoint)
> .setGroupId(inputGroupId)
> .setClientIdPrefix(clientId)
> .setTopics(kafkaInputTopic)
> .setDeserializer(KafkaRecordDeserializationSchema.of(new
> JSONKeyValueDeserializationSchema(fetchMetadata)))
>
> .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
> .build();
>
> /* A watermark is needed to prevent duplicates! */
> WatermarkStrategy<ObjectNode> kafkaWmstrategy = WatermarkStrategy
>
> .<ObjectNode>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness)))
>
> .withIdleness(Duration.ofSeconds(Integer.parseInt(idleness)))
> .withTimestampAssigner(new
> SerializableTimestampAssigner<ObjectNode>() {
> @Override
> public long extractTimestamp(ObjectNode element, long
> eventTime) {
> return
> element.get("value").get("handling_time").asLong();
> }
> });
>
> /* Use the watermark stragegy to create a datastream */
> DataStream<ObjectNode> ds = env.fromSource(source,
> kafkaWmstrategy, "Kafka Source");
>
> /* Split the ObjectNode into a Tuple4 */
> DataStream<Tuple4<Long, Long, String, String>> tuple4ds =
> ds.flatMap(new Splitter());
>
> WatermarkStrategy<Tuple4<Long, Long, String, String>> wmStrategy =
> WatermarkStrategy
> .<Tuple4<Long, Long, String,
> String>>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness)))
>
> .withIdleness(Duration.ofSeconds(Integer.parseInt(idleness)))
> .withTimestampAssigner(new
> SerializableTimestampAssigner<Tuple4<Long, Long, String, String>>() {
> @Override
> public long extractTimestamp(Tuple4<Long, Long,
> String, String> element, long eventTime) {
> return element.f0;
> }
> });
>
> DataStream<Tuple4<Long, Long, String, String>> tuple4dswm =
> tuple4ds.assignTimestampsAndWatermarks(wmStrategy);
>
> DataStream<String> tuple4DsWmKeyedbytr = tuple4dswm
> .keyBy(new KeySelector<Tuple4<Long, Long, String, String>,
> String>() {
> @Override
> public String getKey(Tuple4<Long, Long, String, String>
> value) throws Exception {
> return value.f2;
> }
> })
>
> .window(ProcessingTimeSessionWindows.withGap(Time.seconds(Integer.parseInt(sessionWindowGap))))
>
> .allowedLateness(Time.seconds(Integer.parseInt(sessionAllowedLateness)))
> .process(new MyProcessWindowFunction());
>
>
> KafkaSink<String> kSink = KafkaSink.<String>builder()
> .setBootstrapServers(outputBrokers)
>
> .setRecordSerializer(KafkaRecordSerializationSchema.builder()
> .setTopic(kafkaOutputTopic)
> .setValueSerializationSchema(new
> SimpleStringSchema())
> .build()
> )
> .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
> .build();
>
> // Sink to the Kafka topic
> tuple4DsWmKeyedbytr.sinkTo(kSink);
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>