Github user askprasanna commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2156#discussion_r121341716
  
    --- Diff: 
external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
 ---
    @@ -134,110 +130,86 @@ public void 
testNextTupleEmitsFailedMessagesEvenWhenMaxUncommittedOffsetsIsExcee
     
                 Time.advanceTime(50);
                 //No backoff for test retry service, just check that messages 
will retry immediately
    -            for (int i = 0; i < recordsForPartition.size(); i++) {
    +            for (int i = 0; i < numRecords; i++) {
                     spout.nextTuple();
                 }
     
                 ArgumentCaptor<KafkaSpoutMessageId> retryMessageIds = 
ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
    -            verify(collectorMock, 
times(recordsForPartition.size())).emit(anyObject(), anyObject(), 
retryMessageIds.capture());
    +            verify(collectorMock, times(numRecords)).emit(anyObject(), 
anyObject(), retryMessageIds.capture());
     
                 //Verify that the poll started at the earliest retriable tuple 
offset
                 List<Long> failedOffsets = new ArrayList<>();
    -            for(KafkaSpoutMessageId msgId : messageIds.getAllValues()) {
    +            for (KafkaSpoutMessageId msgId : messageIds.getAllValues()) {
                     failedOffsets.add(msgId.offset());
                 }
                 InOrder inOrder = inOrder(consumerMock);
                 inOrder.verify(consumerMock).seek(partition, 
failedOffsets.get(0));
                 inOrder.verify(consumerMock).poll(anyLong());
             }
         }
    -    
    +
    +    private List<ConsumerRecord<String, String>> 
createRecords(TopicPartition topic, long startingOffset, int numRecords) {
    +        List<ConsumerRecord<String, String>> recordsForPartition = new 
ArrayList<>();
    +        for (int i = 0; i < numRecords; i++) {
    +            recordsForPartition.add(new ConsumerRecord(topic.topic(), 
topic.partition(), startingOffset + i, "key", "value"));
    +        }
    +        return recordsForPartition;
    +    }
    +
         @Test
    -    public void 
testNextTupleEmitsAtMostMaxUncommittedOffsetsPlusMaxPollRecordsWhenRetryingTuples()
 {
    -        /*
    -        The spout must reemit failed messages waiting for retry even if it 
is not allowed to poll for new messages due to maxUncommittedOffsets being 
exceeded.
    -        numUncommittedOffsets is equal to numNonRetriableEmittedTuples + 
numRetriableTuples.
    -        The spout will only emit if numUncommittedOffsets - 
numRetriableTuples < maxUncommittedOffsets (i.e. numNonRetriableEmittedTuples < 
maxUncommittedOffsets)
    -        This means that the latest offset a poll can start at for a 
retriable partition,
    -        counting from the last committed offset, is maxUncommittedOffsets,
    -        where there are maxUncommittedOffsets - 1 uncommitted tuples "to 
the left".
    -        If the retry poll starts at that offset, it at most emits the 
retried tuple plus maxPollRecords - 1 new tuples.
    -        The limit on uncommitted offsets for one partition is therefore 
maxUncommittedOffsets + maxPollRecords - 1.
    -        
    -        It is only necessary to test this for a single partition, because 
partitions can't contribute negatively to numNonRetriableEmittedTuples,
    -        so if the limit holds for one partition, it will also hold for 
each individual partition when multiple are involved.
    -        
    -        This makes the actual limit numPartitions * (maxUncommittedOffsets 
+ maxPollRecords - 1)
    -         */
    -        
    -        //Emit maxUncommittedOffsets messages, and fail only the last. 
Then ensure that the spout will allow no more than maxUncommittedOffsets + 
maxPollRecords - 1 uncommitted offsets when retrying
    +    public void 
testSpoutWillSkipPartitionsAtTheMaxUncommittedOffsetsLimit() {
    +        //This verifies that partitions can't prevent each other from 
retrying tuples due to the maxUncommittedOffsets limit.
             try (SimulatedTime simulatedTime = new SimulatedTime()) {
    -            setupSpout(Collections.singleton(partition));
    -            
    -            Map<TopicPartition, List<ConsumerRecord<String, String>>> 
firstPollRecords = new HashMap<>();
    -            List<ConsumerRecord<String, String>> 
firstPollRecordsForPartition = new ArrayList<>();
    -            for (int i = 0; i < spoutConfig.getMaxUncommittedOffsets(); 
i++) {
    -                //This is cheating a bit since maxPollRecords would 
normally spread this across multiple polls
    -                firstPollRecordsForPartition.add(new 
ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value"));
    -            }
    -            firstPollRecords.put(partition, firstPollRecordsForPartition);
    -            
    -            int maxPollRecords = 5;
    -            Map<TopicPartition, List<ConsumerRecord<String, String>>> 
secondPollRecords = new HashMap<>();
    -            List<ConsumerRecord<String, String>> 
secondPollRecordsForPartition = new ArrayList<>();
    -            for(int i = 0; i < maxPollRecords; i++) {
    -                secondPollRecordsForPartition.add(new 
ConsumerRecord(partition.topic(), partition.partition(), 
spoutConfig.getMaxUncommittedOffsets() + i, "key", "value"));
    -            }
    -            secondPollRecords.put(partition, 
secondPollRecordsForPartition);
    +            TopicPartition partitionTwo = new 
TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 2);
    +            Set<TopicPartition> partitions = new HashSet<>();
    +            partitions.add(partition);
    +            partitions.add(partitionTwo);
    +            setupSpout(partitions);
    +            Map<TopicPartition, List<ConsumerRecord<String, String>>> 
records = new HashMap<>();
    +            //This is cheating a bit since maxPollRecords would normally 
spread this across multiple polls
    +            records.put(partition, createRecords(partition, 0, 
spoutConfig.getMaxUncommittedOffsets()));
    +            records.put(partitionTwo, createRecords(partitionTwo, 0, 
spoutConfig.getMaxUncommittedOffsets()));
     
                 when(consumerMock.poll(anyLong()))
    -                .thenReturn(new ConsumerRecords(firstPollRecords))
    -                .thenReturn(new ConsumerRecords(secondPollRecords));
    +                .thenReturn(new ConsumerRecords(records));
    --- End diff --
    
    Nit: return generic ConsumerRecords to avoid warning around raw type


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to