cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement
the StartpointVisitor for the KafkaSystemConsumer.
URL: https://github.com/apache/samza/pull/918#discussion_r267009990
##########
File path:
samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
##########
@@ -206,6 +225,166 @@ public void testFetchThresholdBytesDiabled() {
consumer.stop();
}
+ @Test
+ public void
testStartpointSpecificOffsetVisitorShouldUpdateTheFetchOffsetInConsumer() {
+ final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
+ final KafkaConsumerProxy kafkaConsumerProxy =
Mockito.mock(KafkaConsumerProxy.class);
+ KafkaSystemConsumer kafkaSystemConsumer = new
KafkaSystemConsumer(consumer, TEST_SYSTEM, new MapConfig(), TEST_CLIENT_ID,
kafkaConsumerProxy, Mockito.mock(KafkaSystemConsumerMetrics.class), new
TestClock(), REGISTRATION_HANDLER);
+ KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler =
kafkaSystemConsumer.new KafkaStartpointRegistrationHandler();
+
+ final StartpointSpecific testStartpointSpecific = new
StartpointSpecific(TEST_OFFSET);
+
+ // Mock the consumer interactions.
+ Mockito.doNothing().when(consumer).seek(TEST_TOPIC_PARTITION,
Long.valueOf(TEST_OFFSET));
+
+ // Invoke the consumer with startpoint.
+ kafkaStartpointRegistrationHandler.visit(TEST_SYSTEM_STREAM_PARTITION,
testStartpointSpecific);
+
+ // Mock verifications.
+ Mockito.verify(consumer).seek(TEST_TOPIC_PARTITION,
Long.valueOf(TEST_OFFSET));
+
Mockito.verify(kafkaConsumerProxy).addTopicPartition(Mockito.any(SystemStreamPartition.class),
Mockito.anyLong());
+ }
+
+ @Test
+ public void
testStartpointTimestampVisitorShouldUpdateTheFetchOffsetInConsumer() {
+ // Define dummy variables for testing.
+ final Long testTimeStamp = 10L;
+
+ final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
+ final KafkaConsumerProxy kafkaConsumerProxy =
Mockito.mock(KafkaConsumerProxy.class);
+ KafkaSystemConsumer kafkaSystemConsumer = new
KafkaSystemConsumer(consumer, TEST_SYSTEM, new MapConfig(), TEST_CLIENT_ID,
kafkaConsumerProxy, Mockito.mock(KafkaSystemConsumerMetrics.class), new
TestClock(), REGISTRATION_HANDLER);
+
+ KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler =
kafkaSystemConsumer.new KafkaStartpointRegistrationHandler();
+
+ final StartpointTimestamp startpointTimestamp = new
StartpointTimestamp(testTimeStamp);
+ final Map<TopicPartition, OffsetAndTimestamp> offsetForTimesResult =
ImmutableMap.of(
+ TEST_TOPIC_PARTITION, new
OffsetAndTimestamp(Long.valueOf(TEST_OFFSET), testTimeStamp));
+
+ // Mock the consumer interactions.
+
Mockito.when(consumer.offsetsForTimes(Mockito.anyMap())).thenReturn(offsetForTimesResult);
Review comment:
We should be able to check a specific argument instead of using `anyMap`,
right?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services