This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-connectors-kafka.git


The following commit(s) were added to refs/heads/main by this push:
     new 67a5cffb Kafka Client 4.1.0 (#358)
67a5cffb is described below

commit 67a5cffb9f41d254d356d44cb09b91e0e049fa04
Author: PJ Fanning <[email protected]>
AuthorDate: Tue Sep 23 05:13:11 2025 +0100

    Kafka Client 4.1.0 (#358)
    
    * Update kafka-clients to 4.0.0
    
    * Update KafkaConsumerActor.scala
    
    * Update ConsumerResetProtection.scala
    
    * Update KafkaSpec.scala
    
    test compile issues
    
    * Create kafka-clients-4.0-upgrade.excludes
    
    * deprecation issues
    
    * kafka-client 4.1.0
    
    * Update KafkaConsumerActor.scala
    
    * Update ConsumerDummy.scala
    
    * Update KafkaConsumerActor.scala
    
    * Update ConsumerMock.scala
    
    * kafka-avro-serializer 8.0.0
    
    * compile issues
    
    ---------
    
    Co-authored-by: scala-steward-asf[bot] 
<147768647+scala-steward-asf[bot]@users.noreply.github.com>
---
 .../kafka-clients-4.0-upgrade.excludes             | 19 ++++++++++++
 .../kafka/internal/ConsumerResetProtection.scala   |  2 +-
 .../pekko/kafka/internal/KafkaConsumerActor.scala  |  1 +
 docs/src/main/paradox/serialization.md             | 10 +++----
 project/Versions.scala                             |  4 +--
 .../pekko/kafka/testkit/scaladsl/KafkaSpec.scala   |  2 ++
 .../scaladsl/SchemaRegistrySerializationSpec.scala |  6 ++--
 .../internal/CommittingProducerSinkSpec.scala      | 34 +++++++++++-----------
 .../pekko/kafka/internal/ConsumerDummy.scala       | 12 ++++++--
 .../apache/pekko/kafka/internal/ConsumerMock.scala |  8 +++--
 .../internal/ConsumerProgressTrackingSpec.scala    |  4 +--
 .../internal/ConsumerResetProtectionSpec.scala     |  8 +++--
 .../kafka/internal/PartitionedSourceSpec.scala     |  2 +-
 .../apache/pekko/kafka/internal/ProducerSpec.scala |  2 +-
 14 files changed, 74 insertions(+), 40 deletions(-)

diff --git 
a/core/src/main/mima-filters/1.1.x.backwards.excludes/kafka-clients-4.0-upgrade.excludes
 
b/core/src/main/mima-filters/1.1.x.backwards.excludes/kafka-clients-4.0-upgrade.excludes
new file mode 100644
index 00000000..451d90a3
--- /dev/null
+++ 
b/core/src/main/mima-filters/1.1.x.backwards.excludes/kafka-clients-4.0-upgrade.excludes
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Method that was removed from kafka-clients 4.0
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.kafka.RestrictedConsumer.committed")
diff --git 
a/core/src/main/scala/org/apache/pekko/kafka/internal/ConsumerResetProtection.scala
 
b/core/src/main/scala/org/apache/pekko/kafka/internal/ConsumerResetProtection.scala
index 55006009..8387fa95 100644
--- 
a/core/src/main/scala/org/apache/pekko/kafka/internal/ConsumerResetProtection.scala
+++ 
b/core/src/main/scala/org/apache/pekko/kafka/internal/ConsumerResetProtection.scala
@@ -68,7 +68,7 @@ object ConsumerResetProtection {
           .toMap
           .asJava
 
-      new ConsumerRecords[K, V](safe)
+      new ConsumerRecords[K, V](safe, java.util.Collections.emptyMap())
     }
 
     /**
diff --git 
a/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala 
b/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala
index b837dd42..848f5b36 100644
--- 
a/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala
+++ 
b/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala
@@ -445,6 +445,7 @@ import scala.util.control.NonFatal
     progressTracker
   }
 
+  @nowarn("msg=deprecated")
   override def postStop(): Unit = {
     // reply to outstanding requests is important if the actor is restarted
     requests.foreach {
diff --git a/docs/src/main/paradox/serialization.md 
b/docs/src/main/paradox/serialization.md
index 02b5b696..f6ca3079 100644
--- a/docs/src/main/paradox/serialization.md
+++ b/docs/src/main/paradox/serialization.md
@@ -66,7 +66,7 @@ Maven
         <dependency>
           <groupId>io.confluent</groupId>
           <artifactId>kafka-avro-serializer</artifactId>
-          <version>confluent.version (eg. 7.9.2)</version>
+          <version>confluent.version (eg. 8.0.0)</version>
         </dependency>
         ...
       </dependencies>
@@ -84,14 +84,14 @@ Maven
 
 sbt
 :   ```scala
-    libraryDependencies += "io.confluent" % "kafka-avro-serializer" % 
confluentAvroVersion, //  eg. 7.9.2
+    libraryDependencies += "io.confluent" % "kafka-avro-serializer" % 
confluentAvroVersion, //  eg. 8.0.0
     resolvers += "Confluent Maven Repository" at 
"https://packages.confluent.io/maven/";,
     ```
 
 Gradle
 :   ```gradle
     dependencies {
-      compile group: 'io.confluent', name: 'kafka-avro-serializer', version: 
confluentAvroVersion // eg. 7.9.2
+      compile group: 'io.confluent', name: 'kafka-avro-serializer', version: 
confluentAvroVersion // eg. 8.0.0
     }
     repositories {
       maven {
@@ -103,7 +103,7 @@ Gradle
 
 ### Producer
 
-To create serializers that use the Schema Registry, its URL needs to be 
provided as configuration 
`AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG` to the serializer and 
that serializer is used in the @apidoc[ProducerSettings$].
+To create serializers that use the Schema Registry, its URL needs to be 
provided as configuration 
`AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG` to the serializer 
and that serializer is used in the @apidoc[ProducerSettings$].
 
 Scala
 : @@ snip 
[snip](/tests/src/test/scala/docs/scaladsl/SchemaRegistrySerializationSpec.scala)
 { #imports #serializer }
@@ -115,7 +115,7 @@ Java
 
 ### Consumer
 
-To create deserializers that use the Schema Registry, its URL needs to be 
provided as configuration  
`AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG` to the deserializer 
and that deserializer is used in the @apidoc[ConsumerSettings$].
+To create deserializers that use the Schema Registry, its URL needs to be 
provided as configuration  
`AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG` to the deserializer 
and that deserializer is used in the @apidoc[ConsumerSettings$].
 
 Scala
 : @@ snip 
[snip](/tests/src/test/scala/docs/scaladsl/SchemaRegistrySerializationSpec.scala)
 { #imports #de-serializer }
diff --git a/project/Versions.scala b/project/Versions.scala
index 24107652..e7abc322 100644
--- a/project/Versions.scala
+++ b/project/Versions.scala
@@ -22,7 +22,7 @@ object Versions {
   val pekkoConnectorsKafkaVersionForDocs = "current"
   val pekkoManagementVersionForDocs = "current"
 
-  val kafkaVersion = "3.9.1"
+  val kafkaVersion = "4.1.0"
   val KafkaVersionForDocs = "37"
 
   val scalaTestVersion = "3.2.19"
@@ -33,7 +33,7 @@ object Versions {
   // this depends on Kafka, and should be upgraded to such latest version
   // that depends on the same Kafka version, as is defined above
   // See 
https://mvnrepository.com/artifact/io.confluent/kafka-avro-serializer?repo=confluent-packages
-  val confluentAvroSerializerVersion = "7.9.2"
+  val confluentAvroSerializerVersion = "8.0.0"
   val confluentLibsExclusionRules = Seq(
     ExclusionRule("log4j", "log4j"),
     ExclusionRule("org.slf4j", "slf4j-log4j12"),
diff --git 
a/testkit/src/main/scala/org/apache/pekko/kafka/testkit/scaladsl/KafkaSpec.scala
 
b/testkit/src/main/scala/org/apache/pekko/kafka/testkit/scaladsl/KafkaSpec.scala
index 3ae23870..1f77271b 100644
--- 
a/testkit/src/main/scala/org/apache/pekko/kafka/testkit/scaladsl/KafkaSpec.scala
+++ 
b/testkit/src/main/scala/org/apache/pekko/kafka/testkit/scaladsl/KafkaSpec.scala
@@ -36,6 +36,7 @@ import org.apache.kafka.clients.producer.{ Producer => 
KProducer, ProducerRecord
 import org.apache.kafka.common.ConsumerGroupState
 import org.slf4j.{ Logger, LoggerFactory }
 
+import scala.annotation.nowarn
 import scala.collection.immutable
 import scala.concurrent.duration._
 import scala.concurrent.{ Await, ExecutionContext, Future }
@@ -119,6 +120,7 @@ abstract class KafkaSpec(_kafkaPort: Int, val 
zooKeeperPort: Int, actorSystem: A
    *
    * If the predicate does not hold after configured amount of time, throws an 
exception.
    */
+  @nowarn("cat=deprecation")
   def waitUntilConsumerSummary(groupId: String)(predicate: 
PartialFunction[List[MemberDescription], Boolean]): Unit =
     waitUntilConsumerGroup(groupId) { group =>
       group.state() == ConsumerGroupState.STABLE &&
diff --git 
a/tests/src/test/scala/docs/scaladsl/SchemaRegistrySerializationSpec.scala 
b/tests/src/test/scala/docs/scaladsl/SchemaRegistrySerializationSpec.scala
index e43ea558..5f643fb1 100644
--- a/tests/src/test/scala/docs/scaladsl/SchemaRegistrySerializationSpec.scala
+++ b/tests/src/test/scala/docs/scaladsl/SchemaRegistrySerializationSpec.scala
@@ -35,7 +35,7 @@ import scala.collection.immutable
 import scala.concurrent.duration._
 import scala.jdk.CollectionConverters._
 // #imports
-import io.confluent.kafka.serializers.{ AbstractKafkaAvroSerDeConfig, 
KafkaAvroDeserializer, KafkaAvroSerializer }
+import io.confluent.kafka.serializers.{ AbstractKafkaSchemaSerDeConfig, 
KafkaAvroDeserializer, KafkaAvroSerializer }
 import org.apache.avro.specific.SpecificRecord
 // #imports
 import org.apache.kafka.clients.consumer.ConsumerConfig
@@ -59,7 +59,7 @@ class SchemaRegistrySerializationSpec extends DocsSpecBase 
with TestcontainersKa
     // #serializer #de-serializer
 
     val kafkaAvroSerDeConfig = Map[String, Any](
-      AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> 
schemaRegistryUrl,
+      AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> 
schemaRegistryUrl,
       KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG -> true.toString)
     // #serializer #de-serializer
 
@@ -217,7 +217,7 @@ class SchemaRegistrySerializationSpec extends DocsSpecBase 
with TestcontainersKa
 
   private def specificRecordConsumerSettings(group: String): 
ConsumerSettings[String, SpecificRecord] = {
     val kafkaAvroSerDeConfig = Map[String, Any] {
-      AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> 
schemaRegistryUrl
+      AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> 
schemaRegistryUrl
     }
     val kafkaAvroDeserializer = new KafkaAvroDeserializer()
     kafkaAvroDeserializer.configure(kafkaAvroSerDeConfig.asJava, false)
diff --git 
a/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingProducerSinkSpec.scala
 
b/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingProducerSinkSpec.scala
index f0d54d35..8ba35d38 100644
--- 
a/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingProducerSinkSpec.scala
+++ 
b/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingProducerSinkSpec.scala
@@ -81,7 +81,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
       consumer.message(partition, "value 1"),
       consumer.message(partition, "value 2"))
 
-    val producer = new MockProducer[String, String](true, new 
StringSerializer, new StringSerializer)
+    val producer = new MockProducer[String, String](true, None.orNull, new 
StringSerializer, new StringSerializer)
     val producerSettings = ProducerSettings(system, new StringSerializer, new 
StringSerializer)
       .withProducer(producer)
     val commitInterval = 200.millis
@@ -117,7 +117,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
       consumer.message(partition, "skip"),
       consumer.message(partition, "send"))
 
-    val producer = new MockProducer[String, String](true, new 
StringSerializer, new StringSerializer)
+    val producer = new MockProducer[String, String](true, None.orNull, new 
StringSerializer, new StringSerializer)
     val producerSettings = ProducerSettings(system, new StringSerializer, new 
StringSerializer)
       .withProducer(producer)
     val commitInterval = 200.millis
@@ -157,7 +157,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
       consumer.message(partition, "value 1"),
       consumer.message(partition, "value 2"))
 
-    val producer = new MockProducer[String, String](true, new 
StringSerializer, new StringSerializer)
+    val producer = new MockProducer[String, String](true, None.orNull, new 
StringSerializer, new StringSerializer)
     val producerSettings = ProducerSettings(system, new StringSerializer, new 
StringSerializer)
       .withProducer(producer)
     val committerSettings = 
CommitterSettings(system).withMaxBatch(2L).withMaxInterval(10.seconds)
@@ -191,7 +191,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
       consumer.message(partition, "value 1"),
       consumer.message(partition, "value 2"))
 
