Hello,
Im new to Kafka ecosystem so I apologize if this is all a naive question.
What Im looking to accomplish is the following:
- We get heartbeat events from a source which is piped into a kafka topic.
These events are of the form - {"session_id": "foo", "total_time_spent": x
} . We get these events ever 15 seconds or so. The total time spent is the
total time for a certain session id, so for a session id its cumulative and
thus monotonically increasing.
- Now we need to transform this stream to emit incremental total time
spent. So essentially for any event the incremental_time_spent is the
current total_time_spent - previous_total_time_spent . The notion of
previous is time based.
Im wondering how to achieve the above using the streams api.
The first attempt I made was of the following form:
KStreamBuilder builder = new KStreamBuilder();
builder
.stream(topic)
.groupByKey()
.aggregate(() -> null, Incrementalize::incrementalize)
.toStream()
.print();
with the incrementalize method being the following:
private static JsonNode incrementalize(String sessionId, JsonNode
currentNode, JsonNode currentAggregate) {
ObjectNode result = JsonNodeFactory.instance.objectNode();
ArrayNode valuesArray = JsonNodeFactory.instance.arrayNode();
long currentTimeSpent = currentNode.get("total_time_spent").asLong();
if (currentAggregate == null) {
result.put("incremental_time_spent", currentTimeSpent);
} else {
ArrayNode values = (ArrayNode) currentAggregate.get("values");
valuesArray.addAll(values);
result.put("incremental_time_spent",
Math.max(0, currentTimeSpent - valuesArray.get(valuesArray.size() -
1).asLong()));
}
valuesArray.add(currentTimeSpent);
result.set("values", valuesArray);
return result;
}
Now the problem Im having with the above approach is that the KTable ->
toStream doesnt emit every change, it only seems to emit a change every 30
seconds even though the heartbeats are coming in every 15 seconds and are
getting accumulated into the values array appropriately.
Is there a gap in my fundamental modeling of the problem into streams?
Could someone please point me in the right direction?
Thanks!
Puneet
--
Regards,
Puneet