cadonna commented on code in PR #13931: URL: https://github.com/apache/kafka/pull/13931#discussion_r1489430066
########## streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java: ########## @@ -138,11 +150,30 @@ public class RecordCollectorTest { private StreamsProducer streamsProducer; private ProcessorTopology topology; private final InternalProcessorContext<Void, Void> context = new InternalMockProcessorContext<>(); - private RecordCollectorImpl collector; + final Sensor mockSensor = Mockito.mock(Sensor.class); Review Comment: ```suggestion private final Sensor mockSensor = Mockito.mock(Sensor.class); ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java: ########## @@ -1279,49 +1268,50 @@ public void shouldThrowStreamsExceptionOnSubsequentCloseIfFatalEvenWithContinueE @Test public void shouldNotThrowStreamsExceptionOnSubsequentCallIfASendFailsWithContinueExceptionHandler() { - final RecordCollector collector = new RecordCollectorImpl( - logContext, - taskId, - getExceptionalStreamsProducerOnSend(new Exception()), - new AlwaysContinueProductionExceptionHandler(), - streamsMetrics, - topology - ); + try (final MockedStatic<TaskMetrics> taskMetrics = mockStatic(TaskMetrics.class)) { + final Sensor droppedRecordsSensor = Mockito.mock(Sensor.class); + when(TaskMetrics.droppedRecordsSensor( + Mockito.anyString(), + eq(taskId.toString()), + eq(mockStreamsMetrics)) + ).thenReturn(droppedRecordsSensor); + + final RecordCollector collector = new RecordCollectorImpl( + logContext, + taskId, + getExceptionalStreamsProducerOnSend(new Exception()), + new AlwaysContinueProductionExceptionHandler(), + mockStreamsMetrics, + topology + ); - try (final LogCaptureAppender logCaptureAppender = - LogCaptureAppender.createAndRegister(RecordCollectorImpl.class)) { - logCaptureAppender.setThreshold(Level.INFO); + try (final LogCaptureAppender logCaptureAppender = + LogCaptureAppender.createAndRegister(RecordCollectorImpl.class)) { + logCaptureAppender.setThreshold(Level.INFO); - collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, null, null, streamPartitioner); - collector.flush(); + collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, null, null, streamPartitioner); + collector.flush(); - final List<String> messages = logCaptureAppender.getMessages(); - final StringBuilder errorMessage = new StringBuilder("Messages received:"); - for (final String error : messages) { - errorMessage.append("\n - ").append(error); + final List<String> messages = logCaptureAppender.getMessages(); + final StringBuilder errorMessage = new StringBuilder("Messages received:"); + for (final String error : messages) { + errorMessage.append("\n - ").append(error); + } + assertTrue( + errorMessage.toString(), + messages.get(messages.size() - 1) + .endsWith("Exception handler choose to CONTINUE processing in spite of this error but written offsets would not be recorded.") + ); } - assertTrue( - errorMessage.toString(), - messages.get(messages.size() - 1) - .endsWith("Exception handler choose to CONTINUE processing in spite of this error but written offsets would not be recorded.") - ); - } - final Metric metric = streamsMetrics.metrics().get(new MetricName( - "dropped-records-total", - "stream-task-metrics", - "The total number of dropped records", - mkMap( - mkEntry("thread-id", Thread.currentThread().getName()), - mkEntry("task-id", taskId.toString()) - ) - )); - assertEquals(1.0, metric.metricValue()); + Mockito.verify(droppedRecordsSensor, Mockito.times(1)).record(); - collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, null, null, streamPartitioner); - collector.flush(); - collector.closeClean(); + collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, null, null, streamPartitioner); + collector.flush(); + collector.closeClean(); + } } + Review Comment: nit: ```suggestion ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java: ########## @@ -1831,7 +1848,7 @@ public void shouldReturnOffsetsForRepartitionTopicsForPurging() { taskId, config, stateManager, - streamsMetrics, + this.mockStreamsMetrics, Review Comment: nit: We do not use `this` if it is not needed: ```suggestion mockStreamsMetrics, ``` Could you please adapt the remainder of this class accordingly? ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java: ########## @@ -1349,7 +1339,7 @@ public void abortTransaction() { Time.SYSTEM ), productionExceptionHandler, - streamsMetrics, + mockStreamsMetrics, Review Comment: Could please fix the indentation here and below? ########## streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java: ########## @@ -62,12 +62,16 @@ public abstract class AbstractKeyValueStoreTest { @Before public void before() { - driver = KeyValueStoreTestDriver.create(Integer.class, String.class); + driver = this.createKeyValueStoreTestDriver(); context = (InternalMockProcessorContext) driver.context(); context.setTime(10); store = createKeyValueStore(context); } + protected KeyValueStoreTestDriver<Integer, String> createKeyValueStoreTestDriver() { + return KeyValueStoreTestDriver.create(Integer.class, String.class); + } Review Comment: Why do you not use the following: ``` abstract protected KeyValueStoreTestDriver<Integer, String> createKeyValueStoreTestDriver(); ``` Isn't this method overridden in all subclasses? ########## streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java: ########## @@ -83,12 +90,26 @@ public void setUp() { cacheFlushListener = new CacheFlushListenerStub<>(new StringDeserializer(), new StringDeserializer()); store = new CachingKeyValueStore(underlyingStore, false); store.setFlushListener(cacheFlushListener, false); - cache = new ThreadCache(new LogContext("testCache "), maxCacheSizeBytes, new MockStreamsMetrics(new Metrics())); + cache = new ThreadCache(new LogContext("testCache "), maxCacheSizeBytes, this.mockStreamsMetrics); context = new InternalMockProcessorContext<>(null, null, null, null, cache); context.setRecordContext(new ProcessorRecordContext(10, 0, 0, TOPIC, new RecordHeaders())); store.init((StateStoreContext) context, null); } + @Override + protected KeyValueStoreTestDriver<Integer, String> createKeyValueStoreTestDriver() { Review Comment: I am wondering if it might not be better to specify the mock and all its stubs for all sensor levels once in `AbstractKeyValueStoreTest`. That would avoid code duplication. What do you think? ########## streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java: ########## @@ -62,12 +62,16 @@ public abstract class AbstractKeyValueStoreTest { @Before public void before() { - driver = KeyValueStoreTestDriver.create(Integer.class, String.class); + driver = this.createKeyValueStoreTestDriver(); Review Comment: ```suggestion driver = createKeyValueStoreTestDriver(); ``` ########## streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java: ########## @@ -115,7 +118,7 @@ public void setup() { new ThreadCache( new LogContext("testCache"), 0, - new MockStreamsMetrics(new Metrics()))); + this.mockStreamsMetrics)); Review Comment: ```suggestion mockStreamsMetrics)); ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java: ########## @@ -1497,7 +1487,7 @@ private RecordCollector newRecordCollector(final ProductionExceptionHandler prod taskId, streamsProducer, productionExceptionHandler, - streamsMetrics, + mockStreamsMetrics, Review Comment: Could please fix the indentation here and below? ########## streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java: ########## @@ -83,12 +90,26 @@ public void setUp() { cacheFlushListener = new CacheFlushListenerStub<>(new StringDeserializer(), new StringDeserializer()); store = new CachingKeyValueStore(underlyingStore, false); store.setFlushListener(cacheFlushListener, false); - cache = new ThreadCache(new LogContext("testCache "), maxCacheSizeBytes, new MockStreamsMetrics(new Metrics())); + cache = new ThreadCache(new LogContext("testCache "), maxCacheSizeBytes, this.mockStreamsMetrics); Review Comment: ```suggestion cache = new ThreadCache(new LogContext("testCache "), maxCacheSizeBytes, mockStreamsMetrics); ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java: ########## @@ -853,7 +842,7 @@ public void shouldNotAbortTxOnCloseCleanIfEosEnabled() { taskId, streamsProducer, productionExceptionHandler, - streamsMetrics, + mockStreamsMetrics, Review Comment: Could please fix the indentation here and below? ########## streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java: ########## @@ -194,6 +200,13 @@ public static <K, V> KeyValueStoreTestDriver<K, V> create(final Serializer<K> ke private final InternalMockProcessorContext context; private final StateSerdes<K, V> stateSerdes; + @Mock + private StreamsMetricsImpl mockStreamsMetrics; + + public void setMockStreamsMetrics(final StreamsMetricsImpl mockStreamsMetrics) { + this.mockStreamsMetrics = mockStreamsMetrics; + } Review Comment: This method does not seem to be used anywhere. ########## streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java: ########## @@ -109,8 +112,7 @@ public void setUp() { recordCollector, new ThreadCache( new LogContext("testCache"), - 0, - new MockStreamsMetrics(new Metrics()))); + 0, mockStreamsMetrics)); Review Comment: ```suggestion 0, mockStreamsMetrics)); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org