[jira] [Commented] (KAFKA-4299) Consumer offsets reset for all topics after increasing partitions for one topic

2016-10-13 Thread Juho Autio (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15574324#comment-15574324
 ] 

Juho Autio commented on KAFKA-4299:
---

Thanks for testing [~vahid]. What about offsets of the pre-existing partitions 
in the topic that you altered? Were those reset?

I updated to issue description some consumer parameters that we have set.

> Consumer offsets reset for all topics after increasing partitions for one 
> topic
> ---
>
> Key: KAFKA-4299
> URL: https://issues.apache.org/jira/browse/KAFKA-4299
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.1
>Reporter: Juho Autio
>
> I increased partitions for one existing topic (2->10), but was surprised to 
> see that it entirely reset the committed offsets of my consumer group.
> All topics & partitions were reset to the earliest offset available, and the 
> consumer read everything again.
> Documentation doesn't mention anything like this. Is this how it's supposed 
> to work, or a bug?
> I would've expected the consumer offsets to not decrease at all, especially 
> for the topics that I didn't even touch.
> For the altered topic I would've expected that consuming the previously 
> existing partitions 0 and 1 would've continued from the position where they 
> were, and naturally starting to read the new added partitions from 0.
> I added partitions according to the "Modifying topics" section of Kafka 
> 0.10.0 Documentation:
> {quote}
> To add partitions you can do
> {code}
>  > bin/kafka-topics.sh --zookeeper $ZOOKEEPER_HOST --alter --topic 
> altered_topic --partitions 10
> {code}
> {quote}
> Previously this topic had 2 partitions.
> For the consumer I'm using 
> {{kafka.javaapi.consumer.ConsumerConnector.createMessageStreamsByFilter()}}.
> And version is:
> {code}
> org.apache.kafka
> kafka_2.11
> 0.10.0.1
> {code}
> Kafka cluster itself is {{kafka_2.11-0.10.0.1}}.
> This is quite problematic because we can't afford waiting for consumers to 
> read the full buffer from the beginning (for all topics!) when increasing 
> partitions for a topic.
> Some possibly relevant settings we have for the consumer:
> {code}
> kafka.partition.assignment.strategy = "range"
> kafka.auto.offset.reset = "smallest"
> kafka.auto.commit.enable = "false"
> kafka.offsets.storage = "kafka"
> kafka.dual.commit.enabled = false
> kafka.consumer.timeout.ms = "2000"
> kafka.auto.create.topics.enable = true
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4299) Consumer offsets reset for all topics after increasing partitions for one topic

2016-10-13 Thread Juho Autio (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Juho Autio updated KAFKA-4299:
--
Description: 
I increased partitions for one existing topic (2->10), but was surprised to see 
that it entirely reset the committed offsets of my consumer group.

All topics & partitions were reset to the earliest offset available, and the 
consumer read everything again.

Documentation doesn't mention anything like this. Is this how it's supposed to 
work, or a bug?

I would've expected the consumer offsets to not decrease at all, especially for 
the topics that I didn't even touch.

For the altered topic I would've expected that consuming the previously 
existing partitions 0 and 1 would've continued from the position where they 
were, and naturally starting to read the new added partitions from 0.

I added partitions according to the "Modifying topics" section of Kafka 0.10.0 
Documentation:

{quote}
To add partitions you can do
{code}
 > bin/kafka-topics.sh --zookeeper $ZOOKEEPER_HOST --alter --topic 
 > altered_topic --partitions 10
{code}
{quote}

Previously this topic had 2 partitions.

For the consumer I'm using 
{{kafka.javaapi.consumer.ConsumerConnector.createMessageStreamsByFilter()}}.

And version is:

{code}
org.apache.kafka
kafka_2.11
0.10.0.1
{code}

Kafka cluster itself is {{kafka_2.11-0.10.0.1}}.

This is quite problematic because we can't afford waiting for consumers to read 
the full buffer from the beginning (for all topics!) when increasing partitions 
for a topic.


Some possibly relevant settings we have for the consumer:

{code}
kafka.partition.assignment.strategy = "range"
kafka.auto.offset.reset = "smallest"
kafka.auto.commit.enable = "false"
kafka.offsets.storage = "kafka"
kafka.dual.commit.enabled = false
kafka.consumer.timeout.ms = "2000"
kafka.auto.create.topics.enable = true
{code}

  was:
I increased partitions for one existing topic (2->10), but was surprised to see 
that it entirely reset the committed offsets of my consumer group.

All topics & partitions were reset to the earliest offset available, and the 
consumer read everything again.

Documentation doesn't mention anything like this. Is this how it's supposed to 
work, or a bug?

I would've expected the consumer offsets to not decrease at all, especially for 
the topics that I didn't even touch.

For the altered topic I would've expected that consuming the previously 
existing partitions 0 and 1 would've continued from the position where they 
were, and naturally starting to read the new added partitions from 0.

I added partitions according to the "Modifying topics" section of Kafka 0.10.0 
Documentation:

{quote}
To add partitions you can do
{code}
 > bin/kafka-topics.sh --zookeeper $ZOOKEEPER_HOST --alter --topic 
 > altered_topic --partitions 10
{code}
{quote}

Previously this topic had 2 partitions.

For the consumer I'm using 
{{kafka.javaapi.consumer.ConsumerConnector.createMessageStreamsByFilter()}}.

And version is:

{code}
org.apache.kafka
kafka_2.11
0.10.0.1
{code}

Kafka cluster itself is {{kafka_2.11-0.10.0.1}}.

This is quite problematic because we can't afford waiting for consumers to read 
the full buffer from the beginning (for all topics!) when increasing partitions 
for a topic.


> Consumer offsets reset for all topics after increasing partitions for one 
> topic
> ---
>
> Key: KAFKA-4299
> URL: https://issues.apache.org/jira/browse/KAFKA-4299
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.1
>Reporter: Juho Autio
>
> I increased partitions for one existing topic (2->10), but was surprised to 
> see that it entirely reset the committed offsets of my consumer group.
> All topics & partitions were reset to the earliest offset available, and the 
> consumer read everything again.
> Documentation doesn't mention anything like this. Is this how it's supposed 
> to work, or a bug?
> I would've expected the consumer offsets to not decrease at all, especially 
> for the topics that I didn't even touch.
> For the altered topic I would've expected that consuming the previously 
> existing partitions 0 and 1 would've continued from the position where they 
> were, and naturally starting to read the new added partitions from 0.
> I added partitions according to the "Modifying topics" section of Kafka 
> 0.10.0 Documentation:
> {quote}
> To add partitions you can do
> {code}
>  > bin/kafka-topics.sh --zookeeper $ZOOKEEPER_HOST --alter --topic 
> altered_topic --partitions 10
> {code}
> {quote}
> Previously this topic had 2 partitions.
> For the consumer I'm using 
> {{kafka.javaapi.consumer.ConsumerConnector.createMessageStreamsByFilter()}}.
> And version is:
> {code}
> org.apache.kafka
> kafka_2.11
> 0.10.0.1
> {code}
> Kafka cluster itself is 

Build failed in Jenkins: kafka-trunk-jdk7 #1629

2016-10-13 Thread Apache Jenkins Server
See 

Changes:

[jason] KAFKA-4254; Update producer's metadata before failing on non-existent

--
[...truncated 3691 lines...]
kafka.api.PlaintextConsumerTest > testSimpleConsumption PASSED

kafka.api.ProducerBounceTest > testBrokerFailure STARTED

kafka.api.ProducerBounceTest > testBrokerFailure PASSED

kafka.api.ProducerFailureHandlingTest > testCannotSendToInternalTopic STARTED

kafka.api.ProducerFailureHandlingTest > testCannotSendToInternalTopic PASSED

kafka.api.ProducerFailureHandlingTest > testTooLargeRecordWithAckOne STARTED

kafka.api.ProducerFailureHandlingTest > testTooLargeRecordWithAckOne PASSED

kafka.api.ProducerFailureHandlingTest > testWrongBrokerList STARTED

kafka.api.ProducerFailureHandlingTest > testWrongBrokerList PASSED

kafka.api.ProducerFailureHandlingTest > testNotEnoughReplicas STARTED

kafka.api.ProducerFailureHandlingTest > testNotEnoughReplicas PASSED

kafka.api.ProducerFailureHandlingTest > 
testResponseTooLargeForReplicationWithAckAll STARTED

kafka.api.ProducerFailureHandlingTest > 
testResponseTooLargeForReplicationWithAckAll PASSED

kafka.api.ProducerFailureHandlingTest > testNonExistentTopic STARTED

kafka.api.ProducerFailureHandlingTest > testNonExistentTopic PASSED

kafka.api.ProducerFailureHandlingTest > testInvalidPartition STARTED

kafka.api.ProducerFailureHandlingTest > testInvalidPartition PASSED

kafka.api.ProducerFailureHandlingTest > testSendAfterClosed STARTED

kafka.api.ProducerFailureHandlingTest > testSendAfterClosed PASSED

kafka.api.ProducerFailureHandlingTest > testTooLargeRecordWithAckZero STARTED

kafka.api.ProducerFailureHandlingTest > testTooLargeRecordWithAckZero PASSED

kafka.api.ProducerFailureHandlingTest > 
testPartitionTooLargeForReplicationWithAckAll STARTED

kafka.api.ProducerFailureHandlingTest > 
testPartitionTooLargeForReplicationWithAckAll PASSED

kafka.api.ProducerFailureHandlingTest > 
testNotEnoughReplicasAfterBrokerShutdown STARTED

kafka.api.ProducerFailureHandlingTest > 
testNotEnoughReplicasAfterBrokerShutdown PASSED

kafka.api.RackAwareAutoTopicCreationTest > testAutoCreateTopic STARTED

kafka.api.RackAwareAutoTopicCreationTest > testAutoCreateTopic PASSED

kafka.api.SaslPlaintextConsumerTest > testCoordinatorFailover STARTED

kafka.api.SaslPlaintextConsumerTest > testCoordinatorFailover PASSED

kafka.api.SaslPlaintextConsumerTest > testSimpleConsumption STARTED

kafka.api.SaslPlaintextConsumerTest > testSimpleConsumption PASSED

kafka.api.SslConsumerTest > testCoordinatorFailover STARTED

kafka.api.SslConsumerTest > testCoordinatorFailover PASSED

kafka.api.SslConsumerTest > testSimpleConsumption STARTED

kafka.api.SslConsumerTest > testSimpleConsumption PASSED

kafka.api.ConsumerBounceTest > testSeekAndCommitWithBrokerFailures STARTED

kafka.api.ConsumerBounceTest > testSeekAndCommitWithBrokerFailures PASSED

kafka.api.ConsumerBounceTest > testConsumptionWithBrokerFailures STARTED

kafka.api.ConsumerBounceTest > testConsumptionWithBrokerFailures PASSED

kafka.api.AuthorizerIntegrationTest > testCommitWithTopicWrite STARTED

kafka.api.AuthorizerIntegrationTest > testCommitWithTopicWrite PASSED

kafka.api.AuthorizerIntegrationTest > 
testPatternSubscriptionWithTopicAndGroupRead STARTED

kafka.api.AuthorizerIntegrationTest > 
testPatternSubscriptionWithTopicAndGroupRead PASSED

kafka.api.AuthorizerIntegrationTest > testPatternSubscriptionWithNoTopicAccess 
STARTED

kafka.api.AuthorizerIntegrationTest > testPatternSubscriptionWithNoTopicAccess 
PASSED

kafka.api.AuthorizerIntegrationTest > 
testCreatePermissionNeededToReadFromNonExistentTopic STARTED

kafka.api.AuthorizerIntegrationTest > 
testCreatePermissionNeededToReadFromNonExistentTopic PASSED

kafka.api.AuthorizerIntegrationTest > 
testPatternSubscriptionWithTopicDescribeOnlyAndGroupRead STARTED

kafka.api.AuthorizerIntegrationTest > 
testPatternSubscriptionWithTopicDescribeOnlyAndGroupRead PASSED

kafka.api.AuthorizerIntegrationTest > testDeleteWithWildCardAuth STARTED

kafka.api.AuthorizerIntegrationTest > testDeleteWithWildCardAuth PASSED

kafka.api.AuthorizerIntegrationTest > testOffsetFetchWithNoAccess STARTED

kafka.api.AuthorizerIntegrationTest > testOffsetFetchWithNoAccess PASSED

kafka.api.AuthorizerIntegrationTest > testProduceWithTopicRead STARTED

kafka.api.AuthorizerIntegrationTest > testProduceWithTopicRead PASSED

kafka.api.AuthorizerIntegrationTest > testListOffsetsWithNoTopicAccess STARTED

kafka.api.AuthorizerIntegrationTest > testListOffsetsWithNoTopicAccess PASSED

kafka.api.AuthorizerIntegrationTest > 
testCreatePermissionNeededForWritingToNonExistentTopic STARTED

kafka.api.AuthorizerIntegrationTest > 
testCreatePermissionNeededForWritingToNonExistentTopic PASSED

kafka.api.AuthorizerIntegrationTest > 
testPatternSubscriptionMatchingInternalTopicWithDescribeOnlyPermission STARTED

kafka.api.AuthorizerIntegrationTest > 

[jira] [Resolved] (KAFKA-4298) LogCleaner writes inconsistent compressed message set if topic message format != message format

2016-10-13 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4298?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-4298.

   Resolution: Fixed
Fix Version/s: (was: 0.10.0.2)

Issue resolved by pull request 2019
[https://github.com/apache/kafka/pull/2019]

> LogCleaner writes inconsistent compressed message set if topic message format 
> != message format
> ---
>
> Key: KAFKA-4298
> URL: https://issues.apache.org/jira/browse/KAFKA-4298
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.1
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Critical
> Fix For: 0.10.1.0
>
>
> When cleaning the log, we don't want to convert messages to the format 
> configured for the topic due to KAFKA-3915. However, the cleaner logic for 
> writing compressed messages (in case some messages in the message set were 
> not retained) writes the topic message format version in the magic field of 
> the outer message instead of the actual message format. The choice of the 
> absolute/relative offset for the inner messages will also be based on the 
> topic message format version.
> For example, if there is an old compressed message set with magic=0 in the 
> log and the topic is configured for magic=1, then after cleaning, the new 
> message set will have a wrapper with magic=1, the nested messages will still 
> have magic=0, but the message offsets will be relative. If this happens, 
> there does not seem to be an easy way to recover without manually fixing up 
> the log.
> The offsets still work correctly as both the clients and broker use the outer 
> message format version to decide if the relative offset needs to be converted 
> to an absolute offset. So the main problem turns out to be that 
> `ByteBufferMessageSet.deepIterator` throws an exception if there is a 
> mismatch between outer and inner message format version.
> {code}
> if (newMessage.magic != wrapperMessage.magic)
>   throw new IllegalStateException(s"Compressed message has magic 
> value ${wrapperMessage.magic} " +
> s"but inner message has magic value ${newMessage.magic}")
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4298) LogCleaner writes inconsistent compressed message set if topic message format != message format

2016-10-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15574148#comment-15574148
 ] 

