[ 
https://issues.apache.org/jira/browse/KAFKA-6662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16399980#comment-16399980
 ] 

ASF GitHub Bot commented on KAFKA-6662:
---------------------------------------

wangzzu opened a new pull request #4717: KAFKA-6662: Consumer use 
offsetsForTimes() get offset return None.
URL: https://github.com/apache/kafka/pull/4717
 
 
   When we use Consumer's method  offsetsForTimes()  to get the topic-partition 
offset, sometimes it will return null. Print the client log
   
   ```
   2018-03-15 11:54:05,239] DEBUG Collector TraceCollector dispatcher loop 
interval 256 upload 0 retry 0 fail 0 
(com.meituan.mtrace.collector.sg.AbstractCollector)
   [2018-03-15 11:54:05,241] DEBUG Set SASL client state to INITIAL 
(org.apache.kafka.common.security.authenticator.SaslClientAuthenticator)
   [2018-03-15 11:54:05,241] DEBUG Set SASL client state to INTERMEDIATE 
(org.apache.kafka.common.security.authenticator.SaslClientAuthenticator)
   [2018-03-15 11:54:05,247] DEBUG Set SASL client state to COMPLETE 
(org.apache.kafka.common.security.authenticator.SaslClientAuthenticator)
   [2018-03-15 11:54:05,247] DEBUG Initiating API versions fetch from node 53. 
(org.apache.kafka.clients.NetworkClient)
   [2018-03-15 11:54:05,253] DEBUG Recorded API versions for node 53: 
(Produce(0): 0 to 2 [usable: 2], Fetch(1): 0 to 3 [usable: 3], Offsets(2): 0 to 
1 [usable: 1], Metadata(3): 0 to 2 [usable: 2], LeaderAndIsr(4): 0 [usable: 0], 
StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 3 [usable: 3], 
ControlledShutdown(7): 1 [usable: 1], OffsetCommit(8): 0 to 2 [usable: 2], 
OffsetFetch(9): 0 to 2 [usable: 2], GroupCoordinator(10): 0 [usable: 0], 
JoinGroup(11): 0 to 1 [usable: 1], Heartbeat(12): 0 [usable: 0], 
LeaveGroup(13): 0 [usable: 0], SyncGroup(14): 0 [usable: 0], 
DescribeGroups(15): 0 [usable: 0], ListGroups(16): 0 [usable: 0], 
SaslHandshake(17): 0 [usable: 0], ApiVersions(18): 0 [usable: 0], 
CreateTopics(19): 0 to 1 [usable: 1], DeleteTopics(20): 0 [usable: 0]) 
(org.apache.kafka.clients.NetworkClient)
   [2018-03-15 11:54:05,315] DEBUG Handling ListOffsetResponse response for 
org.matt_test2-0. 
   ```
   Fetched offset -1, timestamp -1 
(org.apache.kafka.clients.consumer.internals.Fetcher)
   From the log, we find broker return the offset, but it's value is -1, this 
value will be removed in Fetcher.handleListOffsetResponse()
   
   ```java
   // Handle v1 and later response
   log.debug("Handling ListOffsetResponse response for {}. Fetched offset {}, 
timestamp {}",
           topicPartition, partitionData.offset, partitionData.timestamp);
   if (partitionData.offset != ListOffsetResponse.UNKNOWN_OFFSET) {
       OffsetData offsetData = new OffsetData(partitionData.offset, 
partitionData.timestamp);
       timestampOffsetMap.put(topicPartition, offsetData);
   }
   ```
   
   We test several situations, and we found that in the following two cases it 
will return none.
   
   1. The topic-partition msg number is 0, when we use offsetsForTimes() to get 
the offset, the offset will retuan -1;
   2. The targetTime we use to find offset is larger than the partition 
active_segment's largestTimestamp, the offset will return -1;
   
   If the offset is set -1, it will not be return to consumer client. I think 
