Hi Team, I am trying to build an audit like system where I read messages from "n" Kafka queues, key by a unique key and then reduce them to a single message, if it has passed through all the "n" Kafka queues in a window time of "m" hours/days, the message has succeeded else it has expired.
I can get it working in my test case but can't get it working when there are million of messages, there are very few messages that goes to the success stage in the iteration, huge amount of messages are sent back to the iteration, hence it create back pressure and it does not read the messages from Kafka queues anymore. Since no new messages are read, the messages inside the window no longer succeed, they keep going through the iterator forever and expire although they must succeed. I read about the buffer which when full creates back pressure and does not read any more messages. The system is suppose to be a light weight audit system and audit messages created are very small in size. Is it possible to increase the size of the buffer to avoid back pressure? Is there an alternative solution to this issue? The code looks like this: val unionInputStream = union(kafka1,kafka2,kafka3,kafka4) def audit() = { reducedStream = unionInputStream.keyby(keyFunction).window(TumblingProcessingTimeWindow).reduce(reduceFunction) splitStreams = reducedStream.split(splitFunction) splitStreams.select(success).addSink(terminalSink) splitStreams.select(expire).addSink(expireSink) (splitStreams.select(replay), splitStreams.select(success)) } unionInputStream.iterate(audit(_)) Thanks and Regards, Mahesh