yashmayya commented on code in PR #14153:
URL: https://github.com/apache/kafka/pull/14153#discussion_r1287038386


##########
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java:
##########
@@ -103,7 +103,7 @@ public class KafkaBasedLog<K, V> {
     private Optional<Producer<K, V>> producer;
     private TopicAdmin admin;
 
-    private Thread thread;
+    Thread thread;

Review Comment:
   ```suggestion
       // Visible for testing
       Thread thread;
   ```
   nit



##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java:
##########
@@ -114,29 +113,49 @@ public class KafkaBasedLogTest {
     private static final String TP0_VALUE_NEW = "VAL0_NEW";
     private static final String TP1_VALUE_NEW = "VAL1_NEW";
 
-    private Time time = new MockTime();
-    private KafkaBasedLog<String, String> store;
+    private final Time time = new MockTime();
+    private MockedKafkaBasedLog store;
 
     @Mock
-    private Runnable initializer;
+    private Consumer<TopicAdmin> initializer;
     @Mock
     private KafkaProducer<String, String> producer;
-    private MockConsumer<String, String> consumer;
-    @Mock

Review Comment:
   I'm curious, why was this change required - 
https://github.com/apache/kafka/pull/14153/commits/2c7d2e380d02d0f7dc77944ca5775ddaa3540457?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java:
##########
@@ -114,29 +113,49 @@ public class KafkaBasedLogTest {
     private static final String TP0_VALUE_NEW = "VAL0_NEW";
     private static final String TP1_VALUE_NEW = "VAL1_NEW";
 
-    private Time time = new MockTime();
-    private KafkaBasedLog<String, String> store;
+    private final Time time = new MockTime();
+    private MockedKafkaBasedLog store;
 
     @Mock
-    private Runnable initializer;
+    private Consumer<TopicAdmin> initializer;
     @Mock
     private KafkaProducer<String, String> producer;
-    private MockConsumer<String, String> consumer;
-    @Mock
     private TopicAdmin admin;
+    private final Supplier<TopicAdmin> topicAdminSupplier = () -> admin;
+    private MockConsumer<String, String> consumer;
 
-    private Map<TopicPartition, List<ConsumerRecord<String, String>>> 
consumedRecords = new HashMap<>();
-    private Callback<ConsumerRecord<String, String>> consumedCallback = 
(error, record) -> {
+    private final Map<TopicPartition, List<ConsumerRecord<String, String>>> 
consumedRecords = new HashMap<>();
+    private final Callback<ConsumerRecord<String, String>> consumedCallback = 
(error, record) -> {
         TopicPartition partition = new TopicPartition(record.topic(), 
record.partition());
         List<ConsumerRecord<String, String>> records = 
consumedRecords.computeIfAbsent(partition, k -> new ArrayList<>());
         records.add(record);
     };
 
-    @SuppressWarnings("unchecked")
+    private class MockedKafkaBasedLog extends KafkaBasedLog<String, String> {
+        public MockedKafkaBasedLog(String topic,
+                                   Map<String, Object> producerConfigs,
+                                   Map<String, Object> consumerConfigs,
+                                   Supplier<TopicAdmin> topicAdminSupplier,
+                                   Callback<ConsumerRecord<String, String>> 
consumedCallback,
+                                   Time time,
+                                   Consumer<TopicAdmin> initializer) {
+            super(topic, producerConfigs, consumerConfigs, topicAdminSupplier, 
consumedCallback, time, initializer);
+        }
+
+        @Override
+        protected KafkaProducer<String, String> createProducer() {
+            return producer;
+        }
+
+        @Override
+        protected MockConsumer<String, String> createConsumer() {
+            return consumer;
+        }
+    }

Review Comment:
   Can we use an anonymous inner class in the `setUp` method instead? I think 
that'll look a lot cleaner.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java:
##########
@@ -376,22 +358,11 @@ public void testPollConsumerError() throws Exception {
 
         store.stop();
 
-        assertFalse(Whitebox.<Thread>getInternalState(store, 
"thread").isAlive());
-        assertTrue(consumer.closed());
-        PowerMock.verifyAll();
+        verifyStartAndStop();
     }
 
     @Test
     public void testGetOffsetsConsumerErrorOnReadToEnd() throws Exception {
-        expectStart();
-
-        // Producer flushes when read to log end is called

Review Comment:
   Same comment as above 
(https://github.com/apache/kafka/pull/14153/files#r1286903819)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to