[jira] [Resolved] (KAFKA-6662) Consumer use offsetsForTimes() get offset return None.

2018-08-03 Thread Matt Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matt Wang resolved KAFKA-6662.
--
Resolution: Not A Problem

> Consumer use offsetsForTimes() get offset return None.
> --
>
> Key: KAFKA-6662
> URL: https://issues.apache.org/jira/browse/KAFKA-6662
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.2.0
>Reporter: Matt Wang
>Priority: Minor
>
> When we use Consumer's method  offsetsForTimes()  to get the topic-partition 
> offset, sometimes it will return null. Print the client log
> {code:java}
> // 2018-03-15 11:54:05,239] DEBUG Collector TraceCollector dispatcher loop 
> interval 256 upload 0 retry 0 fail 0 
> (com.meituan.mtrace.collector.sg.AbstractCollector)
> [2018-03-15 11:54:05,241] DEBUG Set SASL client state to INITIAL 
> (org.apache.kafka.common.security.authenticator.SaslClientAuthenticator)
> [2018-03-15 11:54:05,241] DEBUG Set SASL client state to INTERMEDIATE 
> (org.apache.kafka.common.security.authenticator.SaslClientAuthenticator)
> [2018-03-15 11:54:05,247] DEBUG Set SASL client state to COMPLETE 
> (org.apache.kafka.common.security.authenticator.SaslClientAuthenticator)
> [2018-03-15 11:54:05,247] DEBUG Initiating API versions fetch from node 53. 
> (org.apache.kafka.clients.NetworkClient)
> [2018-03-15 11:54:05,253] DEBUG Recorded API versions for node 53: 
> (Produce(0): 0 to 2 [usable: 2], Fetch(1): 0 to 3 [usable: 3], Offsets(2): 0 
> to 1 [usable: 1], Metadata(3): 0 to 2 [usable: 2], LeaderAndIsr(4): 0 
> [usable: 0], StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 3 
> [usable: 3], ControlledShutdown(7): 1 [usable: 1], OffsetCommit(8): 0 to 2 
> [usable: 2], OffsetFetch(9): 0 to 2 [usable: 2], GroupCoordinator(10): 0 
> [usable: 0], JoinGroup(11): 0 to 1 [usable: 1], Heartbeat(12): 0 [usable: 0], 
> LeaveGroup(13): 0 [usable: 0], SyncGroup(14): 0 [usable: 0], 
> DescribeGroups(15): 0 [usable: 0], ListGroups(16): 0 [usable: 0], 
> SaslHandshake(17): 0 [usable: 0], ApiVersions(18): 0 [usable: 0], 
> CreateTopics(19): 0 to 1 [usable: 1], DeleteTopics(20): 0 [usable: 0]) 
> (org.apache.kafka.clients.NetworkClient)
> [2018-03-15 11:54:05,315] DEBUG Handling ListOffsetResponse response for 
> org.matt_test2-0. Fetched offset -1, timestamp -1 
> (org.apache.kafka.clients.consumer.internals.Fetcher){code}
> From the log, we find broker return the offset, but it's value is -1, this 
> value will be removed in Fetcher.handleListOffsetResponse(),
> {code:java}
> // // Handle v1 and later response
> log.debug("Handling ListOffsetResponse response for {}. Fetched offset {}, 
> timestamp {}",
> topicPartition, partitionData.offset, partitionData.timestamp);
> if (partitionData.offset != ListOffsetResponse.UNKNOWN_OFFSET) {
> OffsetData offsetData = new OffsetData(partitionData.offset, 
> partitionData.timestamp);
> timestampOffsetMap.put(topicPartition, offsetData);
> }{code}
> We test several situations, and we found that in the following two cases it 
> will return none.
>  # The topic-partition msg number is 0, when we use offsetsForTimes() to get 
> the offset, the offset will retuan -1;
>  #  The targetTime we use to find offset is larger than the partition 
> active_segment's largestTimestamp, the offset will return -1;
> If the offset is set -1, it will not be return to consumer client. I think in 
> these situation, it should be return the latest offset, and it's also defined 
> in kafka/core annotation.
> {code:java}
> // /**
>  * Search the message offset based on timestamp.
>  * This method returns an option of TimestampOffset. The offset is the offset 
> of the first message whose timestamp is
>  * greater than or equals to the target timestamp.
>  *
>  * If all the message in the segment have smaller timestamps, the returned 
> offset will be last offset + 1 and the
>  * timestamp will be max timestamp in the segment.
>  *
>  * If all the messages in the segment have larger timestamps, or no message 
> in the segment has a timestamp,
>  * the returned the offset will be the base offset of the segment and the 
> timestamp will be Message.NoTimestamp.
>  *
>  * This methods only returns None when the log is not empty but we did not 
> see any messages when scanning the log
>  * from the indexed position. This could happen if the log is truncated after 
> we get the indexed position but
>  * before we scan the log from there. In this case we simply return None and 
> the caller will need to check on
>  * the truncated log and maybe retry or even do the search on another log 
> segment.
>  *
>  * @param timestamp The timestamp to search for.
>  * @return the timestamp and offset of the first message whose timestamp is 
> larger than or equals to the
>  * target timestamp. None 

