Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2465#discussion_r157346946
--- Diff:
external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
---
@@ -181,16 +181,16 @@ public void
spoutMustIgnoreFailsForTuplesItIsNotAssignedAfterRebalance() throws
TopicPartition partitionThatWillBeRevoked = new
TopicPartition(topic, 1);
TopicPartition assignedPartition = new TopicPartition(topic, 2);
-
when(retryServiceMock.getMessageId(Mockito.any(ConsumerRecord.class)))
- .thenReturn(new
KafkaSpoutMessageId(partitionThatWillBeRevoked, 0))
- .thenReturn(new KafkaSpoutMessageId(assignedPartition, 0));
+
when(retryServiceMock.getMessageId(Mockito.any(ConsumerRecord.class),
contextMock))
+ .thenReturn(new
KafkaSpoutMessageId(partitionThatWillBeRevoked, 0, contextMock))
+ .thenReturn(new KafkaSpoutMessageId(assignedPartition, 0,
contextMock));
//Emit a message on each partition and revoke the first partition
List<KafkaSpoutMessageId> emittedMessageIds =
emitOneMessagePerPartitionThenRevokeOnePartition(
spout, partitionThatWillBeRevoked, assignedPartition,
rebalanceListenerCapture);
//Check that only two message ids were generated
- verify(retryServiceMock,
times(2)).getMessageId(Mockito.any(ConsumerRecord.class));
+ verify(retryServiceMock,
times(2)).getMessageId(Mockito.any(ConsumerRecord.class), contextMock);
--- End diff --
You need to put eq around contextmock here or you'll get an error
---