cadonna commented on code in PR #13931:
URL: https://github.com/apache/kafka/pull/13931#discussion_r1489430066


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java:
##########
@@ -138,11 +150,30 @@ public class RecordCollectorTest {
     private StreamsProducer streamsProducer;
     private ProcessorTopology topology;
     private final InternalProcessorContext<Void, Void> context = new 
InternalMockProcessorContext<>();
-
     private RecordCollectorImpl collector;
+    final Sensor mockSensor = Mockito.mock(Sensor.class);

Review Comment:
   ```suggestion
       private final Sensor mockSensor = Mockito.mock(Sensor.class);
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java:
##########
@@ -1279,49 +1268,50 @@ public void 
shouldThrowStreamsExceptionOnSubsequentCloseIfFatalEvenWithContinueE
 
     @Test
     public void 
shouldNotThrowStreamsExceptionOnSubsequentCallIfASendFailsWithContinueExceptionHandler()
 {
-        final RecordCollector collector = new RecordCollectorImpl(
-            logContext,
-            taskId,
-            getExceptionalStreamsProducerOnSend(new Exception()),
-            new AlwaysContinueProductionExceptionHandler(),
-            streamsMetrics,
-            topology
-        );
+        try (final MockedStatic<TaskMetrics> taskMetrics = 
mockStatic(TaskMetrics.class)) {
+            final Sensor droppedRecordsSensor = Mockito.mock(Sensor.class);
+            when(TaskMetrics.droppedRecordsSensor(
+                    Mockito.anyString(),
+                    eq(taskId.toString()),
+                    eq(mockStreamsMetrics))
+            ).thenReturn(droppedRecordsSensor);
+
+            final RecordCollector collector = new RecordCollectorImpl(
+                logContext,
+                taskId,
+                getExceptionalStreamsProducerOnSend(new Exception()),
+                new AlwaysContinueProductionExceptionHandler(),
+                mockStreamsMetrics,
+                topology
+            );
 
-        try (final LogCaptureAppender logCaptureAppender =
-                 
LogCaptureAppender.createAndRegister(RecordCollectorImpl.class)) {
-            logCaptureAppender.setThreshold(Level.INFO);
+            try (final LogCaptureAppender logCaptureAppender =
+                     
LogCaptureAppender.createAndRegister(RecordCollectorImpl.class)) {
+                logCaptureAppender.setThreshold(Level.INFO);
 
-            collector.send(topic, "3", "0", null, null, stringSerializer, 
stringSerializer, null, null, streamPartitioner);
-            collector.flush();
+                collector.send(topic, "3", "0", null, null, stringSerializer, 
stringSerializer, null, null, streamPartitioner);
+                collector.flush();
 
-            final List<String> messages = logCaptureAppender.getMessages();
-            final StringBuilder errorMessage = new StringBuilder("Messages 
received:");
-            for (final String error : messages) {
-                errorMessage.append("\n - ").append(error);
+                final List<String> messages = logCaptureAppender.getMessages();
+                final StringBuilder errorMessage = new StringBuilder("Messages 
received:");
+                for (final String error : messages) {
+                    errorMessage.append("\n - ").append(error);
+                }
+                assertTrue(
+                    errorMessage.toString(),
+                    messages.get(messages.size() - 1)
+                            .endsWith("Exception handler choose to CONTINUE 
processing in spite of this error but written offsets would not be recorded.")
+                );
             }
-            assertTrue(
-                errorMessage.toString(),
-                messages.get(messages.size() - 1)
-                    .endsWith("Exception handler choose to CONTINUE processing 
in spite of this error but written offsets would not be recorded.")
-            );
-        }
 
-        final Metric metric = streamsMetrics.metrics().get(new MetricName(
-            "dropped-records-total",
-            "stream-task-metrics",
-            "The total number of dropped records",
-            mkMap(
-                mkEntry("thread-id", Thread.currentThread().getName()),
-                mkEntry("task-id", taskId.toString())
-            )
-        ));
-        assertEquals(1.0, metric.metricValue());
+            Mockito.verify(droppedRecordsSensor, Mockito.times(1)).record();
 
-        collector.send(topic, "3", "0", null, null, stringSerializer, 
stringSerializer, null, null, streamPartitioner);
-        collector.flush();
-        collector.closeClean();
+            collector.send(topic, "3", "0", null, null, stringSerializer, 
stringSerializer, null, null, streamPartitioner);
+            collector.flush();
+            collector.closeClean();
+        }
     }
+    

Review Comment:
   nit:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -1831,7 +1848,7 @@ public void 
shouldReturnOffsetsForRepartitionTopicsForPurging() {
             taskId,
             config,
             stateManager,
-            streamsMetrics,
+            this.mockStreamsMetrics,

Review Comment:
   nit: We do not use `this` if it is not needed:
    ```suggestion
               mockStreamsMetrics,
   ```
   Could you please adapt the remainder of this class accordingly?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java:
##########
@@ -1349,7 +1339,7 @@ public void abortTransaction() {
                 Time.SYSTEM
             ),
             productionExceptionHandler,
-            streamsMetrics,
+                mockStreamsMetrics,

Review Comment:
   Could please fix the indentation here and below?



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java:
##########
@@ -62,12 +62,16 @@ public abstract class AbstractKeyValueStoreTest {
 
     @Before
     public void before() {
-        driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
+        driver = this.createKeyValueStoreTestDriver();
         context = (InternalMockProcessorContext) driver.context();
         context.setTime(10);
         store = createKeyValueStore(context);
     }
 
+    protected KeyValueStoreTestDriver<Integer, String> 
createKeyValueStoreTestDriver() {
+        return KeyValueStoreTestDriver.create(Integer.class, String.class);
+    }

Review Comment:
   Why do you not use the following: 
   ```
       abstract protected KeyValueStoreTestDriver<Integer, String> 
createKeyValueStoreTestDriver();
   ```
   
   Isn't this method overridden in all subclasses?



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java:
##########
@@ -83,12 +90,26 @@ public void setUp() {
         cacheFlushListener = new CacheFlushListenerStub<>(new 
StringDeserializer(), new StringDeserializer());
         store = new CachingKeyValueStore(underlyingStore, false);
         store.setFlushListener(cacheFlushListener, false);
-        cache = new ThreadCache(new LogContext("testCache "), 
maxCacheSizeBytes, new MockStreamsMetrics(new Metrics()));
+        cache = new ThreadCache(new LogContext("testCache "), 
maxCacheSizeBytes, this.mockStreamsMetrics);
         context = new InternalMockProcessorContext<>(null, null, null, null, 
cache);
         context.setRecordContext(new ProcessorRecordContext(10, 0, 0, TOPIC, 
new RecordHeaders()));
         store.init((StateStoreContext) context, null);
     }
 
+    @Override
+    protected KeyValueStoreTestDriver<Integer, String> 
createKeyValueStoreTestDriver() {

Review Comment:
   I am wondering if it might not be better to specify the mock and all its 
stubs for all sensor levels once in `AbstractKeyValueStoreTest`. That would 
avoid code duplication. What do you think?  



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java:
##########
@@ -62,12 +62,16 @@ public abstract class AbstractKeyValueStoreTest {
 
     @Before
     public void before() {
-        driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
+        driver = this.createKeyValueStoreTestDriver();

Review Comment:
   ```suggestion
           driver = createKeyValueStoreTestDriver();
   ```



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java:
##########
@@ -115,7 +118,7 @@ public void setup() {
             new ThreadCache(
                 new LogContext("testCache"),
                 0,
-                new MockStreamsMetrics(new Metrics())));
+                this.mockStreamsMetrics));

Review Comment:
   ```suggestion
                   mockStreamsMetrics));
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java:
##########
@@ -1497,7 +1487,7 @@ private RecordCollector newRecordCollector(final 
ProductionExceptionHandler prod
             taskId,
             streamsProducer,
             productionExceptionHandler,
-            streamsMetrics,
+                mockStreamsMetrics,

Review Comment:
   Could please fix the indentation here and below?



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java:
##########
@@ -83,12 +90,26 @@ public void setUp() {
         cacheFlushListener = new CacheFlushListenerStub<>(new 
StringDeserializer(), new StringDeserializer());
         store = new CachingKeyValueStore(underlyingStore, false);
         store.setFlushListener(cacheFlushListener, false);
-        cache = new ThreadCache(new LogContext("testCache "), 
maxCacheSizeBytes, new MockStreamsMetrics(new Metrics()));
+        cache = new ThreadCache(new LogContext("testCache "), 
maxCacheSizeBytes, this.mockStreamsMetrics);

Review Comment:
   ```suggestion
           cache = new ThreadCache(new LogContext("testCache "), 
maxCacheSizeBytes, mockStreamsMetrics);
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java:
##########
@@ -853,7 +842,7 @@ public void shouldNotAbortTxOnCloseCleanIfEosEnabled() {
             taskId,
             streamsProducer,
             productionExceptionHandler,
-            streamsMetrics,
+                mockStreamsMetrics,

Review Comment:
   Could please fix the indentation here and below?



##########
streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java:
##########
@@ -194,6 +200,13 @@ public static <K, V> KeyValueStoreTestDriver<K, V> 
create(final Serializer<K> ke
     private final InternalMockProcessorContext context;
     private final StateSerdes<K, V> stateSerdes;
 
+    @Mock
+    private StreamsMetricsImpl mockStreamsMetrics;
+
+    public void setMockStreamsMetrics(final StreamsMetricsImpl 
mockStreamsMetrics) {
+        this.mockStreamsMetrics = mockStreamsMetrics;
+    }

Review Comment:
   This method does not seem to be used anywhere.



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java:
##########
@@ -109,8 +112,7 @@ public void setUp() {
             recordCollector,
             new ThreadCache(
                 new LogContext("testCache"),
-                0,
-                new MockStreamsMetrics(new Metrics())));
+                0, mockStreamsMetrics));

Review Comment:
   ```suggestion
                   0, 
                   mockStreamsMetrics));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to