MarcoLotz opened a new pull request #10042:
URL: https://github.com/apache/kafka/pull/10042


   **Description:**
   A NPE is happening when using the StreamResetter tool with the arguments 
"to-timedate" and "by-duration" on empty partitions.
   
   **Cause:**
   This happens because 
[fetcher](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L516)
 returns a null value on the map for empty partitions.
   
   **Solution:**
   The null values needs to be correctly handled with Optional. In the case of 
empty partition, consumer.seek() should not be called - letting it fallback to 
the "auto.offset.reset" behaviour. The user should be notified by it.
   
   **Implementation comments:**
   
   - System.out.println was used instead of log.warn - since the whole class 
uses syso for interacting with the user.
   - There was no test for empty partition on reset to offset 
[streamsResetter.resetOffsetsTo()]. Test was added.
   - Added test for empty partition when resetToDatetime() is called - that was 
the cause of the bug.
   - MockConsumer doesn't have an implementation of offsetsForTimes() method - 
it throws an exception when called, breaching Liskov Substitution Principle. 
This method is used by resetToDatetime(). Since I couldn't find Mockito in the 
project dependencies to make a Spy, I decided to extends the MockConsumer class 
as a nested class and especialize the method.
   - Looking at Gradle, I saw that the min. java version is 1.8 - thus is 
decided to use 1.8 implementation of Optional instead of 1.9 - which would be a 
lot cleaner specially for the Optional.ProcessOrElse(). 
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to