-    val producer = new MockProducer[String, String](true, new 
StringSerializer, new StringSerializer)
+    val producer = new MockProducer[String, String](true, None.orNull, new 
StringSerializer, new StringSerializer)
     val producerSettings = ProducerSettings(system, new StringSerializer, new 
StringSerializer)
       .withProducer(producer)
     val committerSettings = 
CommitterSettings(system).withMaxBatch(2L).withMaxInterval(10.seconds)
@@ -226,7 +226,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
     val producerRecordsPerInput = 2
     val totalProducerRecords = elements.size * producerRecordsPerInput
 
-    val producer = new MockProducer[String, String](true, new 
StringSerializer, new StringSerializer)
+    val producer = new MockProducer[String, String](true, None.orNull, new 
StringSerializer, new StringSerializer)
     val producerSettings = ProducerSettings(system, new StringSerializer, new 
StringSerializer)
       .withProducer(producer)
     val committerSettings = 
CommitterSettings(system).withMaxBatch(elements.size.longValue())
@@ -259,7 +259,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
     val consumer = FakeConsumer(groupId, topic, startOffset = 1616L)
     val message = consumer.message(partition, "increment the offset")
 
-    val producer = new MockProducer[String, String](true, new 
StringSerializer, new StringSerializer)
+    val producer = new MockProducer[String, String](true, None.orNull, new 
StringSerializer, new StringSerializer)
     val producerSettings = ProducerSettings(system, new StringSerializer, new 
StringSerializer)
       .withProducer(producer)
     val committerSettings = CommitterSettings(system).withMaxBatch(1)
