[ 
https://issues.apache.org/jira/browse/KAFKA-6662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16408054#comment-16408054
 ] 

ASF GitHub Bot commented on KAFKA-6662:
---------------------------------------

wangzzu closed pull request #4717: KAFKA-6662: Consumer use offsetsForTimes() 
get offset return None.
URL: https://github.com/apache/kafka/pull/4717
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/log/Log.scala 
b/core/src/main/scala/kafka/log/Log.scala
index f0050f54aef..33035f9a9c9 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -1120,7 +1120,10 @@ class Log(@volatile var dir: File,
           None
       }
 
-      targetSeg.flatMap(_.findOffsetByTimestamp(targetTimestamp, 
logStartOffset))
+      targetSeg match {
+        case None => Some(TimestampOffset(RecordBatch.NO_TIMESTAMP, 
this.logEndOffset))
+        case _ => targetSeg.flatMap(_.findOffsetByTimestamp(targetTimestamp))
+      }
     }
   }
 
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala 
b/core/src/main/scala/kafka/log/LogSegment.scala
index 5970f42f6d9..52740d49c79 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -479,8 +479,12 @@ class LogSegment private[log] (val log: FileRecords,
     val position = offsetIndex.lookup(math.max(timestampOffset.offset, 
startingOffset)).position
 
     // Search the timestamp
-    Option(log.searchForTimestamp(timestamp, position, startingOffset)).map { 
timestampAndOffset =>
-      TimestampOffset(timestampAndOffset.timestamp, timestampAndOffset.offset)
+    if (position == 0 && timestampOffset.timestamp == -1){
+      Option(timestampOffset)
+    } else {
+      Option(log.searchForTimestamp(timestamp, position, startingOffset)).map 
{ timestampAndOffset =>
+        TimestampOffset(timestampAndOffset.timestamp, 
timestampAndOffset.offset)
+      }
     }
   }
 
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala 
b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index c45ed0d2986..946ef91c3de 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -263,7 +263,7 @@ class LogSegmentTest {
     assertEquals(43, seg.findOffsetByTimestamp(430).get.offset)
     assertEquals(44, seg.findOffsetByTimestamp(431).get.offset)
     // Search beyond the last timestamp
-    assertEquals(None, seg.findOffsetByTimestamp(491))
+    assertEquals(49, seg.findOffsetByTimestamp(491).get.offset)
     // Search before the first indexed timestamp
     assertEquals(41, seg.findOffsetByTimestamp(401).get.offset)
     // Search before the first timestamp


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Consumer use offsetsForTimes() get offset return None.
> ------------------------------------------------------
>
>                 Key: KAFKA-6662
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6662
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.10.2.0
>            Reporter: Matt Wang
>            Priority: Minor
>
> When we use Consumer's method  offsetsForTimes()  to get the topic-partition 
> offset, sometimes it will return null. Print the client log
> {code:java}
> // 2018-03-15 11:54:05,239] DEBUG Collector TraceCollector dispatcher loop 
> interval 256 upload 0 retry 0 fail 0 
> (com.meituan.mtrace.collector.sg.AbstractCollector)
> [2018-03-15 11:54:05,241] DEBUG Set SASL client state to INITIAL 
> (org.apache.kafka.common.security.authenticator.SaslClientAuthenticator)
> [2018-03-15 11:54:05,241] DEBUG Set SASL client state to INTERMEDIATE 
> (org.apache.kafka.common.security.authenticator.SaslClientAuthenticator)
> [2018-03-15 11:54:05,247] DEBUG Set SASL client state to COMPLETE 
> (org.apache.kafka.common.security.authenticator.SaslClientAuthenticator)
> [2018-03-15 11:54:05,247] DEBUG Initiating API versions fetch from node 53. 
> (org.apache.kafka.clients.NetworkClient)
> [2018-03-15 11:54:05,253] DEBUG Recorded API versions for node 53: 
> (Produce(0): 0 to 2 [usable: 2], Fetch(1): 0 to 3 [usable: 3], Offsets(2): 0 
> to 1 [usable: 1], Metadata(3): 0 to 2 [usable: 2], LeaderAndIsr(4): 0 
> [usable: 0], StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 3 
> [usable: 3], ControlledShutdown(7): 1 [usable: 1], OffsetCommit(8): 0 to 2 
> [usable: 2], OffsetFetch(9): 0 to 2 [usable: 2], GroupCoordinator(10): 0 
> [usable: 0], JoinGroup(11): 0 to 1 [usable: 1], Heartbeat(12): 0 [usable: 0], 
> LeaveGroup(13): 0 [usable: 0], SyncGroup(14): 0 [usable: 0], 
> DescribeGroups(15): 0 [usable: 0], ListGroups(16): 0 [usable: 0], 
> SaslHandshake(17): 0 [usable: 0], ApiVersions(18): 0 [usable: 0], 
> CreateTopics(19): 0 to 1 [usable: 1], DeleteTopics(20): 0 [usable: 0]) 
> (org.apache.kafka.clients.NetworkClient)
> [2018-03-15 11:54:05,315] DEBUG Handling ListOffsetResponse response for 
> org.matt_test2-0. Fetched offset -1, timestamp -1 
> (org.apache.kafka.clients.consumer.internals.Fetcher){code}
> From the log, we find broker return the offset, but it's value is -1, this 
> value will be removed in Fetcher.handleListOffsetResponse(),
> {code:java}
> // // Handle v1 and later response
> log.debug("Handling ListOffsetResponse response for {}. Fetched offset {}, 
> timestamp {}",
>         topicPartition, partitionData.offset, partitionData.timestamp);
> if (partitionData.offset != ListOffsetResponse.UNKNOWN_OFFSET) {
>     OffsetData offsetData = new OffsetData(partitionData.offset, 
> partitionData.timestamp);
>     timestampOffsetMap.put(topicPartition, offsetData);
> }{code}
> We test several situations, and we found that in the following two cases it 
> will return none.
>  # The topic-partition msg number is 0, when we use offsetsForTimes() to get 
> the offset, the offset will retuan -1;
>  #  The targetTime we use to find offset is larger than the partition 
> active_segment's largestTimestamp, the offset will return -1;
> If the offset is set -1, it will not be return to consumer client. I think in 
> these situation, it should be return the latest offset, and it's also defined 
> in kafka/core annotation.
> {code:java}
> // /**
>  * Search the message offset based on timestamp.
>  * This method returns an option of TimestampOffset. The offset is the offset 
> of the first message whose timestamp is
>  * greater than or equals to the target timestamp.
>  *
>  * If all the message in the segment have smaller timestamps, the returned 
> offset will be last offset + 1 and the
>  * timestamp will be max timestamp in the segment.
>  *
>  * If all the messages in the segment have larger timestamps, or no message 
> in the segment has a timestamp,
>  * the returned the offset will be the base offset of the segment and the 
> timestamp will be Message.NoTimestamp.
>  *
>  * This methods only returns None when the log is not empty but we did not 
> see any messages when scanning the log
>  * from the indexed position. This could happen if the log is truncated after 
> we get the indexed position but
>  * before we scan the log from there. In this case we simply return None and 
> the caller will need to check on
>  * the truncated log and maybe retry or even do the search on another log 
> segment.
>  *
>  * @param timestamp The timestamp to search for.
>  * @return the timestamp and offset of the first message whose timestamp is 
> larger than or equals to the
>  *         target timestamp. None will be returned if there is no such 
> message.
>  */
> def findOffsetByTimestamp(timestamp: Long): Option[TimestampOffset] = {
>   // Get the index entry with a timestamp less than or equal to the target 
> timestamp
>   val timestampOffset = timeIndex.lookup(timestamp)
>   val position = index.lookup(timestampOffset.offset).position
>   // Search the timestamp
>   log.searchForTimestamp(timestamp, position)
> }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to