[jira] [Created] (KAFKA-7094) Variate should unify code style in one method, and use camel name

2018-06-25 Thread Matt Wang (JIRA)
Matt Wang created KAFKA-7094:


 Summary: Variate should unify code style in one method, and use  
camel name
 Key: KAFKA-7094
 URL: https://issues.apache.org/jira/browse/KAFKA-7094
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 1.0.1
Reporter: Matt Wang


In one method, there are two variates, partitionsTobeLeader and 
partitionsToBeFollower, which should use unify code style, that will be helpful 
to code maintenance.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6662) Consumer use offsetsForTimes() get offset return None.

2018-03-15 Thread Matt Wang (JIRA)
Matt Wang created KAFKA-6662:


 Summary: Consumer use offsetsForTimes() get offset return None.
 Key: KAFKA-6662
 URL: https://issues.apache.org/jira/browse/KAFKA-6662
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.10.2.0
Reporter: Matt Wang


When we use Consumer's method  offsetsForTimes()  to get the topic-partition 
offset, sometimes it will return null. Print the client log
{code:java}
// 2018-03-15 11:54:05,239] DEBUG Collector TraceCollector dispatcher loop 
interval 256 upload 0 retry 0 fail 0 
(com.meituan.mtrace.collector.sg.AbstractCollector)
[2018-03-15 11:54:05,241] DEBUG Set SASL client state to INITIAL 
(org.apache.kafka.common.security.authenticator.SaslClientAuthenticator)
[2018-03-15 11:54:05,241] DEBUG Set SASL client state to INTERMEDIATE 
(org.apache.kafka.common.security.authenticator.SaslClientAuthenticator)
[2018-03-15 11:54:05,247] DEBUG Set SASL client state to COMPLETE 
(org.apache.kafka.common.security.authenticator.SaslClientAuthenticator)
[2018-03-15 11:54:05,247] DEBUG Initiating API versions fetch from node 53. 
(org.apache.kafka.clients.NetworkClient)
[2018-03-15 11:54:05,253] DEBUG Recorded API versions for node 53: (Produce(0): 
0 to 2 [usable: 2], Fetch(1): 0 to 3 [usable: 3], Offsets(2): 0 to 1 [usable: 
1], Metadata(3): 0 to 2 [usable: 2], LeaderAndIsr(4): 0 [usable: 0], 
StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 3 [usable: 3], 
ControlledShutdown(7): 1 [usable: 1], OffsetCommit(8): 0 to 2 [usable: 2], 
OffsetFetch(9): 0 to 2 [usable: 2], GroupCoordinator(10): 0 [usable: 0], 
JoinGroup(11): 0 to 1 [usable: 1], Heartbeat(12): 0 [usable: 0], 
LeaveGroup(13): 0 [usable: 0], SyncGroup(14): 0 [usable: 0], 
DescribeGroups(15): 0 [usable: 0], ListGroups(16): 0 [usable: 0], 
SaslHandshake(17): 0 [usable: 0], ApiVersions(18): 0 [usable: 0], 
CreateTopics(19): 0 to 1 [usable: 1], DeleteTopics(20): 0 [usable: 0]) 
(org.apache.kafka.clients.NetworkClient)
[2018-03-15 11:54:05,315] DEBUG Handling ListOffsetResponse response for 
org.matt_test2-0. Fetched offset -1, timestamp -1 
(org.apache.kafka.clients.consumer.internals.Fetcher){code}
>From the log, we find broker return the offset, but it's value is -1, this 
>value will be removed in Fetcher.handleListOffsetResponse(),
{code:java}
// // Handle v1 and later response
log.debug("Handling ListOffsetResponse response for {}. Fetched offset {}, 
timestamp {}",
topicPartition, partitionData.offset, partitionData.timestamp);
if (partitionData.offset != ListOffsetResponse.UNKNOWN_OFFSET) {
OffsetData offsetData = new OffsetData(partitionData.offset, 
partitionData.timestamp);
timestampOffsetMap.put(topicPartition, offsetData);
}{code}
We test several situations, and we found that in the following two cases it 
will return none.
 # The topic-partition msg number is 0, when we use offsetsForTimes() to get 