ASF GitHub Bot commented on KAFKA-4298:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2019


> LogCleaner writes inconsistent compressed message set if topic message format 
> != message format
> ---
>
> Key: KAFKA-4298
> URL: https://issues.apache.org/jira/browse/KAFKA-4298
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.1
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Critical
> Fix For: 0.10.1.0
>
>
> When cleaning the log, we don't want to convert messages to the format 
> configured for the topic due to KAFKA-3915. However, the cleaner logic for 
> writing compressed messages (in case some messages in the message set were 
> not retained) writes the topic message format version in the magic field of 
> the outer message instead of the actual message format. The choice of the 
> absolute/relative offset for the inner messages will also be based on the 
> topic message format version.
> For example, if there is an old compressed message set with magic=0 in the 
> log and the topic is configured for magic=1, then after cleaning, the new 
> message set will have a wrapper with magic=1, the nested messages will still 
> have magic=0, but the message offsets will be relative. If this happens, 
> there does not seem to be an easy way to recover without manually fixing up 
> the log.
> The offsets still work correctly as both the clients and broker use the outer 
> message format version to decide if the relative offset needs to be converted 
> to an absolute offset. So the main problem turns out to be that 
> `ByteBufferMessageSet.deepIterator` throws an exception if there is a 
> mismatch between outer and inner message format version.
> {code}
> if (newMessage.magic != wrapperMessage.magic)
>   throw new IllegalStateException(s"Compressed message has magic 
> value ${wrapperMessage.magic} " +
> s"but inner message has magic value ${newMessage.magic}")
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #2019: KAFKA-4298: Ensure compressed message sets are not...

2016-10-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2019


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (KAFKA-4298) LogCleaner writes inconsistent compressed message set if topic message format != message format

2016-10-13 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4298?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-4298:
---
Summary: LogCleaner writes inconsistent compressed message set if topic 
message format != message format  (was: LogCleaner does not convert compressed 
message sets properly)

> LogCleaner writes inconsistent compressed message set if topic message format 
> != message format
> ---
>
> Key: KAFKA-4298
> URL: https://issues.apache.org/jira/browse/KAFKA-4298
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.1
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Critical
> Fix For: 0.10.1.0, 0.10.0.2
>
>
> When cleaning the log, we don't want to convert messages to the format 
> configured for the topic due to KAFKA-3915. However, the cleaner logic for 
> writing compressed messages (in case some messages in the message set were 
> not retained) writes the topic message format version in the magic field of 
> the outer message instead of the actual message format. The choice of the 
> absolute/relative offset for the inner messages will also be based on the 
> topic message format version.
> For example, if there is an old compressed message set with magic=0 in the 
> log and the topic is configured for magic=1, then after cleaning, the new 
> message set will have a wrapper with magic=1, the nested messages will still 
> have magic=0, but the message offsets will be relative. If this happens, 
> there does not seem to be an easy way to recover without manually fixing up 
> the log.
> The offsets still work correctly as both the clients and broker use the outer 
> message format version to decide if the relative offset needs to be converted 
> to an absolute offset. So the main problem turns out to be that 
> `ByteBufferMessageSet.deepIterator` throws an exception if there is a 
> mismatch between outer and inner message format version.
> {code}
> if (newMessage.magic != wrapperMessage.magic)
>   throw new IllegalStateException(s"Compressed message has magic 
> value ${wrapperMessage.magic} " +
> s"but inner message has magic value ${newMessage.magic}")
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-4298) LogCleaner does not convert compressed message sets properly

2016-10-13 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15571432#comment-15571432
 ] 

Ismael Juma edited comment on KAFKA-4298 at 10/14/16 3:58 AM:
--

Great catch. I'm a bit unsure why this hasn't been reported so far. I think the 
correct description is:

"When cleaning the log, we don't want to convert messages to the format 
configured for the topic due to KAFKA-3915. However, the cleaner logic for 
writing compressed messages (in case some messages in the message set were not 
retained) writes the topic message format version in the magic field of the 
outer message instead of the actual message format. The choice of the 
absolute/relative offset for the inner messages will also be based on the topic 
message format version.

For example, if there is an old compressed message set with magic=0 in the log 
and the topic is configured for magic=1, then after cleaning, the new message 
set will have a wrapper with magic=1, the nested messages will still have 
magic=0, but the message offsets will be relative. If this happens, there does 
not seem to be an easy way to recover without manually fixing up the log."

Edit: updated the description.


was (Author: ijuma):
Great catch. I'm a bit unsure why this hasn't been reported so far. I think the 
correct description is:

"When cleaning the log, we don't want to convert messages to the format 
configured for the topic due to KAFKA-3915. However, the cleaner logic for 
writing compressed messages (in case some messages in the message set were not 
retained) writes the topic message format version in the magic field of the 
outer message instead of the actual message format. The choice of the 
absolute/relative offset for the inner messages will also be based on the topic 
message format version.

For example, if there is an old compressed message set with magic=0 in the log 
and the topic is configured for magic=1, then after cleaning, the new message 
set will have a wrapper with magic=1, the nested messages will still have 
magic=0, but the message offsets will be relative. If this happens, there does 
not seem to be an easy way to recover without manually fixing up the log."

> LogCleaner does not convert compressed message sets properly
> 
>
> Key: KAFKA-4298
> URL: https://issues.apache.org/jira/browse/KAFKA-4298
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.1
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Critical
> Fix For: 0.10.1.0, 0.10.0.2
>
>
> When cleaning the log, we don't want to convert messages to the format 
> configured for the topic due to KAFKA-3915. However, the cleaner logic for 
> writing compressed messages (in case some messages in the message set were 
> not retained) writes the topic message format version in the magic field of 
> the outer message instead of the actual message format. The choice of the 
> absolute/relative offset for the inner messages will also be based on the 
> topic message format version.
> For example, if there is an old compressed message set with magic=0 in the 
> log and the topic is configured for magic=1, then after cleaning, the new 
> message set will have a wrapper with magic=1, the nested messages will still 
> have magic=0, but the message offsets will be relative. If this happens, 
> there does not seem to be an easy way to recover without manually fixing up 
> the log.
> The offsets still work correctly as both the clients and broker use the outer 
> message format version to decide if the relative offset needs to be converted 
> to an absolute offset. So the main problem turns out to be that 
> `ByteBufferMessageSet.deepIterator` throws an exception if there is a 
> mismatch between outer and inner message format version.
> {code}
> if (newMessage.magic != wrapperMessage.magic)
>   throw new IllegalStateException(s"Compressed message has magic 
> value ${wrapperMessage.magic} " +
> s"but inner message has magic value ${newMessage.magic}")
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4298) LogCleaner does not convert compressed message sets properly

2016-10-13 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4298?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-4298:
---
Description: 
When cleaning the log, we don't want to convert messages to the format 
configured for the topic due to KAFKA-3915. However, the cleaner logic for 
writing compressed messages (in case some messages in the message set were not 
retained) writes the topic message format version in the magic field of the 
outer message instead of the actual message format. The choice of the 
absolute/relative offset for the inner messages will also be based on the topic 
message format version.

For example, if there is an old compressed message set with magic=0 in the log 
and the topic is configured for magic=1, then after cleaning, the new message 
set will have a wrapper with magic=1, the nested messages will still have 
magic=0, but the message offsets will be relative. If this happens, there does 
not seem to be an easy way to recover without manually fixing up the log.

The offsets still work correctly as both the clients and broker use the outer 
message format version to decide if the relative offset needs to be converted 
to an absolute offset. So the main problem turns out to be that 
`ByteBufferMessageSet.deepIterator` throws an exception if there is a mismatch 
between outer and inner message format version.

{code}
if (newMessage.magic != wrapperMessage.magic)
  throw new IllegalStateException(s"Compressed message has magic value 
${wrapperMessage.magic} " +
s"but inner message has magic value ${newMessage.magic}")
{code}

  was:When cleaning the log, we attempt to write the cleaned messages using the 
message format configured for the topic, but as far as I can tell, we do not 
convert the wrapped messages in compressed message sets. For example, if there 
is an old compressed message set with magic=0 in the log and the topic is 
configured for magic=1, then after cleaning, the new message set will have a 
wrapper with magic=1, but the nested messages will still have magic=0. If this 
happens, there does not seem to be an easy way to recover without manually 
fixing up the log.


> LogCleaner does not convert compressed message sets properly
> 
>
> Key: KAFKA-4298
> URL: https://issues.apache.org/jira/browse/KAFKA-4298
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.1
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Critical
> Fix For: 0.10.1.0, 0.10.0.2
>
>
> When cleaning the log, we don't want to convert messages to the format 
> configured for the topic due to KAFKA-3915. However, the cleaner logic for 
> writing compressed messages (in case some messages in the message set were 
> not retained) writes the topic message format version in the magic field of 
> the outer message instead of the actual message format. The choice of the 
> absolute/relative offset for the inner messages will also be based on the 
> topic message format version.
> For example, if there is an old compressed message set with magic=0 in the 
> log and the topic is configured for magic=1, then after cleaning, the new 
> message set will have a wrapper with magic=1, the nested messages will still 
> have magic=0, but the message offsets will be relative. If this happens, 
> there does not seem to be an easy way to recover without manually fixing up 
> the log.
> The offsets still work correctly as both the clients and broker use the outer 
> message format version to decide if the relative offset needs to be converted 
> to an absolute offset. So the main problem turns out to be that 
> `ByteBufferMessageSet.deepIterator` throws an exception if there is a 
> mismatch between outer and inner message format version.
> {code}
> if (newMessage.magic != wrapperMessage.magic)
>   throw new IllegalStateException(s"Compressed message has magic 
> value ${wrapperMessage.magic} " +
> s"but inner message has magic value ${newMessage.magic}")
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-4254) Questionable handling of unknown partitions in KafkaProducer

2016-10-13 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-4254.

   Resolution: Fixed
Fix Version/s: (was: 0.10.1.1)
   0.10.1.0

Issue resolved by pull request 1995
[https://github.com/apache/kafka/pull/1995]

> Questionable handling of unknown partitions in KafkaProducer
> 
>
> Key: KAFKA-4254
> URL: https://issues.apache.org/jira/browse/KAFKA-4254
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Reporter: Jason Gustafson
>Assignee: Konstantine Karantasis
> Fix For: 0.10.1.0
>
>
> Currently the producer will raise an {{IllegalArgumentException}} if the user 
> attempts to write to a partition which has just been created. This is caused 
> by the fact that the producer does not attempt to refetch topic metadata in 
> this case, which means that its check for partition validity is based on 
> stale metadata.
> If the topic for the partition did not already exist, it works fine because 
> the producer will block until it has metadata for the topic, so this case is 
> primarily hit when the number of partitions is dynamically increased. 
> A couple options to fix this that come to mind:
> 1. We could treat unknown partitions just as we do unknown topics. If the 
> partition doesn't exist, we refetch metadata and try again (timing out when 
> max.block.ms is reached).
> 2. We can at least throw a more specific exception so that users can handle 
> the error. Raising {{IllegalArgumentException}} is not helpful in practice 
> because it can also be caused by other error.s
> My inclination is to do the first one since the producer seems incorrect to 
> tell the user that the partition is invalid.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4254) Questionable handling of unknown partitions in KafkaProducer

2016-10-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15574020#comment-15574020
 ] 

ASF GitHub Bot commented on KAFKA-4254:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/1995


> Questionable handling of unknown partitions in KafkaProducer
> 
>
> Key: KAFKA-4254
> URL: https://issues.apache.org/jira/browse/KAFKA-4254
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Reporter: Jason Gustafson
>Assignee: Konstantine Karantasis
> Fix For: 0.10.1.0
>
>
> Currently the producer will raise an {{IllegalArgumentException}} if the user 
> attempts to write to a partition which has just been created. This is caused 
> by the fact that the producer does not attempt to refetch topic metadata in 
> this case, which means that its check for partition validity is based on 
> stale metadata.
> If the topic for the partition did not already exist, it works fine because 
> the producer will block until it has metadata for the topic, so this case is 
> primarily hit when the number of partitions is dynamically increased. 
> A couple options to fix this that come to mind:
> 1. We could treat unknown partitions just as we do unknown topics. If the 
> partition doesn't exist, we refetch metadata and try again (timing out when 
> max.block.ms is reached).
> 2. We can at least throw a more specific exception so that users can handle 
> the error. Raising {{IllegalArgumentException}} is not helpful in practice 
> because it can also be caused by other error.s
> My inclination is to do the first one since the producer seems incorrect to 
> tell the user that the partition is invalid.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1995: KAFKA-4254: Update producers metadata before faili...

2016-10-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/1995


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-1696) Kafka should be able to generate Hadoop delegation tokens

2016-10-13 Thread Terence Yim (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15573699#comment-15573699
 ] 

Terence Yim commented on KAFKA-1696:


May I ask for any update about this feature? Any idea in which Kafka version 
will have this?

> Kafka should be able to generate Hadoop delegation tokens
> -
>
> Key: KAFKA-1696
> URL: https://issues.apache.org/jira/browse/KAFKA-1696
> Project: Kafka
>  Issue Type: Sub-task
>  Components: security
>Reporter: Jay Kreps
>Assignee: Parth Brahmbhatt
>
> For access from MapReduce/etc jobs run on behalf of a user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4280) Add REST resource for showing available connector plugin configs

2016-10-13 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15573476#comment-15573476
 ] 

Gwen Shapira commented on KAFKA-4280:
-

Looking forward to the documentation PR :)