@@ -292,7 +292,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
       consumer.message(partition, "value 1"),
       consumer.message(partition, "value 2"))
 
-    val producer = new MockProducer[String, String](true, new 
StringSerializer, new StringSerializer)
+    val producer = new MockProducer[String, String](true, None.orNull, new 
StringSerializer, new StringSerializer)
     val producerSettings = ProducerSettings(system, new StringSerializer, new 
StringSerializer)
       .withProducer(producer)
     val committerSettings = CommitterSettings(system)
@@ -328,7 +328,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
       consumer.message(partition, "value 1"),
       consumer.message(partition, "value 2"))
 
-    val producer = new MockProducer[String, String](true, new 
StringSerializer, new StringSerializer)
+    val producer = new MockProducer[String, String](true, None.orNull, new 
StringSerializer, new StringSerializer)
     val producerSettings = ProducerSettings(system, new StringSerializer, new 
StringSerializer)
       .withProducer(producer)
     // choose a large commit interval so that completion happens before
@@ -364,7 +364,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
       consumer.message(partition, "value 1"),
       consumer.message(partition, "value 2"))
 
-    val producer = new MockProducer[String, String](true, new 
StringSerializer, new StringSerializer)
+    val producer = new MockProducer[String, String](true, None.orNull, new 
StringSerializer, new StringSerializer)
     val producerSettings = ProducerSettings(system, new StringSerializer, new 
StringSerializer)
       .withProducer(producer)
     val commitInterval = 5.seconds
