cadonna commented on code in PR #13931:
URL: https://github.com/apache/kafka/pull/13931#discussion_r1489430066
##
streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java:
##
@@ -138,11 +150,30 @@ public class RecordCollectorTest {
private StreamsProducer streamsProducer;
private ProcessorTopology topology;
private final InternalProcessorContext context = new
InternalMockProcessorContext<>();
-
private RecordCollectorImpl collector;
+final Sensor mockSensor = Mockito.mock(Sensor.class);
Review Comment:
```suggestion
private final Sensor mockSensor = Mockito.mock(Sensor.class);
```
##
streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java:
##
@@ -1279,49 +1268,50 @@ public void
shouldThrowStreamsExceptionOnSubsequentCloseIfFatalEvenWithContinueE
@Test
public void
shouldNotThrowStreamsExceptionOnSubsequentCallIfASendFailsWithContinueExceptionHandler()
{
-final RecordCollector collector = new RecordCollectorImpl(
-logContext,
-taskId,
-getExceptionalStreamsProducerOnSend(new Exception()),
-new AlwaysContinueProductionExceptionHandler(),
-streamsMetrics,
-topology
-);
+try (final MockedStatic taskMetrics =
mockStatic(TaskMetrics.class)) {
+final Sensor droppedRecordsSensor = Mockito.mock(Sensor.class);
+when(TaskMetrics.droppedRecordsSensor(
+Mockito.anyString(),
+eq(taskId.toString()),
+eq(mockStreamsMetrics))
+).thenReturn(droppedRecordsSensor);
+
+final RecordCollector collector = new RecordCollectorImpl(
+logContext,
+taskId,
+getExceptionalStreamsProducerOnSend(new Exception()),
+new AlwaysContinueProductionExceptionHandler(),
+mockStreamsMetrics,
+topology
+);
-try (final LogCaptureAppender logCaptureAppender =
-
LogCaptureAppender.createAndRegister(RecordCollectorImpl.class)) {
-logCaptureAppender.setThreshold(Level.INFO);
+try (final LogCaptureAppender logCaptureAppender =
+
LogCaptureAppender.createAndRegister(RecordCollectorImpl.class)) {
+logCaptureAppender.setThreshold(Level.INFO);
-collector.send(topic, "3", "0", null, null, stringSerializer,
stringSerializer, null, null, streamPartitioner);
-collector.flush();
+collector.send(topic, "3", "0", null, null, stringSerializer,
stringSerializer, null, null, streamPartitioner);
+collector.flush();
-final List messages = logCaptureAppender.getMessages();
-final StringBuilder errorMessage = new StringBuilder("Messages
received:");
-for (final String error : messages) {
-errorMessage.append("\n - ").append(error);
+final List messages = logCaptureAppender.getMessages();
+final StringBuilder errorMessage = new StringBuilder("Messages
received:");
+for (final String error : messages) {
+errorMessage.append("\n - ").append(error);
+}
+assertTrue(
+errorMessage.toString(),
+messages.get(messages.size() - 1)
+.endsWith("Exception handler choose to CONTINUE
processing in spite of this error but written offsets would not be recorded.")
+);
}
-assertTrue(
-errorMessage.toString(),
-messages.get(messages.size() - 1)
-.endsWith("Exception handler choose to CONTINUE processing
in spite of this error but written offsets would not be recorded.")
-);
-}
-final Metric metric = streamsMetrics.metrics().get(new MetricName(
-"dropped-records-total",
-"stream-task-metrics",
-"The total number of dropped records",
-mkMap(
-mkEntry("thread-id", Thread.currentThread().getName()),
-mkEntry("task-id", taskId.toString())
-)
-));
-assertEquals(1.0, metric.metricValue());
+Mockito.verify(droppedRecordsSensor, Mockito.times(1)).record();
-collector.send(topic, "3", "0", null, null, stringSerializer,
stringSerializer, null, null, streamPartitioner);
-collector.flush();
-collector.closeClean();
+collector.send(topic, "3", "0", null, null, stringSerializer,
stringSerializer, null, null, streamPartitioner);
+collector.flush();
+collector.closeClean();
+}
}
+
Review Comment:
nit:
```suggestion
```
##
streams/src/te