> Add REST resource for showing available connector plugin configs
> 
>
> Key: KAFKA-4280
> URL: https://issues.apache.org/jira/browse/KAFKA-4280
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Gwen Shapira
>Assignee: Ewen Cheslack-Postava
>
> Connector-plugins allow listing the plugs and validating configs, but we have 
> nothing (I think?) for listing available configuration properties.
> If this doesn't exist, would be good for usability to add it. If it does 
> exist, perhaps document it?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4244) Update our website look & feel

2016-10-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15573473#comment-15573473
 ] 

ASF GitHub Bot commented on KAFKA-4244:
---

Github user gwenshap closed the pull request at:

https://github.com/apache/kafka/pull/1967


> Update our website look & feel
> --
>
> Key: KAFKA-4244
> URL: https://issues.apache.org/jira/browse/KAFKA-4244
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Gwen Shapira
>Assignee: Gwen Shapira
> Fix For: 0.10.1.0
>
>
> Our website deserves a facelift.
> This will be multi-part change:
> 1. Changes to the web pages in our normal GitHub to new headers, fix some 
> missing tags, etc.
> 2. Changes to the auto-get code to get protocol.html correct too
> 3. Deploy changes to website + update the header/footer/CSS in the website to 
> actual cause facelift.
> Please do not deploy changes to the website from our GitHub after #1 is done 
> but before #3 is complete. Hopefully, I'll be all done by Monday.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1967: cherry-picking KAFKA-4244 to 0.10.1.0 branch

2016-10-13 Thread gwenshap
Github user gwenshap closed the pull request at:

https://github.com/apache/kafka/pull/1967


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Jenkins build is back to normal : kafka-0.10.1-jdk7 #70

2016-10-13 Thread Apache Jenkins Server
See 



Build failed in Jenkins: kafka-trunk-jdk8 #977

2016-10-13 Thread Apache Jenkins Server
See 

Changes:

[wangguoz] KAFKA-4300: NamedCache throws an NPE when evict is called and the 
cache

--
[...truncated 14110 lines...]
org.apache.kafka.streams.kstream.internals.KStreamImplTest > 
shouldCantHaveNullPredicate PASSED

org.apache.kafka.streams.kstream.internals.KStreamImplTest > 
shouldNotAllowNullActionOnForEach STARTED

org.apache.kafka.streams.kstream.internals.KStreamImplTest > 
shouldNotAllowNullActionOnForEach PASSED

org.apache.kafka.streams.kstream.internals.KStreamImplTest > 
shouldNotAllowNullValueMapperOnTableJoin STARTED

org.apache.kafka.streams.kstream.internals.KStreamImplTest > 
shouldNotAllowNullValueMapperOnTableJoin PASSED

org.apache.kafka.streams.kstream.internals.KStreamImplTest > 
shouldNotAllowNullPredicateOnFilterNot STARTED

org.apache.kafka.streams.kstream.internals.KStreamImplTest > 
shouldNotAllowNullPredicateOnFilterNot PASSED

org.apache.kafka.streams.kstream.internals.KStreamImplTest > 
shouldHaveAtLeastOnPredicateWhenBranching STARTED

org.apache.kafka.streams.kstream.internals.KStreamImplTest > 
shouldHaveAtLeastOnPredicateWhenBranching PASSED

org.apache.kafka.streams.kstream.internals.KStreamImplTest > 
shouldNotAllowNullFilePathOnWriteAsText STARTED

org.apache.kafka.streams.kstream.internals.KStreamImplTest > 
shouldNotAllowNullFilePathOnWriteAsText PASSED

org.apache.kafka.streams.kstream.internals.KStreamTransformValuesTest > 
testTransform STARTED

org.apache.kafka.streams.kstream.internals.KStreamTransformValuesTest > 
testTransform PASSED

org.apache.kafka.streams.kstream.internals.KGroupedStreamImplTest > 
shouldNotHaveNullReducerOnReduce STARTED

org.apache.kafka.streams.kstream.internals.KGroupedStreamImplTest > 
shouldNotHaveNullReducerOnReduce PASSED

org.apache.kafka.streams.kstream.internals.KGroupedStreamImplTest > 
shouldNotHaveNullStoreNameOnReduce STARTED

org.apache.kafka.streams.kstream.internals.KGroupedStreamImplTest > 
shouldNotHaveNullStoreNameOnReduce PASSED

org.apache.kafka.streams.kstream.internals.KGroupedStreamImplTest > 
shouldNotHaveNullAdderOnWindowedAggregate STARTED

org.apache.kafka.streams.kstream.internals.KGroupedStreamImplTest > 
shouldNotHaveNullAdderOnWindowedAggregate PASSED

org.apache.kafka.streams.kstream.internals.KGroupedStreamImplTest > 
shouldNotHaveNullInitializerOnWindowedAggregate STARTED

org.apache.kafka.streams.kstream.internals.KGroupedStreamImplTest > 
shouldNotHaveNullInitializerOnWindowedAggregate PASSED

org.apache.kafka.streams.kstream.internals.KGroupedStreamImplTest > 
shouldNotHaveNullReducerWithWindowedReduce STARTED

org.apache.kafka.streams.kstream.internals.KGroupedStreamImplTest > 
shouldNotHaveNullReducerWithWindowedReduce PASSED

org.apache.kafka.streams.kstream.internals.KGroupedStreamImplTest > 
shouldNotHaveNullStoreNameOnAggregate STARTED

org.apache.kafka.streams.kstream.internals.KGroupedStreamImplTest > 
shouldNotHaveNullStoreNameOnAggregate PASSED

org.apache.kafka.streams.kstream.internals.KGroupedStreamImplTest > 
shouldNotHaveNullAdderOnAggregate STARTED

org.apache.kafka.streams.kstream.internals.KGroupedStreamImplTest > 
shouldNotHaveNullAdderOnAggregate PASSED

org.apache.kafka.streams.kstream.internals.KGroupedStreamImplTest > 
shouldNotHaveNullWindowsWithWindowedReduce STARTED

org.apache.kafka.streams.kstream.internals.KGroupedStreamImplTest > 
shouldNotHaveNullWindowsWithWindowedReduce PASSED

org.apache.kafka.streams.kstream.internals.KGroupedStreamImplTest > 
shouldNotHaveNullWindowsOnWindowedAggregate STARTED

org.apache.kafka.streams.kstream.internals.KGroupedStreamImplTest > 
shouldNotHaveNullWindowsOnWindowedAggregate PASSED

org.apache.kafka.streams.kstream.internals.KGroupedStreamImplTest > 
shouldNotHaveNullStoreNameOnWindowedAggregate STARTED

org.apache.kafka.streams.kstream.internals.KGroupedStreamImplTest > 
shouldNotHaveNullStoreNameOnWindowedAggregate PASSED

org.apache.kafka.streams.kstream.internals.KGroupedStreamImplTest > 
shouldNotHaveNullStoreNameWithWindowedReduce STARTED

org.apache.kafka.streams.kstream.internals.KGroupedStreamImplTest > 
shouldNotHaveNullStoreNameWithWindowedReduce PASSED

org.apache.kafka.streams.kstream.internals.KGroupedStreamImplTest > 
shouldNotHaveNullInitializerOnAggregate STARTED

org.apache.kafka.streams.kstream.internals.KGroupedStreamImplTest > 
shouldNotHaveNullInitializerOnAggregate PASSED

org.apache.kafka.streams.kstream.internals.KStreamKStreamJoinTest > 
testOuterJoin STARTED

org.apache.kafka.streams.kstream.internals.KStreamKStreamJoinTest > 
testOuterJoin PASSED

org.apache.kafka.streams.kstream.internals.KStreamKStreamJoinTest > testJoin 
STARTED

org.apache.kafka.streams.kstream.internals.KStreamKStreamJoinTest > testJoin 
PASSED

org.apache.kafka.streams.kstream.internals.KStreamKStreamJoinTest > 
testWindowing STARTED

org.apache.kafka.streams.kstream.internals.KStreamKStreamJoinTest > 
testWindowing 

Re: [VOTE] 0.10.1.0 RC2

2016-10-13 Thread Jason Gustafson
Thanks Vahid, I'll see if I can reproduce the problem you're seeing on Step
6 of the quickstart.

-Jason

On Thu, Oct 13, 2016 at 1:04 PM, Vahid S Hashemian <
vahidhashem...@us.ibm.com> wrote:

> Hi Jason,
>
> I tested the quickstart again with the new RC on Ubuntu Linux, Windows,
> and Mac.
>
> There are two more issues I noticed:
> I have not been able to run the Streams example on my Windows environments
> (when I build from the source, and also when I use the built version) due
> to some errors that I'm getting, but I suspect there are issues with my
> environments.
> When I go through the Step 6 (muti-broker example) when the leader is
> broker 0 and I shut it down when I run the new consumer no records are
> returned (I make sure I use the port of a live broker for bootstrap-server
> argument). As soon as I restart the stopped broker (0), records are
> returned. If the leader is broker 1 or 2 I don't run into this issue. If I
> use the old consumer I don't run into the issue either. I have been able
> to reproduce this consistently on all three OS's above.
>
> --Vahid
>
>
>
> From:   Jason Gustafson 
> To: dev@kafka.apache.org, Kafka Users ,
> kafka-clients 
> Date:   10/12/2016 10:41 AM
> Subject:[VOTE] 0.10.1.0 RC2
>
>
>
> Hello Kafka users, developers and client-developers,
>
> One more RC for 0.10.1.0. I think we're getting close!
>
> Release plan: https://cwiki.apache.org/confluence/display/KAFKA/Rele
> ase+Plan+0.10.1.
>
> Release notes for the 0.10.1.0 release:
> http://home.apache.org/~jgus/kafka-0.10.1.0-rc2/RELEASE_NOTES.html
>
> *** Please download, test and vote by Saturday, Oct 15, 11am PT
>
> Kafka's KEYS file containing PGP keys we use to sign the release:
> http://kafka.apache.org/KEYS
>
> * Release artifacts to be voted upon (source and binary):
> http://home.apache.org/~jgus/kafka-0.10.1.0-rc2/
>
> * Maven artifacts to be voted upon:
> https://repository.apache.org/content/groups/staging/
>
> * Javadoc:
> http://home.apache.org/~jgus/kafka-0.10.1.0-rc2/javadoc/
>
> * Tag to be voted upon (off 0.10.1 branch) is the 0.10.1.0-rc2 tag:
> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=
> 8702d66434b86092a3738472f9186d6845ab0720
>
> * Documentation:
> http://kafka.apache.org/0101/documentation.html
>
> * Protocol:
> http://kafka.apache.org/0101/protocol.html
>
> * Tests:
> Unit tests: https://builds.apache.org/job/kafka-0.10.1-jdk7/68/
> System tests: http://confluent-kafka-0-10-1-system-test-results.s3-
> us-west-2.amazonaws.com/2016-10-11--001.1476197348--apache-
> -0.10.1--d981dd2/
>
> Thanks,
>
> Jason
>
>
>
>
>


[GitHub] kafka pull request #2026: MINOR: Improve on Streams log4j

2016-10-13 Thread guozhangwang
GitHub user guozhangwang opened a pull request:

https://github.com/apache/kafka/pull/2026

MINOR: Improve on Streams log4j



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/guozhangwang/kafka KMinor-improve-logging

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2026.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2026


commit 0d5fc33cb95f3f514644d187e32d7d4c100d8223
Author: Guozhang Wang 
Date:   2016-10-13T21:53:16Z

improved logging




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4299) Consumer offsets reset for all topics after increasing partitions for one topic

2016-10-13 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15573249#comment-15573249
 ] 

Vahid Hashemian commented on KAFKA-4299:


I was not able to reproduce this using the console consumer. After increasing 
the number of partitions of one topic, the offsets of the other topic in the 
group remained unchanged.

> Consumer offsets reset for all topics after increasing partitions for one 
> topic
> ---
>
> Key: KAFKA-4299
> URL: https://issues.apache.org/jira/browse/KAFKA-4299
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.1
>Reporter: Juho Autio
>
> I increased partitions for one existing topic (2->10), but was surprised to 
> see that it entirely reset the committed offsets of my consumer group.
> All topics & partitions were reset to the earliest offset available, and the 
> consumer read everything again.
> Documentation doesn't mention anything like this. Is this how it's supposed 
> to work, or a bug?
> I would've expected the consumer offsets to not decrease at all, especially 
> for the topics that I didn't even touch.
> For the altered topic I would've expected that consuming the previously 
> existing partitions 0 and 1 would've continued from the position where they 
> were, and naturally starting to read the new added partitions from 0.
> I added partitions according to the "Modifying topics" section of Kafka 
> 0.10.0 Documentation:
> {quote}
> To add partitions you can do
> {code}
>  > bin/kafka-topics.sh --zookeeper $ZOOKEEPER_HOST --alter --topic 
> altered_topic --partitions 10
> {code}
> {quote}
> Previously this topic had 2 partitions.
> For the consumer I'm using 
> {{kafka.javaapi.consumer.ConsumerConnector.createMessageStreamsByFilter()}}.
> And version is:
> {code}
> org.apache.kafka
> kafka_2.11
> 0.10.0.1
> {code}
> Kafka cluster itself is {{kafka_2.11-0.10.0.1}}.
> This is quite problematic because we can't afford waiting for consumers to 
> read the full buffer from the beginning (for all topics!) when increasing 
> partitions for a topic.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention

2016-10-13 Thread Dong Lin
Hi David,

As explained in the motivation section of the KIP, the problem is that if
log retention is too small, we may lose data; and if log retention is too
large, then we waste disk space. Therefore, we need to solve one if the two
problems -- allow data to be persisted longer for consumption if log
retention is set too small, or allow data to be expired earlier if log
retention is too large. I think the KIP probably needs to make this clear
and explain which one is rejected and why. Note that the choice of the two
affects the solution -- if we want to address the first problem then
log.retention.ms should be used as lower bound on the actual retention
time, and if we want to address the second problem then the log.retention.ms
should be used as higher bound on the actual retention time.

In both cases, we probably need to figure out a way to determine "active
consumer group". Maybe we can compare the time-since-last-commit against a
threshold to determine this. In addition, the threshold can be overridden
either per-topic or per-groupId. If we go along this route, the rejected
solution (per-topic vs. per-groupId) should probably be explained in the
KIP.


Thanks,
Dong



On Thu, Oct 13, 2016 at 10:23 AM, Dong Lin  wrote:

> Hi David,
>
> Thanks for your explanation. There still seems to be issue with this
> solution. Please see my comment inline.
>
>
> On Thu, Oct 13, 2016 at 8:46 AM, 东方甲乙 <254479...@qq.com> wrote:
>
>> Hi Dong,
>> Sorry for the delay, here are the comments:
>> 1.I think we should distinguish these two cases:
>> (1) group has no member, but has commit offset :  In this case we should
>> consider its commit offset
>> (2) group has no member, no commit offset:  Skip this group
>> Is it ok?
>>
>>
>> ListGroup API can list the groups,  but this API only show the Online
>> Group, so we should enhance the listGroup API to list those groups in the
>> case (1)
>>
>> Say some user starts a consumer to consume topic A with
> enable.auto.commit = true. Later they change the group name in the config.
> Then the proposed solution will never execute consumed log retention for
> the topic A, right? I think group name change is pretty common and we
> should take care of this issue. One possible solution is to add a config to
> specify the maximum time since last offset commit before we consider a
> group is inactive.
>
>
>>
>> 2. Because every consumer group may appear in different time, say, group
>> 1 start to consume in day 1, group 2 start to consume in day 2.  If we
>> delete the log segment right away,
>> group 2 can not consume these message.  So we hope the messages can hold
>> for a specified time.  I think many use-cases will need there configs, if
>> there are many consumer groups.
>>
>>
> If we want to take care of group 2, can we simply disable consumed log
> retention for the topic and set log retention to 1 day? Can you explain the
> benefit of enabling consumed log retention and set consumed log retention
> to 1 day?
>
> Currently the flow graph in the KIP suggests that we delete data iff
> (consumed log retention is triggered OR forced log retention is triggered).
> And alternative solution is to delete data iff ( (consumed log retention is
> disabled OR consumed log retention is triggered) AND forced log retention
> is triggered). I would argue that the 2nd scheme is better. Say the
> consumed log retention is enabled. The 1st scheme basically interprets
> forced log retention as the upper bound of the time the data can stay in
> Kafka. The 2nd scheme interprets forced log retention as the lower bound of
> the time the data can stay in Kafka, which is more consistent with the
> purpose of having this forced log retention (to save disk space). And if we
> adopt the 2nd solution, the use-case you suggested can be easily addressed
> by setting forced log retention to 1 day and enable consumed log retention.
> What do you think?
>
>
>
>>
>> Thanks,
>> David
>>
>>
>>
>>
>> -- 原始邮件 --
>> 发件人: "Dong Lin";;
>> 发送时间: 2016年10月10日(星期一) 下午4:05
>> 收件人: "dev";
>>
>> 主题: Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention
>>
>>
>>
>> Hey David,
>>
>> Thanks for reply. Please see comment inline.
>>
>> On Mon, Oct 10, 2016 at 12:40 AM, Pengwei (L) 
>> wrote:
>>
>> > Hi Dong
>> >Thanks for the questions:
>> >
>> > 1.  Now we don't distinguish inactive or active groups. Because in some
>> > case maybe inactive group will become active again, and using the
>> previous
>> > commit offset.
>> >
>> > So we will not delete the log segment in the consumer retention if there
>> > are some groups consume but not commit, but the log segment can be
>> delete by
>> >  the force retention.
>> >
>>
>> So in the example I provided, the consumed log retention will be
>> effectively disabled, right? This seems to be a real problem in operation
>> -- we don't want log 

[jira] [Updated] (KAFKA-4300) NamedCache throws an NPE when evict is called and the cache is empty

2016-10-13 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4300?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-4300:
-
Resolution: Fixed
Status: Resolved  (was: Patch Available)

> NamedCache throws an NPE when evict is called and the cache is empty
> 
>
> Key: KAFKA-4300
> URL: https://issues.apache.org/jira/browse/KAFKA-4300
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Damian Guy
>Assignee: Damian Guy
> Fix For: 0.10.1.0
>
>
> evict can be called on a NamedCache even though it is empty. This is due to 
> the shared nature of the outer ThreadCache. Currently if evict is called on 
> an empty NamedCache it will throw a NullPointerException.
> From the original email:
> I'm joining a bunch of Kafka Topics using Kafka Streams, with the Kafka
> 0.10.1 release candidate.
> It runs ok for a few thousand of messages, and then it dies with the
> following exception:
> Exception in thread "StreamThread-1" java.lang.NullPointerException
> at
> org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:194)
> at
> org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:190)
> at
> org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:121)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:147)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:134)
> at
> org.apache.kafka.streams.kstream.internals.KTableReduce$KTableAggregateValueGetter.get(KTableReduce.java:121)
> at
> org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:77)
> at
> org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:48)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> at
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:83)
> at
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:73)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> at
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:83)
> at
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:73)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> at
> org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:52)
> at
> org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:49)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> at
> org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:83)
> at
> org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:49)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> at
> org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:35)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.maybeForward(CachingKeyValueStore.java:97)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:34)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:84)
> at
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:117)
> at
> org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:196)
> at
> org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:190)
> at
> 

[jira] [Commented] (KAFKA-4300) NamedCache throws an NPE when evict is called and the cache is empty

2016-10-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15573043#comment-15573043
 ] 

ASF GitHub Bot commented on KAFKA-4300:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2024


> NamedCache throws an NPE when evict is called and the cache is empty
> 
>
> Key: KAFKA-4300
> URL: https://issues.apache.org/jira/browse/KAFKA-4300
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Damian Guy
>Assignee: Damian Guy
> Fix For: 0.10.1.0
>
>
> evict can be called on a NamedCache even though it is empty. This is due to 
> the shared nature of the outer ThreadCache. Currently if evict is called on 
> an empty NamedCache it will throw a NullPointerException.
> From the original email:
> I'm joining a bunch of Kafka Topics using Kafka Streams, with the Kafka
> 0.10.1 release candidate.
> It runs ok for a few thousand of messages, and then it dies with the
> following exception:
> Exception in thread "StreamThread-1" java.lang.NullPointerException
> at
> org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:194)
> at
> org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:190)
> at
> org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:121)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:147)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:134)
> at
> org.apache.kafka.streams.kstream.internals.KTableReduce$KTableAggregateValueGetter.get(KTableReduce.java:121)
> at
> org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:77)
> at
> org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:48)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> at
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:83)
> at
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:73)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> at
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:83)
> at
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:73)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> at
> org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:52)
> at
> org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:49)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> at
> org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:83)
> at
> org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:49)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> at
> org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:35)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.maybeForward(CachingKeyValueStore.java:97)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:34)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:84)
> at
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:117)
> at
> org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:196)
> at
> org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:190)
> at
> 

[GitHub] kafka pull request #2024: KAFKA-4300: NamedCache throws an NPE when evict is...

2016-10-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2024


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [VOTE] 0.10.1.0 RC2

2016-10-13 Thread Vahid S Hashemian
Hi Jason,

I tested the quickstart again with the new RC on Ubuntu Linux, Windows, 
and Mac.

There are two more issues I noticed:
I have not been able to run the Streams example on my Windows environments 
(when I build from the source, and also when I use the built version) due 
to some errors that I'm getting, but I suspect there are issues with my 
environments.
When I go through the Step 6 (muti-broker example) when the leader is 
broker 0 and I shut it down when I run the new consumer no records are 
returned (I make sure I use the port of a live broker for bootstrap-server 
argument). As soon as I restart the stopped broker (0), records are 
returned. If the leader is broker 1 or 2 I don't run into this issue. If I 
use the old consumer I don't run into the issue either. I have been able 
to reproduce this consistently on all three OS's above.

--Vahid



From:   Jason Gustafson 
To: dev@kafka.apache.org, Kafka Users , 
kafka-clients 
Date:   10/12/2016 10:41 AM
Subject:[VOTE] 0.10.1.0 RC2



Hello Kafka users, developers and client-developers,

One more RC for 0.10.1.0. I think we're getting close!

Release plan: https://cwiki.apache.org/confluence/display/KAFKA/Rele
ase+Plan+0.10.1.

Release notes for the 0.10.1.0 release:
http://home.apache.org/~jgus/kafka-0.10.1.0-rc2/RELEASE_NOTES.html

*** Please download, test and vote by Saturday, Oct 15, 11am PT

Kafka's KEYS file containing PGP keys we use to sign the release:
http://kafka.apache.org/KEYS

* Release artifacts to be voted upon (source and binary):
http://home.apache.org/~jgus/kafka-0.10.1.0-rc2/

* Maven artifacts to be voted upon:
https://repository.apache.org/content/groups/staging/

* Javadoc:
http://home.apache.org/~jgus/kafka-0.10.1.0-rc2/javadoc/

* Tag to be voted upon (off 0.10.1 branch) is the 0.10.1.0-rc2 tag:
https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=
8702d66434b86092a3738472f9186d6845ab0720

* Documentation:
http://kafka.apache.org/0101/documentation.html

* Protocol:
http://kafka.apache.org/0101/protocol.html

* Tests:
Unit tests: https://builds.apache.org/job/kafka-0.10.1-jdk7/68/
System tests: http://confluent-kafka-0-10-1-system-test-results.s3-
us-west-2.amazonaws.com/2016-10-11--001.1476197348--apache--0.10.1--d981dd2/

Thanks,

Jason






[jira] [Commented] (KAFKA-4293) ByteBufferMessageSet.deepIterator burns CPU catching EOFExceptions

2016-10-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15572983#comment-15572983
 ] 

ASF GitHub Bot commented on KAFKA-4293:
---

GitHub user radai-rosenblatt opened a pull request:

https://github.com/apache/kafka/pull/2025

KAFKA-4293 - improve ByteBufferMessageSet.deepIterator() performance by 
relying on underlying stream's available() implementation

also:
provided better available() for ByteBufferInputStream
provided better available() for KafkaLZ4BlockInputStream
added KafkaGZIPInputStream with a better available()
fixed KafkaLZ4BlockOutputStream.close() to properly flush

Signed-off-by: radai-rosenblatt 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/radai-rosenblatt/kafka suchwow

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2025.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2025