@@ -404,7 +404,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
       consumer.message(partition, "value 1"),
       consumer.message(partition, "value 2"))
 
-    val producer = new MockProducer[String, String](true, new 
StringSerializer, new StringSerializer)
+    val producer = new MockProducer[String, String](true, None.orNull, new 
StringSerializer, new StringSerializer)
     val producerSettings = ProducerSettings(system, new StringSerializer, new 
StringSerializer)
       .withProducer(producer)
     val commitInterval = 5.seconds
@@ -441,7 +441,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
       consumer.message(partition, "value 2"))
 
     // this producer does not auto complete messages
-    val producer = new MockProducer[String, String](false, new 
StringSerializer, new StringSerializer)
+    val producer = new MockProducer[String, String](false, None.orNull, new 
StringSerializer, new StringSerializer)
     val producerSettings = ProducerSettings(system, new StringSerializer, new 
StringSerializer)
       .withProducer(producer)
     val committerSettings = CommitterSettings(system).withMaxBatch(1L)
@@ -480,7 +480,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
       consumer.message(partition, "value 1"),
       consumer.message(partition, "value 2"))
 
-    val producer = new MockProducer[String, String](false, new 
StringSerializer, new StringSerializer)
+    val producer = new MockProducer[String, String](false, None.orNull, new 
StringSerializer, new StringSerializer)
     val producerSettings = ProducerSettings(system, new StringSerializer, new 
StringSerializer)
       .withProducer(producer)
     val committerSettings = CommitterSettings(system).withMaxBatch(1L)
