[ 
https://issues.apache.org/jira/browse/KAFKA-7187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590409#comment-16590409
 ] 

ASF GitHub Bot commented on KAFKA-7187:
---------------------------------------

nucatus closed pull request #5560: KAFKA-7187 MockConsumer can fetch offsets 
based on timestamps
URL: https://github.com/apache/kafka/pull/5560
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index cf1b07fabe2..9069c42e5b5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -380,7 +380,23 @@ public synchronized void resume(Collection<TopicPartition> 
partitions) {
 
     @Override
     public synchronized Map<TopicPartition, OffsetAndTimestamp> 
offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+        Map<TopicPartition, OffsetAndTimestamp> result = new HashMap<>();
+        for (TopicPartition tp : timestampsToSearch.keySet()) {
+            Long timestamp = timestampsToSearch.get(tp);
+            List<ConsumerRecord<K, V>> topicRecords = records.get(tp);
+            if (topicRecords == null) {
+                continue;
+            }
+            OffsetAndTimestamp ot = null;
+            for (ConsumerRecord record : topicRecords) {
+                if (record.timestamp() >= timestamp) {
+                    ot = new OffsetAndTimestamp(record.offset(), 
record.timestamp());
+                    break;
+                }
+            }
+            result.put(tp, ot);
+        }
+        return result;
     }
 
     @Override
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 
b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
index 1d01eb6d0b2..011f96ade3d 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
@@ -25,6 +25,7 @@
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -45,9 +46,17 @@ public void testSimpleMock() {
         consumer.updateBeginningOffsets(beginningOffsets);
         consumer.seek(new TopicPartition("test", 0), 0);
         ConsumerRecord<String, String> rec1 = new ConsumerRecord<>("test", 0, 
0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, "key1", "value1");
-        ConsumerRecord<String, String> rec2 = new ConsumerRecord<>("test", 0, 
1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, "key2", "value2");
+        ConsumerRecord<String, String> rec2 = new ConsumerRecord<>("test", 0, 
1, 100L, TimestampType.CREATE_TIME, 0L, 0, 0, "key2", "value2");
         consumer.addRecord(rec1);
         consumer.addRecord(rec2);
+        Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
+        TopicPartition test_0 = new TopicPartition("test", 0);
+        timestampsToSearch.put(test_0, 0L);
+        assertEquals(0, 
consumer.offsetsForTimes(timestampsToSearch).get(test_0).offset());
+        timestampsToSearch.put(test_0, 50L);
+        assertEquals(1, 
consumer.offsetsForTimes(timestampsToSearch).get(test_0).offset());
+        timestampsToSearch.put(test_0, 150L);
+        assertEquals(null, 
consumer.offsetsForTimes(timestampsToSearch).get(test_0));
         ConsumerRecords<String, String> recs = 
consumer.poll(Duration.ofMillis(1));
         Iterator<ConsumerRecord<String, String>> iter = recs.iterator();
         assertEquals(rec1, iter.next());


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> offsetsForTimes in MockConsumer implementation
> ----------------------------------------------
>
>                 Key: KAFKA-7187
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7187
>             Project: Kafka
>          Issue Type: Improvement
>          Components: clients
>            Reporter: Jing Chen
>            Priority: Minor
>
> The implementation for offsetsForTimes in MockConsumer is missing, it simply 
> throws UnsupportedOperationException, can anyone help to provide the 
> implementation of the method?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to