> ByteBufferMessageSet.deepIterator burns CPU catching EOFExceptions
> --
>
> Key: KAFKA-4293
> URL: https://issues.apache.org/jira/browse/KAFKA-4293
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.0.1
>Reporter: radai rosenblatt
>Assignee: radai rosenblatt
>
> around line 110:
> {noformat}
> try {
> while (true)
> innerMessageAndOffsets.add(readMessageFromStream(compressed))
> } catch {
> case eofe: EOFException =>
> // we don't do anything at all here, because the finally
> // will close the compressed input stream, and we simply
> // want to return the innerMessageAndOffsets
> {noformat}
> the only indication the code has that the end of the oteration was reached is 
> by catching EOFException (which will be thrown inside 
> readMessageFromStream()).
> profiling runs performed at linkedIn show 10% of the total broker CPU time 
> taken up by Throwable.fillInStack() because of this behaviour.
> unfortunately InputStream.available() cannot be relied upon (concrete example 
> - GZipInputStream will not correctly return 0) so the fix would probably be a 
> wire format change to also encode the number of messages.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #2025: KAFKA-4293 - improve ByteBufferMessageSet.deepIter...

2016-10-13 Thread radai-rosenblatt
GitHub user radai-rosenblatt opened a pull request:

https://github.com/apache/kafka/pull/2025

KAFKA-4293 - improve ByteBufferMessageSet.deepIterator() performance by 
relying on underlying stream's available() implementation

also:
provided better available() for ByteBufferInputStream
provided better available() for KafkaLZ4BlockInputStream
added KafkaGZIPInputStream with a better available()
fixed KafkaLZ4BlockOutputStream.close() to properly flush

Signed-off-by: radai-rosenblatt 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/radai-rosenblatt/kafka suchwow

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2025.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2025






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [kafka-clients] [VOTE] 0.10.1.0 RC2

2016-10-13 Thread Jason Gustafson
No worries, it wouldn't be interesting unless there were a couple
last-minute blockers! We're also got KAFKA-4298 to get in.

-Jason

On Thu, Oct 13, 2016 at 10:04 AM, Damian Guy  wrote:

> Hi Jason,
>
> Really sorry, but we are going to need to cut another RC.  There was a
> report on the user list w.r.t the NamedCache (in KafkaStreams) throwing a
> NullPointerException. I've looked into it and it is definitely a bug that
> needs to be fixed. jira is https://issues.apache.org/
> jira/browse/KAFKA-4300 and PR is https://github.com/apache/kafka/pull/2024
>
> Thanks,
> Damian
>
> On Wed, 12 Oct 2016 at 20:05 Dana Powers  wrote:
>
>> +1; all kafka-python integration tests pass.
>>
>> -Dana
>>
>>
>> On Wed, Oct 12, 2016 at 10:41 AM, Jason Gustafson 
>> wrote:
>> > Hello Kafka users, developers and client-developers,
>> >
>> > One more RC for 0.10.1.0. I think we're getting close!
>> >
>> > Release plan:
>> > https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+0.10.1.
>> >
>> > Release notes for the 0.10.1.0 release:
>> > http://home.apache.org/~jgus/kafka-0.10.1.0-rc2/RELEASE_NOTES.html
>> >
>> > *** Please download, test and vote by Saturday, Oct 15, 11am PT
>> >
>> > Kafka's KEYS file containing PGP keys we use to sign the release:
>> > http://kafka.apache.org/KEYS
>> >
>> > * Release artifacts to be voted upon (source and binary):
>> > http://home.apache.org/~jgus/kafka-0.10.1.0-rc2/
>> >
>> > * Maven artifacts to be voted upon:
>> > https://repository.apache.org/content/groups/staging/
>> >
>> > * Javadoc:
>> > http://home.apache.org/~jgus/kafka-0.10.1.0-rc2/javadoc/
>> >
>> > * Tag to be voted upon (off 0.10.1 branch) is the 0.10.1.0-rc2 tag:
>> > https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=
>> 8702d66434b86092a3738472f9186d6845ab0720
>> >
>> > * Documentation:
>> > http://kafka.apache.org/0101/documentation.html
>> >
>> > * Protocol:
>> > http://kafka.apache.org/0101/protocol.html
>> >
>> > * Tests:
>> > Unit tests: https://builds.apache.org/job/kafka-0.10.1-jdk7/68/
>> > System tests:
>> > http://confluent-kafka-0-10-1-system-test-results.s3-us-
>> west-2.amazonaws.com/2016-10-11--001.1476197348--apache--0.10.1--d981dd2/
>> >
>> > Thanks,
>> >
>> > Jason
>> >
>> > --
>> > You received this message because you are subscribed to the Google
>> Groups
>> > "kafka-clients" group.
>> > To unsubscribe from this group and stop receiving emails from it, send
>> an
>> > email to kafka-clients+unsubscr...@googlegroups.com.
>> > To post to this group, send email to kafka-clie...@googlegroups.com.
>> > Visit this group at https://groups.google.com/group/kafka-clients.
>> > To view this discussion on the web visit
>> > https://groups.google.com/d/msgid/kafka-clients/CAJDuW%
>> 3DDk7Mi6ZsiniHcdbCCBdBhasjSeb7_N3EW%3D97OrfvFyew%40mail.gmail.com.
>> > For more options, visit https://groups.google.com/d/optout.
>>
>


Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention

2016-10-13 Thread Dong Lin
Hi David,

Thanks for your explanation. There still seems to be issue with this
solution. Please see my comment inline.


On Thu, Oct 13, 2016 at 8:46 AM, 东方甲乙 <254479...@qq.com> wrote:

> Hi Dong,
> Sorry for the delay, here are the comments:
> 1.I think we should distinguish these two cases:
> (1) group has no member, but has commit offset :  In this case we should
> consider its commit offset
> (2) group has no member, no commit offset:  Skip this group
> Is it ok?
>
>
> ListGroup API can list the groups,  but this API only show the Online
> Group, so we should enhance the listGroup API to list those groups in the
> case (1)
>
> Say some user starts a consumer to consume topic A with enable.auto.commit
= true. Later they change the group name in the config. Then the proposed
solution will never execute consumed log retention for the topic A, right?
I think group name change is pretty common and we should take care of this
issue. One possible solution is to add a config to specify the maximum time
since last offset commit before we consider a group is inactive.


>
> 2. Because every consumer group may appear in different time, say, group 1
> start to consume in day 1, group 2 start to consume in day 2.  If we delete
> the log segment right away,
> group 2 can not consume these message.  So we hope the messages can hold
> for a specified time.  I think many use-cases will need there configs, if
> there are many consumer groups.
>
>
If we want to take care of group 2, can we simply disable consumed log
retention for the topic and set log retention to 1 day? Can you explain the
benefit of enabling consumed log retention and set consumed log retention
to 1 day?

Currently the flow graph in the KIP suggests that we delete data iff
(consumed log retention is triggered OR forced log retention is triggered).
And alternative solution is to delete data iff ( (consumed log retention is
disabled OR consumed log retention is triggered) AND forced log retention
is triggered). I would argue that the 2nd scheme is better. Say the
consumed log retention is enabled. The 1st scheme basically interprets
forced log retention as the upper bound of the time the data can stay in
Kafka. The 2nd scheme interprets forced log retention as the lower bound of
the time the data can stay in Kafka, which is more consistent with the
purpose of having this forced log retention (to save disk space). And if we
adopt the 2nd solution, the use-case you suggested can be easily addressed
by setting forced log retention to 1 day and enable consumed log retention.
What do you think?



>
> Thanks,
> David
>
>
>
>
> -- 原始邮件 --
> 发件人: "Dong Lin";;
> 发送时间: 2016年10月10日(星期一) 下午4:05
> 收件人: "dev";
>
> 主题: Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention
>
>
>
> Hey David,
>
> Thanks for reply. Please see comment inline.
>
> On Mon, Oct 10, 2016 at 12:40 AM, Pengwei (L) 
> wrote:
>
> > Hi Dong
> >Thanks for the questions:
> >
> > 1.  Now we don't distinguish inactive or active groups. Because in some
> > case maybe inactive group will become active again, and using the
> previous
> > commit offset.
> >
> > So we will not delete the log segment in the consumer retention if there
> > are some groups consume but not commit, but the log segment can be
> delete by
> >  the force retention.
> >
>
> So in the example I provided, the consumed log retention will be
> effectively disabled, right? This seems to be a real problem in operation
> -- we don't want log retention to be un-intentionally disabled simply
> because someone start a tool to consume from that topic. Either this KIP
> should provide a way to handle this, or there should be a way for operator
> to be aware of such case and be able to re-eanble consumed log retention
> for the topic. What do you think?
>
>
>
> > 2.  These configs are used to determine the out of date time of the
> > consumed retention, like the parameters of the force retention
> > (log.retention.hours, log.retention.minutes, log.retention.ms). For
> > example, users want the save the log for 3 days, after 3 days, kafka will
> > delete the log segments which are
> >
> > consumed by all the consumer group.  The log retention thread need these
> > parameters.
> >
> > It makes sense to have configs such as log.retention.ms -- it is used to
> make data available for up to a configured amount of time before it is
> deleted. My question is what is the use-case for making log available for
> another e.g. 3 days after it has been consumed by all consumer groups. The
> purpose of this KIP is to allow log to be deleted right as long as all
> interested consumer groups have consumed it. Can you provide a use-case for
> keeping log available for longer time after it has been consumed by all
> groups?
>
>
> >
> > Thanks,
> > David
> >
> >
> > > Hey David,
> > >
> > > Thanks for the KIP. Can you help with the 

Kafka-Client-0.9.0.1: Throwing an exception while Deserialization a kafka value message

2016-10-13 Thread Amine Chouicha
All:
I have a kind of semantic question on how the kafka client library is behaving 
when:

1. You are providing you own Serialization/Deserialization class by setting up 
this prop (value.serializer)

2. While deserializing, I am throwing a serialization exception



Based on my tests, not only does Kafka client swallow that exception (just log 
it), but it will try again to reprocess on the next poll basically behaving 
like a “poison pill”. Is this a bug or it is really intended, it really does 
not make sense to me. Below are the details:



1. the custom class deserializer throw an exception

2. this exception is caught by org.apache.kafka.clients.consumer.internals. 
Fetcher.parseRecord (line 625) and rethrown

3. the rethrown exception is then caught and logged by 
org.apache.kafka.clients.poll (line 275)

4. on the next poll, we get the same issue allover again.










Caused by: java.lang.ArrayIndexOutOfBoundsException: 6
  at org.apache.avro.io.ResolvingDecoder.readEnum(ResolvingDecoder.java:257)
  at 
org.apache.avro.generic.GenericDatumReader.readEnum(GenericDatumReader.java:246)
  at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
  at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
  at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:178)
  at 
org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
  at 
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:230)
  at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:174)
  at 
org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:274)
  at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:176)
  at 
org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
  at 
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:230)
  at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:174)
  at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
  at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
  at 
com.morningstar.dp.messaging.common.serialization.avro.AvroNoSchemaGenericSerde.deserialize(AvroNoSchemaGenericSerde.java:72)
  at 
org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:622)
  at 
org.apache.kafka.clients.consumer.internals.Fetcher.handleFetchResponse(Fetcher.java:566)
  at 
org.apache.kafka.clients.consumer.internals.Fetcher.access$000(Fetcher.java:69)
  at 
org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:139)
  at 
org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:136)
  at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
  at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
  at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:380)
  at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:274)
  at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
  at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
  at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
  at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
  at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:222)
  at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:311)
  at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:890)
  at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)




[jira] [Updated] (KAFKA-4300) NamedCache throws an NPE when evict is called and the cache is empty

2016-10-13 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4300?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-4300:
--
Status: Patch Available  (was: In Progress)

> NamedCache throws an NPE when evict is called and the cache is empty
> 
>
> Key: KAFKA-4300
> URL: https://issues.apache.org/jira/browse/KAFKA-4300
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Damian Guy
>Assignee: Damian Guy
> Fix For: 0.10.1.0
>
>
> evict can be called on a NamedCache even though it is empty. This is due to 
> the shared nature of the outer ThreadCache. Currently if evict is called on 
> an empty NamedCache it will throw a NullPointerException.
> From the original email:
> I'm joining a bunch of Kafka Topics using Kafka Streams, with the Kafka
> 0.10.1 release candidate.
> It runs ok for a few thousand of messages, and then it dies with the
> following exception:
> Exception in thread "StreamThread-1" java.lang.NullPointerException
> at
> org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:194)
> at
> org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:190)
> at
> org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:121)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:147)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:134)
> at
> org.apache.kafka.streams.kstream.internals.KTableReduce$KTableAggregateValueGetter.get(KTableReduce.java:121)
> at
> org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:77)
> at
> org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:48)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> at
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:83)
> at
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:73)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> at
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:83)
> at
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:73)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> at
> org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:52)
> at
> org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:49)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> at
> org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:83)
> at
> org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:49)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> at
> org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:35)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.maybeForward(CachingKeyValueStore.java:97)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:34)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:84)
> at
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:117)
> at
> org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:196)
> at
> org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:190)
> at
> org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:121)
> at
> 

[jira] [Updated] (KAFKA-4300) NamedCache throws an NPE when evict is called and the cache is empty

2016-10-13 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4300?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-4300:
--
Description: 
evict can be called on a NamedCache even though it is empty. This is due to the 
shared nature of the outer ThreadCache. Currently if evict is called on an 
empty NamedCache it will throw a NullPointerException.

>From the original email:

I'm joining a bunch of Kafka Topics using Kafka Streams, with the Kafka
0.10.1 release candidate.

It runs ok for a few thousand of messages, and then it dies with the
following exception:

Exception in thread "StreamThread-1" java.lang.NullPointerException
at
org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:194)
at
org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:190)
at
org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:121)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:147)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:134)
at
org.apache.kafka.streams.kstream.internals.KTableReduce$KTableAggregateValueGetter.get(KTableReduce.java:121)
at
org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:77)
at
org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:48)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
at
org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:83)
at
org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:73)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
at
org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:83)
at
org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:73)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
at
org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:52)
at
org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:49)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
at
org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:83)
at
org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:49)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
at
org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:35)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.maybeForward(CachingKeyValueStore.java:97)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:34)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:84)
at
org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:117)
at
org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:196)
at
org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:190)
at
org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:121)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:187)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:182)
at
org.apache.kafka.streams.kstream.internals.KTableReduce$KTableReduceProcessor.process(KTableReduce.java:92)
at
org.apache.kafka.streams.kstream.internals.KTableReduce$KTableReduceProcessor.process(KTableReduce.java:52)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
at

Re: [kafka-clients] [VOTE] 0.10.1.0 RC2

2016-10-13 Thread Damian Guy
Hi Jason,

Really sorry, but we are going to need to cut another RC.  There was a
report on the user list w.r.t the NamedCache (in KafkaStreams) throwing a
NullPointerException. I've looked into it and it is definitely a bug that
needs to be fixed. jira is https://issues.apache.org/jira/browse/KAFKA-4300 and
PR is https://github.com/apache/kafka/pull/2024

Thanks,
Damian

On Wed, 12 Oct 2016 at 20:05 Dana Powers  wrote:

> +1; all kafka-python integration tests pass.
>
> -Dana
>
>
> On Wed, Oct 12, 2016 at 10:41 AM, Jason Gustafson 
> wrote:
> > Hello Kafka users, developers and client-developers,
> >
> > One more RC for 0.10.1.0. I think we're getting close!
> >
> > Release plan:
> > https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+0.10.1.
> >
> > Release notes for the 0.10.1.0 release:
> > http://home.apache.org/~jgus/kafka-0.10.1.0-rc2/RELEASE_NOTES.html
> >
> > *** Please download, test and vote by Saturday, Oct 15, 11am PT
> >
> > Kafka's KEYS file containing PGP keys we use to sign the release:
> > http://kafka.apache.org/KEYS
> >
> > * Release artifacts to be voted upon (source and binary):
> > http://home.apache.org/~jgus/kafka-0.10.1.0-rc2/
> >
> > * Maven artifacts to be voted upon:
> > https://repository.apache.org/content/groups/staging/
> >
> > * Javadoc:
> > http://home.apache.org/~jgus/kafka-0.10.1.0-rc2/javadoc/
> >
> > * Tag to be voted upon (off 0.10.1 branch) is the 0.10.1.0-rc2 tag:
> >
> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=8702d66434b86092a3738472f9186d6845ab0720
> >
> > * Documentation:
> > http://kafka.apache.org/0101/documentation.html
> >
> > * Protocol:
> > http://kafka.apache.org/0101/protocol.html
> >
> > * Tests:
> > Unit tests: https://builds.apache.org/job/kafka-0.10.1-jdk7/68/
> > System tests:
> >
> http://confluent-kafka-0-10-1-system-test-results.s3-us-west-2.amazonaws.com/2016-10-11--001.1476197348--apache--0.10.1--d981dd2/
> >
> > Thanks,
> >
> > Jason
> >
> > --
> > You received this message because you are subscribed to the Google Groups
> > "kafka-clients" group.
> > To unsubscribe from this group and stop receiving emails from it, send an
> > email to kafka-clients+unsubscr...@googlegroups.com.
> > To post to this group, send email to kafka-clie...@googlegroups.com.
> > Visit this group at https://groups.google.com/group/kafka-clients.
> > To view this discussion on the web visit
> >
> https://groups.google.com/d/msgid/kafka-clients/CAJDuW%3DDk7Mi6ZsiniHcdbCCBdBhasjSeb7_N3EW%3D97OrfvFyew%40mail.gmail.com
> .
> > For more options, visit https://groups.google.com/d/optout.
>


[jira] [Created] (KAFKA-4301) Include some SSL/TLS logging to avoid need need for javax debug util every time an issue arises

2016-10-13 Thread Ryan P (JIRA)
Ryan P created KAFKA-4301:
-

 Summary: Include some SSL/TLS logging to avoid need need for javax 
debug util every time an issue arises 
 Key: KAFKA-4301
 URL: https://issues.apache.org/jira/browse/KAFKA-4301
 Project: Kafka
  Issue Type: Improvement
Reporter: Ryan P


It would be handy to include certain transport layer session attributes in at 
least the debug level logging within Kafka. Specifically with regard to TLS/SSL 
communications. 

Some of the things it would be helpful to see without having to enable the 
javax network debug utility include: 

1. Negotiated cipher suite 
2. Authenticated client principal 

Technically item2 is covered with the authorizer logging but it would be nice 
to have this information available even in the absence of an authorizer 
implementation. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Work started] (KAFKA-4300) NamedCache throws an NPE when evict is called and the cache is empty

