Github user askprasanna commented on a diff in the pull request:
https://github.com/apache/storm/pull/2156#discussion_r121341716
--- Diff:
external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
---
@@ -134,110 +130,86 @@ public void
testNextTupleEmitsFailedMessagesEvenWhenMaxUncommittedOffsetsIsExcee
Time.advanceTime(50);
//No backoff for test retry service, just check that messages
will retry immediately
- for (int i = 0; i < recordsForPartition.size(); i++) {
+ for (int i = 0; i < numRecords; i++) {
spout.nextTuple();
}
ArgumentCaptor<KafkaSpoutMessageId> retryMessageIds =
ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
- verify(collectorMock,
times(recordsForPartition.size())).emit(anyObject(), anyObject(),
retryMessageIds.capture());
+ verify(collectorMock, times(numRecords)).emit(anyObject(),
anyObject(), retryMessageIds.capture());
//Verify that the poll started at the earliest retriable tuple
offset
List<Long> failedOffsets = new ArrayList<>();
- for(KafkaSpoutMessageId msgId : messageIds.getAllValues()) {
+ for (KafkaSpoutMessageId msgId : messageIds.getAllValues()) {
failedOffsets.add(msgId.offset());
}
InOrder inOrder = inOrder(consumerMock);
inOrder.verify(consumerMock).seek(partition,
failedOffsets.get(0));
inOrder.verify(consumerMock).poll(anyLong());
}
}
-
+
+ private List<ConsumerRecord<String, String>>
createRecords(TopicPartition topic, long startingOffset, int numRecords) {
+ List<ConsumerRecord<String, String>> recordsForPartition = new
ArrayList<>();
+ for (int i = 0; i < numRecords; i++) {
+ recordsForPartition.add(new ConsumerRecord(topic.topic(),
topic.partition(), startingOffset + i, "key", "value"));
+ }
+ return recordsForPartition;
+ }
+
@Test
- public void
testNextTupleEmitsAtMostMaxUncommittedOffsetsPlusMaxPollRecordsWhenRetryingTuples()
{
- /*
- The spout must reemit failed messages waiting for retry even if it
is not allowed to poll for new messages due to maxUncommittedOffsets being
exceeded.
- numUncommittedOffsets is equal to numNonRetriableEmittedTuples +
numRetriableTuples.
- The spout will only emit if numUncommittedOffsets -
numRetriableTuples < maxUncommittedOffsets (i.e. numNonRetriableEmittedTuples <
maxUncommittedOffsets)
- This means that the latest offset a poll can start at for a
retriable partition,
- counting from the last committed offset, is maxUncommittedOffsets,
- where there are maxUncommittedOffsets - 1 uncommitted tuples "to
the left".
- If the retry poll starts at that offset, it at most emits the
retried tuple plus maxPollRecords - 1 new tuples.
- The limit on uncommitted offsets for one partition is therefore
maxUncommittedOffsets + maxPollRecords - 1.
-
- It is only necessary to test this for a single partition, because
partitions can't contribute negatively to numNonRetriableEmittedTuples,
- so if the limit holds for one partition, it will also hold for
each individual partition when multiple are involved.
-
- This makes the actual limit numPartitions * (maxUncommittedOffsets
+ maxPollRecords - 1)
- */
-
- //Emit maxUncommittedOffsets messages, and fail only the last.
Then ensure that the spout will allow no more than maxUncommittedOffsets +
maxPollRecords - 1 uncommitted offsets when retrying
+ public void
testSpoutWillSkipPartitionsAtTheMaxUncommittedOffsetsLimit() {
+ //This verifies that partitions can't prevent each other from
retrying tuples due to the maxUncommittedOffsets limit.
try (SimulatedTime simulatedTime = new SimulatedTime()) {
- setupSpout(Collections.singleton(partition));
-
- Map<TopicPartition, List<ConsumerRecord<String, String>>>
firstPollRecords = new HashMap<>();
- List<ConsumerRecord<String, String>>
firstPollRecordsForPartition = new ArrayList<>();
- for (int i = 0; i < spoutConfig.getMaxUncommittedOffsets();
i++) {
- //This is cheating a bit since maxPollRecords would
normally spread this across multiple polls
- firstPollRecordsForPartition.add(new
ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value"));
- }
- firstPollRecords.put(partition, firstPollRecordsForPartition);
-
- int maxPollRecords = 5;
- Map<TopicPartition, List<ConsumerRecord<String, String>>>
secondPollRecords = new HashMap<>();
- List<ConsumerRecord<String, String>>
secondPollRecordsForPartition = new ArrayList<>();
- for(int i = 0; i < maxPollRecords; i++) {
- secondPollRecordsForPartition.add(new
ConsumerRecord(partition.topic(), partition.partition(),
spoutConfig.getMaxUncommittedOffsets() + i, "key", "value"));
- }
- secondPollRecords.put(partition,
secondPollRecordsForPartition);
+ TopicPartition partitionTwo = new
TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 2);
+ Set<TopicPartition> partitions = new HashSet<>();
+ partitions.add(partition);
+ partitions.add(partitionTwo);
+ setupSpout(partitions);
+ Map<TopicPartition, List<ConsumerRecord<String, String>>>
records = new HashMap<>();
+ //This is cheating a bit since maxPollRecords would normally
spread this across multiple polls
+ records.put(partition, createRecords(partition, 0,
spoutConfig.getMaxUncommittedOffsets()));
+ records.put(partitionTwo, createRecords(partitionTwo, 0,
spoutConfig.getMaxUncommittedOffsets()));
when(consumerMock.poll(anyLong()))
- .thenReturn(new ConsumerRecords(firstPollRecords))
- .thenReturn(new ConsumerRecords(secondPollRecords));
+ .thenReturn(new ConsumerRecords(records));
--- End diff --
Nit: return generic ConsumerRecords to avoid warning around raw type
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---