[
https://issues.apache.org/jira/browse/KAFKA-6662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16399981#comment-16399981
]
Matt Wang commented on KAFKA-6662:
--
the pr is:https://github.com/apache/kafka/pull/4717
> Consumer use offsetsForTimes() get offset return None.
> --
>
> Key: KAFKA-6662
> URL: https://issues.apache.org/jira/browse/KAFKA-6662
> Project: Kafka
> Issue Type: Bug
> Components: core
>Affects Versions: 0.10.2.0
>Reporter: Matt Wang
>Priority: Minor
>
> When we use Consumer's method offsetsForTimes() to get the topic-partition
> offset, sometimes it will return null. Print the client log
> {code:java}
> // 2018-03-15 11:54:05,239] DEBUG Collector TraceCollector dispatcher loop
> interval 256 upload 0 retry 0 fail 0
> (com.meituan.mtrace.collector.sg.AbstractCollector)
> [2018-03-15 11:54:05,241] DEBUG Set SASL client state to INITIAL
> (org.apache.kafka.common.security.authenticator.SaslClientAuthenticator)
> [2018-03-15 11:54:05,241] DEBUG Set SASL client state to INTERMEDIATE
> (org.apache.kafka.common.security.authenticator.SaslClientAuthenticator)
> [2018-03-15 11:54:05,247] DEBUG Set SASL client state to COMPLETE
> (org.apache.kafka.common.security.authenticator.SaslClientAuthenticator)
> [2018-03-15 11:54:05,247] DEBUG Initiating API versions fetch from node 53.
> (org.apache.kafka.clients.NetworkClient)
> [2018-03-15 11:54:05,253] DEBUG Recorded API versions for node 53:
> (Produce(0): 0 to 2 [usable: 2], Fetch(1): 0 to 3 [usable: 3], Offsets(2): 0
> to 1 [usable: 1], Metadata(3): 0 to 2 [usable: 2], LeaderAndIsr(4): 0
> [usable: 0], StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 3
> [usable: 3], ControlledShutdown(7): 1 [usable: 1], OffsetCommit(8): 0 to 2
> [usable: 2], OffsetFetch(9): 0 to 2 [usable: 2], GroupCoordinator(10): 0
> [usable: 0], JoinGroup(11): 0 to 1 [usable: 1], Heartbeat(12): 0 [usable: 0],
> LeaveGroup(13): 0 [usable: 0], SyncGroup(14): 0 [usable: 0],
> DescribeGroups(15): 0 [usable: 0], ListGroups(16): 0 [usable: 0],
> SaslHandshake(17): 0 [usable: 0], ApiVersions(18): 0 [usable: 0],
> CreateTopics(19): 0 to 1 [usable: 1], DeleteTopics(20): 0 [usable: 0])
> (org.apache.kafka.clients.NetworkClient)
> [2018-03-15 11:54:05,315] DEBUG Handling ListOffsetResponse response for
> org.matt_test2-0. Fetched offset -1, timestamp -1
> (org.apache.kafka.clients.consumer.internals.Fetcher){code}
> From the log, we find broker return the offset, but it's value is -1, this
> value will be removed in Fetcher.handleListOffsetResponse(),
> {code:java}
> // // Handle v1 and later response
> log.debug("Handling ListOffsetResponse response for {}. Fetched offset {},
> timestamp {}",
> topicPartition, partitionData.offset, partitionData.timestamp);
> if (partitionData.offset != ListOffsetResponse.UNKNOWN_OFFSET) {
> OffsetData offsetData = new OffsetData(partitionData.offset,
> partitionData.timestamp);
> timestampOffsetMap.put(topicPartition, offsetData);
> }{code}
> We test several situations, and we found that in the following two cases it
> will return none.
> # The topic-partition msg number is 0, when we use offsetsForTimes() to get
> the offset, the offset will retuan -1;
> # The targetTime we use to find offset is larger than the partition
> active_segment's largestTimestamp, the offset will return -1;
> If the offset is set -1, it will not be return to consumer client. I think in
> these situation, it should be return the latest offset, and it's also defined
> in kafka/core annotation.
> {code:java}
> // /**
> * Search the message offset based on timestamp.
> * This method returns an option of TimestampOffset. The offset is the offset
> of the first message whose timestamp is
> * greater than or equals to the target timestamp.
> *
> * If all the message in the segment have smaller timestamps, the returned
> offset will be last offset + 1 and the
> * timestamp will be max timestamp in the segment.
> *
> * If all the messages in the segment have larger timestamps, or no message
> in the segment has a timestamp,
> * the returned the offset will be the base offset of the segment and the
> timestamp will be Message.NoTimestamp.
> *
> * This methods only returns None when the log is not empty but we did not
> see any messages when scanning the log
> * from the indexed position. This could happen if the log is truncated after
> we get the indexed position but
> * before we scan the log from there. In this case we simply return None and
> the caller will need to check on
> * the truncated log and maybe retry or even do the search on another log
> segment.
> *
> * @param timestamp The timestamp to search for.
> * @return the timestamp and offset of the first message whose timestamp is
> larger