This is an automated email from the ASF dual-hosted git repository.

frankvicky pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1b351ad6e25 MINOR: Remove unnecessary dependencies from 
coordinator-common (follow up to pr#20089) (#20194)
1b351ad6e25 is described below

commit 1b351ad6e25094d9b38cc8cbe53a7a8caeec99e9
Author: Logan Zhu <[email protected]>
AuthorDate: Sat Jul 19 19:08:33 2025 +0800

    MINOR: Remove unnecessary dependencies from coordinator-common (follow up 
to pr#20089) (#20194)
    
    This PR removes the dependencies on `core` and `scala-library` from the
    `coordinator-common` module, as a follow-up to
    https://github.com/apache/kafka/pull/20089.
    
    These dependencies have been removed from tests, and the previously
    added import-control relaxations have been reverted accordingly.
    
    Reviewers: TengYao Chi <[email protected]>, Ken Huang
    <[email protected]>
---
 build.gradle                                       |   1 -
 checkstyle/import-control-coordinator-common.xml   |   4 +-
 .../common/runtime/CoordinatorLoaderImplTest.java  | 111 +++++++--------------
 3 files changed, 38 insertions(+), 78 deletions(-)

diff --git a/build.gradle b/build.gradle
index 5bb893f4fa4..f68b130f2de 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1664,7 +1664,6 @@ project(':coordinator-common') {
 
     testImplementation project(':clients').sourceSets.test.output
     testImplementation project(':server-common').sourceSets.test.output
-    testImplementation project(':core')
     testImplementation libs.junitJupiter
     testImplementation libs.mockitoCore
     testImplementation testLog4j2Libs
diff --git a/checkstyle/import-control-coordinator-common.xml 
b/checkstyle/import-control-coordinator-common.xml
index 09589d87901..7841697cf89 100644
--- a/checkstyle/import-control-coordinator-common.xml
+++ b/checkstyle/import-control-coordinator-common.xml
@@ -27,6 +27,8 @@
     <allow pkg="org.slf4j" />
     <allow pkg="org.junit" />
     <allow pkg="org.mockito" />
+    <!-- no one depends on the server -->
+    <disallow pkg="kafka" />
 
     <!-- anyone can use public classes -->
     <allow pkg="org.apache.kafka.common.errors" exact-match="true" />
@@ -41,7 +43,6 @@
         <subpackage name="common">
             <subpackage name="runtime">
                 <allow pkg="com.yammer.metrics.core" />
-                <allow pkg="kafka.server" />
                 <allow pkg="org.apache.kafka.clients.consumer" />
                 <allow pkg="org.apache.kafka.common.annotation" />
                 <allow pkg="org.apache.kafka.common.compress" />
@@ -67,7 +68,6 @@
                 <allow pkg="org.apache.kafka.test" />
                 <allow pkg="org.apache.kafka.timeline" />
                 <allow pkg="org.HdrHistogram" />
-                <allow pkg="scala" />
             </subpackage>
         </subpackage>
     </subpackage>
diff --git 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImplTest.java
 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImplTest.java
index 8d741fa16e0..61c5d52c8a9 100644
--- 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImplTest.java
+++ 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImplTest.java
@@ -16,8 +16,6 @@
  */
 package org.apache.kafka.coordinator.common.runtime;
 
-import kafka.server.ReplicaManager;
-
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.compress.Compression;
 import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
@@ -51,8 +49,6 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 
-import scala.Option;
-
 import static org.apache.kafka.test.TestUtils.assertFutureThrows;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -65,10 +61,8 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
-import static scala.jdk.javaapi.OptionConverters.toJava;
-import static scala.jdk.javaapi.OptionConverters.toScala;
 
-@SuppressWarnings("unchecked")
+@SuppressWarnings({"unchecked", "resource"})
 @Timeout(60)
 class CoordinatorLoaderImplTest {
 
@@ -89,9 +83,8 @@ class CoordinatorLoaderImplTest {
     @Test
     void testNonexistentPartition() throws Exception {
         TopicPartition tp = new TopicPartition("foo", 0);
-        ReplicaManager replicaManager = mock(ReplicaManager.class);
-        Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = 
partition -> toJava(replicaManager.getLog(partition));
-        Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier 
= partition -> Optional.ofNullable((Long) 
replicaManager.getLogEndOffset(partition).get());
+        Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = 
partition -> Optional.empty();
+        Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier 
= partition -> Optional.empty();
         Deserializer<Tuple<String, String>> serde = mock(Deserializer.class);
         CoordinatorPlayback<Tuple<String, String>> coordinator = 
mock(CoordinatorPlayback.class);
 
@@ -102,8 +95,6 @@ class CoordinatorLoaderImplTest {
                 serde,
                 1000
         )) {
-            when(replicaManager.getLog(tp)).thenReturn(Option.empty());
-
             assertFutureThrows(NotLeaderOrFollowerException.class, 
loader.load(tp, coordinator));
         }
     }
@@ -111,9 +102,8 @@ class CoordinatorLoaderImplTest {
     @Test
     void testLoadingIsRejectedWhenClosed() throws Exception {
         TopicPartition tp = new TopicPartition("foo", 0);
-        ReplicaManager replicaManager = mock(ReplicaManager.class);
-        Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = 
partition -> toJava(replicaManager.getLog(partition));
-        Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier 
= partition -> Optional.ofNullable((Long) 
replicaManager.getLogEndOffset(partition).get());
+        Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = 
partition -> Optional.of(mock(UnifiedLog.class));
+        Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier 
= partition -> Optional.empty();
         Deserializer<Tuple<String, String>> serde = mock(Deserializer.class);
         CoordinatorPlayback<Tuple<String, String>> coordinator = 
mock(CoordinatorPlayback.class);
 
@@ -132,11 +122,10 @@ class CoordinatorLoaderImplTest {
     @Test
     void testLoading() throws Exception {
         TopicPartition tp = new TopicPartition("foo", 0);
-        ReplicaManager replicaManager = mock(ReplicaManager.class);
-        Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = 
partition -> toJava(replicaManager.getLog(partition));
-        Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier 
= partition -> Optional.ofNullable((Long) 
replicaManager.getLogEndOffset(partition).get());
-        Deserializer<Tuple<String, String>> serde = new 
StringKeyValueDeserializer();
         UnifiedLog log = mock(UnifiedLog.class);
+        Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = 
partition -> Optional.of(log);
+        Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier 
= partition -> Optional.of(9L);
+        Deserializer<Tuple<String, String>> serde = new 
StringKeyValueDeserializer();
         CoordinatorPlayback<Tuple<String, String>> coordinator = 
mock(CoordinatorPlayback.class);
 
         try (CoordinatorLoaderImpl<Tuple<String, String>> loader = new 
CoordinatorLoaderImpl<>(
@@ -146,9 +135,7 @@ class CoordinatorLoaderImplTest {
                 serde,
                 1000
         )) {
-            
when(replicaManager.getLog(tp)).thenReturn(toScala(Optional.of(log)));
             when(log.logStartOffset()).thenReturn(0L);
-            
when(replicaManager.getLogEndOffset(tp)).thenReturn(toScala(Optional.of(9L)));
             when(log.highWatermark()).thenReturn(0L);
 
             FetchDataInfo readResult1 = logReadResult(0, Arrays.asList(
@@ -218,11 +205,10 @@ class CoordinatorLoaderImplTest {
     @Test
     void testLoadingStoppedWhenClosed() throws Exception {
         TopicPartition tp = new TopicPartition("foo", 0);
-        ReplicaManager replicaManager = mock(ReplicaManager.class);
-        Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = 
partition -> toJava(replicaManager.getLog(partition));
-        Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier 
= partition -> Optional.ofNullable((Long) 
replicaManager.getLogEndOffset(partition).get());
-        Deserializer<Tuple<String, String>> serde = new 
StringKeyValueDeserializer();
         UnifiedLog log = mock(UnifiedLog.class);
+        Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = 
partition -> Optional.of(log);
+        Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier 
= partition -> Optional.of(100L);
+        Deserializer<Tuple<String, String>> serde = new 
StringKeyValueDeserializer();
         CoordinatorPlayback<Tuple<String, String>> coordinator = 
mock(CoordinatorPlayback.class);
 
         try (CoordinatorLoaderImpl<Tuple<String, String>> loader = new 
CoordinatorLoaderImpl<>(
@@ -232,9 +218,7 @@ class CoordinatorLoaderImplTest {
                 serde,
                 1000
         )) {
-            
when(replicaManager.getLog(tp)).thenReturn(toScala(Optional.of(log)));
             when(log.logStartOffset()).thenReturn(0L);
-            
when(replicaManager.getLogEndOffset(tp)).thenReturn(toScala(Optional.of(100L)));
 
             FetchDataInfo readResult = logReadResult(0, Arrays.asList(
                     new SimpleRecord("k1".getBytes(), "v1".getBytes()),
@@ -266,11 +250,10 @@ class CoordinatorLoaderImplTest {
     @Test
     void testUnknownRecordTypeAreIgnored() throws Exception {
         TopicPartition tp = new TopicPartition("foo", 0);
-        ReplicaManager replicaManager = mock(ReplicaManager.class);
-        Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = 
partition -> toJava(replicaManager.getLog(partition));
-        Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier 
= partition -> Optional.ofNullable((Long) 
replicaManager.getLogEndOffset(partition).get());
-        StringKeyValueDeserializer serde = 
mock(StringKeyValueDeserializer.class);
         UnifiedLog log = mock(UnifiedLog.class);
+        Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = 
partition -> Optional.of(log);
+        Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier 
= partition -> Optional.of(2L);
+        StringKeyValueDeserializer serde = 
mock(StringKeyValueDeserializer.class);
         CoordinatorPlayback<Tuple<String, String>> coordinator = 
mock(CoordinatorPlayback.class);
 
         try (CoordinatorLoaderImpl<Tuple<String, String>> loader = new 
CoordinatorLoaderImpl<>(
@@ -280,9 +263,7 @@ class CoordinatorLoaderImplTest {
                 serde,
                 1000
         )) {
-            
when(replicaManager.getLog(tp)).thenReturn(toScala(Optional.of(log)));
             when(log.logStartOffset()).thenReturn(0L);
-            
when(replicaManager.getLogEndOffset(tp)).thenReturn(toScala(Optional.of(2L)));
 
             FetchDataInfo readResult = logReadResult(0, Arrays.asList(
                     new SimpleRecord("k1".getBytes(), "v1".getBytes()),
@@ -305,11 +286,10 @@ class CoordinatorLoaderImplTest {
     @Test
     void testDeserializationErrorFailsTheLoading() throws Exception {
         TopicPartition tp = new TopicPartition("foo", 0);
-        ReplicaManager replicaManager = mock(ReplicaManager.class);
-        Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = 
partition -> toJava(replicaManager.getLog(partition));
-        Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier 
= partition -> Optional.ofNullable((Long) 
replicaManager.getLogEndOffset(partition).get());
-        StringKeyValueDeserializer serde = 
mock(StringKeyValueDeserializer.class);
         UnifiedLog log = mock(UnifiedLog.class);
+        Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = 
partition -> Optional.of(log);
+        Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier 
= partition -> Optional.of(2L);
+        StringKeyValueDeserializer serde = 
mock(StringKeyValueDeserializer.class);
         CoordinatorPlayback<Tuple<String, String>> coordinator = 
mock(CoordinatorPlayback.class);
 
         try (CoordinatorLoaderImpl<Tuple<String, String>> loader = new 
CoordinatorLoaderImpl<>(
@@ -319,9 +299,7 @@ class CoordinatorLoaderImplTest {
                 serde,
                 1000
         )) {
-            
when(replicaManager.getLog(tp)).thenReturn(toScala(Optional.of(log)));
             when(log.logStartOffset()).thenReturn(0L);
-            
when(replicaManager.getLogEndOffset(tp)).thenReturn(toScala(Optional.of(2L)));
 
             FetchDataInfo readResult = logReadResult(0, Arrays.asList(
                     new SimpleRecord("k1".getBytes(), "v1".getBytes()),
@@ -347,11 +325,10 @@ class CoordinatorLoaderImplTest {
         // when all the records are expired and the active segment is 
truncated or when the partition
         // is accidentally corrupted.
         TopicPartition tp = new TopicPartition("foo", 0);
-        ReplicaManager replicaManager = mock(ReplicaManager.class);
-        Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = 
partition -> toJava(replicaManager.getLog(partition));
-        Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier 
= partition -> Optional.ofNullable((Long) 
replicaManager.getLogEndOffset(partition).get());
-        StringKeyValueDeserializer serde = 
mock(StringKeyValueDeserializer.class);
         UnifiedLog log = mock(UnifiedLog.class);
+        Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = 
partition -> Optional.of(log);
+        Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier 
= partition -> Optional.of(10L);
+        StringKeyValueDeserializer serde = 
mock(StringKeyValueDeserializer.class);
         CoordinatorPlayback<Tuple<String, String>> coordinator = 
mock(CoordinatorPlayback.class);
 
         try (CoordinatorLoaderImpl<Tuple<String, String>> loader = new 
CoordinatorLoaderImpl<>(
@@ -361,9 +338,7 @@ class CoordinatorLoaderImplTest {
                 serde,
                 1000
         )) {
-            
when(replicaManager.getLog(tp)).thenReturn(toScala(Optional.of(log)));
             when(log.logStartOffset()).thenReturn(0L);
-            
when(replicaManager.getLogEndOffset(tp)).thenReturn(toScala(Optional.of(10L)));
 
             FetchDataInfo readResult = logReadResult(0, List.of());
 
@@ -377,11 +352,10 @@ class CoordinatorLoaderImplTest {
     @Test
     void testLoadSummary() throws Exception {
         TopicPartition tp = new TopicPartition("foo", 0);
-        ReplicaManager replicaManager = mock(ReplicaManager.class);
-        Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = 
partition -> toJava(replicaManager.getLog(partition));
-        Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier 
= partition -> Optional.ofNullable((Long) 
replicaManager.getLogEndOffset(partition).get());
-        StringKeyValueDeserializer serde = new StringKeyValueDeserializer();
         UnifiedLog log = mock(UnifiedLog.class);
+        Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = 
partition -> Optional.of(log);
+        Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier 
= partition -> Optional.of(5L);
+        StringKeyValueDeserializer serde = new StringKeyValueDeserializer();
         CoordinatorPlayback<Tuple<String, String>> coordinator = 
mock(CoordinatorPlayback.class);
         MockTime time = new MockTime();
 
@@ -393,9 +367,7 @@ class CoordinatorLoaderImplTest {
                 1000
         )) {
             long startTimeMs = time.milliseconds();
-            
when(replicaManager.getLog(tp)).thenReturn(toScala(Optional.of(log)));
             when(log.logStartOffset()).thenReturn(0L);
-            
when(replicaManager.getLogEndOffset(tp)).thenReturn(toScala(Optional.of(5L)));
 
             FetchDataInfo readResult1 = logReadResult(0, Arrays.asList(
                     new SimpleRecord("k1".getBytes(), "v1".getBytes()),
@@ -428,11 +400,10 @@ class CoordinatorLoaderImplTest {
     @Test
     void testUpdateLastWrittenOffsetOnBatchLoaded() throws Exception {
         TopicPartition tp = new TopicPartition("foo", 0);
-        ReplicaManager replicaManager = mock(ReplicaManager.class);
-        Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = 
partition -> toJava(replicaManager.getLog(partition));
-        Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier 
= partition -> Optional.ofNullable((Long) 
replicaManager.getLogEndOffset(partition).get());
-        StringKeyValueDeserializer serde = new StringKeyValueDeserializer();
         UnifiedLog log = mock(UnifiedLog.class);
+        Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = 
partition -> Optional.of(log);
+        Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier 
= partition -> Optional.of(7L);
+        StringKeyValueDeserializer serde = new StringKeyValueDeserializer();
         CoordinatorPlayback<Tuple<String, String>> coordinator = 
mock(CoordinatorPlayback.class);
 
         try (CoordinatorLoaderImpl<Tuple<String, String>> loader = new 
CoordinatorLoaderImpl<>(
@@ -442,10 +413,8 @@ class CoordinatorLoaderImplTest {
                 serde,
                 1000
         )) {
-            
when(replicaManager.getLog(tp)).thenReturn(toScala(Optional.of(log)));
             when(log.logStartOffset()).thenReturn(0L);
             when(log.highWatermark()).thenReturn(0L, 0L, 2L);
-            
when(replicaManager.getLogEndOffset(tp)).thenReturn(toScala(Optional.of(7L)));
 
             FetchDataInfo readResult1 = logReadResult(0, Arrays.asList(
                     new SimpleRecord("k1".getBytes(), "v1".getBytes()),
@@ -494,11 +463,10 @@ class CoordinatorLoaderImplTest {
     @Test
     void 
testUpdateLastWrittenOffsetAndUpdateLastCommittedOffsetNoRecordsRead() throws 
Exception {
         TopicPartition tp = new TopicPartition("foo", 0);
-        ReplicaManager replicaManager = mock(ReplicaManager.class);
-        Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = 
partition -> toJava(replicaManager.getLog(partition));
-        Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier 
= partition -> Optional.ofNullable((Long) 
replicaManager.getLogEndOffset(partition).get());
-        StringKeyValueDeserializer serde = new StringKeyValueDeserializer();
         UnifiedLog log = mock(UnifiedLog.class);
+        Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = 
partition -> Optional.of(log);
+        Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier 
= partition -> Optional.of(0L);
+        StringKeyValueDeserializer serde = new StringKeyValueDeserializer();
         CoordinatorPlayback<Tuple<String, String>> coordinator = 
mock(CoordinatorPlayback.class);
 
         try (CoordinatorLoaderImpl<Tuple<String, String>> loader = new 
CoordinatorLoaderImpl<>(
@@ -508,10 +476,8 @@ class CoordinatorLoaderImplTest {
                 serde,
                 1000
         )) {
-            
when(replicaManager.getLog(tp)).thenReturn(toScala(Optional.of(log)));
             when(log.logStartOffset()).thenReturn(0L);
             when(log.highWatermark()).thenReturn(0L);
-            
when(replicaManager.getLogEndOffset(tp)).thenReturn(toScala(Optional.of(0L)));
 
             assertNotNull(loader.load(tp, coordinator).get(10, 
TimeUnit.SECONDS));
 
@@ -523,11 +489,10 @@ class CoordinatorLoaderImplTest {
     @Test
     void testUpdateLastWrittenOffsetOnBatchLoadedWhileHighWatermarkAhead() 
throws Exception {
         TopicPartition tp = new TopicPartition("foo", 0);
-        ReplicaManager replicaManager = mock(ReplicaManager.class);
-        Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = 
partition -> toJava(replicaManager.getLog(partition));
-        Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier 
= partition -> Optional.ofNullable((Long) 
replicaManager.getLogEndOffset(partition).get());
-        StringKeyValueDeserializer serde = new StringKeyValueDeserializer();
         UnifiedLog log = mock(UnifiedLog.class);
+        Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = 
partition -> Optional.of(log);
+        Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier 
= partition -> Optional.of(7L);
+        StringKeyValueDeserializer serde = new StringKeyValueDeserializer();
         CoordinatorPlayback<Tuple<String, String>> coordinator = 
mock(CoordinatorPlayback.class);
 
         try (CoordinatorLoaderImpl<Tuple<String, String>> loader = new 
CoordinatorLoaderImpl<>(
@@ -537,10 +502,8 @@ class CoordinatorLoaderImplTest {
                 serde,
                 1000
         )) {
-            
when(replicaManager.getLog(tp)).thenReturn(toScala(Optional.of(log)));
             when(log.logStartOffset()).thenReturn(0L);
             when(log.highWatermark()).thenReturn(5L, 7L, 7L);
-            
when(replicaManager.getLogEndOffset(tp)).thenReturn(toScala(Optional.of(7L)));
 
             FetchDataInfo readResult1 = logReadResult(0, Arrays.asList(
                     new SimpleRecord("k1".getBytes(), "v1".getBytes()),
@@ -590,11 +553,10 @@ class CoordinatorLoaderImplTest {
     @Test
     void testPartitionGoesOfflineDuringLoad() throws Exception {
         TopicPartition tp = new TopicPartition("foo", 0);
-        ReplicaManager replicaManager = mock(ReplicaManager.class);
-        Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = 
partition -> toJava(replicaManager.getLog(partition));
-        Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier 
= partition -> Optional.ofNullable((Long) 
replicaManager.getLogEndOffset(partition).get());
-        StringKeyValueDeserializer serde = new StringKeyValueDeserializer();
         UnifiedLog log = mock(UnifiedLog.class);
+        Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = 
partition -> Optional.of(log);
+        Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier 
= mock(Function.class);
+        StringKeyValueDeserializer serde = new StringKeyValueDeserializer();
         CoordinatorPlayback<Tuple<String, String>> coordinator = 
mock(CoordinatorPlayback.class);
 
         try (CoordinatorLoaderImpl<Tuple<String, String>> loader = new 
CoordinatorLoaderImpl<>(
@@ -604,10 +566,9 @@ class CoordinatorLoaderImplTest {
                 serde,
                 1000
         )) {
-            
when(replicaManager.getLog(tp)).thenReturn(toScala(Optional.of(log)));
             when(log.logStartOffset()).thenReturn(0L);
             when(log.highWatermark()).thenReturn(0L);
-            
when(replicaManager.getLogEndOffset(tp)).thenReturn(toScala(Optional.of(5L))).thenReturn(toScala(Optional.of(-1L)));
+            
when(partitionLogEndOffsetSupplier.apply(tp)).thenReturn(Optional.of(5L)).thenReturn(Optional.of(-1L));
 
             FetchDataInfo readResult1 = logReadResult(0, Arrays.asList(
                     new SimpleRecord("k1".getBytes(), "v1".getBytes()),

Reply via email to