@@ -523,7 +523,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
       consumer.message(partition, "value 1"),
       consumer.message(partition, "value 2"))
 
-    val producer = new MockProducer[String, String](false, new 
StringSerializer, new StringSerializer)
+    val producer = new MockProducer[String, String](false, None.orNull, new 
StringSerializer, new StringSerializer)
     val producerSettings = ProducerSettings(system, new StringSerializer, new 
StringSerializer)
       .withProducer(producer)
     // choose a large commit interval so that completion happens before
@@ -569,7 +569,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
       consumer.message(partition, "value 1"),
       consumer.message(partition, "value 2"))
 
-    val producer = new MockProducer[String, String](true, new 
StringSerializer, new StringSerializer)
+    val producer = new MockProducer[String, String](true, None.orNull, new 
StringSerializer, new StringSerializer)
     val producerSettings = ProducerSettings(system, new StringSerializer, new 
StringSerializer)
       .withProducer(producer)
     val committerSettings = CommitterSettings(system).withMaxBatch(2L)
@@ -602,7 +602,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
       consumer.message(partition, "value 1"),
       consumer.message(partition, "value 2"))
 
-    val producer = new MockProducer[String, String](true, new 
StringSerializer, new StringSerializer)
+    val producer = new MockProducer[String, String](true, None.orNull, new 
StringSerializer, new StringSerializer)
     val producerSettings = ProducerSettings(system, new StringSerializer, new 
StringSerializer)
       .withProducer(producer)
     val committerSettings = CommitterSettings(system).withMaxBatch(2L)
@@ -640,7 +640,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
       consumer.message(partition, "value 1"),
       consumer.message(partition, "value 2"))
 