the offset, the offset will retuan -1;
 #  The targetTime we use to find offset is larger than the partition 
active_segment's largestTimestamp, the offset will return -1;

If the offset is set -1, it will not be return to consumer client. I think in 
these situation, it should be return the latest offset, and it's also defined 
in kafka/core annotation.
{code:java}
// /**
 * Search the message offset based on timestamp.
 * This method returns an option of TimestampOffset. The offset is the offset 
of the first message whose timestamp is
 * greater than or equals to the target timestamp.
 *
 * If all the message in the segment have smaller timestamps, the returned 
offset will be last offset + 1 and the
 * timestamp will be max timestamp in the segment.
 *
 * If all the messages in the segment have larger timestamps, or no message in 
the segment has a timestamp,
 * the returned the offset will be the base offset of the segment and the 
timestamp will be Message.NoTimestamp.
 *
 * This methods only returns None when the log is not empty but we did not see 
any messages when scanning the log
 * from the indexed position. This could happen if the log is truncated after 
we get the indexed position but
 * before we scan the log from there. In this case we simply return None and 
the caller will need to check on
 * the truncated log and maybe retry or even do the search on another log 
segment.
 *
 * @param timestamp The timestamp to search for.
 * @return the timestamp and offset of the first message whose timestamp is 
larger than or equals to the
 * target timestamp. None will be returned if there is no such message.
 */
def findOffsetByTimestamp(timestamp: Long): Option[TimestampOffset] = {
  // Get the index entry with a timestamp less than or equal to the target 
timestamp
  val timestampOffset = timeIndex.lookup(timestamp)
  val position = index.lookup(timestampOffset.offset).position
  // Search the timestamp
  log.searchForTimestamp(timestamp, 

[jira] [Resolved] (KAFKA-4328) The parameters for creating the ZkUtils object is reverse

2016-11-09 Thread Matt Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4328?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matt Wang resolved KAFKA-4328.
--
Resolution: Fixed

> The parameters for creating the ZkUtils object is reverse
> -
>
> Key: KAFKA-4328
> URL: https://issues.apache.org/jira/browse/KAFKA-4328
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.0.0, 0.10.0.1
> Environment: software platform
>Reporter: Matt Wang
>  Labels: patch
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> When creating the ZkUtils object, the parameters of zkSessionTimeOutMs and 
> zkConnectionTimeoutMs is reverse. Though the default values of these 
> parameters are both 6000, it will have some problem, especially when we want 
> to reset these values. 
> The pull requests address is:
> https://github.com/apache/kafka/pull/1646



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-4329) The order of the parameters for creating the ZkUtils object is reversed

2016-10-27 Thread Matt Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matt Wang resolved KAFKA-4329.
--
Resolution: Resolved
  Reviewer: Ismael Juma  (was: Guozhang Wang)

 Using named parameters to avoid these problems.

> The order of the parameters for creating the ZkUtils object is reversed
> ---
>
> Key: KAFKA-4329
> URL: https://issues.apache.org/jira/browse/KAFKA-4329
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.0.0, 0.10.0.1
> Environment: software platform
>Reporter: Matt Wang
>Priority: Critical
>  Labels: patch
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> When creating the ZkUtils object, the parameters of zkSessionTimeOutMs and 
> zkConnectionTimeoutMs is reverse. Though the default values of these 
> parameters are both 6000, it will have some problems, especially when we want 
> to reset these values. 
> The pull requests address is:
> https://github.com/apache/kafka/pull/1646



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4329) The order of the parameters for creating the ZkUtils object is reversed

