This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-connectors-kafka.git
The following commit(s) were added to refs/heads/main by this push:
new 67a5cffb Kafka Client 4.1.0 (#358)
67a5cffb is described below
commit 67a5cffb9f41d254d356d44cb09b91e0e049fa04
Author: PJ Fanning <[email protected]>
AuthorDate: Tue Sep 23 05:13:11 2025 +0100
Kafka Client 4.1.0 (#358)
* Update kafka-clients to 4.0.0
* Update KafkaConsumerActor.scala
* Update ConsumerResetProtection.scala
* Update KafkaSpec.scala
test compile issues
* Create kafka-clients-4.0-upgrade.excludes
* deprecation issues
* kafka-client 4.1.0
* Update KafkaConsumerActor.scala
* Update ConsumerDummy.scala
* Update KafkaConsumerActor.scala
* Update ConsumerMock.scala
* kafka-avro-serializer 8.0.0
* compile issues
---------
Co-authored-by: scala-steward-asf[bot]
<147768647+scala-steward-asf[bot]@users.noreply.github.com>
---
.../kafka-clients-4.0-upgrade.excludes | 19 ++++++++++++
.../kafka/internal/ConsumerResetProtection.scala | 2 +-
.../pekko/kafka/internal/KafkaConsumerActor.scala | 1 +
docs/src/main/paradox/serialization.md | 10 +++----
project/Versions.scala | 4 +--
.../pekko/kafka/testkit/scaladsl/KafkaSpec.scala | 2 ++
.../scaladsl/SchemaRegistrySerializationSpec.scala | 6 ++--
.../internal/CommittingProducerSinkSpec.scala | 34 +++++++++++-----------
.../pekko/kafka/internal/ConsumerDummy.scala | 12 ++++++--
.../apache/pekko/kafka/internal/ConsumerMock.scala | 8 +++--
.../internal/ConsumerProgressTrackingSpec.scala | 4 +--
.../internal/ConsumerResetProtectionSpec.scala | 8 +++--
.../kafka/internal/PartitionedSourceSpec.scala | 2 +-
.../apache/pekko/kafka/internal/ProducerSpec.scala | 2 +-
14 files changed, 74 insertions(+), 40 deletions(-)
diff --git
a/core/src/main/mima-filters/1.1.x.backwards.excludes/kafka-clients-4.0-upgrade.excludes
b/core/src/main/mima-filters/1.1.x.backwards.excludes/kafka-clients-4.0-upgrade.excludes
new file mode 100644
index 00000000..451d90a3
--- /dev/null
+++
b/core/src/main/mima-filters/1.1.x.backwards.excludes/kafka-clients-4.0-upgrade.excludes
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Method that was removed from kafka-clients 4.0
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.kafka.RestrictedConsumer.committed")
diff --git
a/core/src/main/scala/org/apache/pekko/kafka/internal/ConsumerResetProtection.scala
b/core/src/main/scala/org/apache/pekko/kafka/internal/ConsumerResetProtection.scala
index 55006009..8387fa95 100644
---
a/core/src/main/scala/org/apache/pekko/kafka/internal/ConsumerResetProtection.scala
+++
b/core/src/main/scala/org/apache/pekko/kafka/internal/ConsumerResetProtection.scala
@@ -68,7 +68,7 @@ object ConsumerResetProtection {
.toMap
.asJava
- new ConsumerRecords[K, V](safe)
+ new ConsumerRecords[K, V](safe, java.util.Collections.emptyMap())
}
/**
diff --git
a/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala
b/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala
index b837dd42..848f5b36 100644
---
a/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala
+++
b/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala
@@ -445,6 +445,7 @@ import scala.util.control.NonFatal
progressTracker
}
+ @nowarn("msg=deprecated")
override def postStop(): Unit = {
// reply to outstanding requests is important if the actor is restarted
requests.foreach {
diff --git a/docs/src/main/paradox/serialization.md
b/docs/src/main/paradox/serialization.md
index 02b5b696..f6ca3079 100644
--- a/docs/src/main/paradox/serialization.md
+++ b/docs/src/main/paradox/serialization.md
@@ -66,7 +66,7 @@ Maven
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
- <version>confluent.version (eg. 7.9.2)</version>
+ <version>confluent.version (eg. 8.0.0)</version>
</dependency>
...
</dependencies>
@@ -84,14 +84,14 @@ Maven
sbt
: ```scala
- libraryDependencies += "io.confluent" % "kafka-avro-serializer" %
confluentAvroVersion, // eg. 7.9.2
+ libraryDependencies += "io.confluent" % "kafka-avro-serializer" %
confluentAvroVersion, // eg. 8.0.0
resolvers += "Confluent Maven Repository" at
"https://packages.confluent.io/maven/",
```
Gradle
: ```gradle
dependencies {
- compile group: 'io.confluent', name: 'kafka-avro-serializer', version:
confluentAvroVersion // eg. 7.9.2
+ compile group: 'io.confluent', name: 'kafka-avro-serializer', version:
confluentAvroVersion // eg. 8.0.0
}
repositories {
maven {
@@ -103,7 +103,7 @@ Gradle
### Producer
-To create serializers that use the Schema Registry, its URL needs to be
provided as configuration
`AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG` to the serializer and
that serializer is used in the @apidoc[ProducerSettings$].
+To create serializers that use the Schema Registry, its URL needs to be
provided as configuration
`AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG` to the serializer
and that serializer is used in the @apidoc[ProducerSettings$].
Scala
: @@ snip
[snip](/tests/src/test/scala/docs/scaladsl/SchemaRegistrySerializationSpec.scala)
{ #imports #serializer }
@@ -115,7 +115,7 @@ Java
### Consumer
-To create deserializers that use the Schema Registry, its URL needs to be
provided as configuration
`AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG` to the deserializer
and that deserializer is used in the @apidoc[ConsumerSettings$].
+To create deserializers that use the Schema Registry, its URL needs to be
provided as configuration
`AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG` to the deserializer
and that deserializer is used in the @apidoc[ConsumerSettings$].
Scala
: @@ snip
[snip](/tests/src/test/scala/docs/scaladsl/SchemaRegistrySerializationSpec.scala)
{ #imports #de-serializer }
diff --git a/project/Versions.scala b/project/Versions.scala
index 24107652..e7abc322 100644
--- a/project/Versions.scala
+++ b/project/Versions.scala
@@ -22,7 +22,7 @@ object Versions {
val pekkoConnectorsKafkaVersionForDocs = "current"
val pekkoManagementVersionForDocs = "current"
- val kafkaVersion = "3.9.1"
+ val kafkaVersion = "4.1.0"
val KafkaVersionForDocs = "37"
val scalaTestVersion = "3.2.19"
@@ -33,7 +33,7 @@ object Versions {
// this depends on Kafka, and should be upgraded to such latest version
// that depends on the same Kafka version, as is defined above
// See
https://mvnrepository.com/artifact/io.confluent/kafka-avro-serializer?repo=confluent-packages
- val confluentAvroSerializerVersion = "7.9.2"
+ val confluentAvroSerializerVersion = "8.0.0"
val confluentLibsExclusionRules = Seq(
ExclusionRule("log4j", "log4j"),
ExclusionRule("org.slf4j", "slf4j-log4j12"),
diff --git
a/testkit/src/main/scala/org/apache/pekko/kafka/testkit/scaladsl/KafkaSpec.scala
b/testkit/src/main/scala/org/apache/pekko/kafka/testkit/scaladsl/KafkaSpec.scala
index 3ae23870..1f77271b 100644
---
a/testkit/src/main/scala/org/apache/pekko/kafka/testkit/scaladsl/KafkaSpec.scala
+++
b/testkit/src/main/scala/org/apache/pekko/kafka/testkit/scaladsl/KafkaSpec.scala
@@ -36,6 +36,7 @@ import org.apache.kafka.clients.producer.{ Producer =>
KProducer, ProducerRecord
import org.apache.kafka.common.ConsumerGroupState
import org.slf4j.{ Logger, LoggerFactory }
+import scala.annotation.nowarn
import scala.collection.immutable
import scala.concurrent.duration._
import scala.concurrent.{ Await, ExecutionContext, Future }
@@ -119,6 +120,7 @@ abstract class KafkaSpec(_kafkaPort: Int, val
zooKeeperPort: Int, actorSystem: A
*
* If the predicate does not hold after configured amount of time, throws an
exception.
*/
+ @nowarn("cat=deprecation")
def waitUntilConsumerSummary(groupId: String)(predicate:
PartialFunction[List[MemberDescription], Boolean]): Unit =
waitUntilConsumerGroup(groupId) { group =>
group.state() == ConsumerGroupState.STABLE &&
diff --git
a/tests/src/test/scala/docs/scaladsl/SchemaRegistrySerializationSpec.scala
b/tests/src/test/scala/docs/scaladsl/SchemaRegistrySerializationSpec.scala
index e43ea558..5f643fb1 100644
--- a/tests/src/test/scala/docs/scaladsl/SchemaRegistrySerializationSpec.scala
+++ b/tests/src/test/scala/docs/scaladsl/SchemaRegistrySerializationSpec.scala
@@ -35,7 +35,7 @@ import scala.collection.immutable
import scala.concurrent.duration._
import scala.jdk.CollectionConverters._
// #imports
-import io.confluent.kafka.serializers.{ AbstractKafkaAvroSerDeConfig,
KafkaAvroDeserializer, KafkaAvroSerializer }
+import io.confluent.kafka.serializers.{ AbstractKafkaSchemaSerDeConfig,
KafkaAvroDeserializer, KafkaAvroSerializer }
import org.apache.avro.specific.SpecificRecord
// #imports
import org.apache.kafka.clients.consumer.ConsumerConfig
@@ -59,7 +59,7 @@ class SchemaRegistrySerializationSpec extends DocsSpecBase
with TestcontainersKa
// #serializer #de-serializer
val kafkaAvroSerDeConfig = Map[String, Any](
- AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG ->
schemaRegistryUrl,
+ AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG ->
schemaRegistryUrl,
KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG -> true.toString)
// #serializer #de-serializer
@@ -217,7 +217,7 @@ class SchemaRegistrySerializationSpec extends DocsSpecBase
with TestcontainersKa
private def specificRecordConsumerSettings(group: String):
ConsumerSettings[String, SpecificRecord] = {
val kafkaAvroSerDeConfig = Map[String, Any] {
- AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG ->
schemaRegistryUrl
+ AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG ->
schemaRegistryUrl
}
val kafkaAvroDeserializer = new KafkaAvroDeserializer()
kafkaAvroDeserializer.configure(kafkaAvroSerDeConfig.asJava, false)
diff --git
a/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingProducerSinkSpec.scala
b/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingProducerSinkSpec.scala
index f0d54d35..8ba35d38 100644
---
a/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingProducerSinkSpec.scala
+++
b/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingProducerSinkSpec.scala
@@ -81,7 +81,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
consumer.message(partition, "value 1"),
consumer.message(partition, "value 2"))
- val producer = new MockProducer[String, String](true, new
StringSerializer, new StringSerializer)
+ val producer = new MockProducer[String, String](true, None.orNull, new
StringSerializer, new StringSerializer)
val producerSettings = ProducerSettings(system, new StringSerializer, new
StringSerializer)
.withProducer(producer)
val commitInterval = 200.millis
@@ -117,7 +117,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
consumer.message(partition, "skip"),
consumer.message(partition, "send"))
- val producer = new MockProducer[String, String](true, new
StringSerializer, new StringSerializer)
+ val producer = new MockProducer[String, String](true, None.orNull, new
StringSerializer, new StringSerializer)
val producerSettings = ProducerSettings(system, new StringSerializer, new
StringSerializer)
.withProducer(producer)
val commitInterval = 200.millis
@@ -157,7 +157,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
consumer.message(partition, "value 1"),
consumer.message(partition, "value 2"))
- val producer = new MockProducer[String, String](true, new
StringSerializer, new StringSerializer)
+ val producer = new MockProducer[String, String](true, None.orNull, new
StringSerializer, new StringSerializer)
val producerSettings = ProducerSettings(system, new StringSerializer, new
StringSerializer)
.withProducer(producer)
val committerSettings =
CommitterSettings(system).withMaxBatch(2L).withMaxInterval(10.seconds)
@@ -191,7 +191,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
consumer.message(partition, "value 1"),
consumer.message(partition, "value 2"))
- val producer = new MockProducer[String, String](true, new
StringSerializer, new StringSerializer)
+ val producer = new MockProducer[String, String](true, None.orNull, new
StringSerializer, new StringSerializer)
val producerSettings = ProducerSettings(system, new StringSerializer, new
StringSerializer)
.withProducer(producer)
val committerSettings =
CommitterSettings(system).withMaxBatch(2L).withMaxInterval(10.seconds)
@@ -226,7 +226,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
val producerRecordsPerInput = 2
val totalProducerRecords = elements.size * producerRecordsPerInput
- val producer = new MockProducer[String, String](true, new
StringSerializer, new StringSerializer)
+ val producer = new MockProducer[String, String](true, None.orNull, new
StringSerializer, new StringSerializer)
val producerSettings = ProducerSettings(system, new StringSerializer, new
StringSerializer)
.withProducer(producer)
val committerSettings =
CommitterSettings(system).withMaxBatch(elements.size.longValue())
@@ -259,7 +259,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
val consumer = FakeConsumer(groupId, topic, startOffset = 1616L)
val message = consumer.message(partition, "increment the offset")
- val producer = new MockProducer[String, String](true, new
StringSerializer, new StringSerializer)
+ val producer = new MockProducer[String, String](true, None.orNull, new
StringSerializer, new StringSerializer)
val producerSettings = ProducerSettings(system, new StringSerializer, new
StringSerializer)
.withProducer(producer)
val committerSettings = CommitterSettings(system).withMaxBatch(1)
@@ -292,7 +292,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
consumer.message(partition, "value 1"),
consumer.message(partition, "value 2"))
- val producer = new MockProducer[String, String](true, new
StringSerializer, new StringSerializer)
+ val producer = new MockProducer[String, String](true, None.orNull, new
StringSerializer, new StringSerializer)
val producerSettings = ProducerSettings(system, new StringSerializer, new
StringSerializer)
.withProducer(producer)
val committerSettings = CommitterSettings(system)
@@ -328,7 +328,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
consumer.message(partition, "value 1"),
consumer.message(partition, "value 2"))
- val producer = new MockProducer[String, String](true, new
StringSerializer, new StringSerializer)
+ val producer = new MockProducer[String, String](true, None.orNull, new
StringSerializer, new StringSerializer)
val producerSettings = ProducerSettings(system, new StringSerializer, new
StringSerializer)
.withProducer(producer)
// choose a large commit interval so that completion happens before
@@ -364,7 +364,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
consumer.message(partition, "value 1"),
consumer.message(partition, "value 2"))
- val producer = new MockProducer[String, String](true, new
StringSerializer, new StringSerializer)
+ val producer = new MockProducer[String, String](true, None.orNull, new
StringSerializer, new StringSerializer)
val producerSettings = ProducerSettings(system, new StringSerializer, new
StringSerializer)
.withProducer(producer)
val commitInterval = 5.seconds
@@ -404,7 +404,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
consumer.message(partition, "value 1"),
consumer.message(partition, "value 2"))
- val producer = new MockProducer[String, String](true, new
StringSerializer, new StringSerializer)
+ val producer = new MockProducer[String, String](true, None.orNull, new
StringSerializer, new StringSerializer)
val producerSettings = ProducerSettings(system, new StringSerializer, new
StringSerializer)
.withProducer(producer)
val commitInterval = 5.seconds
@@ -441,7 +441,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
consumer.message(partition, "value 2"))
// this producer does not auto complete messages
- val producer = new MockProducer[String, String](false, new
StringSerializer, new StringSerializer)
+ val producer = new MockProducer[String, String](false, None.orNull, new
StringSerializer, new StringSerializer)
val producerSettings = ProducerSettings(system, new StringSerializer, new
StringSerializer)
.withProducer(producer)
val committerSettings = CommitterSettings(system).withMaxBatch(1L)
@@ -480,7 +480,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
consumer.message(partition, "value 1"),
consumer.message(partition, "value 2"))
- val producer = new MockProducer[String, String](false, new
StringSerializer, new StringSerializer)
+ val producer = new MockProducer[String, String](false, None.orNull, new
StringSerializer, new StringSerializer)
val producerSettings = ProducerSettings(system, new StringSerializer, new
StringSerializer)
.withProducer(producer)
val committerSettings = CommitterSettings(system).withMaxBatch(1L)
@@ -523,7 +523,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
consumer.message(partition, "value 1"),
consumer.message(partition, "value 2"))
- val producer = new MockProducer[String, String](false, new
StringSerializer, new StringSerializer)
+ val producer = new MockProducer[String, String](false, None.orNull, new
StringSerializer, new StringSerializer)
val producerSettings = ProducerSettings(system, new StringSerializer, new
StringSerializer)
.withProducer(producer)
// choose a large commit interval so that completion happens before
@@ -569,7 +569,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
consumer.message(partition, "value 1"),
consumer.message(partition, "value 2"))
- val producer = new MockProducer[String, String](true, new
StringSerializer, new StringSerializer)
+ val producer = new MockProducer[String, String](true, None.orNull, new
StringSerializer, new StringSerializer)
val producerSettings = ProducerSettings(system, new StringSerializer, new
StringSerializer)
.withProducer(producer)
val committerSettings = CommitterSettings(system).withMaxBatch(2L)
@@ -602,7 +602,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
consumer.message(partition, "value 1"),
consumer.message(partition, "value 2"))
- val producer = new MockProducer[String, String](true, new
StringSerializer, new StringSerializer)
+ val producer = new MockProducer[String, String](true, None.orNull, new
StringSerializer, new StringSerializer)
val producerSettings = ProducerSettings(system, new StringSerializer, new
StringSerializer)
.withProducer(producer)
val committerSettings = CommitterSettings(system).withMaxBatch(2L)
@@ -640,7 +640,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
consumer.message(partition, "value 1"),
consumer.message(partition, "value 2"))
- val producer = new MockProducer[String, String](true, new
StringSerializer, new StringSerializer)
+ val producer = new MockProducer[String, String](true, None.orNull, new
StringSerializer, new StringSerializer)
val producerSettings = ProducerSettings(system, new StringSerializer, new
StringSerializer)
.withProducer(producer)
val committerSettings = CommitterSettings(system)
@@ -674,7 +674,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
}
it should "shut down without elements" in assertAllStagesStopped {
- val producer = new MockProducer[String, String](true, new
StringSerializer, new StringSerializer)
+ val producer = new MockProducer[String, String](true, None.orNull, new
StringSerializer, new StringSerializer)
val producerSettings = ProducerSettings(system, new StringSerializer, new
StringSerializer)
.withProducer(producer)
val committerSettings = CommitterSettings(system).withMaxInterval(1.second)
diff --git
a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerDummy.scala
b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerDummy.scala
index 93073673..9d38fa7f 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerDummy.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerDummy.scala
@@ -21,6 +21,7 @@ import java.util.concurrent.atomic.AtomicInteger
import org.apache.pekko.Done
import org.apache.kafka.clients.consumer._
import org.apache.kafka.common.{ Metric, MetricName, PartitionInfo,
TopicPartition }
+import org.apache.kafka.common.metrics.KafkaMetric
import org.slf4j.{ Logger, LoggerFactory }
import scala.concurrent.Promise
@@ -50,7 +51,6 @@ abstract class ConsumerDummy[K, V] extends Consumer[K, V] {
override def subscribe(pattern: java.util.regex.Pattern, callback:
ConsumerRebalanceListener): Unit = ???
override def subscribe(pattern: java.util.regex.Pattern): Unit = ???
override def unsubscribe(): Unit = ???
- override def poll(timeout: Long): ConsumerRecords[K, V] = ???
override def commitSync(): Unit = ???
override def commitSync(offsets: java.util.Map[TopicPartition,
OffsetAndMetadata]): Unit = ???
override def commitAsync(): Unit = ???
@@ -63,7 +63,6 @@ abstract class ConsumerDummy[K, V] extends Consumer[K, V] {
override def seekToEnd(partitions: java.util.Collection[TopicPartition]):
Unit = ???
override def position(partition: TopicPartition): Long = ???
override def position(partition: TopicPartition, timeout:
java.time.Duration): Long = ???
- override def committed(partition: TopicPartition): OffsetAndMetadata = ???
override def metrics(): java.util.Map[MetricName, _ <: Metric] = ???
override def partitionsFor(topic: String): java.util.List[PartitionInfo] =
???
override def listTopics(): java.util.Map[String,
java.util.List[PartitionInfo]] = ???
@@ -81,13 +80,13 @@ abstract class ConsumerDummy[K, V] extends Consumer[K, V] {
override def endOffsets(
partitions: java.util.Collection[TopicPartition]):
java.util.Map[TopicPartition, java.lang.Long] = ???
override def close(): Unit = {}
+ override def close(options: CloseOptions): Unit = {}
override def close(timeout: java.time.Duration): Unit = {}
override def wakeup(): Unit = ???
override def commitSync(timeout: java.time.Duration): Unit = ???
override def commitSync(offsets: java.util.Map[TopicPartition,
OffsetAndMetadata],
timeout: java.time.Duration): Unit = ???
- override def committed(partition: TopicPartition, timeout:
java.time.Duration): OffsetAndMetadata = ???
override def committed(partitions: util.Set[TopicPartition]):
util.Map[TopicPartition, OffsetAndMetadata] = ???
override def committed(partitions: util.Set[TopicPartition],
timeout: Duration): util.Map[TopicPartition, OffsetAndMetadata] = ???
@@ -102,4 +101,11 @@ abstract class ConsumerDummy[K, V] extends Consumer[K, V] {
override def groupMetadata(): ConsumerGroupMetadata = ???
override def enforceRebalance(): Unit = ???
override def currentLag(partition: TopicPartition): java.util.OptionalLong =
???
+
+ override def subscribe(sp: SubscriptionPattern): Unit = ???
+ override def subscribe(sp: SubscriptionPattern, listener:
ConsumerRebalanceListener): Unit = ???
+
+ override def registerMetricForSubscription(metric: KafkaMetric): Unit = ???
+ override def unregisterMetricFromSubscription(metric: KafkaMetric): Unit =
???
+
}
diff --git
a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerMock.scala
b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerMock.scala
index e922a8cd..05fa441e 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerMock.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerMock.scala
@@ -26,6 +26,7 @@ import org.mockito.stubbing.Answer
import org.mockito.verification.VerificationMode
import org.mockito.{ ArgumentMatchers, Mockito }
+import scala.annotation.nowarn
import scala.collection.immutable.Seq
import scala.concurrent.duration._
import scala.jdk.CollectionConverters._
@@ -113,7 +114,7 @@ class ConsumerMock[K, V](handler:
ConsumerMock.CommitHandler = new ConsumerMock.
if (releaseCommitCallbacks.get()) {
handler.onComplete()
}
- new ConsumerRecords[K, V](records.asJava)
+ new ConsumerRecords[K, V](records.asJava,
java.util.Collections.emptyMap())
}
})
Mockito
@@ -168,6 +169,7 @@ class ConsumerMock[K, V](handler:
ConsumerMock.CommitHandler = new ConsumerMock.
responses :+= records
}
+ @nowarn("msg=deprecated")
def verifyClosed(mode: VerificationMode = Mockito.times(1)) =
verify(mock, mode).close(ConsumerMock.closeTimeout.toJava)
@@ -207,7 +209,9 @@ class FailingConsumerMock[K, V](throwable: Throwable,
failOnCallNumber: Int*) ex
callNumber = callNumber + 1
if (failOnCallNumber.contains(callNumber))
throw throwable
- else new ConsumerRecords[K, V](Map.empty[TopicPartition,
java.util.List[ConsumerRecord[K, V]]].asJava)
+ else new ConsumerRecords[K, V](
+ Map.empty[TopicPartition, java.util.List[ConsumerRecord[K,
V]]].asJava,
+ java.util.Collections.emptyMap())
}
})
}
diff --git
a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerProgressTrackingSpec.scala
b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerProgressTrackingSpec.scala
index eca227ad..48a68b92 100644
---
a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerProgressTrackingSpec.scala
+++
b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerProgressTrackingSpec.scala
@@ -30,7 +30,7 @@ class ConsumerProgressTrackingSpec extends AnyFlatSpecLike
with Matchers with Lo
private val tp = new TopicPartition("t", 0)
private val m1 = new ConsumerRecord[String, String](tp.topic(),
tp.partition(), 10L, "k1", "kv")
def asConsumerRecords[K, V](tp: TopicPartition, records: ConsumerRecord[K,
V]*): ConsumerRecords[K, V] = {
- new ConsumerRecords[K, V](Map(tp -> records.asJava).asJava)
+ new ConsumerRecords[K, V](Map(tp -> records.asJava).asJava,
java.util.Collections.emptyMap())
}
private val records = asConsumerRecords(tp, m1)
@@ -86,7 +86,7 @@ class ConsumerProgressTrackingSpec extends AnyFlatSpecLike
with Matchers with Lo
new ConsumerRecords[String, String](
Map(
tp2 -> List(new ConsumerRecord[String, String](tp2.topic(),
tp2.partition(), 10L, "k1",
- "kv")).asJava).asJava))
+ "kv")).asJava).asJava, java.util.Collections.emptyMap()))
tracker.receivedMessages.map(extractOffsetFromSafe) should be(Map(tp ->
10L))
// no change to the committing
tracker.commitRequested.map(extractOffset) should be(Map(tp -> 0L))
diff --git
a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerResetProtectionSpec.scala
b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerResetProtectionSpec.scala
index 8da09266..0cc90fd2 100644
---
a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerResetProtectionSpec.scala
+++
b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerResetProtectionSpec.scala
@@ -51,7 +51,7 @@ class ConsumerResetProtectionSpec
val m1 = new ConsumerRecord(tp.topic(), tp.partition(), 10L, "k1", "kv")
def asConsumerRecords[K, V](records: ConsumerRecord[K, V]*):
ConsumerRecords[K, V] = {
- new ConsumerRecords[K, V](Map(tp -> records.asJava).asJava)
+ new ConsumerRecords[K, V](Map(tp -> records.asJava).asJava,
java.util.Collections.emptyMap())
}
val records = asConsumerRecords(m1)
@@ -147,7 +147,8 @@ class ConsumerResetProtectionSpec
new ConsumerRecords(
Map(
tp -> List(m1).asJava,
- tp1 -> List(new ConsumerRecord(tp1.topic(), tp1.partition(),
10L, "k1", "kv")).asJava).asJava))
+ tp1 -> List(new ConsumerRecord(tp1.topic(), tp1.partition(),
10L, "k1", "kv")).asJava).asJava,
+ java.util.Collections.emptyMap()))
shouldHaveEqualRecords(records, protectedRecords)
}
@@ -169,7 +170,8 @@ class ConsumerResetProtectionSpec
tp -> List(
new ConsumerRecord(tp.topic(), tp.partition(), 101L, "k1", "kv"),
new ConsumerRecord(tp.topic(), tp.partition(), 1L, "k2", "kv"),
- new ConsumerRecord(tp.topic(), tp.partition(), 102L, "k1",
"kv")).asJava).asJava))
+ new ConsumerRecord(tp.topic(), tp.partition(), 102L, "k1",
"kv")).asJava).asJava,
+ java.util.Collections.emptyMap()))
records.count() should be(3)
records.records(tp).asScala.map(_.offset()) should be(Seq(101L, 1L,
102L))
}
diff --git
a/tests/src/test/scala/org/apache/pekko/kafka/internal/PartitionedSourceSpec.scala
b/tests/src/test/scala/org/apache/pekko/kafka/internal/PartitionedSourceSpec.scala
index 36158400..24d014f9 100644
---
a/tests/src/test/scala/org/apache/pekko/kafka/internal/PartitionedSourceSpec.scala
+++
b/tests/src/test/scala/org/apache/pekko/kafka/internal/PartitionedSourceSpec.scala
@@ -815,7 +815,7 @@ object PartitionedSourceSpec {
if (data2.nonEmpty) {
log.debug(s"poll result $data2")
}
- new ConsumerRecords[K, V](data2.asJava)
+ new ConsumerRecords[K, V](data2.asJava, java.util.Collections.emptyMap())
}
override def position(partition: TopicPartition): Long = 0
override def position(partition: TopicPartition, timeout:
java.time.Duration): Long = 0
diff --git
a/tests/src/test/scala/org/apache/pekko/kafka/internal/ProducerSpec.scala
b/tests/src/test/scala/org/apache/pekko/kafka/internal/ProducerSpec.scala
index d6a0ec04..12e93360 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/internal/ProducerSpec.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/internal/ProducerSpec.scala
@@ -158,7 +158,7 @@ class ProducerSpec(_system: ActorSystem)
assertAllStagesStopped {
val input = (1 to 10).map { recordAndMetadata(_)._1 }
- val mockProducer = new MockProducer[String, String](true, new
StringSerializer, new StringSerializer)
+ val mockProducer = new MockProducer[String, String](true, None.orNull,
new StringSerializer, new StringSerializer)
val fut: Future[Done] =
Source(input).runWith(Producer.plainSink(settings.withProducer(mockProducer)))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]