in these situation, it should be return the latest offset, and it's also 
defined in kafka/core annotation.
   
   ```scala
   /**
    * Search the message offset based on timestamp.
    * This method returns an option of TimestampOffset. The offset is the 
offset of the first message whose timestamp is
    * greater than or equals to the target timestamp.
    *
    * If all the message in the segment have smaller timestamps, the returned 
offset will be last offset + 1 and the
    * timestamp will be max timestamp in the segment.
    *
    * If all the messages in the segment have larger timestamps, or no message 
in the segment has a timestamp,
    * the returned the offset will be the base offset of the segment and the 
timestamp will be Message.NoTimestamp.
    *
    * This methods only returns None when the log is not empty but we did not 
see any messages when scanning the log
    * from the indexed position. This could happen if the log is truncated 
after we get the indexed position but
    * before we scan the log from there. In this case we simply return None and 
the caller will need to check on
    * the truncated log and maybe retry or even do the search on another log 
segment.
    *
    * @param timestamp The timestamp to search for.
    * @return the timestamp and offset of the first message whose timestamp is 
larger than or equals to the
    *         target timestamp. None will be returned if there is no such 
message.
    */
   def findOffsetByTimestamp(timestamp: Long): Option[TimestampOffset] = {
     // Get the index entry with a timestamp less than or equal to the target 
timestamp
     val timestampOffset = timeIndex.lookup(timestamp)
     val position = index.lookup(timestampOffset.offset).position
     // Search the timestamp
     log.searchForTimestamp(timestamp, position)
   }
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Consumer use offsetsForTimes() get offset return None.
> ------------------------------------------------------
>
>                 Key: KAFKA-6662
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6662
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.10.2.0
>            Reporter: Matt Wang
>            Priority: Minor
>
> When we use Consumer's method  offsetsForTimes()  to get the topic-partition 
> offset, sometimes it will return null. Print the client log
> {code:java}
> // 2018-03-15 11:54:05,239] DEBUG Collector TraceCollector dispatcher loop 
> interval 256 upload 0 retry 0 fail 0 
> (com.meituan.mtrace.collector.sg.AbstractCollector)
> [2018-03-15 11:54:05,241] DEBUG Set SASL client state to INITIAL 
> (org.apache.kafka.common.security.authenticator.SaslClientAuthenticator)
> [2018-03-15 11:54:05,241] DEBUG Set SASL client state to INTERMEDIATE 
> (org.apache.kafka.common.security.authenticator.SaslClientAuthenticator)
> [2018-03-15 11:54:05,247] DEBUG Set SASL client state to COMPLETE 
> (org.apache.kafka.common.security.authenticator.SaslClientAuthenticator)
> [2018-03-15 11:54:05,247] DEBUG Initiating API versions fetch from node 53. 
> (org.apache.kafka.clients.NetworkClient)
> [2018-03-15 11:54:05,253] DEBUG Recorded API versions for node 53: 
> (Produce(0): 0 to 2 [usable: 2], Fetch(1): 0 to 3 [usable: 3], Offsets(2): 0 
> to 1 [usable: 1], Metadata(3): 0 to 2 [usable: 2], LeaderAndIsr(4): 0 
> [usable: 0], StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 3 
> [usable: 3], ControlledShutdown(7): 1 [usable: 1], OffsetCommit(8): 0 to 2 
> [usable: 2], OffsetFetch(9): 0 to 2 [usable: 2], GroupCoordinator(10): 0 
> [usable: 0], JoinGroup(11): 0 to 1 [usable: 1], Heartbeat(12): 0 [usable: 0], 
> LeaveGroup(13): 0 [usable: 0], SyncGroup(14): 0 [usable: 0], 
> DescribeGroups(15): 0 [usable: 0], ListGroups(16): 0 [usable: 0], 
> SaslHandshake(17): 0 [usable: 0], ApiVersions(18): 0 [usable: 0], 
> CreateTopics(19): 0 to 1 [usable: 1], DeleteTopics(20): 0 [usable: 0]) 
> (org.apache.kafka.clients.NetworkClient)
> [2018-03-15 11:54:05,315] DEBUG Handling ListOffsetResponse response for 
> org.matt_test2-0. Fetched offset -1, timestamp -1 
> (org.apache.kafka.clients.consumer.internals.Fetcher){code}
> From the log, we find broker return the offset, but it's value is -1, this 
> value will be removed in Fetcher.handleListOffsetResponse(),
> {code:java}
> // // Handle v1 and later response
> log.debug("Handling ListOffsetResponse response for {}. Fetched offset {}, 
> timestamp {}",
>         topicPartition, partitionData.offset, partitionData.timestamp);
> if (partitionData.offset != ListOffsetResponse.UNKNOWN_OFFSET) {
>     OffsetData offsetData = new OffsetData(partitionData.offset, 
> partitionData.timestamp);
>     timestampOffsetMap.put(topicPartition, offsetData);
> }{code}
> We test several situations, and we found that in the following two cases it 
> will return none.
>  # The topic-partition msg number is 0, when we use offsetsForTimes() to get 
> the offset, the offset will retuan -1;
>  #  The targetTime we use to find offset is larger than the partition 
> active_segment's largestTimestamp, the offset will return -1;
> If the offset is set -1, it will not be return to consumer client. I think in 
> these situation, it should be return the latest offset, and it's also defined 
> in kafka/core annotation.
> {code:java}
> // /**
>  * Search the message offset based on timestamp.
>  * This method returns an option of TimestampOffset. The offset is the offset 
> of the first message whose timestamp is
>  * greater than or equals to the target timestamp.
>  *
>  * If all the message in the segment have smaller timestamps, the returned 
> offset will be last offset + 1 and the
>  * timestamp will be max timestamp in the segment.
>  *
>  * If all the messages in the segment have larger timestamps, or no message 
> in the segment has a timestamp,
>  * the returned the offset will be the base offset of the segment and the 
> timestamp will be Message.NoTimestamp.
>  *
>  * This methods only returns None when the log is not empty but we did not 
> see any messages when scanning the log
>  * from the indexed position. This could happen if the log is truncated after 
> we get the indexed position but
>  * before we scan the log from there. In this case we simply return None and 
> the caller will need to check on
>  * the truncated log and maybe retry or even do the search on another log 
> segment.
>  *
>  * @param timestamp The timestamp to search for.
>  * @return the timestamp and offset of the first message whose timestamp is 
> larger than or equals to the
>  *         target timestamp. None will be returned if there is no such 
> message.
>  */
> def findOffsetByTimestamp(timestamp: Long): Option[TimestampOffset] = {
>   // Get the index entry with a timestamp less than or equal to the target 
> timestamp
>   val timestampOffset = timeIndex.lookup(timestamp)
>   val position = index.lookup(timestampOffset.offset).position
>   // Search the timestamp
>   log.searchForTimestamp(timestamp, position)
> }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to