This is an automated email from the ASF dual-hosted git repository. engelen pushed a commit to branch update/kafka-clients-4.0.0 in repository https://gitbox.apache.org/repos/asf/pekko-connectors-kafka.git
commit fab3db74122f13d2104db5f91220da0506a022e4 Author: PJ Fanning <[email protected]> AuthorDate: Wed Mar 26 13:08:41 2025 +0100 Update KafkaConsumerActor.scala --- .../apache/pekko/kafka/internal/KafkaConsumerActor.scala | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala b/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala index 87a56426..f720b3a8 100644 --- a/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala +++ b/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala @@ -716,13 +716,16 @@ import scala.util.control.NonFatal }) case req: Metadata.GetCommittedOffset @nowarn("cat=deprecation") => - @nowarn("cat=deprecation") val resp = Metadata.CommittedOffset( + Metadata.CommittedOffsets( Try { - @nowarn("cat=deprecation") val offset = consumer.committed(req.partition, settings.getMetadataRequestTimeout) - offset - }, - req.partition) - resp + consumer + .committed( + java.util.Collections.singleton(req.partition), + settings.getMetadataRequestTimeout) + .asScala + .filterNot(_._2 == null) + .toMap + }) } private def stopFromMessage(msg: StopLike) = msg match { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
