MarcoLotz commented on a change in pull request #10042:
URL: https://github.com/apache/kafka/pull/10042#discussion_r579519988
##########
File path:
streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
##########
@@ -76,6 +78,25 @@ public void
testResetToSpecificOffsetWhenBetweenBeginningAndEndOffset() {
assertEquals(3, records.count());
}
+ @Test
+ public void testResetToSpecificOffsetWhenPartitionIsEmpty() {
+ final MockConsumer<byte[], byte[]> emptyConsumer = new
MockConsumer<>(OffsetResetStrategy.EARLIEST);
+ emptyConsumer.assign(Collections.singletonList(topicPartition));
+
+ final Map<TopicPartition, Long> endOffsets = new HashMap<>();
+ endOffsets.put(topicPartition, 0L);
+ emptyConsumer.updateEndOffsets(endOffsets);
+
+ final Map<TopicPartition, Long> beginningOffsets = new HashMap<>();
+ beginningOffsets.put(topicPartition, 0L);
+ emptyConsumer.updateBeginningOffsets(beginningOffsets);
+
+ streamsResetter.resetOffsetsTo(emptyConsumer, inputTopicPartitions,
2L);
+
+ final ConsumerRecords<byte[], byte[]> records =
emptyConsumer.poll(Duration.ofMillis(500));
+ assertEquals(0, records.count());
Review comment:
@mjsax I guess got your point. In the CUT the call
```streamsResetter.resetOffsetsTo(emptyConsumer, inputTopicPartitions, 2L);```
makes the consumer client to seek that offset (if available) without a commit.
The call to commit offset happens in another section of the code that is not
under test there (line 407 of StreamsResetter.java).
I will change the test logic so that the "given condition" uses
```streamsResetter.resetOffsetsTo(emptyConsumer, inputTopicPartitions, 2L);```
and "when condition" is actually the call of ```client.commitSync();```.
This way, the "then condition" would be able to test using the commited
offsets. Same applies for the other test.
##########
File path:
streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
##########
@@ -76,6 +78,25 @@ public void
testResetToSpecificOffsetWhenBetweenBeginningAndEndOffset() {
assertEquals(3, records.count());
}
+ @Test
+ public void testResetToSpecificOffsetWhenPartitionIsEmpty() {
+ final MockConsumer<byte[], byte[]> emptyConsumer = new
MockConsumer<>(OffsetResetStrategy.EARLIEST);
+ emptyConsumer.assign(Collections.singletonList(topicPartition));
+
+ final Map<TopicPartition, Long> endOffsets = new HashMap<>();
+ endOffsets.put(topicPartition, 0L);
+ emptyConsumer.updateEndOffsets(endOffsets);
+
+ final Map<TopicPartition, Long> beginningOffsets = new HashMap<>();
+ beginningOffsets.put(topicPartition, 0L);
+ emptyConsumer.updateBeginningOffsets(beginningOffsets);
+
+ streamsResetter.resetOffsetsTo(emptyConsumer, inputTopicPartitions,
2L);
+
+ final ConsumerRecords<byte[], byte[]> records =
emptyConsumer.poll(Duration.ofMillis(500));
+ assertEquals(0, records.count());
Review comment:
@mjsax I guess got your point. In the CUT the call
```streamsResetter.resetOffsetsTo(emptyConsumer, inputTopicPartitions, 2L);```
makes the consumer client to seek that offset (if available) without a commit.
The call to commit offset happens in another section of the code that is not
under test there (line 407 of StreamsResetter.java).
I will change the test logic so that the "given condition" uses
```streamsResetter.resetOffsetsTo(emptyConsumer, inputTopicPartitions, 2L);```
and "when condition" is actually the call of ```client.commitSync();```.
This way, the "then condition" would be able to test using the committed
offsets. Same applies for the other test.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]