2016-10-21 Thread Matt Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matt Wang updated KAFKA-4329:
-
Summary: The order of the parameters for creating the ZkUtils object is 
reversed  (was: The parameters for creating the ZkUtils object is reverse)

> The order of the parameters for creating the ZkUtils object is reversed
> ---
>
> Key: KAFKA-4329
> URL: https://issues.apache.org/jira/browse/KAFKA-4329
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.0.0, 0.10.0.1
> Environment: software platform
>Reporter: Matt Wang
>Priority: Critical
>  Labels: patch
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> When creating the ZkUtils object, the parameters of zkSessionTimeOutMs and 
> zkConnectionTimeoutMs is reverse. Though the default values of these 
> parameters are both 6000, it will have some problems, especially when we want 
> to reset these values. 
> The pull requests address is:
> https://github.com/apache/kafka/pull/1646



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4329) The parameters for creating the ZkUtils object is reverse

2016-10-21 Thread Matt Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matt Wang updated KAFKA-4329:
-
Description: 
When creating the ZkUtils object, the parameters of zkSessionTimeOutMs and 
zkConnectionTimeoutMs is reverse. Though the default values of these parameters 
are both 6000, it will have some problems, especially when we want to reset 
these values. 
The pull requests address is:
https://github.com/apache/kafka/pull/1646

  was:
When creating the ZkUtils object, the parameters of zkSessionTimeOutMs and 
zkConnectionTimeoutMs is reverse. Though the default values of these parameters 
are both 6000, it will have some problem, especially when we want to reset 
these values. 
The pull requests address is:
https://github.com/apache/kafka/pull/1646


> The parameters for creating the ZkUtils object is reverse
> -
>
> Key: KAFKA-4329
> URL: https://issues.apache.org/jira/browse/KAFKA-4329
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.0.0, 0.10.0.1
> Environment: software platform
>Reporter: Matt Wang
>Priority: Critical
>  Labels: patch
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> When creating the ZkUtils object, the parameters of zkSessionTimeOutMs and 
> zkConnectionTimeoutMs is reverse. Though the default values of these 
> parameters are both 6000, it will have some problems, especially when we want 
> to reset these values. 
> The pull requests address is:
> https://github.com/apache/kafka/pull/1646



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4329) The parameters for creating the ZkUtils object is reverse

2016-10-21 Thread Matt Wang (JIRA)
Matt Wang created KAFKA-4329:


 Summary: The parameters for creating the ZkUtils object is reverse
 Key: KAFKA-4329
 URL: https://issues.apache.org/jira/browse/KAFKA-4329
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.10.0.1, 0.10.0.0
 Environment: software platform
Reporter: Matt Wang
Priority: Critical


When creating the ZkUtils object, the parameters of zkSessionTimeOutMs and 
zkConnectionTimeoutMs is reverse. Though the default values of these parameters 
are both 6000, it will have some problem, especially when we want to reset 
these values. 
The pull requests address is:
https://github.com/apache/kafka/pull/1646



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4328) The parameters for creating the ZkUtils object is reverse

2016-10-21 Thread Matt Wang (JIRA)
Matt Wang created KAFKA-4328:


 Summary: The parameters for creating the ZkUtils object is reverse
 Key: KAFKA-4328
 URL: https://issues.apache.org/jira/browse/KAFKA-4328
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.10.0.1, 0.10.0.0
 Environment: software platform
Reporter: Matt Wang


When creating the ZkUtils object, the parameters of zkSessionTimeOutMs and 
zkConnectionTimeoutMs is reverse. Though the default values of these parameters 
are both 6000, it will have some problem, especially when we want to reset 
these values. 
The pull requests address is:
https://github.com/apache/kafka/pull/1646



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)