Hi,
I read from a Kafka topic events that are in JSON format
These event contain a handling time (aka event time) in epoch milliseconds,
a transaction_id and a large nested JSON structure.
I need to group the events by transaction_id, order them by handling time
and calculate the differences in handling time.
The events are updated with this calculated elapsed time and pushed
further.
So all events that go in should come out with the elapsed time added.
For testing I use events that are old (so handling time is not nearly the
wall clock time)
Initially I used EventTimeSessionWindows but somehow the processing did not
run as expected.
When I pushed 1000 events eventually 800 or so would appear at the output.
This was resolved by switching to ProcessingTimeSessionWindows .
My thought was then that I could remove the watermarkstrategies with
watermarks with timestamp assigners (handling time) for the Kafka input
stream and the data stream.
However this was not the case.
Can anyone enlighten me as to why the watermark strategies are still needed?
Below the code
KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
.setProperties(kafkaProps)
.setProperty("ssl.truststore.type", trustStoreType)
.setProperty("ssl.truststore.password", trustStorePassword)
.setProperty("ssl.truststore.location", trustStoreLocation)
.setProperty("security.protocol", securityProtocol)
.setProperty("partition.discovery.interval.ms",
partitionDiscoveryIntervalMs)
.setProperty("commit.offsets.on.checkpoint",
commitOffsetsOnCheckpoint)
.setGroupId(inputGroupId)
.setClientIdPrefix(clientId)
.setTopics(kafkaInputTopic)
.setDeserializer(KafkaRecordDeserializationSchema.of(new
JSONKeyValueDeserializationSchema(fetchMetadata)))
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
.build();
/* A watermark is needed to prevent duplicates! */
WatermarkStrategy<ObjectNode> kafkaWmstrategy = WatermarkStrategy
.<ObjectNode>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness)))
.withIdleness(Duration.ofSeconds(Integer.parseInt(idleness)))
.withTimestampAssigner(new
SerializableTimestampAssigner<ObjectNode>() {
@Override
public long extractTimestamp(ObjectNode element, long
eventTime) {
return
element.get("value").get("handling_time").asLong();
}
});
/* Use the watermark stragegy to create a datastream */
DataStream<ObjectNode> ds = env.fromSource(source, kafkaWmstrategy,
"Kafka Source");
/* Split the ObjectNode into a Tuple4 */
DataStream<Tuple4<Long, Long, String, String>> tuple4ds =
ds.flatMap(new Splitter());
WatermarkStrategy<Tuple4<Long, Long, String, String>> wmStrategy =
WatermarkStrategy
.<Tuple4<Long, Long, String,
String>>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness)))
.withIdleness(Duration.ofSeconds(Integer.parseInt(idleness)))
.withTimestampAssigner(new
SerializableTimestampAssigner<Tuple4<Long, Long, String, String>>() {
@Override
public long extractTimestamp(Tuple4<Long, Long, String,
String> element, long eventTime) {
return element.f0;
}
});
DataStream<Tuple4<Long, Long, String, String>> tuple4dswm =
tuple4ds.assignTimestampsAndWatermarks(wmStrategy);
DataStream<String> tuple4DsWmKeyedbytr = tuple4dswm
.keyBy(new KeySelector<Tuple4<Long, Long, String, String>,
String>() {
@Override
public String getKey(Tuple4<Long, Long, String, String>
value) throws Exception {
return value.f2;
}
})
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(Integer.parseInt(sessionWindowGap))))
.allowedLateness(Time.seconds(Integer.parseInt(sessionAllowedLateness)))
.process(new MyProcessWindowFunction());
KafkaSink<String> kSink = KafkaSink.<String>builder()
.setBootstrapServers(outputBrokers)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(kafkaOutputTopic)
.setValueSerializationSchema(new
SimpleStringSchema())
.build()
)
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
// Sink to the Kafka topic
tuple4DsWmKeyedbytr.sinkTo(kSink);
KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
.setProperties(kafkaProps)
.setProperty("ssl.truststore.type", trustStoreType)
.setProperty("ssl.truststore.password", trustStorePassword)
.setProperty("ssl.truststore.location", trustStoreLocation)
.setProperty("security.protocol", securityProtocol)
.setProperty("partition.discovery.interval.ms",
partitionDiscoveryIntervalMs)
.setProperty("commit.offsets.on.checkpoint",
commitOffsetsOnCheckpoint)
.setGroupId(inputGroupId)
.setClientIdPrefix(clientId)
.setTopics(kafkaInputTopic)
.setDeserializer(KafkaRecordDeserializationSchema.of(new
JSONKeyValueDeserializationSchema(fetchMetadata)))
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
.build();
/* A watermark is needed to prevent duplicates! */
WatermarkStrategy<ObjectNode> kafkaWmstrategy = WatermarkStrategy
.<ObjectNode>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness)))
.withIdleness(Duration.ofSeconds(Integer.parseInt(idleness)))
.withTimestampAssigner(new
SerializableTimestampAssigner<ObjectNode>() {
@Override
public long extractTimestamp(ObjectNode element, long
eventTime) {
return
element.get("value").get("handling_time").asLong();
}
});
/* Use the watermark stragegy to create a datastream */
DataStream<ObjectNode> ds = env.fromSource(source, kafkaWmstrategy,
"Kafka Source");
/* Split the ObjectNode into a Tuple4 */
DataStream<Tuple4<Long, Long, String, String>> tuple4ds =
ds.flatMap(new Splitter());
WatermarkStrategy<Tuple4<Long, Long, String, String>> wmStrategy =
WatermarkStrategy
.<Tuple4<Long, Long, String,
String>>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness)))
.withIdleness(Duration.ofSeconds(Integer.parseInt(idleness)))
.withTimestampAssigner(new
SerializableTimestampAssigner<Tuple4<Long, Long, String, String>>() {
@Override
public long extractTimestamp(Tuple4<Long, Long, String,
String> element, long eventTime) {
return element.f0;
}
});
DataStream<Tuple4<Long, Long, String, String>> tuple4dswm =
tuple4ds.assignTimestampsAndWatermarks(wmStrategy);
DataStream<String> tuple4DsWmKeyedbytr = tuple4dswm
.keyBy(new KeySelector<Tuple4<Long, Long, String, String>,
String>() {
@Override
public String getKey(Tuple4<Long, Long, String, String> value)
throws Exception {
return value.f2;
}
})
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(Integer.parseInt(sessionWindowGap))))
.allowedLateness(Time.seconds(Integer.parseInt(sessionAllowedLateness)))
.process(new MyProcessWindowFunction());
KafkaSink<String> kSink = KafkaSink.<String>builder()
.setBootstrapServers(outputBrokers)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(kafkaOutputTopic)
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
// Sink to the Kafka topic
tuple4DsWmKeyedbytr.sinkTo(kSink);
env.execute("Cag Events");
}
// Splits the Object node into a Tuple4
private static class Splitter implements FlatMapFunction<ObjectNode,
Tuple4<Long, Long, String, String>> {
@Override
public void flatMap(ObjectNode json, Collector<Tuple4<Long, Long,
String, String>> out) throws Exception {
// retrieved handling_time twice intentionally one of which will be
used for the watermark strategy and the other for the calculation of the
elapsed time
out.collect(new Tuple4<Long, Long, String,
String>(json.get("value").get("handling_time").asLong(),
json.get("value").get("handling_time").asLong(),
json.get("value").get("transaction_id").asText(),
json.get("value").get("original_event").toPrettyString()));
}
}
// Class to sort the events that belong to the same transactions
public static class SortEventsHandlingTime implements
Comparator<Tuple4<Long, Long, String, String>> {
// Let's compare 2 Tuple4 objects
public int compare(Tuple4<Long, Long, String, String> o1, Tuple4<Long,
Long, String, String> o2) {
int result =
Long.compare(Long.parseLong(o1.getField(0).toString()),
Long.parseLong(o2.getField(0).toString()));
if (result > 0) {
return 1;
} else if (result == 0) {
return 0;
} else {
return -1;
}
}
}
// Sorts the events and calculates the elapsed times
static class MyProcessWindowFunction extends
ProcessWindowFunction<Tuple4<Long, Long, String, String>, String, String,
TimeWindow> {
@Override
public void process(String key, Context context, Iterable<Tuple4<Long,
Long, String, String>> input, Collector<String> out) throws
JsonProcessingException {
Long elapsed = 0L;
Long pHandlingTime = 0L;
Long cumulativeElapsed = 0L;
List<Tuple4<Long, Long, String, String>> inputList = new
ArrayList<>();
input.forEach(inputList::add);
inputList.sort(new SortEventsHandlingTime());
ObjectMapper mapper = new ObjectMapper();
for (Tuple4<Long, Long, String, String> in: inputList){
if (pHandlingTime.equals(0L)) {
elapsed = 0L;
} else {
elapsed = Long.parseLong(in.getField(0).toString()) -
pHandlingTime;
}
cumulativeElapsed = cumulativeElapsed + elapsed;
pHandlingTime = Long.parseLong(in.getField(0).toString());
JsonNode originalEvent =
mapper.readTree(in.getField(3).toString());
// Cast
ObjectNode o = (ObjectNode)
originalEvent.get("Message").get("endpoints").get(0).get("endpoint_handlers").get(0);
o.put("handling_time", in.getField(0).toString());
o.put("elapsed_time", elapsed.toString());
o.put("cumulative_elapsed_time", cumulativeElapsed.toString());
out.collect(((ObjectNode) originalEvent).toString());
}
}
}