[jira] [Resolved] (KAFKA-6662) Consumer use offsetsForTimes() get offset return None.
[ https://issues.apache.org/jira/browse/KAFKA-6662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matt Wang resolved KAFKA-6662. -- Resolution: Not A Problem > Consumer use offsetsForTimes() get offset return None. > -- > > Key: KAFKA-6662 > URL: https://issues.apache.org/jira/browse/KAFKA-6662 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.10.2.0 >Reporter: Matt Wang >Priority: Minor > > When we use Consumer's method offsetsForTimes() to get the topic-partition > offset, sometimes it will return null. Print the client log > {code:java} > // 2018-03-15 11:54:05,239] DEBUG Collector TraceCollector dispatcher loop > interval 256 upload 0 retry 0 fail 0 > (com.meituan.mtrace.collector.sg.AbstractCollector) > [2018-03-15 11:54:05,241] DEBUG Set SASL client state to INITIAL > (org.apache.kafka.common.security.authenticator.SaslClientAuthenticator) > [2018-03-15 11:54:05,241] DEBUG Set SASL client state to INTERMEDIATE > (org.apache.kafka.common.security.authenticator.SaslClientAuthenticator) > [2018-03-15 11:54:05,247] DEBUG Set SASL client state to COMPLETE > (org.apache.kafka.common.security.authenticator.SaslClientAuthenticator) > [2018-03-15 11:54:05,247] DEBUG Initiating API versions fetch from node 53. > (org.apache.kafka.clients.NetworkClient) > [2018-03-15 11:54:05,253] DEBUG Recorded API versions for node 53: > (Produce(0): 0 to 2 [usable: 2], Fetch(1): 0 to 3 [usable: 3], Offsets(2): 0 > to 1 [usable: 1], Metadata(3): 0 to 2 [usable: 2], LeaderAndIsr(4): 0 > [usable: 0], StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 3 > [usable: 3], ControlledShutdown(7): 1 [usable: 1], OffsetCommit(8): 0 to 2 > [usable: 2], OffsetFetch(9): 0 to 2 [usable: 2], GroupCoordinator(10): 0 > [usable: 0], JoinGroup(11): 0 to 1 [usable: 1], Heartbeat(12): 0 [usable: 0], > LeaveGroup(13): 0 [usable: 0], SyncGroup(14): 0 [usable: 0], > DescribeGroups(15): 0 [usable: 0], ListGroups(16): 0 [usable: 0], > SaslHandshake(17): 0 [usable: 0], ApiVersions(18): 0 [usable: 0], > CreateTopics(19): 0 to 1 [usable: 1], DeleteTopics(20): 0 [usable: 0]) > (org.apache.kafka.clients.NetworkClient) > [2018-03-15 11:54:05,315] DEBUG Handling ListOffsetResponse response for > org.matt_test2-0. Fetched offset -1, timestamp -1 > (org.apache.kafka.clients.consumer.internals.Fetcher){code} > From the log, we find broker return the offset, but it's value is -1, this > value will be removed in Fetcher.handleListOffsetResponse(), > {code:java} > // // Handle v1 and later response > log.debug("Handling ListOffsetResponse response for {}. Fetched offset {}, > timestamp {}", > topicPartition, partitionData.offset, partitionData.timestamp); > if (partitionData.offset != ListOffsetResponse.UNKNOWN_OFFSET) { > OffsetData offsetData = new OffsetData(partitionData.offset, > partitionData.timestamp); > timestampOffsetMap.put(topicPartition, offsetData); > }{code} > We test several situations, and we found that in the following two cases it > will return none. > # The topic-partition msg number is 0, when we use offsetsForTimes() to get > the offset, the offset will retuan -1; > # The targetTime we use to find offset is larger than the partition > active_segment's largestTimestamp, the offset will return -1; > If the offset is set -1, it will not be return to consumer client. I think in > these situation, it should be return the latest offset, and it's also defined > in kafka/core annotation. > {code:java} > // /** > * Search the message offset based on timestamp. > * This method returns an option of TimestampOffset. The offset is the offset > of the first message whose timestamp is > * greater than or equals to the target timestamp. > * > * If all the message in the segment have smaller timestamps, the returned > offset will be last offset + 1 and the > * timestamp will be max timestamp in the segment. > * > * If all the messages in the segment have larger timestamps, or no message > in the segment has a timestamp, > * the returned the offset will be the base offset of the segment and the > timestamp will be Message.NoTimestamp. > * > * This methods only returns None when the log is not empty but we did not > see any messages when scanning the log > * from the indexed position. This could happen if the log is truncated after > we get the indexed position but > * before we scan the log from there. In this case we simply return None and > the caller will need to check on > * the truncated log and maybe retry or even do the search on another log > segment. > * > * @param timestamp The timestamp to search for. > * @return the timestamp and offset of the first message whose timestamp is > larger than or equals to the > * target timestamp. None
[jira] [Created] (KAFKA-7094) Variate should unify code style in one method, and use camel name
Matt Wang created KAFKA-7094: Summary: Variate should unify code style in one method, and use camel name Key: KAFKA-7094 URL: https://issues.apache.org/jira/browse/KAFKA-7094 Project: Kafka Issue Type: Bug Components: core Affects Versions: 1.0.1 Reporter: Matt Wang In one method, there are two variates, partitionsTobeLeader and partitionsToBeFollower, which should use unify code style, that will be helpful to code maintenance. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6662) Consumer use offsetsForTimes() get offset return None.
Matt Wang created KAFKA-6662: Summary: Consumer use offsetsForTimes() get offset return None. Key: KAFKA-6662 URL: https://issues.apache.org/jira/browse/KAFKA-6662 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.10.2.0 Reporter: Matt Wang When we use Consumer's method offsetsForTimes() to get the topic-partition offset, sometimes it will return null. Print the client log {code:java} // 2018-03-15 11:54:05,239] DEBUG Collector TraceCollector dispatcher loop interval 256 upload 0 retry 0 fail 0 (com.meituan.mtrace.collector.sg.AbstractCollector) [2018-03-15 11:54:05,241] DEBUG Set SASL client state to INITIAL (org.apache.kafka.common.security.authenticator.SaslClientAuthenticator) [2018-03-15 11:54:05,241] DEBUG Set SASL client state to INTERMEDIATE (org.apache.kafka.common.security.authenticator.SaslClientAuthenticator) [2018-03-15 11:54:05,247] DEBUG Set SASL client state to COMPLETE (org.apache.kafka.common.security.authenticator.SaslClientAuthenticator) [2018-03-15 11:54:05,247] DEBUG Initiating API versions fetch from node 53. (org.apache.kafka.clients.NetworkClient) [2018-03-15 11:54:05,253] DEBUG Recorded API versions for node 53: (Produce(0): 0 to 2 [usable: 2], Fetch(1): 0 to 3 [usable: 3], Offsets(2): 0 to 1 [usable: 1], Metadata(3): 0 to 2 [usable: 2], LeaderAndIsr(4): 0 [usable: 0], StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 3 [usable: 3], ControlledShutdown(7): 1 [usable: 1], OffsetCommit(8): 0 to 2 [usable: 2], OffsetFetch(9): 0 to 2 [usable: 2], GroupCoordinator(10): 0 [usable: 0], JoinGroup(11): 0 to 1 [usable: 1], Heartbeat(12): 0 [usable: 0], LeaveGroup(13): 0 [usable: 0], SyncGroup(14): 0 [usable: 0], DescribeGroups(15): 0 [usable: 0], ListGroups(16): 0 [usable: 0], SaslHandshake(17): 0 [usable: 0], ApiVersions(18): 0 [usable: 0], CreateTopics(19): 0 to 1 [usable: 1], DeleteTopics(20): 0 [usable: 0]) (org.apache.kafka.clients.NetworkClient) [2018-03-15 11:54:05,315] DEBUG Handling ListOffsetResponse response for org.matt_test2-0. Fetched offset -1, timestamp -1 (org.apache.kafka.clients.consumer.internals.Fetcher){code} >From the log, we find broker return the offset, but it's value is -1, this >value will be removed in Fetcher.handleListOffsetResponse(), {code:java} // // Handle v1 and later response log.debug("Handling ListOffsetResponse response for {}. Fetched offset {}, timestamp {}", topicPartition, partitionData.offset, partitionData.timestamp); if (partitionData.offset != ListOffsetResponse.UNKNOWN_OFFSET) { OffsetData offsetData = new OffsetData(partitionData.offset, partitionData.timestamp); timestampOffsetMap.put(topicPartition, offsetData); }{code} We test several situations, and we found that in the following two cases it will return none. # The topic-partition msg number is 0, when we use offsetsForTimes() to get the offset, the offset will retuan -1; # The targetTime we use to find offset is larger than the partition active_segment's largestTimestamp, the offset will return -1; If the offset is set -1, it will not be return to consumer client. I think in these situation, it should be return the latest offset, and it's also defined in kafka/core annotation. {code:java} // /** * Search the message offset based on timestamp. * This method returns an option of TimestampOffset. The offset is the offset of the first message whose timestamp is * greater than or equals to the target timestamp. * * If all the message in the segment have smaller timestamps, the returned offset will be last offset + 1 and the * timestamp will be max timestamp in the segment. * * If all the messages in the segment have larger timestamps, or no message in the segment has a timestamp, * the returned the offset will be the base offset of the segment and the timestamp will be Message.NoTimestamp. * * This methods only returns None when the log is not empty but we did not see any messages when scanning the log * from the indexed position. This could happen if the log is truncated after we get the indexed position but * before we scan the log from there. In this case we simply return None and the caller will need to check on * the truncated log and maybe retry or even do the search on another log segment. * * @param timestamp The timestamp to search for. * @return the timestamp and offset of the first message whose timestamp is larger than or equals to the * target timestamp. None will be returned if there is no such message. */ def findOffsetByTimestamp(timestamp: Long): Option[TimestampOffset] = { // Get the index entry with a timestamp less than or equal to the target timestamp val timestampOffset = timeIndex.lookup(timestamp) val position = index.lookup(timestampOffset.offset).position // Search the timestamp log.searchForTimestamp(timestamp,
[jira] [Resolved] (KAFKA-4328) The parameters for creating the ZkUtils object is reverse
[ https://issues.apache.org/jira/browse/KAFKA-4328?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matt Wang resolved KAFKA-4328. -- Resolution: Fixed > The parameters for creating the ZkUtils object is reverse > - > > Key: KAFKA-4328 > URL: https://issues.apache.org/jira/browse/KAFKA-4328 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.10.0.0, 0.10.0.1 > Environment: software platform >Reporter: Matt Wang > Labels: patch > Original Estimate: 24h > Remaining Estimate: 24h > > When creating the ZkUtils object, the parameters of zkSessionTimeOutMs and > zkConnectionTimeoutMs is reverse. Though the default values of these > parameters are both 6000, it will have some problem, especially when we want > to reset these values. > The pull requests address is: > https://github.com/apache/kafka/pull/1646 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-4329) The order of the parameters for creating the ZkUtils object is reversed
[ https://issues.apache.org/jira/browse/KAFKA-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matt Wang resolved KAFKA-4329. -- Resolution: Resolved Reviewer: Ismael Juma (was: Guozhang Wang) Using named parameters to avoid these problems. > The order of the parameters for creating the ZkUtils object is reversed > --- > > Key: KAFKA-4329 > URL: https://issues.apache.org/jira/browse/KAFKA-4329 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.10.0.0, 0.10.0.1 > Environment: software platform >Reporter: Matt Wang >Priority: Critical > Labels: patch > Original Estimate: 24h > Remaining Estimate: 24h > > When creating the ZkUtils object, the parameters of zkSessionTimeOutMs and > zkConnectionTimeoutMs is reverse. Though the default values of these > parameters are both 6000, it will have some problems, especially when we want > to reset these values. > The pull requests address is: > https://github.com/apache/kafka/pull/1646 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-4329) The order of the parameters for creating the ZkUtils object is reversed
[ https://issues.apache.org/jira/browse/KAFKA-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matt Wang updated KAFKA-4329: - Summary: The order of the parameters for creating the ZkUtils object is reversed (was: The parameters for creating the ZkUtils object is reverse) > The order of the parameters for creating the ZkUtils object is reversed > --- > > Key: KAFKA-4329 > URL: https://issues.apache.org/jira/browse/KAFKA-4329 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.10.0.0, 0.10.0.1 > Environment: software platform >Reporter: Matt Wang >Priority: Critical > Labels: patch > Original Estimate: 24h > Remaining Estimate: 24h > > When creating the ZkUtils object, the parameters of zkSessionTimeOutMs and > zkConnectionTimeoutMs is reverse. Though the default values of these > parameters are both 6000, it will have some problems, especially when we want > to reset these values. > The pull requests address is: > https://github.com/apache/kafka/pull/1646 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-4329) The parameters for creating the ZkUtils object is reverse
[ https://issues.apache.org/jira/browse/KAFKA-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matt Wang updated KAFKA-4329: - Description: When creating the ZkUtils object, the parameters of zkSessionTimeOutMs and zkConnectionTimeoutMs is reverse. Though the default values of these parameters are both 6000, it will have some problems, especially when we want to reset these values. The pull requests address is: https://github.com/apache/kafka/pull/1646 was: When creating the ZkUtils object, the parameters of zkSessionTimeOutMs and zkConnectionTimeoutMs is reverse. Though the default values of these parameters are both 6000, it will have some problem, especially when we want to reset these values. The pull requests address is: https://github.com/apache/kafka/pull/1646 > The parameters for creating the ZkUtils object is reverse > - > > Key: KAFKA-4329 > URL: https://issues.apache.org/jira/browse/KAFKA-4329 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.10.0.0, 0.10.0.1 > Environment: software platform >Reporter: Matt Wang >Priority: Critical > Labels: patch > Original Estimate: 24h > Remaining Estimate: 24h > > When creating the ZkUtils object, the parameters of zkSessionTimeOutMs and > zkConnectionTimeoutMs is reverse. Though the default values of these > parameters are both 6000, it will have some problems, especially when we want > to reset these values. > The pull requests address is: > https://github.com/apache/kafka/pull/1646 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-4329) The parameters for creating the ZkUtils object is reverse
Matt Wang created KAFKA-4329: Summary: The parameters for creating the ZkUtils object is reverse Key: KAFKA-4329 URL: https://issues.apache.org/jira/browse/KAFKA-4329 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.10.0.1, 0.10.0.0 Environment: software platform Reporter: Matt Wang Priority: Critical When creating the ZkUtils object, the parameters of zkSessionTimeOutMs and zkConnectionTimeoutMs is reverse. Though the default values of these parameters are both 6000, it will have some problem, especially when we want to reset these values. The pull requests address is: https://github.com/apache/kafka/pull/1646 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-4328) The parameters for creating the ZkUtils object is reverse
Matt Wang created KAFKA-4328: Summary: The parameters for creating the ZkUtils object is reverse Key: KAFKA-4328 URL: https://issues.apache.org/jira/browse/KAFKA-4328 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.10.0.1, 0.10.0.0 Environment: software platform Reporter: Matt Wang When creating the ZkUtils object, the parameters of zkSessionTimeOutMs and zkConnectionTimeoutMs is reverse. Though the default values of these parameters are both 6000, it will have some problem, especially when we want to reset these values. The pull requests address is: https://github.com/apache/kafka/pull/1646 -- This message was sent by Atlassian JIRA (v6.3.4#6332)