-    val producer = new MockProducer[String, String](true, new 
StringSerializer, new StringSerializer)
+    val producer = new MockProducer[String, String](true, None.orNull, new 
StringSerializer, new StringSerializer)
     val producerSettings = ProducerSettings(system, new StringSerializer, new 
StringSerializer)
       .withProducer(producer)
     val committerSettings = CommitterSettings(system)
@@ -674,7 +674,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
   }
 
   it should "shut down without elements" in assertAllStagesStopped {
-    val producer = new MockProducer[String, String](true, new 
StringSerializer, new StringSerializer)
+    val producer = new MockProducer[String, String](true, None.orNull, new 
StringSerializer, new StringSerializer)
     val producerSettings = ProducerSettings(system, new StringSerializer, new 
StringSerializer)
       .withProducer(producer)
     val committerSettings = CommitterSettings(system).withMaxInterval(1.second)
diff --git 
a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerDummy.scala 
b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerDummy.scala
index 93073673..9d38fa7f 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerDummy.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerDummy.scala
@@ -21,6 +21,7 @@ import java.util.concurrent.atomic.AtomicInteger
 import org.apache.pekko.Done
 import org.apache.kafka.clients.consumer._
 import org.apache.kafka.common.{ Metric, MetricName, PartitionInfo, 
TopicPartition }
+import org.apache.kafka.common.metrics.KafkaMetric
 import org.slf4j.{ Logger, LoggerFactory }
 
 import scala.concurrent.Promise
@@ -50,7 +51,6 @@ abstract class ConsumerDummy[K, V] extends Consumer[K, V] {
   override def subscribe(pattern: java.util.regex.Pattern, callback: 
ConsumerRebalanceListener): Unit = ???
   override def subscribe(pattern: java.util.regex.Pattern): Unit = ???
   override def unsubscribe(): Unit = ???
-  override def poll(timeout: Long): ConsumerRecords[K, V] = ???
   override def commitSync(): Unit = ???
   override def commitSync(offsets: java.util.Map[TopicPartition, 
OffsetAndMetadata]): Unit = ???
   override def commitAsync(): Unit = ???
@@ -63,7 +63,6 @@ abstract class ConsumerDummy[K, V] extends Consumer[K, V] {
   override def seekToEnd(partitions: java.util.Collection[TopicPartition]): 
Unit = ???
   override def position(partition: TopicPartition): Long = ???
   override def position(partition: TopicPartition, timeout: 
java.time.Duration): Long = ???
-  override def committed(partition: TopicPartition): OffsetAndMetadata = ???
   override def metrics(): java.util.Map[MetricName, _ <: Metric] = ???
   override def partitionsFor(topic: String): java.util.List[PartitionInfo] = 
???
   override def listTopics(): java.util.Map[String, 
java.util.List[PartitionInfo]] = ???
@@ -81,13 +80,13 @@ abstract class ConsumerDummy[K, V] extends Consumer[K, V] {
   override def endOffsets(
       partitions: java.util.Collection[TopicPartition]): 
java.util.Map[TopicPartition, java.lang.Long] = ???
   override def close(): Unit = {}
+  override def close(options: CloseOptions): Unit = {}
   override def close(timeout: java.time.Duration): Unit = {}
   override def wakeup(): Unit = ???
 
   override def commitSync(timeout: java.time.Duration): Unit = ???
   override def commitSync(offsets: java.util.Map[TopicPartition, 
OffsetAndMetadata],
       timeout: java.time.Duration): Unit = ???
-  override def committed(partition: TopicPartition, timeout: 
java.time.Duration): OffsetAndMetadata = ???
   override def committed(partitions: util.Set[TopicPartition]): 
util.Map[TopicPartition, OffsetAndMetadata] = ???
   override def committed(partitions: util.Set[TopicPartition],
       timeout: Duration): util.Map[TopicPartition, OffsetAndMetadata] = ???
@@ -102,4 +101,11 @@ abstract class ConsumerDummy[K, V] extends Consumer[K, V] {
   override def groupMetadata(): ConsumerGroupMetadata = ???
   override def enforceRebalance(): Unit = ???
   override def currentLag(partition: TopicPartition): java.util.OptionalLong = 
???
+
+  override def subscribe(sp: SubscriptionPattern): Unit = ???
+  override def subscribe(sp: SubscriptionPattern, listener: 
ConsumerRebalanceListener): Unit = ???
+
+  override def registerMetricForSubscription(metric: KafkaMetric): Unit = ???
+  override def unregisterMetricFromSubscription(metric: KafkaMetric): Unit = 
???
+
 }
diff --git 
a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerMock.scala 
b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerMock.scala
index e922a8cd..05fa441e 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerMock.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerMock.scala
@@ -26,6 +26,7 @@ import org.mockito.stubbing.Answer
 import org.mockito.verification.VerificationMode
 import org.mockito.{ ArgumentMatchers, Mockito }
 
+import scala.annotation.nowarn
 import scala.collection.immutable.Seq
 import scala.concurrent.duration._
 import scala.jdk.CollectionConverters._
@@ -113,7 +114,7 @@ class ConsumerMock[K, V](handler: 
ConsumerMock.CommitHandler = new ConsumerMock.
           if (releaseCommitCallbacks.get()) {
             handler.onComplete()
           }
-          new ConsumerRecords[K, V](records.asJava)
+          new ConsumerRecords[K, V](records.asJava, 
java.util.Collections.emptyMap())
         }
       })
     Mockito
@@ -168,6 +169,7 @@ class ConsumerMock[K, V](handler: 
ConsumerMock.CommitHandler = new ConsumerMock.
       responses :+= records
     }
 
+  @nowarn("msg=deprecated")
   def verifyClosed(mode: VerificationMode = Mockito.times(1)) =
     verify(mock, mode).close(ConsumerMock.closeTimeout.toJava)
 
@@ -207,7 +209,9 @@ class FailingConsumerMock[K, V](throwable: Throwable, 
failOnCallNumber: Int*) ex
         callNumber = callNumber + 1
         if (failOnCallNumber.contains(callNumber))
           throw throwable
-        else new ConsumerRecords[K, V](Map.empty[TopicPartition, 
java.util.List[ConsumerRecord[K, V]]].asJava)
+        else new ConsumerRecords[K, V](
+          Map.empty[TopicPartition, java.util.List[ConsumerRecord[K, 
V]]].asJava,
+          java.util.Collections.emptyMap())
       }
     })
 }
