Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2480#discussion_r159063226
  
    --- Diff: 
external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
 ---
    @@ -343,4 +344,48 @@ public void 
testSpoutMustRefreshPartitionsEvenIfNotPolling() throws Exception {
             spout.nextTuple();
             verify(collectorMock).emit(anyString(), anyList(), 
any(KafkaSpoutMessageId.class));
         }
    +
    +    @Test
    +    public void testOffsetMetrics() throws Exception {
    +        final int messageCount = 10;
    +        prepareSpout(messageCount);
    +
    +        Map<String, Long> offsetMetric  = (Map<String, Long>) 
spout.getKafkaOffsetMetric().getValueAndReset();
    +        assertEquals(0, 
offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalEarliestTimeOffset").longValue());
    +        // the offset of the last available message + 1.
    +        assertEquals(10, 
offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestTimeOffset").longValue());
    +        assertEquals(0, 
offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestEmittedOffset").longValue());
    +        assertEquals(0, 
offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestCompletedOffset").longValue());
    +        //totalSpoutLag = totalLatestTimeOffset-totalLatestCompletedOffset
    +        assertEquals(10, 
offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalSpoutLag").longValue());
    +
    +        //Emit and Ack all messages
    +        for(int i = 0; i < messageCount; i++) {
    --- End diff --
    
    I think you can use nextTuple_verifyEmitted_ack_resetCollector(int offset) 
from the superclass to do everything in this block.


---

Reply via email to