2016-10-13 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4300?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-4300 started by Damian Guy.
-
> NamedCache throws an NPE when evict is called and the cache is empty
> 
>
> Key: KAFKA-4300
> URL: https://issues.apache.org/jira/browse/KAFKA-4300
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Damian Guy
>Assignee: Damian Guy
> Fix For: 0.10.1.0
>
>
> evict can be called on a NamedCache even though it is empty. This is due to 
> the shared nature of the outer ThreadCache. Currently if evict is called on 
> an empty NamedCache it will throw a NullPointerException



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4300) NamedCache throws an NPE when evict is called and the cache is empty

2016-10-13 Thread Damian Guy (JIRA)
Damian Guy created KAFKA-4300:
-

 Summary: NamedCache throws an NPE when evict is called and the 
cache is empty
 Key: KAFKA-4300
 URL: https://issues.apache.org/jira/browse/KAFKA-4300
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.10.1.0
Reporter: Damian Guy
Assignee: Damian Guy
 Fix For: 0.10.1.0


evict can be called on a NamedCache even though it is empty. This is due to the 
shared nature of the outer ThreadCache. Currently if evict is called on an 
empty NamedCache it will throw a NullPointerException



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #2024: HOTFIX: fix npe in NamedCache if evict is called w...

2016-10-13 Thread dguy
GitHub user dguy opened a pull request:

https://github.com/apache/kafka/pull/2024

HOTFIX: fix npe in NamedCache if evict is called when the cache is empty

If evict is called on a NamedCache and the cache is empty an NPE is thrown. 
This was reported on the user list from a developer running 0.10.1.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dguy/kafka cache-bug

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2024.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2024


commit 250cfccb715b5067fd71e4d33a6878bc4333d1e0
Author: Damian Guy 
Date:   2016-10-13T16:55:05Z

fix npe in NamedCache if evict is called when the cache is empty




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] KIP-82 - Add Record Headers

2016-10-13 Thread radai
here's yet another use case of an organization using kafka in need of
headers - https://issues.apache.org/jira/browse/AVRO-1704
they appear to be a 100% avro shop so they can make do with changes within
avro (who in turn didnt mind defining an avro wire format with header
support)

On Wed, Oct 12, 2016 at 1:22 AM, Michael Pearce 
wrote:

> @Jay and Dana
>
> We have internally had a few discussions of how we may address this if we
> had a common apache kafka message wrapper for headers that can be used
> client side only to, and address the compaction issue.
> I have detailed this solution separately and linked from the main KIP-82
> wiki.
>
> Here’s a direct link –
> https://cwiki.apache.org/confluence/display/KAFKA/
> Headers+Value+Message+Wrapper
>
> We feel this solution though doesn’t manage to address all the use cases
> being mentioned still and also has some compatibility drawbacks e.g.
> backwards forwards compatibility especially on different language clients
> Also we still require with this solution, as still need to address
> compaction issue / tombstones, we need to make server side changes and as
> many message/record version changes.
>
> We believe the proposed solution in KIP-82 does address all these needs
> and is cleaner still, and more benefits.
> Please have a read, and comment. Also if you have any improvements on the
> proposed KIP-82 or an alternative solution/option your input is appreciated.
>
> @All
> As Joel has mentioned to get this moving along, and able to discuss more
> fluidly, it would be great if we can organize to meet up virtually online
> e.g. webex or something.
> I am aware, that the majority are based in America, myself is in the UK.
> @Kostya I assume you’re in Eastern Europe or Russia based on your email
> address (please correct this assumption), I hope the time difference isn’t
> too much that the below would suit you if you wish to join
>
> Can I propose next Wednesday 19th October at 18:30 BST , 10:30 PST, 20:30
> MSK we try meetup online?
>
> Would this date/time suit the majority?
> Also what is the preferred method? I can host via Adobe Connect style
> webex (which my company uses) but it isn’t the best IMHO, so more than
> happy to have someone suggest a better alternative.
>
> Best,
> Mike
>
>
>
>
> On 10/8/16, 7:26 AM, "Michael Pearce"  wrote:
>
> >> I agree with the critique of compaction not having a value. I think
> we should consider fixing that directly.
>
> > Agree that the compaction issue is troubling: compacted "null"
> deletes
> are incompatible w/ headers that must be packed into the message
> value. Are there any alternatives on compaction delete semantics that
> could address this? The KIP wiki discussion I think mostly assumes
> that compaction-delete is what it is and can't be changed/fixed.
>
> This KIP is about dealing with quite a few use cases and issues,
> please see both the KIP use cases detailed by myself and also the
> additional use cases wiki added by LinkedIn linked from the main KIP.
>
> The compaction is something that happily is addressed with headers,
> but most defiantly isn't the sole reason or use case for them, headers
> solves many issues and use cases. Thus their elegance and simplicity, and
> why they're so common in transport mechanisms and so succesfull, as stated
> like http, tcp, jms.
>
> 
> From: Dana Powers 
> Sent: Friday, October 7, 2016 11:09 PM
> To: dev@kafka.apache.org
> Subject: Re: [DISCUSS] KIP-82 - Add Record Headers
>
> > I agree with the critique of compaction not having a value. I think
> we should consider fixing that directly.
>
> Agree that the compaction issue is troubling: compacted "null" deletes
> are incompatible w/ headers that must be packed into the message
> value. Are there any alternatives on compaction delete semantics that
> could address this? The KIP wiki discussion I think mostly assumes
> that compaction-delete is what it is and can't be changed/fixed.
>
> -Dana
>
> On Fri, Oct 7, 2016 at 1:38 PM, Michael Pearce 
> wrote:
> >
> > Hi Jay,
> >
> > Thanks for the comments and feedback.
> >
> > I think its quite clear that if a problem keeps arising then it is
> clear that it needs resolving, and addressing properly.
> >
> > Fair enough at linkedIn, and historically for the very first use
> cases addressing this maybe not have been a big priority. But as Kafka is
> now Apache open source and being picked up by many including my company, it
> is clear and evident that this is a requirement and issue that needs to be
> now addressed to address these needs.
> >
> > The fact in almost every transport mechanism including networking
> layers in the enterprise ive worked in, there has always been headers i
> think clearly shows their 

Re: [DISCUSS] KIP-81: Max in-flight fetches

2016-10-13 Thread Mickael Maison
I've now updated the KIP.

New link as I've updated the title:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-81%3A+Bound+Fetch+memory+usage+in+the+consumer

Any further feedback welcome !

On Tue, Oct 11, 2016 at 6:00 PM, Mickael Maison
 wrote:
> Thanks for the feedback.
>
> Regarding the config name, I agree it's probably best to reuse the
> same name as the producer (buffer.memory) whichever implementation we
> decide to use.
>
> At first, I opted for limiting the max number of concurrent fetches as
> it felt more natural in the Fetcher code. Whereas in the producer we
> keep track of the size of the buffer with RecordAccumulator, the
> consumer simply stores the completed fetches in a list so we don't
> have the used memory size immediately. Also the number of inflight
> fetches was already tracked by Fetcher.
> That said, it shouldn't be too hard to keep track of the used memory
> by the completed fetches collection if we decide to, either way should
> work.
>
> On Mon, Oct 10, 2016 at 3:40 PM, Ismael Juma  wrote:
>> Hi Mickael,
>>
>> Thanks for the KIP. A quick comment on the rejected alternative of using a
>> bounded memory pool:
>>
>> "While this might be the best way to handle that on the server side it's
>> unclear if this would suit the client well. Usually the client has a rough
>> idea about how many partitions it will be subscribed to so it's easier to
>> size the maximum number of concurrent fetches."
>>
>> I think this should be discussed in more detail. The producer (a client)
>> uses a `BufferPool`, so we should also explain why the consumer should
>> follow a different approach. Also, with KIP-74, the number of partitions is
>> less relevant than the number of brokers with partitions that a consumer is
>> subscribed to (which can change as the Kafka cluster size changes).
>>
>> I think it's also worth separating implementation from the config options.
>> For example, one could configure a memory limit with an implementation that
>> limits the number of concurrent fetches or uses a bounded memory pool. Are
>> there other good reasons to have an explicit concurrent fetches limit per
>> consumer config? If so, it would be good to mention them in the KIP.
>>
>> Ismael
>>
>> On Mon, Oct 10, 2016 at 2:41 PM, Mickael Maison 
>> wrote:
>>
>>> Hi all,
>>>
>>> I would like to discuss the following KIP proposal:
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-81%3A+
>>> Max+in-flight+fetches
>>>
>>>
>>> Feedback and comments are welcome.
>>> Thanks !
>>>
>>> Mickael
>>>


RE?? [DISCUSS] KIP-68 Add a consumed log retention before log retention

2016-10-13 Thread ????????
Hi Dong,
Sorry for the delay, here are the comments:
1.I think we should distinguish these two cases:
(1) group has no member, but has commit offset :  In this case we should 
consider its commit offset
(2) group has no member, no commit offset:  Skip this group
Is it ok?


ListGroup API can list the groups,  but this API only show the Online Group, so 
we should enhance the listGroup API to list those groups in the case (1)


2. Because every consumer group may appear in different time, say, group 1 
start to consume in day 1, group 2 start to consume in day 2.  If we delete the 
log segment right away,
group 2 can not consume these message.  So we hope the messages can hold for a 
specified time.  I think many use-cases will need there configs, if there are 
many consumer groups.


Thanks,
David




--  --
??: "Dong Lin";;
: 2016??10??10??(??) 4:05
??: "dev"; 

: Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention



Hey David,

Thanks for reply. Please see comment inline.

On Mon, Oct 10, 2016 at 12:40 AM, Pengwei (L)  wrote:

> Hi Dong
>Thanks for the questions:
>
> 1.  Now we don't distinguish inactive or active groups. Because in some
> case maybe inactive group will become active again, and using the previous
> commit offset.
>
> So we will not delete the log segment in the consumer retention if there
> are some groups consume but not commit, but the log segment can be delete by
>  the force retention.
>

So in the example I provided, the consumed log retention will be
effectively disabled, right? This seems to be a real problem in operation
-- we don't want log retention to be un-intentionally disabled simply
because someone start a tool to consume from that topic. Either this KIP
should provide a way to handle this, or there should be a way for operator
to be aware of such case and be able to re-eanble consumed log retention
for the topic. What do you think?



> 2.  These configs are used to determine the out of date time of the
> consumed retention, like the parameters of the force retention
> (log.retention.hours, log.retention.minutes, log.retention.ms). For
> example, users want the save the log for 3 days, after 3 days, kafka will
> delete the log segments which are
>
> consumed by all the consumer group.  The log retention thread need these
> parameters.
>
> It makes sense to have configs such as log.retention.ms -- it is used to
make data available for up to a configured amount of time before it is
deleted. My question is what is the use-case for making log available for
another e.g. 3 days after it has been consumed by all consumer groups. The
purpose of this KIP is to allow log to be deleted right as long as all
interested consumer groups have consumed it. Can you provide a use-case for
keeping log available for longer time after it has been consumed by all
groups?


>
> Thanks,
> David
>
>
> > Hey David,
> >
> > Thanks for the KIP. Can you help with the following two questions:
> >
> > 1) If someone start a consumer (e.g. kafka-console-consumer) to consume a
> > topic for debug/validation purpose, a randome consumer group may be
> created
> > and offset may be committed for this consumer group. If no offset commit
> is
> > made for this consumer group in the future, will this effectively
> > disable consumed log retention for this topic? In other words, how do
> this
> > KIP distinguish active consumer group from inactive ones?
> >
> > 2) Why do we need new configs such as log.retention.commitoffset.hours?
> Can
> >we simply delete log segments if consumed log retention is enabled for
> this
> > topic and all consumer groups have consumed messages in the log segment?
> >
> > Thanks,
> > Dong
> >
> >
> >
> >On Sat, Oct 8, 2016 at 2:15 AM, Pengwei (L) 
> wrote:
> >
> > > Hi Becket,
> > >
> > >   Thanks for the feedback:
> > > 1.  We use the simple consumer api to query the commit offset, so we
> don't
> > > need to specify the consumer group.
> > > 2.  Every broker using the simple consumer api(OffsetFetchKey) to query
> > > the commit offset in the log retention process.  The client can commit
> > > offset or not.
> > > 3.  It does not need to distinguish the follower brokers or leader
> > > brokers,  every brokers can query.
> > > 4.  We don't need to change the protocols, we mainly change the log
> > > retention process in the log manager.
> > >
> > >   One question is the query min offset need O(partitions * groups) time
> > > complexity, another alternative is to build an internal topic to save
> every
> > > partition's min offset, it can reduce to O(1).
> > > I will update the wiki for more details.
> > >
> > > Thanks,
> > > David
> > >
> > >
> > > > Hi Pengwei,
> > > >
> > > > Thanks for the KIP proposal. It is a very useful KIP. At a high
> level,
> > > the
> > > > proposed behavior looks 

[jira] [Comment Edited] (KAFKA-4128) Kafka broker losses messages when zookeeper session times out

2016-10-13 Thread Mazhar Shaikh (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4128?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568706#comment-15568706
 ] 

Mazhar Shaikh edited comment on KAFKA-4128 at 10/13/16 2:18 PM:


Hi Gwen Shapira,

My concern for this bug is as below :

1. When ever a follower connects to leader, where follower has more messages 
(offset) then leader, then follower truncates/Drop these msg to last 
Highwatermark.

   =>Here, Do we have any configuration which will avoid this dropping of msg 
and instead replicate it to master ?
 
2. What can be the possible reason for ZookeeperSession timeout, considering 
there is no issues with garbage collection.


Broker = 6
replica = 2
Total Partitions : 96, 
Partition per broker : 16 (Leader) + 16 (Follower)





I see this specific error is not handled in kafka code.

 /**
   * Handle a partition whose offset is out of range and return a new fetch 
offset.
   */


