Hey Folks, I have a process which aims to measure time lag (in Scala).
When the process bootstraps it looks at the history of offsets and collect the offset that existed for different timestamps (7 days ago, 6 days ago... etc in more frequency as it gets closer to *now*). In order to do that it uses the "consumer.offsetsForTimes" method and keeps that information in memory. I observed that in some cases, mainly in *some* compacted topics partitions (observed on a few partitions from __consumer_offsets and __transaction_state) that the resulted offsets arent monotonically increasing as times are getting closer to now. In these specific cases (again, not all partitions) I see for example (a few data points from many): time (in seconds)=1691512025 offset=16101908 1691550724/15121538 1691645078/15473125 1691789229/15473125 1692104539/16078952 1692116770/16101908 1692116809/16101908 1692116833/16101908 .. .. Code looks like: BootstrapTimes.reverse // bootstrap from old to new time .foreach { duration => val time = now - duration val lookup = partitions .map { _ -> (time.inMillis: java.lang.Long) } .toMap .asJava val offsets = try consumer.offsetsForTimes(lookup).asScala.toMap catch KafkaConsumerException.rethrow val partitionToOffset = offsets.collect { case (partition, value) if value != null => partition -> value.offset } } So no data point is after the earliest time, which seems - odd? What could the reason for it be? can you help me understand why it would mostly be observed with compacted topic (but once in a while in other topics too)?