diff --git 
a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerProgressTrackingSpec.scala
 
b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerProgressTrackingSpec.scala
index eca227ad..48a68b92 100644
--- 
a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerProgressTrackingSpec.scala
+++ 
b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerProgressTrackingSpec.scala
@@ -30,7 +30,7 @@ class ConsumerProgressTrackingSpec extends AnyFlatSpecLike 
with Matchers with Lo
   private val tp = new TopicPartition("t", 0)
   private val m1 = new ConsumerRecord[String, String](tp.topic(), 
tp.partition(), 10L, "k1", "kv")
   def asConsumerRecords[K, V](tp: TopicPartition, records: ConsumerRecord[K, 
V]*): ConsumerRecords[K, V] = {
-    new ConsumerRecords[K, V](Map(tp -> records.asJava).asJava)
+    new ConsumerRecords[K, V](Map(tp -> records.asJava).asJava, 
java.util.Collections.emptyMap())
   }
   private val records = asConsumerRecords(tp, m1)
 
@@ -86,7 +86,7 @@ class ConsumerProgressTrackingSpec extends AnyFlatSpecLike 
with Matchers with Lo
       new ConsumerRecords[String, String](
         Map(
           tp2 -> List(new ConsumerRecord[String, String](tp2.topic(), 
tp2.partition(), 10L, "k1",
-            "kv")).asJava).asJava))
+            "kv")).asJava).asJava, java.util.Collections.emptyMap()))
     tracker.receivedMessages.map(extractOffsetFromSafe) should be(Map(tp -> 
10L))
     // no change to the committing
     tracker.commitRequested.map(extractOffset) should be(Map(tp -> 0L))
