[ https://issues.apache.org/jira/browse/KAFKA-6662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Matt Wang resolved KAFKA-6662. ------------------------------ Resolution: Not A Problem > Consumer use offsetsForTimes() get offset return None. > ------------------------------------------------------ > > Key: KAFKA-6662 > URL: https://issues.apache.org/jira/browse/KAFKA-6662 > Project: Kafka > Issue Type: Bug > Components: core > Affects Versions: 0.10.2.0 > Reporter: Matt Wang > Priority: Minor > > When we use Consumer's method offsetsForTimes() to get the topic-partition > offset, sometimes it will return null. Print the client log > {code:java} > // 2018-03-15 11:54:05,239] DEBUG Collector TraceCollector dispatcher loop > interval 256 upload 0 retry 0 fail 0 > (com.meituan.mtrace.collector.sg.AbstractCollector) > [2018-03-15 11:54:05,241] DEBUG Set SASL client state to INITIAL > (org.apache.kafka.common.security.authenticator.SaslClientAuthenticator) > [2018-03-15 11:54:05,241] DEBUG Set SASL client state to INTERMEDIATE > (org.apache.kafka.common.security.authenticator.SaslClientAuthenticator) > [2018-03-15 11:54:05,247] DEBUG Set SASL client state to COMPLETE > (org.apache.kafka.common.security.authenticator.SaslClientAuthenticator) > [2018-03-15 11:54:05,247] DEBUG Initiating API versions fetch from node 53. > (org.apache.kafka.clients.NetworkClient) > [2018-03-15 11:54:05,253] DEBUG Recorded API versions for node 53: > (Produce(0): 0 to 2 [usable: 2], Fetch(1): 0 to 3 [usable: 3], Offsets(2): 0 > to 1 [usable: 1], Metadata(3): 0 to 2 [usable: 2], LeaderAndIsr(4): 0 > [usable: 0], StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 3 > [usable: 3], ControlledShutdown(7): 1 [usable: 1], OffsetCommit(8): 0 to 2 > [usable: 2], OffsetFetch(9): 0 to 2 [usable: 2], GroupCoordinator(10): 0 > [usable: 0], JoinGroup(11): 0 to 1 [usable: 1], Heartbeat(12): 0 [usable: 0], > LeaveGroup(13): 0 [usable: 0], SyncGroup(14): 0 [usable: 0], > DescribeGroups(15): 0 [usable: 0], ListGroups(16): 0 [usable: 0], > SaslHandshake(17): 0 [usable: 0], ApiVersions(18): 0 [usable: 0], > CreateTopics(19): 0 to 1 [usable: 1], DeleteTopics(20): 0 [usable: 0]) > (org.apache.kafka.clients.NetworkClient) > [2018-03-15 11:54:05,315] DEBUG Handling ListOffsetResponse response for > org.matt_test2-0. Fetched offset -1, timestamp -1 > (org.apache.kafka.clients.consumer.internals.Fetcher){code} > From the log, we find broker return the offset, but it's value is -1, this > value will be removed in Fetcher.handleListOffsetResponse(), > {code:java} > // // Handle v1 and later response > log.debug("Handling ListOffsetResponse response for {}. Fetched offset {}, > timestamp {}", > topicPartition, partitionData.offset, partitionData.timestamp); > if (partitionData.offset != ListOffsetResponse.UNKNOWN_OFFSET) { > OffsetData offsetData = new OffsetData(partitionData.offset, > partitionData.timestamp); > timestampOffsetMap.put(topicPartition, offsetData); > }{code} > We test several situations, and we found that in the following two cases it > will return none. > # The topic-partition msg number is 0, when we use offsetsForTimes() to get > the offset, the offset will retuan -1; > # The targetTime we use to find offset is larger than the partition > active_segment's largestTimestamp, the offset will return -1; > If the offset is set -1, it will not be return to consumer client. I think in > these situation, it should be return the latest offset, and it's also defined > in kafka/core annotation. > {code:java} > // /** > * Search the message offset based on timestamp. > * This method returns an option of TimestampOffset. The offset is the offset > of the first message whose timestamp is > * greater than or equals to the target timestamp. > * > * If all the message in the segment have smaller timestamps, the returned > offset will be last offset + 1 and the > * timestamp will be max timestamp in the segment. > * > * If all the messages in the segment have larger timestamps, or no message > in the segment has a timestamp, > * the returned the offset will be the base offset of the segment and the > timestamp will be Message.NoTimestamp. > * > * This methods only returns None when the log is not empty but we did not > see any messages when scanning the log > * from the indexed position. This could happen if the log is truncated after > we get the indexed position but > * before we scan the log from there. In this case we simply return None and > the caller will need to check on > * the truncated log and maybe retry or even do the search on another log > segment. > * > * @param timestamp The timestamp to search for. > * @return the timestamp and offset of the first message whose timestamp is > larger than or equals to the > * target timestamp. None will be returned if there is no such > message. > */ > def findOffsetByTimestamp(timestamp: Long): Option[TimestampOffset] = { > // Get the index entry with a timestamp less than or equal to the target > timestamp > val timestampOffset = timeIndex.lookup(timestamp) > val position = index.lookup(timestampOffset.offset).position > // Search the timestamp > log.searchForTimestamp(timestamp, position) > } > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)