def handleOffsetOutOfRange(topicAndPartition: TopicAndPartition): Long = {
val replica = replicaMgr.getReplica(topicAndPartition.topic, 
topicAndPartition.partition).get

/**
 * Unclean leader election: A follower goes down, in the meanwhile the 
leader keeps appending messages. The follower comes back up
 * and before it has completely caught up with the leader's logs, all 
replicas in the ISR go down. The follower is now uncleanly
 * elected as the new leader, and it starts appending messages from the 
client. The old leader comes back up, becomes a follower
 * and it may discover that the current leader's end offset is behind its 
own end offset.
 *
 * In such a case, truncate the current follower's log to the current 
leader's end offset and continue fetching.
 *
 * There is a potential for a mismatch between the logs of the two replicas 
here. We don't fix this mismatch as of now.
 */


There is a potential for a mismatch between the logs of the two replicas here. 
We don't fix this mismatch as of now.



was (Author: mazhar.shaikh.in):
Hi Gwen Shapira,

My concern for this bug is as below :

1. When ever a follower connects to leader, where follower has more messages 
(offset) then leader, then follower truncates/Drop these msg to last 
Highwatermark.

   =>Here, Do we have any configuration which will avoid this dropping of msg 
and instead replicate it to master ?
 
2. What can be the possible reason for ZookeeperSession timeout, considering 
there is no issues with garbage collection.


Broker = 6
replica = 2
Total Partitions : 96, 
Partition per broker : 16 (Leader) + 16 (Follower)



I see this specific error is not handled in kafka code.

 /**
   * Handle a partition whose offset is out of range and return a new fetch 
offset.
   */
  def handleOffsetOutOfRange(topicAndPartition: TopicAndPartition): Long = {
val replica = replicaMgr.getReplica(topicAndPartition.topic, 
topicAndPartition.partition).get

/**
 * Unclean leader election: A follower goes down, in the meanwhile the 
leader keeps appending messages. The follower comes back up
 * and before it has completely caught up with the leader's logs, all 
replicas in the ISR go down. The follower is now uncleanly
 * elected as the new leader, and it starts appending messages from the 
client. The old leader comes back up, becomes a follower
 * and it may discover that the current leader's end offset is behind its 
own end offset.
 *
 * In such a case, truncate the current follower's log to the current 
leader's end offset and continue fetching.
 *
 * There is a potential for a mismatch between the logs of the two replicas 
here. We don't fix this mismatch as of now.
 */


There is a potential for a mismatch between the logs of the two replicas here. 
We don't fix this mismatch as of now.


> Kafka broker losses messages when zookeeper session times out
> -
>
> Key: KAFKA-4128
> URL: https://issues.apache.org/jira/browse/KAFKA-4128
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2.1, 0.9.0.1
>Reporter: Mazhar Shaikh
>Priority: Critical
>
> Pumping 30k msgs/second after some 6-8 

[jira] [Comment Edited] (KAFKA-4128) Kafka broker losses messages when zookeeper session times out

2016-10-13 Thread Mazhar Shaikh (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4128?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568706#comment-15568706
 ] 

Mazhar Shaikh edited comment on KAFKA-4128 at 10/13/16 2:18 PM:


Hi Gwen Shapira,

My concern for this bug is as below :

1. When ever a follower connects to leader, where follower has more messages 
(offset) then leader, then follower truncates/Drop these msg to last 
Highwatermark.

   =>Here, Do we have any configuration which will avoid this dropping of msg 
and instead replicate it to master ?
 
2. What can be the possible reason for ZookeeperSession timeout, considering 
there is no issues with garbage collection.


Broker = 6
replica = 2
Total Partitions : 96, 
Partition per broker : 16 (Leader) + 16 (Follower)



I see this specific error is not handled in kafka code.

 /**
   * Handle a partition whose offset is out of range and return a new fetch 
offset.
   */
  def handleOffsetOutOfRange(topicAndPartition: TopicAndPartition): Long = {
val replica = replicaMgr.getReplica(topicAndPartition.topic, 
topicAndPartition.partition).get

/**
 * Unclean leader election: A follower goes down, in the meanwhile the 
leader keeps appending messages. The follower comes back up
 * and before it has completely caught up with the leader's logs, all 
replicas in the ISR go down. The follower is now uncleanly
 * elected as the new leader, and it starts appending messages from the 
client. The old leader comes back up, becomes a follower
 * and it may discover that the current leader's end offset is behind its 
own end offset.
 *
 * In such a case, truncate the current follower's log to the current 
leader's end offset and continue fetching.
 *
 * There is a potential for a mismatch between the logs of the two replicas 
here. We don't fix this mismatch as of now.
 */


There is a potential for a mismatch between the logs of the two replicas here. 
We don't fix this mismatch as of now.



was (Author: mazhar.shaikh.in):
Hi Gwen Shapira,

My concern for this bug is as below :

1. When ever a follower connects to leader, where follower has more messages 
(offset) then leader, then follower truncates/Drop these msg to last 
Highwatermark.

   =>Here, Do we have any configuration which will avoid this dropping of msg 
and instead replicate it to master ?
 
2. What can be the possible reason for ZookeeperSession timeout, considering 
there is no issues with garbage collection.


Broker = 6
replica = 2
Total Partitions : 96, 
Partition per broker : 16 (Leader) + 16 (Follower)





> Kafka broker losses messages when zookeeper session times out
> -
>
> Key: KAFKA-4128
> URL: https://issues.apache.org/jira/browse/KAFKA-4128
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2.1, 0.9.0.1
>Reporter: Mazhar Shaikh
>Priority: Critical
>
> Pumping 30k msgs/second after some 6-8 hrs of run below logs are printed and 
> the messages are lost.
> [More than 5k messages are lost on every partitions]
> Below are few logs:
> [2016-09-06 05:00:42,595] INFO Client session timed out, have not heard from 
> server in 20903ms for sessionid 0x256fabec47c0003, closing socket connection 
> and attempting reconnect (org.apache.zookeeper.ClientCnxn)
> [2016-09-06 05:00:42,696] INFO zookeeper state changed (Disconnected) 
> (org.I0Itec.zkclient.ZkClient)
> [2016-09-06 05:00:42,753] INFO Partition [topic,62] on broker 4: Shrinking 
> ISR for partition [topic,62] from 4,2 to 4 (kafka.cluster.Partition)
> [2016-09-06 05:00:43,585] INFO Opening socket connection to server 
> b0/169.254.2.1:2182. Will not attempt to authenticate using SASL (unknown 
> error) (org.apache.zookeeper.ClientCnxn)
> [2016-09-06 05:00:43,586] INFO Socket connection established to 
> b0/169.254.2.1:2182, initiating session (org.apache.zookeeper.ClientCnxn)
> [2016-09-06 05:00:43,587] INFO Unable to read additional data from server 
> sessionid 0x256fabec47c0003, likely server has closed socket, closing socket 
> connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
> [2016-09-06 05:00:44,644] INFO Opening socket connection to server 
> b1/169.254.2.116:2181. Will not attempt to authenticate using SASL (unknown 
> error) (org.apache.zookeeper.ClientCnxn)
> [2016-09-06 05:00:44,651] INFO Socket connection established to 
> b1/169.254.2.116:2181, initiating session (org.apache.zookeeper.ClientCnxn)
> [2016-09-06 05:00:44,658] 

[GitHub] kafka pull request #2023: TRIVIAL: AbstractFetcherManager: shutdown speeedup

2016-10-13 Thread resetius
GitHub user resetius opened a pull request:

https://github.com/apache/kafka/pull/2023

TRIVIAL: AbstractFetcherManager: shutdown speeedup



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/resetius/kafka 
AbstractFetcherManager-shutdown-speedup

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2023.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2023


commit cdc8742654f382742002b0622068ceb000bf62e4
Author: Alexey Ozeritsky 
Date:   2016-10-11T11:21:16Z

TRIVIAL: AbstractFetcherManager: shutdown speeedup




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-2066) Replace FetchRequest / FetchResponse with their org.apache.kafka.common.requests equivalents

2016-10-13 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15571793#comment-15571793
 ] 

Ismael Juma commented on KAFKA-2066:


Thanks David. Assigned to Jason as he said he's available to pick it up. Jason, 
please unassign if I misunderstood.

> Replace FetchRequest / FetchResponse with their 
> org.apache.kafka.common.requests equivalents
> 
>
> Key: KAFKA-2066
> URL: https://issues.apache.org/jira/browse/KAFKA-2066
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Gwen Shapira
>Assignee: Jason Gustafson
>
> Replace FetchRequest / FetchResponse with their 
> org.apache.kafka.common.requests equivalents.
> Note that they can't be completely removed until we deprecate the 
> SimpleConsumer API (and it will require very careful patchwork for the places 
> where core modules actually use the SimpleConsumer API).
> This also requires a solution on how to stream from memory-mapped files 
> (similar to what existing code does with FileMessageSet. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2066) Replace FetchRequest / FetchResponse with their org.apache.kafka.common.requests equivalents

2016-10-13 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-2066:
---
Assignee: Jason Gustafson

> Replace FetchRequest / FetchResponse with their 
> org.apache.kafka.common.requests equivalents
> 
>
> Key: KAFKA-2066
> URL: https://issues.apache.org/jira/browse/KAFKA-2066
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Gwen Shapira
>Assignee: Jason Gustafson
>
> Replace FetchRequest / FetchResponse with their 
> org.apache.kafka.common.requests equivalents.
> Note that they can't be completely removed until we deprecate the 
> SimpleConsumer API (and it will require very careful patchwork for the places 
> where core modules actually use the SimpleConsumer API).
> This also requires a solution on how to stream from memory-mapped files 
> (similar to what existing code does with FileMessageSet. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4292) Configurable SASL callback handlers

2016-10-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15571761#comment-15571761
 ] 

ASF GitHub Bot commented on KAFKA-4292:
---

GitHub user rajinisivaram opened a pull request:

https://github.com/apache/kafka/pull/2022

KAFKA-4292: Configurable SASL callback handlers

Implementation of KIP-86: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-86%3A+Configurable+SASL+callback+handlers

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rajinisivaram/kafka KAFKA-4292

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2022.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2022


commit f7189cf914a337157db015d9d1687abbcae6377c
Author: Rajini Sivaram 
Date:   2016-09-20T09:11:27Z

KAFKA-4292: Configurable SASL callback handlers




> Configurable SASL callback handlers
> ---
>
> Key: KAFKA-4292
> URL: https://issues.apache.org/jira/browse/KAFKA-4292
> Project: Kafka
>  Issue Type: Improvement
>  Components: security
>Affects Versions: 0.10.1.0
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
> Fix For: 0.10.2.0
>
>
> Implementation of KIP-86: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-86%3A+Configurable+SASL+callback+handlers



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #2022: KAFKA-4292: Configurable SASL callback handlers

2016-10-13 Thread rajinisivaram
GitHub user rajinisivaram opened a pull request:

https://github.com/apache/kafka/pull/2022

KAFKA-4292: Configurable SASL callback handlers

Implementation of KIP-86: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-86%3A+Configurable+SASL+callback+handlers

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rajinisivaram/kafka KAFKA-4292

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2022.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2022


commit f7189cf914a337157db015d9d1687abbcae6377c
Author: Rajini Sivaram 
Date:   2016-09-20T09:11:27Z

KAFKA-4292: Configurable SASL callback handlers




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Work stopped] (KAFKA-2066) Replace FetchRequest / FetchResponse with their org.apache.kafka.common.requests equivalents

2016-10-13 Thread David Jacot (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-2066 stopped by David Jacot.
--
> Replace FetchRequest / FetchResponse with their 
> org.apache.kafka.common.requests equivalents
> 
>
> Key: KAFKA-2066
> URL: https://issues.apache.org/jira/browse/KAFKA-2066
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Gwen Shapira
>Assignee: David Jacot
>
> Replace FetchRequest / FetchResponse with their 
> org.apache.kafka.common.requests equivalents.
> Note that they can't be completely removed until we deprecate the 
> SimpleConsumer API (and it will require very careful patchwork for the places 
> where core modules actually use the SimpleConsumer API).
> This also requires a solution on how to stream from memory-mapped files 
> (similar to what existing code does with FileMessageSet. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2066) Replace FetchRequest / FetchResponse with their org.apache.kafka.common.requests equivalents

2016-10-13 Thread David Jacot (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot updated KAFKA-2066:
---
Assignee: (was: David Jacot)

> Replace FetchRequest / FetchResponse with their 
> org.apache.kafka.common.requests equivalents
> 
>
> Key: KAFKA-2066
> URL: https://issues.apache.org/jira/browse/KAFKA-2066
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Gwen Shapira
>
> Replace FetchRequest / FetchResponse with their 
> org.apache.kafka.common.requests equivalents.
> Note that they can't be completely removed until we deprecate the 
> SimpleConsumer API (and it will require very careful patchwork for the places 
> where core modules actually use the SimpleConsumer API).
> This also requires a solution on how to stream from memory-mapped files 
> (similar to what existing code does with FileMessageSet. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2066) Replace FetchRequest / FetchResponse with their org.apache.kafka.common.requests equivalents

2016-10-13 Thread David Jacot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15571702#comment-15571702
 ] 

David Jacot commented on KAFKA-2066:


Unfortunately, I won't have time in the next weeks. [~hachikuji], feel free to 
pick it up.

> Replace FetchRequest / FetchResponse with their 
> org.apache.kafka.common.requests equivalents
> 
>
> Key: KAFKA-2066
> URL: https://issues.apache.org/jira/browse/KAFKA-2066
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Gwen Shapira
>Assignee: David Jacot
>
> Replace FetchRequest / FetchResponse with their 
> org.apache.kafka.common.requests equivalents.
> Note that they can't be completely removed until we deprecate the 
> SimpleConsumer API (and it will require very careful patchwork for the places 
> where core modules actually use the SimpleConsumer API).
> This also requires a solution on how to stream from memory-mapped files 
> (similar to what existing code does with FileMessageSet. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4299) Consumer offsets reset for all topics after increasing partitions for one topic

2016-10-13 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15571675#comment-15571675
 ] 

Ismael Juma commented on KAFKA-4299:


cc [~hachikuji]

> Consumer offsets reset for all topics after increasing partitions for one 
> topic
> ---
>
> Key: KAFKA-4299
> URL: https://issues.apache.org/jira/browse/KAFKA-4299
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.1
>Reporter: Juho Autio
>
> I increased partitions for one existing topic (2->10), but was surprised to 
> see that it entirely reset the committed offsets of my consumer group.
> All topics & partitions were reset to the earliest offset available, and the 
> consumer read everything again.
> Documentation doesn't mention anything like this. Is this how it's supposed 
> to work, or a bug?
> I would've expected the consumer offsets to not decrease at all, especially 
> for the topics that I didn't even touch.
> For the altered topic I would've expected that consuming the previously 
> existing partitions 0 and 1 would've continued from the position where they 
> were, and naturally starting to read the new added partitions from 0.
> I added partitions according to the "Modifying topics" section of Kafka 
> 0.10.0 Documentation:
> {quote}
> To add partitions you can do
> {code}
>  > bin/kafka-topics.sh --zookeeper $ZOOKEEPER_HOST --alter --topic 
> altered_topic --partitions 10
> {code}
> {quote}
> Previously this topic had 2 partitions.
> For the consumer I'm using 
> {{kafka.javaapi.consumer.ConsumerConnector.createMessageStreamsByFilter()}}.
> And version is:
> {code}
> org.apache.kafka
> kafka_2.11
> 0.10.0.1
> {code}
> Kafka cluster itself is {{kafka_2.11-0.10.0.1}}.
> This is quite problematic because we can't afford waiting for consumers to 
> read the full buffer from the beginning (for all topics!) when increasing 
> partitions for a topic.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #2021: MINOR: remove duplicate doc headers

