This is an automated email from the ASF dual-hosted git repository. engelen pushed a commit to branch update/kafka-clients-4.0.0 in repository https://gitbox.apache.org/repos/asf/pekko-connectors-kafka.git
commit 0fca3707ac0eba4d1adec2a3b61bf471cfbd3ad5 Author: PJ Fanning <[email protected]> AuthorDate: Wed Mar 26 22:42:44 2025 +0100 deprecation issues --- .../test/scala/org/apache/pekko/kafka/internal/ConsumerMock.scala | 6 ++++-- .../pekko/kafka/internal/ConsumerProgressTrackingSpec.scala | 4 ++-- .../apache/pekko/kafka/internal/ConsumerResetProtectionSpec.scala | 8 +++++--- .../org/apache/pekko/kafka/internal/PartitionedSourceSpec.scala | 2 +- 4 files changed, 12 insertions(+), 8 deletions(-) diff --git a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerMock.scala b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerMock.scala index b6fd6d83..c6c4569f 100644 --- a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerMock.scala +++ b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerMock.scala @@ -113,7 +113,7 @@ class ConsumerMock[K, V](handler: ConsumerMock.CommitHandler = new ConsumerMock. if (releaseCommitCallbacks.get()) { handler.onComplete() } - new ConsumerRecords[K, V](records.asJava) + new ConsumerRecords[K, V](records.asJava, java.util.Collections.emptyMap()) } }) Mockito @@ -207,7 +207,9 @@ class FailingConsumerMock[K, V](throwable: Throwable, failOnCallNumber: Int*) ex callNumber = callNumber + 1 if (failOnCallNumber.contains(callNumber)) throw throwable - else new ConsumerRecords[K, V](Map.empty[TopicPartition, java.util.List[ConsumerRecord[K, V]]].asJava) + else new ConsumerRecords[K, V]( + Map.empty[TopicPartition, java.util.List[ConsumerRecord[K, V]]].asJava, + java.util.Collections.emptyMap()) } }) } diff --git a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerProgressTrackingSpec.scala b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerProgressTrackingSpec.scala index 7bd3b16a..09e22f15 100644 --- a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerProgressTrackingSpec.scala +++ b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerProgressTrackingSpec.scala @@ -30,7 +30,7 @@ class ConsumerProgressTrackingSpec extends AnyFlatSpecLike with Matchers with Lo private val tp = new TopicPartition("t", 0) private val m1 = new ConsumerRecord[String, String](tp.topic(), tp.partition(), 10L, "k1", "kv") def asConsumerRecords[K, V](tp: TopicPartition, records: ConsumerRecord[K, V]*): ConsumerRecords[K, V] = { - new ConsumerRecords[K, V](Map(tp -> records.asJava).asJava) + new ConsumerRecords[K, V](Map(tp -> records.asJava).asJava, java.util.Collections.emptyMap()) } private val records = asConsumerRecords(tp, m1) @@ -86,7 +86,7 @@ class ConsumerProgressTrackingSpec extends AnyFlatSpecLike with Matchers with Lo new ConsumerRecords[String, String]( Map( tp2 -> List(new ConsumerRecord[String, String](tp2.topic(), tp2.partition(), 10L, "k1", - "kv")).asJava).asJava)) + "kv")).asJava).asJava, java.util.Collections.emptyMap())) tracker.receivedMessages.map(extractOffsetFromSafe) should be(Map(tp -> 10L)) // no change to the committing tracker.commitRequested.map(extractOffset) should be(Map(tp -> 0L)) diff --git a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerResetProtectionSpec.scala b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerResetProtectionSpec.scala index 6840827c..6dfe8d75 100644 --- a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerResetProtectionSpec.scala +++ b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerResetProtectionSpec.scala @@ -51,7 +51,7 @@ class ConsumerResetProtectionSpec val m1 = new ConsumerRecord(tp.topic(), tp.partition(), 10L, "k1", "kv") def asConsumerRecords[K, V](records: ConsumerRecord[K, V]*): ConsumerRecords[K, V] = { - new ConsumerRecords[K, V](Map(tp -> records.asJava).asJava) + new ConsumerRecords[K, V](Map(tp -> records.asJava).asJava, java.util.Collections.emptyMap()) } val records = asConsumerRecords(m1) @@ -147,7 +147,8 @@ class ConsumerResetProtectionSpec new ConsumerRecords( Map( tp -> List(m1).asJava, - tp1 -> List(new ConsumerRecord(tp1.topic(), tp1.partition(), 10L, "k1", "kv")).asJava).asJava)) + tp1 -> List(new ConsumerRecord(tp1.topic(), tp1.partition(), 10L, "k1", "kv")).asJava).asJava, + java.util.Collections.emptyMap())) shouldHaveEqualRecords(records, protectedRecords) } @@ -169,7 +170,8 @@ class ConsumerResetProtectionSpec tp -> List( new ConsumerRecord(tp.topic(), tp.partition(), 101L, "k1", "kv"), new ConsumerRecord(tp.topic(), tp.partition(), 1L, "k2", "kv"), - new ConsumerRecord(tp.topic(), tp.partition(), 102L, "k1", "kv")).asJava).asJava)) + new ConsumerRecord(tp.topic(), tp.partition(), 102L, "k1", "kv")).asJava).asJava, + java.util.Collections.emptyMap())) records.count() should be(3) records.records(tp).asScala.map(_.offset()) should be(Seq(101L, 1L, 102L)) } diff --git a/tests/src/test/scala/org/apache/pekko/kafka/internal/PartitionedSourceSpec.scala b/tests/src/test/scala/org/apache/pekko/kafka/internal/PartitionedSourceSpec.scala index 8d998210..947e3f42 100644 --- a/tests/src/test/scala/org/apache/pekko/kafka/internal/PartitionedSourceSpec.scala +++ b/tests/src/test/scala/org/apache/pekko/kafka/internal/PartitionedSourceSpec.scala @@ -815,7 +815,7 @@ object PartitionedSourceSpec { if (data2.nonEmpty) { log.debug(s"poll result $data2") } - new ConsumerRecords[K, V](data2.asJava) + new ConsumerRecords[K, V](data2.asJava, java.util.Collections.emptyMap()) } override def position(partition: TopicPartition): Long = 0 override def position(partition: TopicPartition, timeout: java.time.Duration): Long = 0 --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