diff --git 
a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerResetProtectionSpec.scala
 
b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerResetProtectionSpec.scala
index 8da09266..0cc90fd2 100644
--- 
a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerResetProtectionSpec.scala
+++ 
b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerResetProtectionSpec.scala
@@ -51,7 +51,7 @@ class ConsumerResetProtectionSpec
     val m1 = new ConsumerRecord(tp.topic(), tp.partition(), 10L, "k1", "kv")
 
     def asConsumerRecords[K, V](records: ConsumerRecord[K, V]*): 
ConsumerRecords[K, V] = {
-      new ConsumerRecords[K, V](Map(tp -> records.asJava).asJava)
+      new ConsumerRecords[K, V](Map(tp -> records.asJava).asJava, 
java.util.Collections.emptyMap())
     }
 
     val records = asConsumerRecords(m1)
@@ -147,7 +147,8 @@ class ConsumerResetProtectionSpec
           new ConsumerRecords(
             Map(
               tp -> List(m1).asJava,
-              tp1 -> List(new ConsumerRecord(tp1.topic(), tp1.partition(), 
10L, "k1", "kv")).asJava).asJava))
+              tp1 -> List(new ConsumerRecord(tp1.topic(), tp1.partition(), 
10L, "k1", "kv")).asJava).asJava,
+            java.util.Collections.emptyMap()))
       shouldHaveEqualRecords(records, protectedRecords)
     }
 
@@ -169,7 +170,8 @@ class ConsumerResetProtectionSpec
             tp -> List(
               new ConsumerRecord(tp.topic(), tp.partition(), 101L, "k1", "kv"),
               new ConsumerRecord(tp.topic(), tp.partition(), 1L, "k2", "kv"),
-              new ConsumerRecord(tp.topic(), tp.partition(), 102L, "k1", 
"kv")).asJava).asJava))
+              new ConsumerRecord(tp.topic(), tp.partition(), 102L, "k1", 
"kv")).asJava).asJava,
+          java.util.Collections.emptyMap()))
       records.count() should be(3)
       records.records(tp).asScala.map(_.offset()) should be(Seq(101L, 1L, 
102L))
     }
diff --git 
a/tests/src/test/scala/org/apache/pekko/kafka/internal/PartitionedSourceSpec.scala
 
b/tests/src/test/scala/org/apache/pekko/kafka/internal/PartitionedSourceSpec.scala
index 36158400..24d014f9 100644
--- 
a/tests/src/test/scala/org/apache/pekko/kafka/internal/PartitionedSourceSpec.scala
+++ 
b/tests/src/test/scala/org/apache/pekko/kafka/internal/PartitionedSourceSpec.scala
@@ -815,7 +815,7 @@ object PartitionedSourceSpec {
       if (data2.nonEmpty) {
         log.debug(s"poll result $data2")
       }
-      new ConsumerRecords[K, V](data2.asJava)
+      new ConsumerRecords[K, V](data2.asJava, java.util.Collections.emptyMap())
     }
     override def position(partition: TopicPartition): Long = 0
     override def position(partition: TopicPartition, timeout: 
java.time.Duration): Long = 0
diff --git 
a/tests/src/test/scala/org/apache/pekko/kafka/internal/ProducerSpec.scala 
b/tests/src/test/scala/org/apache/pekko/kafka/internal/ProducerSpec.scala
index d6a0ec04..12e93360 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/internal/ProducerSpec.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/internal/ProducerSpec.scala
@@ -158,7 +158,7 @@ class ProducerSpec(_system: ActorSystem)
     assertAllStagesStopped {
       val input = (1 to 10).map { recordAndMetadata(_)._1 }
 
-      val mockProducer = new MockProducer[String, String](true, new 
StringSerializer, new StringSerializer)
+      val mockProducer = new MockProducer[String, String](true, None.orNull, 
new StringSerializer, new StringSerializer)
 
       val fut: Future[Done] = 
Source(input).runWith(Producer.plainSink(settings.withProducer(mockProducer)))
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to