2016-10-13 Thread omkreddy
GitHub user omkreddy opened a pull request:

https://github.com/apache/kafka/pull/2021

MINOR: remove duplicate doc headers



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/omkreddy/kafka LATEST-DOC-CHANGE

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2021.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2021


commit f61b6a9477e2eb2f88f28995493a7252c2597bbe
Author: Manikumar Reddy O 
Date:   2016-10-13T09:48:05Z

MINOR: remove duplicate doc headers




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4298) LogCleaner does not convert compressed message sets properly

2016-10-13 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15571432#comment-15571432
 ] 

Ismael Juma commented on KAFKA-4298:


Great catch. I'm a bit unsure why this hasn't been reported so far. I think the 
correct description is:

"When cleaning the log, we don't want to convert messages to the format 
configured for the topic due to KAFKA-3915. However, the cleaner logic for 
writing compressed messages (in case some messages in the message set were not 
retained) writes the topic message format version in the magic field of the 
outer message instead of the actual message format. The choice of the 
absolute/relative offset for the inner messages will also be based on the topic 
message format version.

For example, if there is an old compressed message set with magic=0 in the log 
and the topic is configured for magic=1, then after cleaning, the new message 
set will have a wrapper with magic=1, the nested messages will still have 
magic=0, but the message offsets will be relative. If this happens, there does 
not seem to be an easy way to recover without manually fixing up the log."

> LogCleaner does not convert compressed message sets properly
> 
>
> Key: KAFKA-4298
> URL: https://issues.apache.org/jira/browse/KAFKA-4298
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.1
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Critical
> Fix For: 0.10.1.0, 0.10.0.2
>
>
> When cleaning the log, we attempt to write the cleaned messages using the 
> message format configured for the topic, but as far as I can tell, we do not 
> convert the wrapped messages in compressed message sets. For example, if 
> there is an old compressed message set with magic=0 in the log and the topic 
> is configured for magic=1, then after cleaning, the new message set will have 
> a wrapper with magic=1, but the nested messages will still have magic=0. If 
> this happens, there does not seem to be an easy way to recover without 
> manually fixing up the log.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4185) Abstract out password verifier in SaslServer as an injectable dependency

2016-10-13 Thread Edoardo Comar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15571363#comment-15571363
 ] 

Edoardo Comar commented on KAFKA-4185:
--

Hi [~piyushvijay] I see that a KIP has been proposed as a wider solution (works 
for other mechanisms too)
https://cwiki.apache.org/confluence/display/KAFKA/KIP-86%3A+Configurable+SASL+callback+handlers

> Abstract out password verifier in SaslServer as an injectable dependency
> 
>
> Key: KAFKA-4185
> URL: https://issues.apache.org/jira/browse/KAFKA-4185
> Project: Kafka
>  Issue Type: Improvement
>  Components: security
>Affects Versions: 0.10.0.1
>Reporter: Piyush Vijay
> Fix For: 0.10.0.2
>
>
> Kafka comes with a default SASL/PLAIN implementation which assumes that 
> username and password are present in a JAAS
> config file. People often want to use some other way to provide username and 
> password to SaslServer. Their best bet,
> currently, is to have their own implementation of SaslServer (which would be, 
> in most cases, a copied version of PlainSaslServer
> minus the logic where password verification happens). This is not ideal.
> We believe that there exists a better way to structure the current 
> PlainSaslServer implementation which makes it very
> easy for people to plug-in their custom password verifier without having to 
> rewrite SaslServer or copy any code.
> The idea is to have an injectable dependency interface PasswordVerifier which 
> can be re-implemented based on the
> requirements. There would be no need to re-implement or extend 
> PlainSaslServer class.
> Note that this is commonly asked feature and there have been some attempts in 
> the past to solve this problem:
> https://github.com/apache/kafka/pull/1350
> https://github.com/apache/kafka/pull/1770
> https://issues.apache.org/jira/browse/KAFKA-2629
> https://issues.apache.org/jira/browse/KAFKA-3679
> We believe that this proposed solution does not have the demerits because of 
> previous proposals were rejected.
> I would be happy to discuss more.
> Please find the link to the PR in the comments.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4298) LogCleaner does not convert compressed message sets properly

2016-10-13 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4298?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-4298:
---
Affects Version/s: 0.10.0.1

> LogCleaner does not convert compressed message sets properly
> 
>
> Key: KAFKA-4298
> URL: https://issues.apache.org/jira/browse/KAFKA-4298
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.1
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Critical
> Fix For: 0.10.1.0, 0.10.0.2
>
>
> When cleaning the log, we attempt to write the cleaned messages using the 
> message format configured for the topic, but as far as I can tell, we do not 
> convert the wrapped messages in compressed message sets. For example, if 
> there is an old compressed message set with magic=0 in the log and the topic 
> is configured for magic=1, then after cleaning, the new message set will have 
> a wrapper with magic=1, but the nested messages will still have magic=0. If 
> this happens, there does not seem to be an easy way to recover without 
> manually fixing up the log.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4298) LogCleaner does not convert compressed message sets properly

2016-10-13 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4298?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-4298:
---
Fix Version/s: (was: 0.10.1.1)
   0.10.0.2
   0.10.1.0

> LogCleaner does not convert compressed message sets properly
> 
>
> Key: KAFKA-4298
> URL: https://issues.apache.org/jira/browse/KAFKA-4298
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Critical
> Fix For: 0.10.1.0, 0.10.0.2
>
>
> When cleaning the log, we attempt to write the cleaned messages using the 
> message format configured for the topic, but as far as I can tell, we do not 
> convert the wrapped messages in compressed message sets. For example, if 
> there is an old compressed message set with magic=0 in the log and the topic 
> is configured for magic=1, then after cleaning, the new message set will have 
> a wrapper with magic=1, but the nested messages will still have magic=0. If 
> this happens, there does not seem to be an easy way to recover without 
> manually fixing up the log.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4229) Controller can't start after several zk expired event

2016-10-13 Thread Pengwei (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15571265#comment-15571265
 ] 

Pengwei commented on KAFKA-4229:


We test it on 0.9.0.0, but I found the controller code are nearly the same 
between these versions.
In 0.9.0.0, zk version is 3.4.6

> Controller can't start after several zk expired event
> -
>
> Key: KAFKA-4229
> URL: https://issues.apache.org/jira/browse/KAFKA-4229
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
>Reporter: Pengwei
>Assignee: Pengwei
>
> We found the controller not started after several zk expired event in our 
> test environment.  By analysing the log, I found the controller will handle 
> the ephemeral node data delete event first and then the zk expired event , 
> then the controller will gone.
> I can reproducer it on my develop env:
> 1. set up a one broker and one zk env, specify a very large zk timeout (20s)
> 2. stop the broker and remove the zk's /broker/ids/0  directory
> 3. restart the broker and make  a breakpoint in the zk client's event thread 
> to queue the delete event.
> 4. after the /controller node gone the breakpoint will hit.
> 5. expired the current session(suspend the send thread) and create a new 
> session s2
> 6. resume the event thread, then the controller will handle 
> LeaderChangeListener.handleDataDeleted  and become leader
> 7. then controller will handle SessionExpirationListener.handleNewSession, it 
> resign the controller and elect,  but when elect it found the /controller 
> node is exist and not become the leader.  But the /controller node is created 
> by current session s2 will not remove. So the controller is gone



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4299) Consumer offsets reset for all topics after increasing partitions for one topic

2016-10-13 Thread Juho Autio (JIRA)
Juho Autio created KAFKA-4299:
-

 Summary: Consumer offsets reset for all topics after increasing 
partitions for one topic
 Key: KAFKA-4299
 URL: https://issues.apache.org/jira/browse/KAFKA-4299
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.10.0.1
Reporter: Juho Autio


I increased partitions for one existing topic (2->10), but was surprised to see 
that it entirely reset the committed offsets of my consumer group.

All topics & partitions were reset to the earliest offset available, and the 
consumer read everything again.

Documentation doesn't mention anything like this. Is this how it's supposed to 
work, or a bug?

I would've expected the consumer offsets to not decrease at all, especially for 
the topics that I didn't even touch.

For the altered topic I would've expected that consuming the previously 
existing partitions 0 and 1 would've continued from the position where they 
were, and naturally starting to read the new added partitions from 0.

I added partitions according to the "Modifying topics" section of Kafka 0.10.0 
Documentation:

{quote}
To add partitions you can do
{code}
 > bin/kafka-topics.sh --zookeeper $ZOOKEEPER_HOST --alter --topic 
 > altered_topic --partitions 10
{code}
{quote}

Previously this topic had 2 partitions.

For the consumer I'm using 
{{kafka.javaapi.consumer.ConsumerConnector.createMessageStreamsByFilter()}}.

And version is:

{code}
org.apache.kafka
kafka_2.11
0.10.0.1
{code}

Kafka cluster itself is {{kafka_2.11-0.10.0.1}}.

This is quite problematic because we can't afford waiting for consumers to read 
the full buffer from the beginning (for all topics!) when increasing partitions 
for a topic.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3559) Task creation time taking too long in rebalance callback

2016-10-13 Thread Eno Thereska (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15571234#comment-15571234
 ] 

Eno Thereska commented on KAFKA-3559:
-

[~guozhang] actually let's keep this JIRA, no need to create another one after 
all.

> Task creation time taking too long in rebalance callback
> 
>
> Key: KAFKA-3559
> URL: https://issues.apache.org/jira/browse/KAFKA-3559
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Eno Thereska
>  Labels: architecture
> Fix For: 0.10.2.0
>
>
> Currently in Kafka Streams, we create stream tasks upon getting newly 
> assigned partitions in rebalance callback function {code} onPartitionAssigned 
> {code}, which involves initialization of the processor state stores as well 
> (including opening the rocksDB, restore the store from changelog, etc, which 
> takes time).
> With a large number of state stores, the initialization time itself could 
> take tens of seconds, which usually is larger than the consumer session 
> timeout. As a result, when the callback is completed, the consumer is already 
> treated as failed by the coordinator and rebalance again.
> We need to consider if we can optimize the initialization process, or move it 
> out of the callback function, and while initializing the stores one-by-one, 
> use poll call to send heartbeats to avoid being kicked out by coordinator.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #2020: Removed unnecessary if/else clause.

2016-10-13 Thread himani1
GitHub user himani1 opened a pull request:

https://github.com/apache/kafka/pull/2020

Removed unnecessary if/else clause.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/himani1/kafka code_refactor

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2020.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2020


commit 9ff4c4ebf83aff85152f667e00c0e8d5b79b0f2e
Author: himani1 <1himani.ar...@gmail.com>
Date:   2016-10-13T07:24:26Z

Removed unnecessary if/else clause.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4229) Controller can't start after several zk expired event

2016-10-13 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15570973#comment-15570973
 ] 

Guozhang Wang commented on KAFKA-4229:
--

I have assigned this JIRA to you [~pengwei]. BTW which version of Kafka were 
you testing? I saw the affected versions from 0.9.0.0 to 0.10.0.1, so not sure 
which version it is testing.

Also there are some known issues with the older versioned ZKClient such that 
events are not processed in exactly the firing order, and hence may cause 
various issues and cause some events be lost. ZkClient has been upgraded from 
older version to 0.10.0.x, I'm wondering if it has solved the problem or not.

> Controller can't start after several zk expired event
> -
>
> Key: KAFKA-4229
> URL: https://issues.apache.org/jira/browse/KAFKA-4229
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
>Reporter: Pengwei
>Assignee: Pengwei
>
> We found the controller not started after several zk expired event in our 
> test environment.  By analysing the log, I found the controller will handle 
> the ephemeral node data delete event first and then the zk expired event , 
> then the controller will gone.
> I can reproducer it on my develop env:
> 1. set up a one broker and one zk env, specify a very large zk timeout (20s)
> 2. stop the broker and remove the zk's /broker/ids/0  directory
> 3. restart the broker and make  a breakpoint in the zk client's event thread 
> to queue the delete event.
> 4. after the /controller node gone the breakpoint will hit.
> 5. expired the current session(suspend the send thread) and create a new 
> session s2
> 6. resume the event thread, then the controller will handle 
> LeaderChangeListener.handleDataDeleted  and become leader
> 7. then controller will handle SessionExpirationListener.handleNewSession, it 
> resign the controller and elect,  but when elect it found the /controller 
> node is exist and not become the leader.  But the /controller node is created 
> by current session s2 will not remove. So the controller is gone



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4229) Controller can't start after several zk expired event

2016-10-13 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-4229:
-
Assignee: Pengwei

> Controller can't start after several zk expired event
> -
>
> Key: KAFKA-4229
> URL: https://issues.apache.org/jira/browse/KAFKA-4229
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
>Reporter: Pengwei
>Assignee: Pengwei
>
> We found the controller not started after several zk expired event in our 
> test environment.  By analysing the log, I found the controller will handle 
> the ephemeral node data delete event first and then the zk expired event , 
> then the controller will gone.
> I can reproducer it on my develop env:
> 1. set up a one broker and one zk env, specify a very large zk timeout (20s)
> 2. stop the broker and remove the zk's /broker/ids/0  directory
> 3. restart the broker and make  a breakpoint in the zk client's event thread 
> to queue the delete event.
> 4. after the /controller node gone the breakpoint will hit.
> 5. expired the current session(suspend the send thread) and create a new 
> session s2
> 6. resume the event thread, then the controller will handle 
> LeaderChangeListener.handleDataDeleted  and become leader
> 7. then controller will handle SessionExpirationListener.handleNewSession, it 
> resign the controller and elect,  but when elect it found the /controller 
> node is exist and not become the leader.  But the /controller node is created 
> by current session s2 will not remove. So the controller is gone



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)