[jira] [Commented] (KAFKA-3727) Consumer.poll() stuck in loop on non-existent topic manually assigned

2016-06-23 Thread Edoardo Comar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15347169#comment-15347169
 ] 

Edoardo Comar commented on KAFKA-3727:
--

Thanks [~hachikuji], sure I could start a small KIP - an exception thrown on 
non existent topic is a viable approach.

Interestingly, besides the consumer, for a producer the behavior is even less 
consistent:
an Illegal arg exception is thrown out of send() if the partition does not 
exist, 
a timeoutExc is passed to the completionListener if a topic does not exist 

Moreover, there are other cases where the client get stucks - e.g. when the 
wrong credentials are supplied. 
I think the KIP could cover all these cases, which can then be fixed 
incrementally.

> Consumer.poll() stuck in loop on non-existent topic manually assigned
> -
>
> Key: KAFKA-3727
> URL: https://issues.apache.org/jira/browse/KAFKA-3727
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Edoardo Comar
>Assignee: Edoardo Comar
>Priority: Critical
>
> The behavior of a consumer on poll() for a non-existing topic is surprisingly 
> different/inconsistent 
> between a consumer that subscribed to the topic and one that had the 
> topic-partition manually assigned.
> The "subscribed" consumer will return an empty collection
> The "assigned" consumer will *loop forever* - this feels a bug to me.
> sample snippet to reproduce:
> {quote}
> KafkaConsumer assignKc = new KafkaConsumer<>(props1);
> KafkaConsumer subsKc = new KafkaConsumer<>(props2);
> List tps = new ArrayList<>();
> tps.add(new TopicPartition("topic-not-exists", 0));
> assignKc.assign(tps);
> subsKc.subscribe(Arrays.asList("topic-not-exists"));
> System.out.println("* subscribe k consumer ");
> ConsumerRecords crs2 = subsKc.poll(1000L); 
> print("subscribeKc", crs2); // returns empty
> System.out.println("* assign k consumer ");
> ConsumerRecords crs1 = assignKc.poll(1000L); 
>// will loop forever ! 
> print("assignKc", crs1);
> {quote}
> the logs for the "assigned" consumer show:
> [2016-05-18 17:33:09,907] DEBUG Updated cluster metadata version 8 to 
> Cluster(nodes = [192.168.10.18:9093 (id: 0 rack: null)], partitions = []) 
> (org.apache.kafka.clients.Metadata)
> [2016-05-18 17:33:09,908] DEBUG Partition topic-not-exists-0 is unknown for 
> fetching offset, wait for metadata refresh 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> [2016-05-18 17:33:10,010] DEBUG Sending metadata request 
> {topics=[topic-not-exists]} to node 0 (org.apache.kafka.clients.NetworkClient)
> [2016-05-18 17:33:10,011] WARN Error while fetching metadata with correlation 
> id 9 : {topic-not-exists=UNKNOWN_TOPIC_OR_PARTITION} 
> (org.apache.kafka.clients.NetworkClient)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3727) Consumer.poll() stuck in loop on non-existent topic manually assigned

2016-06-20 Thread Edoardo Comar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15340699#comment-15340699
 ] 

Edoardo Comar commented on KAFKA-3727:
--

[~ijuma] and [~hachikuji] referring to your comments in 
https://github.com/apache/kafka/pull/1428 : 
{quote}
the user ultimately has to monitor the progress of the application and take 
steps when it is failing or falling behind.
{quote}

I think that a general approach to handle calls that hang due to loops that 
retry forever,
(for whatever reason : partition/topic not existing, connection refused, wrong 
credentials  ...)
would be to add to the KafkaConsumer a simple listener, e.g.
{code}
void onException(Exception e);
{code}

listener that gets notified of the internal 'failures' that are treated as 
retriable.

This would enable the client code to {{wakeup()}} the consumer and have it exit 
the never ending loop.

What do you think ? Does this need a KIP ?

> Consumer.poll() stuck in loop on non-existent topic manually assigned
> -
>
> Key: KAFKA-3727
> URL: https://issues.apache.org/jira/browse/KAFKA-3727
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Edoardo Comar
>Assignee: Edoardo Comar
>Priority: Critical
>
> The behavior of a consumer on poll() for a non-existing topic is surprisingly 
> different/inconsistent 
> between a consumer that subscribed to the topic and one that had the 
> topic-partition manually assigned.
> The "subscribed" consumer will return an empty collection
> The "assigned" consumer will *loop forever* - this feels a bug to me.
> sample snippet to reproduce:
> {quote}
> KafkaConsumer assignKc = new KafkaConsumer<>(props1);
> KafkaConsumer subsKc = new KafkaConsumer<>(props2);
> List tps = new ArrayList<>();
> tps.add(new TopicPartition("topic-not-exists", 0));
> assignKc.assign(tps);
> subsKc.subscribe(Arrays.asList("topic-not-exists"));
> System.out.println("* subscribe k consumer ");
> ConsumerRecords crs2 = subsKc.poll(1000L); 
> print("subscribeKc", crs2); // returns empty
> System.out.println("* assign k consumer ");
> ConsumerRecords crs1 = assignKc.poll(1000L); 
>// will loop forever ! 
> print("assignKc", crs1);
> {quote}
> the logs for the "assigned" consumer show:
> [2016-05-18 17:33:09,907] DEBUG Updated cluster metadata version 8 to 
> Cluster(nodes = [192.168.10.18:9093 (id: 0 rack: null)], partitions = []) 
> (org.apache.kafka.clients.Metadata)
> [2016-05-18 17:33:09,908] DEBUG Partition topic-not-exists-0 is unknown for 
> fetching offset, wait for metadata refresh 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> [2016-05-18 17:33:10,010] DEBUG Sending metadata request 
> {topics=[topic-not-exists]} to node 0 (org.apache.kafka.clients.NetworkClient)
> [2016-05-18 17:33:10,011] WARN Error while fetching metadata with correlation 
> id 9 : {topic-not-exists=UNKNOWN_TOPIC_OR_PARTITION} 
> (org.apache.kafka.clients.NetworkClient)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3727) Consumer.poll() stuck in loop on non-existent topic manually assigned

2016-06-14 Thread Edoardo Comar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15330850#comment-15330850
 ] 

Edoardo Comar commented on KAFKA-3727:
--

Another case where poll() hangs forever is when the user supplies wrong 
credentials - at least this is the case with SASL PLAIN.
See 
https://issues.apache.org/jira/browse/KAFKA-1894?focusedCommentId=15330848&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15330848

> Consumer.poll() stuck in loop on non-existent topic manually assigned
> -
>
> Key: KAFKA-3727
> URL: https://issues.apache.org/jira/browse/KAFKA-3727
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Edoardo Comar
>Assignee: Edoardo Comar
>Priority: Critical
>
> The behavior of a consumer on poll() for a non-existing topic is surprisingly 
> different/inconsistent 
> between a consumer that subscribed to the topic and one that had the 
> topic-partition manually assigned.
> The "subscribed" consumer will return an empty collection
> The "assigned" consumer will *loop forever* - this feels a bug to me.
> sample snippet to reproduce:
> {quote}
> KafkaConsumer assignKc = new KafkaConsumer<>(props1);
> KafkaConsumer subsKc = new KafkaConsumer<>(props2);
> List tps = new ArrayList<>();
> tps.add(new TopicPartition("topic-not-exists", 0));
> assignKc.assign(tps);
> subsKc.subscribe(Arrays.asList("topic-not-exists"));
> System.out.println("* subscribe k consumer ");
> ConsumerRecords crs2 = subsKc.poll(1000L); 
> print("subscribeKc", crs2); // returns empty
> System.out.println("* assign k consumer ");
> ConsumerRecords crs1 = assignKc.poll(1000L); 
>// will loop forever ! 
> print("assignKc", crs1);
> {quote}
> the logs for the "assigned" consumer show:
> [2016-05-18 17:33:09,907] DEBUG Updated cluster metadata version 8 to 
> Cluster(nodes = [192.168.10.18:9093 (id: 0 rack: null)], partitions = []) 
> (org.apache.kafka.clients.Metadata)
> [2016-05-18 17:33:09,908] DEBUG Partition topic-not-exists-0 is unknown for 
> fetching offset, wait for metadata refresh 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> [2016-05-18 17:33:10,010] DEBUG Sending metadata request 
> {topics=[topic-not-exists]} to node 0 (org.apache.kafka.clients.NetworkClient)
> [2016-05-18 17:33:10,011] WARN Error while fetching metadata with correlation 
> id 9 : {topic-not-exists=UNKNOWN_TOPIC_OR_PARTITION} 
> (org.apache.kafka.clients.NetworkClient)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-1894) Avoid long or infinite blocking in the consumer

2016-06-14 Thread Edoardo Comar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15330848#comment-15330848
 ] 

Edoardo Comar edited comment on KAFKA-1894 at 6/14/16 11:17 PM:


Another case that leads to the loop 
{code}
public void awaitMetadataUpdate() {
int version = this.metadata.requestUpdate();
do {
poll(Long.MAX_VALUE);
} while (this.metadata.version() == version);
}
{code}
to be stuck forever is the case where Authentication is turned on - with SASL 
PLAIN - and the user specifies wrong credentials.




was (Author: ecomar):
Another case that leads to the loop 
```
public void awaitMetadataUpdate() {
int version = this.metadata.requestUpdate();
do {
poll(Long.MAX_VALUE);
} while (this.metadata.version() == version);
}
```
to be stuck forever is the case where Authentication is turned on - with SASL 
PLAIN - and the user specifies wrong credentials.



> Avoid long or infinite blocking in the consumer
> ---
>
> Key: KAFKA-1894
> URL: https://issues.apache.org/jira/browse/KAFKA-1894
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Jay Kreps
>Assignee: Jason Gustafson
> Fix For: 0.10.1.0
>
>
> The new consumer has a lot of loops that look something like
> {code}
>   while(!isThingComplete())
> client.poll();
> {code}
> This occurs both in KafkaConsumer but also in NetworkClient.completeAll. 
> These retry loops are actually mostly the behavior we want but there are 
> several cases where they may cause problems:
>  - In the case of a hard failure we may hang for a long time or indefinitely 
> before realizing the connection is lost.
>  - In the case where the cluster is malfunctioning or down we may retry 
> forever.
> It would probably be better to give a timeout to these. The proposed approach 
> would be to add something like retry.time.ms=6 and only continue retrying 
> for that period of time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1894) Avoid long or infinite blocking in the consumer

2016-06-14 Thread Edoardo Comar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15330848#comment-15330848
 ] 

Edoardo Comar commented on KAFKA-1894:
--

Another case that leads to the loop 
```
public void awaitMetadataUpdate() {
int version = this.metadata.requestUpdate();
do {
poll(Long.MAX_VALUE);
} while (this.metadata.version() == version);
}
```
to be stuck forever is the case where Authentication is turned on - with SASL 
PLAIN - and the user specifies wrong credentials.



> Avoid long or infinite blocking in the consumer
> ---
>
> Key: KAFKA-1894
> URL: https://issues.apache.org/jira/browse/KAFKA-1894
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Jay Kreps
>Assignee: Jason Gustafson
> Fix For: 0.10.1.0
>
>
> The new consumer has a lot of loops that look something like
> {code}
>   while(!isThingComplete())
> client.poll();
> {code}
> This occurs both in KafkaConsumer but also in NetworkClient.completeAll. 
> These retry loops are actually mostly the behavior we want but there are 
> several cases where they may cause problems:
>  - In the case of a hard failure we may hang for a long time or indefinitely 
> before realizing the connection is lost.
>  - In the case where the cluster is malfunctioning or down we may retry 
> forever.
> It would probably be better to give a timeout to these. The proposed approach 
> would be to add something like retry.time.ms=6 and only continue retrying 
> for that period of time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-3727) Consumer.poll() stuck in loop on non-existent topic manually assigned

2016-06-10 Thread Edoardo Comar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15324624#comment-15324624
 ] 

Edoardo Comar edited comment on KAFKA-3727 at 6/10/16 3:26 PM:
---

would it make sense that the missing topic/partition exception is retriable or 
not depending on the broker's topic auto-create setting? 
the broker could return a NotRetriableMissingTPException 

a new error code is an API change so it would be a target for the 0.11 release, 
right ?


was (Author: ecomar):
would it make sense that the missing topic/partition exception is retriable or 
not depending on the broker's topic auto-create setting? 
the broker could return a NotRetriableMissingTPException 

> Consumer.poll() stuck in loop on non-existent topic manually assigned
> -
>
> Key: KAFKA-3727
> URL: https://issues.apache.org/jira/browse/KAFKA-3727
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Edoardo Comar
>Assignee: Edoardo Comar
>Priority: Critical
>
> The behavior of a consumer on poll() for a non-existing topic is surprisingly 
> different/inconsistent 
> between a consumer that subscribed to the topic and one that had the 
> topic-partition manually assigned.
> The "subscribed" consumer will return an empty collection
> The "assigned" consumer will *loop forever* - this feels a bug to me.
> sample snippet to reproduce:
> {quote}
> KafkaConsumer assignKc = new KafkaConsumer<>(props1);
> KafkaConsumer subsKc = new KafkaConsumer<>(props2);
> List tps = new ArrayList<>();
> tps.add(new TopicPartition("topic-not-exists", 0));
> assignKc.assign(tps);
> subsKc.subscribe(Arrays.asList("topic-not-exists"));
> System.out.println("* subscribe k consumer ");
> ConsumerRecords crs2 = subsKc.poll(1000L); 
> print("subscribeKc", crs2); // returns empty
> System.out.println("* assign k consumer ");
> ConsumerRecords crs1 = assignKc.poll(1000L); 
>// will loop forever ! 
> print("assignKc", crs1);
> {quote}
> the logs for the "assigned" consumer show:
> [2016-05-18 17:33:09,907] DEBUG Updated cluster metadata version 8 to 
> Cluster(nodes = [192.168.10.18:9093 (id: 0 rack: null)], partitions = []) 
> (org.apache.kafka.clients.Metadata)
> [2016-05-18 17:33:09,908] DEBUG Partition topic-not-exists-0 is unknown for 
> fetching offset, wait for metadata refresh 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> [2016-05-18 17:33:10,010] DEBUG Sending metadata request 
> {topics=[topic-not-exists]} to node 0 (org.apache.kafka.clients.NetworkClient)
> [2016-05-18 17:33:10,011] WARN Error while fetching metadata with correlation 
> id 9 : {topic-not-exists=UNKNOWN_TOPIC_OR_PARTITION} 
> (org.apache.kafka.clients.NetworkClient)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3727) Consumer.poll() stuck in loop on non-existent topic manually assigned

2016-06-10 Thread Edoardo Comar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15324624#comment-15324624
 ] 

Edoardo Comar commented on KAFKA-3727:
--

would it make sense that the missing topic/partition exception is retriable or 
not depending on the broker's topic auto-create setting? 
the broker could return a NotRetriableMissingTPException 

> Consumer.poll() stuck in loop on non-existent topic manually assigned
> -
>
> Key: KAFKA-3727
> URL: https://issues.apache.org/jira/browse/KAFKA-3727
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Edoardo Comar
>Assignee: Edoardo Comar
>Priority: Critical
>
> The behavior of a consumer on poll() for a non-existing topic is surprisingly 
> different/inconsistent 
> between a consumer that subscribed to the topic and one that had the 
> topic-partition manually assigned.
> The "subscribed" consumer will return an empty collection
> The "assigned" consumer will *loop forever* - this feels a bug to me.
> sample snippet to reproduce:
> {quote}
> KafkaConsumer assignKc = new KafkaConsumer<>(props1);
> KafkaConsumer subsKc = new KafkaConsumer<>(props2);
> List tps = new ArrayList<>();
> tps.add(new TopicPartition("topic-not-exists", 0));
> assignKc.assign(tps);
> subsKc.subscribe(Arrays.asList("topic-not-exists"));
> System.out.println("* subscribe k consumer ");
> ConsumerRecords crs2 = subsKc.poll(1000L); 
> print("subscribeKc", crs2); // returns empty
> System.out.println("* assign k consumer ");
> ConsumerRecords crs1 = assignKc.poll(1000L); 
>// will loop forever ! 
> print("assignKc", crs1);
> {quote}
> the logs for the "assigned" consumer show:
> [2016-05-18 17:33:09,907] DEBUG Updated cluster metadata version 8 to 
> Cluster(nodes = [192.168.10.18:9093 (id: 0 rack: null)], partitions = []) 
> (org.apache.kafka.clients.Metadata)
> [2016-05-18 17:33:09,908] DEBUG Partition topic-not-exists-0 is unknown for 
> fetching offset, wait for metadata refresh 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> [2016-05-18 17:33:10,010] DEBUG Sending metadata request 
> {topics=[topic-not-exists]} to node 0 (org.apache.kafka.clients.NetworkClient)
> [2016-05-18 17:33:10,011] WARN Error while fetching metadata with correlation 
> id 9 : {topic-not-exists=UNKNOWN_TOPIC_OR_PARTITION} 
> (org.apache.kafka.clients.NetworkClient)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3727) Consumer.poll() stuck in loop on non-existent topic manually assigned

2016-06-10 Thread Edoardo Comar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15324450#comment-15324450
 ] 

Edoardo Comar commented on KAFKA-3727:
--

I think it would be reasonable to have the poll timeout propagated to the 
Fetcher.listOffset loop.

Would solve this problem, though not 
https://issues.apache.org/jira/browse/KAFKA-3177 straight away.

I'll work on a PR in the next few days (very busy until Wed)

> Consumer.poll() stuck in loop on non-existent topic manually assigned
> -
>
> Key: KAFKA-3727
> URL: https://issues.apache.org/jira/browse/KAFKA-3727
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Edoardo Comar
>Priority: Critical
>
> The behavior of a consumer on poll() for a non-existing topic is surprisingly 
> different/inconsistent 
> between a consumer that subscribed to the topic and one that had the 
> topic-partition manually assigned.
> The "subscribed" consumer will return an empty collection
> The "assigned" consumer will *loop forever* - this feels a bug to me.
> sample snippet to reproduce:
> {quote}
> KafkaConsumer assignKc = new KafkaConsumer<>(props1);
> KafkaConsumer subsKc = new KafkaConsumer<>(props2);
> List tps = new ArrayList<>();
> tps.add(new TopicPartition("topic-not-exists", 0));
> assignKc.assign(tps);
> subsKc.subscribe(Arrays.asList("topic-not-exists"));
> System.out.println("* subscribe k consumer ");
> ConsumerRecords crs2 = subsKc.poll(1000L); 
> print("subscribeKc", crs2); // returns empty
> System.out.println("* assign k consumer ");
> ConsumerRecords crs1 = assignKc.poll(1000L); 
>// will loop forever ! 
> print("assignKc", crs1);
> {quote}
> the logs for the "assigned" consumer show:
> [2016-05-18 17:33:09,907] DEBUG Updated cluster metadata version 8 to 
> Cluster(nodes = [192.168.10.18:9093 (id: 0 rack: null)], partitions = []) 
> (org.apache.kafka.clients.Metadata)
> [2016-05-18 17:33:09,908] DEBUG Partition topic-not-exists-0 is unknown for 
> fetching offset, wait for metadata refresh 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> [2016-05-18 17:33:10,010] DEBUG Sending metadata request 
> {topics=[topic-not-exists]} to node 0 (org.apache.kafka.clients.NetworkClient)
> [2016-05-18 17:33:10,011] WARN Error while fetching metadata with correlation 
> id 9 : {topic-not-exists=UNKNOWN_TOPIC_OR_PARTITION} 
> (org.apache.kafka.clients.NetworkClient)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-3727) Consumer.poll() stuck in loop on non-existent topic manually assigned

2016-06-10 Thread Edoardo Comar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3727?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Edoardo Comar reassigned KAFKA-3727:


Assignee: Edoardo Comar

> Consumer.poll() stuck in loop on non-existent topic manually assigned
> -
>
> Key: KAFKA-3727
> URL: https://issues.apache.org/jira/browse/KAFKA-3727
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Edoardo Comar
>Assignee: Edoardo Comar
>Priority: Critical
>
> The behavior of a consumer on poll() for a non-existing topic is surprisingly 
> different/inconsistent 
> between a consumer that subscribed to the topic and one that had the 
> topic-partition manually assigned.
> The "subscribed" consumer will return an empty collection
> The "assigned" consumer will *loop forever* - this feels a bug to me.
> sample snippet to reproduce:
> {quote}
> KafkaConsumer assignKc = new KafkaConsumer<>(props1);
> KafkaConsumer subsKc = new KafkaConsumer<>(props2);
> List tps = new ArrayList<>();
> tps.add(new TopicPartition("topic-not-exists", 0));
> assignKc.assign(tps);
> subsKc.subscribe(Arrays.asList("topic-not-exists"));
> System.out.println("* subscribe k consumer ");
> ConsumerRecords crs2 = subsKc.poll(1000L); 
> print("subscribeKc", crs2); // returns empty
> System.out.println("* assign k consumer ");
> ConsumerRecords crs1 = assignKc.poll(1000L); 
>// will loop forever ! 
> print("assignKc", crs1);
> {quote}
> the logs for the "assigned" consumer show:
> [2016-05-18 17:33:09,907] DEBUG Updated cluster metadata version 8 to 
> Cluster(nodes = [192.168.10.18:9093 (id: 0 rack: null)], partitions = []) 
> (org.apache.kafka.clients.Metadata)
> [2016-05-18 17:33:09,908] DEBUG Partition topic-not-exists-0 is unknown for 
> fetching offset, wait for metadata refresh 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> [2016-05-18 17:33:10,010] DEBUG Sending metadata request 
> {topics=[topic-not-exists]} to node 0 (org.apache.kafka.clients.NetworkClient)
> [2016-05-18 17:33:10,011] WARN Error while fetching metadata with correlation 
> id 9 : {topic-not-exists=UNKNOWN_TOPIC_OR_PARTITION} 
> (org.apache.kafka.clients.NetworkClient)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3727) Consumer.poll() stuck in loop on non-existent topic manually assigned

2016-06-10 Thread Edoardo Comar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15324424#comment-15324424
 ] 

Edoardo Comar commented on KAFKA-3727:
--

[~ijuma] [~hachikuji] ... reposting here as more on-topic than in 
https://github.com/apache/kafka/pull/1428 

Why should a missing topic or partition be a retriable exception ?
I am thinking at least of the case where auto-create is turned off.

> Consumer.poll() stuck in loop on non-existent topic manually assigned
> -
>
> Key: KAFKA-3727
> URL: https://issues.apache.org/jira/browse/KAFKA-3727
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Edoardo Comar
>Priority: Critical
>
> The behavior of a consumer on poll() for a non-existing topic is surprisingly 
> different/inconsistent 
> between a consumer that subscribed to the topic and one that had the 
> topic-partition manually assigned.
> The "subscribed" consumer will return an empty collection
> The "assigned" consumer will *loop forever* - this feels a bug to me.
> sample snippet to reproduce:
> {quote}
> KafkaConsumer assignKc = new KafkaConsumer<>(props1);
> KafkaConsumer subsKc = new KafkaConsumer<>(props2);
> List tps = new ArrayList<>();
> tps.add(new TopicPartition("topic-not-exists", 0));
> assignKc.assign(tps);
> subsKc.subscribe(Arrays.asList("topic-not-exists"));
> System.out.println("* subscribe k consumer ");
> ConsumerRecords crs2 = subsKc.poll(1000L); 
> print("subscribeKc", crs2); // returns empty
> System.out.println("* assign k consumer ");
> ConsumerRecords crs1 = assignKc.poll(1000L); 
>// will loop forever ! 
> print("assignKc", crs1);
> {quote}
> the logs for the "assigned" consumer show:
> [2016-05-18 17:33:09,907] DEBUG Updated cluster metadata version 8 to 
> Cluster(nodes = [192.168.10.18:9093 (id: 0 rack: null)], partitions = []) 
> (org.apache.kafka.clients.Metadata)
> [2016-05-18 17:33:09,908] DEBUG Partition topic-not-exists-0 is unknown for 
> fetching offset, wait for metadata refresh 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> [2016-05-18 17:33:10,010] DEBUG Sending metadata request 
> {topics=[topic-not-exists]} to node 0 (org.apache.kafka.clients.NetworkClient)
> [2016-05-18 17:33:10,011] WARN Error while fetching metadata with correlation 
> id 9 : {topic-not-exists=UNKNOWN_TOPIC_OR_PARTITION} 
> (org.apache.kafka.clients.NetworkClient)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3177) Kafka consumer can hang when position() is called on a non-existing partition.

2016-05-26 Thread Edoardo Comar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15301940#comment-15301940
 ] 

Edoardo Comar commented on KAFKA-3177:
--

see also Consumer.poll() stuck in loop on non-existent topic manually assigned 
: https://issues.apache.org/jira/browse/KAFKA-3727

> Kafka consumer can hang when position() is called on a non-existing partition.
> --
>
> Key: KAFKA-3177
> URL: https://issues.apache.org/jira/browse/KAFKA-3177
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.0
>Reporter: Jiangjie Qin
>Assignee: Jason Gustafson
>Priority: Critical
> Fix For: 0.10.1.0
>
>
> This can be easily reproduced as following:
> {code}
> {
> ...
> consumer.assign(SomeNonExsitingTopicParition);
> consumer.position();
> ...
> }
> {code}
> It seems when position is called we will try to do the following:
> 1. Fetch committed offsets.
> 2. If there is no committed offsets, try to reset offset using reset 
> strategy. in sendListOffsetRequest(), if the consumer does not know the 
> TopicPartition, it will refresh its metadata and retry. In this case, because 
> the partition does not exist, we fall in to the infinite loop of refreshing 
> topic metadata.
> Another orthogonal issue is that if the topic in the above code piece does 
> not exist, position() call will actually create the topic due to the fact 
> that currently topic metadata request could automatically create the topic. 
> This is a known separate issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3727) Consumer.poll() stuck in loop on non-existent topic manually assigned

2016-05-25 Thread Edoardo Comar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15301027#comment-15301027
 ] 

Edoardo Comar commented on KAFKA-3727:
--

Thanks [~ijuma] those issues are related. 

However [KAFKA-3177|https://issues.apache.org/jira/browse/KAFKA-3177] does not 
mention poll() which is the most basic api call for a consumer.

And [KAFKA-2391|https://issues.apache.org/jira/browse/KAFKA-2391] mentions 
adding timeouts as arguments in the API, 
and such an argument is already present in KafkaConsumer.poll (timeout) .

This issue is that poll (timeout) will block FOREVER, when a consumer is 
assigned a non non-existent TopicPartition.

> Consumer.poll() stuck in loop on non-existent topic manually assigned
> -
>
> Key: KAFKA-3727
> URL: https://issues.apache.org/jira/browse/KAFKA-3727
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Edoardo Comar
>Priority: Critical
>
> The behavior of a consumer on poll() for a non-existing topic is surprisingly 
> different/inconsistent 
> between a consumer that subscribed to the topic and one that had the 
> topic-partition manually assigned.
> The "subscribed" consumer will return an empty collection
> The "assigned" consumer will *loop forever* - this feels a bug to me.
> sample snippet to reproduce:
> {quote}
> KafkaConsumer assignKc = new KafkaConsumer<>(props1);
> KafkaConsumer subsKc = new KafkaConsumer<>(props2);
> List tps = new ArrayList<>();
> tps.add(new TopicPartition("topic-not-exists", 0));
> assignKc.assign(tps);
> subsKc.subscribe(Arrays.asList("topic-not-exists"));
> System.out.println("* subscribe k consumer ");
> ConsumerRecords crs2 = subsKc.poll(1000L); 
> print("subscribeKc", crs2); // returns empty
> System.out.println("* assign k consumer ");
> ConsumerRecords crs1 = assignKc.poll(1000L); 
>// will loop forever ! 
> print("assignKc", crs1);
> {quote}
> the logs for the "assigned" consumer show:
> [2016-05-18 17:33:09,907] DEBUG Updated cluster metadata version 8 to 
> Cluster(nodes = [192.168.10.18:9093 (id: 0 rack: null)], partitions = []) 
> (org.apache.kafka.clients.Metadata)
> [2016-05-18 17:33:09,908] DEBUG Partition topic-not-exists-0 is unknown for 
> fetching offset, wait for metadata refresh 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> [2016-05-18 17:33:10,010] DEBUG Sending metadata request 
> {topics=[topic-not-exists]} to node 0 (org.apache.kafka.clients.NetworkClient)
> [2016-05-18 17:33:10,011] WARN Error while fetching metadata with correlation 
> id 9 : {topic-not-exists=UNKNOWN_TOPIC_OR_PARTITION} 
> (org.apache.kafka.clients.NetworkClient)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3682) ArrayIndexOutOfBoundsException thrown by SkimpyOffsetMap.get() when full

2016-05-25 Thread Edoardo Comar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15301010#comment-15301010
 ] 

Edoardo Comar commented on KAFKA-3682:
--

[~vahid] you can just run my unit test without the fix in code. 
Test-driven development !

> ArrayIndexOutOfBoundsException thrown by SkimpyOffsetMap.get() when full
> 
>
> Key: KAFKA-3682
> URL: https://issues.apache.org/jira/browse/KAFKA-3682
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Edoardo Comar
>Assignee: Edoardo Comar
>
> When the SkimpyOffsetMap is full, a request for a key that is not in the map 
> will throw an ArrayIndexOutOfBoundsException as the number of internal 
> attempts overflows after Integer.MAXINT



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3682) ArrayIndexOutOfBoundsException thrown by SkimpyOffsetMap.get() when full

2016-05-25 Thread Edoardo Comar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15300756#comment-15300756
 ] 

Edoardo Comar commented on KAFKA-3682:
--

Hi this is a pretty simple fix, backed by a unit test. May s.o. please take a 
look ? thanks

> ArrayIndexOutOfBoundsException thrown by SkimpyOffsetMap.get() when full
> 
>
> Key: KAFKA-3682
> URL: https://issues.apache.org/jira/browse/KAFKA-3682
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Edoardo Comar
>Assignee: Edoardo Comar
>
> When the SkimpyOffsetMap is full, a request for a key that is not in the map 
> will throw an ArrayIndexOutOfBoundsException as the number of internal 
> attempts overflows after Integer.MAXINT



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3727) Consumer.poll() stuck in loop on non-existent topic manually assigned

2016-05-25 Thread Edoardo Comar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3727?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Edoardo Comar updated KAFKA-3727:
-
Priority: Critical  (was: Major)

> Consumer.poll() stuck in loop on non-existent topic manually assigned
> -
>
> Key: KAFKA-3727
> URL: https://issues.apache.org/jira/browse/KAFKA-3727
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Edoardo Comar
>Priority: Critical
>
> The behavior of a consumer on poll() for a non-existing topic is surprisingly 
> different/inconsistent 
> between a consumer that subscribed to the topic and one that had the 
> topic-partition manually assigned.
> The "subscribed" consumer will return an empty collection
> The "assigned" consumer will *loop forever* - this feels a bug to me.
> sample snippet to reproduce:
> {quote}
> KafkaConsumer assignKc = new KafkaConsumer<>(props1);
> KafkaConsumer subsKc = new KafkaConsumer<>(props2);
> List tps = new ArrayList<>();
> tps.add(new TopicPartition("topic-not-exists", 0));
> assignKc.assign(tps);
> subsKc.subscribe(Arrays.asList("topic-not-exists"));
> System.out.println("* subscribe k consumer ");
> ConsumerRecords crs2 = subsKc.poll(1000L); 
> print("subscribeKc", crs2); // returns empty
> System.out.println("* assign k consumer ");
> ConsumerRecords crs1 = assignKc.poll(1000L); 
>// will loop forever ! 
> print("assignKc", crs1);
> {quote}
> the logs for the "assigned" consumer show:
> [2016-05-18 17:33:09,907] DEBUG Updated cluster metadata version 8 to 
> Cluster(nodes = [192.168.10.18:9093 (id: 0 rack: null)], partitions = []) 
> (org.apache.kafka.clients.Metadata)
> [2016-05-18 17:33:09,908] DEBUG Partition topic-not-exists-0 is unknown for 
> fetching offset, wait for metadata refresh 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> [2016-05-18 17:33:10,010] DEBUG Sending metadata request 
> {topics=[topic-not-exists]} to node 0 (org.apache.kafka.clients.NetworkClient)
> [2016-05-18 17:33:10,011] WARN Error while fetching metadata with correlation 
> id 9 : {topic-not-exists=UNKNOWN_TOPIC_OR_PARTITION} 
> (org.apache.kafka.clients.NetworkClient)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3727) Consumer.poll() stuck in loop on non-existent topic manually assigned

2016-05-25 Thread Edoardo Comar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15300482#comment-15300482
 ] 

Edoardo Comar commented on KAFKA-3727:
--

Hi can please anyone comment on this (IMHO) buggy  behavior ?
it also happens with other consumer API calls, e.g.
{code}
consumer.position(assigned-tp-that-does-not-exist);   //blocks here forever
{code}
e.g. with any method that calls
`org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Set)`

as it ends up in listOffset(..) where the UNKNOWN_TOPIC_OR_PARTITION error in 
the future is considered retriable and so the while loops never ends

> Consumer.poll() stuck in loop on non-existent topic manually assigned
> -
>
> Key: KAFKA-3727
> URL: https://issues.apache.org/jira/browse/KAFKA-3727
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Edoardo Comar
>
> The behavior of a consumer on poll() for a non-existing topic is surprisingly 
> different/inconsistent 
> between a consumer that subscribed to the topic and one that had the 
> topic-partition manually assigned.
> The "subscribed" consumer will return an empty collection
> The "assigned" consumer will *loop forever* - this feels a bug to me.
> sample snippet to reproduce:
> {quote}
> KafkaConsumer assignKc = new KafkaConsumer<>(props1);
> KafkaConsumer subsKc = new KafkaConsumer<>(props2);
> List tps = new ArrayList<>();
> tps.add(new TopicPartition("topic-not-exists", 0));
> assignKc.assign(tps);
> subsKc.subscribe(Arrays.asList("topic-not-exists"));
> System.out.println("* subscribe k consumer ");
> ConsumerRecords crs2 = subsKc.poll(1000L); 
> print("subscribeKc", crs2); // returns empty
> System.out.println("* assign k consumer ");
> ConsumerRecords crs1 = assignKc.poll(1000L); 
>// will loop forever ! 
> print("assignKc", crs1);
> {quote}
> the logs for the "assigned" consumer show:
> [2016-05-18 17:33:09,907] DEBUG Updated cluster metadata version 8 to 
> Cluster(nodes = [192.168.10.18:9093 (id: 0 rack: null)], partitions = []) 
> (org.apache.kafka.clients.Metadata)
> [2016-05-18 17:33:09,908] DEBUG Partition topic-not-exists-0 is unknown for 
> fetching offset, wait for metadata refresh 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> [2016-05-18 17:33:10,010] DEBUG Sending metadata request 
> {topics=[topic-not-exists]} to node 0 (org.apache.kafka.clients.NetworkClient)
> [2016-05-18 17:33:10,011] WARN Error while fetching metadata with correlation 
> id 9 : {topic-not-exists=UNKNOWN_TOPIC_OR_PARTITION} 
> (org.apache.kafka.clients.NetworkClient)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3727) Consumer.poll() stuck in loop on non-existent topic manually assigned

2016-05-23 Thread Edoardo Comar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15296538#comment-15296538
 ] 

Edoardo Comar commented on KAFKA-3727:
--

My expectation is that with either subscribe() and assign() if the topic does 
not exist the behaviour on poll() should be the same;
e.g. return an empty recordset (as subscribe currently does) 

this defect may be similar to https://issues.apache.org/jira/browse/KAFKA-3503


> Consumer.poll() stuck in loop on non-existent topic manually assigned
> -
>
> Key: KAFKA-3727
> URL: https://issues.apache.org/jira/browse/KAFKA-3727
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Edoardo Comar
>
> The behavior of a consumer on poll() for a non-existing topic is surprisingly 
> different/inconsistent 
> between a consumer that subscribed to the topic and one that had the 
> topic-partition manually assigned.
> The "subscribed" consumer will return an empty collection
> The "assigned" consumer will *loop forever* - this feels a bug to me.
> sample snippet to reproduce:
> {quote}
> KafkaConsumer assignKc = new KafkaConsumer<>(props1);
> KafkaConsumer subsKc = new KafkaConsumer<>(props2);
> List tps = new ArrayList<>();
> tps.add(new TopicPartition("topic-not-exists", 0));
> assignKc.assign(tps);
> subsKc.subscribe(Arrays.asList("topic-not-exists"));
> System.out.println("* subscribe k consumer ");
> ConsumerRecords crs2 = subsKc.poll(1000L); 
> print("subscribeKc", crs2); // returns empty
> System.out.println("* assign k consumer ");
> ConsumerRecords crs1 = assignKc.poll(1000L); 
>// will loop forever ! 
> print("assignKc", crs1);
> {quote}
> the logs for the "assigned" consumer show:
> [2016-05-18 17:33:09,907] DEBUG Updated cluster metadata version 8 to 
> Cluster(nodes = [192.168.10.18:9093 (id: 0 rack: null)], partitions = []) 
> (org.apache.kafka.clients.Metadata)
> [2016-05-18 17:33:09,908] DEBUG Partition topic-not-exists-0 is unknown for 
> fetching offset, wait for metadata refresh 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> [2016-05-18 17:33:10,010] DEBUG Sending metadata request 
> {topics=[topic-not-exists]} to node 0 (org.apache.kafka.clients.NetworkClient)
> [2016-05-18 17:33:10,011] WARN Error while fetching metadata with correlation 
> id 9 : {topic-not-exists=UNKNOWN_TOPIC_OR_PARTITION} 
> (org.apache.kafka.clients.NetworkClient)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3727) Consumer.poll() stuck in loop on non-existent topic manually assigned

2016-05-23 Thread Edoardo Comar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15296398#comment-15296398
 ] 

Edoardo Comar commented on KAFKA-3727:
--

The consumer remains stuck in a loop because 
{quote}
Fetcher.listOffset(TopicPartition, long) line: 320 
Fetcher.resetOffset(TopicPartition) line: 294  
Fetcher.updateFetchPositions(Set) line: 170
KafkaConsumer.updateFetchPositions(Set) line: 1408 
KafkaConsumer.pollOnce(long) line: 982 
KafkaConsumer.poll(long) line: 937 
{quote}

that is, the consumer is stuck in awaitMetadataUpdate in Fetcher.java
{code}
  private long listOffset(TopicPartition partition, long timestamp) {
while (true) {
RequestFuture future = sendListOffsetRequest(partition, 
timestamp);
client.poll(future);

if (future.succeeded())
return future.value();

if (!future.isRetriable())
throw future.exception();

if (future.exception() instanceof InvalidMetadataException)
client.awaitMetadataUpdate();
else
time.sleep(retryBackoffMs);
}
}
{code}

Don't you think that the consumer should give up with an exception ?

> Consumer.poll() stuck in loop on non-existent topic manually assigned
> -
>
> Key: KAFKA-3727
> URL: https://issues.apache.org/jira/browse/KAFKA-3727
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Edoardo Comar
>
> The behavior of a consumer on poll() for a non-existing topic is surprisingly 
> different/inconsistent 
> between a consumer that subscribed to the topic and one that had the 
> topic-partition manually assigned.
> The "subscribed" consumer will return an empty collection
> The "assigned" consumer will *loop forever* - this feels a bug to me.
> sample snippet to reproduce:
> {quote}
> KafkaConsumer assignKc = new KafkaConsumer<>(props1);
> KafkaConsumer subsKc = new KafkaConsumer<>(props2);
> List tps = new ArrayList<>();
> tps.add(new TopicPartition("topic-not-exists", 0));
> assignKc.assign(tps);
> subsKc.subscribe(Arrays.asList("topic-not-exists"));
> System.out.println("* subscribe k consumer ");
> ConsumerRecords crs2 = subsKc.poll(1000L); 
> print("subscribeKc", crs2); // returns empty
> System.out.println("* assign k consumer ");
> ConsumerRecords crs1 = assignKc.poll(1000L); 
>// will loop forever ! 
> print("assignKc", crs1);
> {quote}
> the logs for the "assigned" consumer show:
> [2016-05-18 17:33:09,907] DEBUG Updated cluster metadata version 8 to 
> Cluster(nodes = [192.168.10.18:9093 (id: 0 rack: null)], partitions = []) 
> (org.apache.kafka.clients.Metadata)
> [2016-05-18 17:33:09,908] DEBUG Partition topic-not-exists-0 is unknown for 
> fetching offset, wait for metadata refresh 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> [2016-05-18 17:33:10,010] DEBUG Sending metadata request 
> {topics=[topic-not-exists]} to node 0 (org.apache.kafka.clients.NetworkClient)
> [2016-05-18 17:33:10,011] WARN Error while fetching metadata with correlation 
> id 9 : {topic-not-exists=UNKNOWN_TOPIC_OR_PARTITION} 
> (org.apache.kafka.clients.NetworkClient)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Issue Comment Deleted] (KAFKA-3727) Consumer.poll() stuck in loop on non-existent topic manually assigned

2016-05-23 Thread Edoardo Comar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3727?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Edoardo Comar updated KAFKA-3727:
-
Comment: was deleted

(was: To explain better, this is a stack trace when the consumer is stuck in 
the loop :
{quote}
Fetcher.listOffset(TopicPartition, long) line: 320 
Fetcher.resetOffset(TopicPartition) line: 294  
Fetcher.updateFetchPositions(Set) line: 170
KafkaConsumer.updateFetchPositions(Set) line: 1408 
KafkaConsumer.pollOnce(long) line: 982 
KafkaConsumer.poll(long) line: 937 
{quote}

that is, the ConsumerNetworkClient will keep invoking {{ awaitMetadataUpdate(); 
}}

{quote}
private long listOffset(TopicPartition partition, long timestamp) {
while (true) {
RequestFuture future = sendListOffsetRequest(partition, 
timestamp);
client.poll(future);

if (future.succeeded())
return future.value();

if (!future.isRetriable())
throw future.exception();

if (future.exception() instanceof InvalidMetadataException)
client.awaitMetadataUpdate();
else
time.sleep(retryBackoffMs);
}
}
{quote}

Don't you think the consumer should give up at some point ?)

> Consumer.poll() stuck in loop on non-existent topic manually assigned
> -
>
> Key: KAFKA-3727
> URL: https://issues.apache.org/jira/browse/KAFKA-3727
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Edoardo Comar
>
> The behavior of a consumer on poll() for a non-existing topic is surprisingly 
> different/inconsistent 
> between a consumer that subscribed to the topic and one that had the 
> topic-partition manually assigned.
> The "subscribed" consumer will return an empty collection
> The "assigned" consumer will *loop forever* - this feels a bug to me.
> sample snippet to reproduce:
> {quote}
> KafkaConsumer assignKc = new KafkaConsumer<>(props1);
> KafkaConsumer subsKc = new KafkaConsumer<>(props2);
> List tps = new ArrayList<>();
> tps.add(new TopicPartition("topic-not-exists", 0));
> assignKc.assign(tps);
> subsKc.subscribe(Arrays.asList("topic-not-exists"));
> System.out.println("* subscribe k consumer ");
> ConsumerRecords crs2 = subsKc.poll(1000L); 
> print("subscribeKc", crs2); // returns empty
> System.out.println("* assign k consumer ");
> ConsumerRecords crs1 = assignKc.poll(1000L); 
>// will loop forever ! 
> print("assignKc", crs1);
> {quote}
> the logs for the "assigned" consumer show:
> [2016-05-18 17:33:09,907] DEBUG Updated cluster metadata version 8 to 
> Cluster(nodes = [192.168.10.18:9093 (id: 0 rack: null)], partitions = []) 
> (org.apache.kafka.clients.Metadata)
> [2016-05-18 17:33:09,908] DEBUG Partition topic-not-exists-0 is unknown for 
> fetching offset, wait for metadata refresh 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> [2016-05-18 17:33:10,010] DEBUG Sending metadata request 
> {topics=[topic-not-exists]} to node 0 (org.apache.kafka.clients.NetworkClient)
> [2016-05-18 17:33:10,011] WARN Error while fetching metadata with correlation 
> id 9 : {topic-not-exists=UNKNOWN_TOPIC_OR_PARTITION} 
> (org.apache.kafka.clients.NetworkClient)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3727) Consumer.poll() stuck in loop on non-existent topic manually assigned

2016-05-23 Thread Edoardo Comar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15296394#comment-15296394
 ] 

Edoardo Comar commented on KAFKA-3727:
--

To explain better, this is a stack trace when the consumer is stuck in the loop 
:
{quote}
Fetcher.listOffset(TopicPartition, long) line: 320 
Fetcher.resetOffset(TopicPartition) line: 294  
Fetcher.updateFetchPositions(Set) line: 170
KafkaConsumer.updateFetchPositions(Set) line: 1408 
KafkaConsumer.pollOnce(long) line: 982 
KafkaConsumer.poll(long) line: 937 
{quote}

that is, the ConsumerNetworkClient will keep invoking {{ awaitMetadataUpdate(); 
}}

{quote}
private long listOffset(TopicPartition partition, long timestamp) {
while (true) {
RequestFuture future = sendListOffsetRequest(partition, 
timestamp);
client.poll(future);

if (future.succeeded())
return future.value();

if (!future.isRetriable())
throw future.exception();

if (future.exception() instanceof InvalidMetadataException)
client.awaitMetadataUpdate();
else
time.sleep(retryBackoffMs);
}
}
{quote}

Don't you think the consumer should give up at some point ?

> Consumer.poll() stuck in loop on non-existent topic manually assigned
> -
>
> Key: KAFKA-3727
> URL: https://issues.apache.org/jira/browse/KAFKA-3727
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Edoardo Comar
>
> The behavior of a consumer on poll() for a non-existing topic is surprisingly 
> different/inconsistent 
> between a consumer that subscribed to the topic and one that had the 
> topic-partition manually assigned.
> The "subscribed" consumer will return an empty collection
> The "assigned" consumer will *loop forever* - this feels a bug to me.
> sample snippet to reproduce:
> {quote}
> KafkaConsumer assignKc = new KafkaConsumer<>(props1);
> KafkaConsumer subsKc = new KafkaConsumer<>(props2);
> List tps = new ArrayList<>();
> tps.add(new TopicPartition("topic-not-exists", 0));
> assignKc.assign(tps);
> subsKc.subscribe(Arrays.asList("topic-not-exists"));
> System.out.println("* subscribe k consumer ");
> ConsumerRecords crs2 = subsKc.poll(1000L); 
> print("subscribeKc", crs2); // returns empty
> System.out.println("* assign k consumer ");
> ConsumerRecords crs1 = assignKc.poll(1000L); 
>// will loop forever ! 
> print("assignKc", crs1);
> {quote}
> the logs for the "assigned" consumer show:
> [2016-05-18 17:33:09,907] DEBUG Updated cluster metadata version 8 to 
> Cluster(nodes = [192.168.10.18:9093 (id: 0 rack: null)], partitions = []) 
> (org.apache.kafka.clients.Metadata)
> [2016-05-18 17:33:09,908] DEBUG Partition topic-not-exists-0 is unknown for 
> fetching offset, wait for metadata refresh 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> [2016-05-18 17:33:10,010] DEBUG Sending metadata request 
> {topics=[topic-not-exists]} to node 0 (org.apache.kafka.clients.NetworkClient)
> [2016-05-18 17:33:10,011] WARN Error while fetching metadata with correlation 
> id 9 : {topic-not-exists=UNKNOWN_TOPIC_OR_PARTITION} 
> (org.apache.kafka.clients.NetworkClient)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3567) Add --security-protocol option to console consumer and producer

2016-05-23 Thread Edoardo Comar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15296163#comment-15296163
 ] 

Edoardo Comar commented on KAFKA-3567:
--

Hi, the console producer has these options:

--producer-property   A mechanism to pass user-defined   
   properties in the form key=value to  
   the producer.
--producer.config   Producer config properties file. Note  
   that [producer-property] takes   
   precedence over this config. 

the console consumer has only

--consumer.config  Consumer config properties file.   

IMHO, it would make sense not to add a specific --security-protocol option to 
them,
but just add a 
--consumer-property 

to the console consumer

> Add --security-protocol option to console consumer and producer
> ---
>
> Key: KAFKA-3567
> URL: https://issues.apache.org/jira/browse/KAFKA-3567
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.9.0.0
>Reporter: Sriharsha Chintalapani
>Assignee: Bharat Viswanadham
> Fix For: 0.9.0.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3728) EndToEndAuthorizationTest offsets_topic misconfigured

2016-05-20 Thread Edoardo Comar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15293272#comment-15293272
 ] 

Edoardo Comar commented on KAFKA-3728:
--

Thanks [~rsivaram] 
it wasn't obvious that ack=-1 requires a number of replicas equal to 
min.insync.replicas, even when the topic has fewer replicas than the ISR.

Possibly due to timeouts, even setting OffsetsTopicReplicationFactorProp to  
min.insync.replicas (=3) doesn't make the test pass reliably.
so I also set the OffsetCommitRequiredAcksProp to 1.

PR coming

> EndToEndAuthorizationTest offsets_topic misconfigured
> -
>
> Key: KAFKA-3728
> URL: https://issues.apache.org/jira/browse/KAFKA-3728
> Project: Kafka
>  Issue Type: Bug
>Reporter: Edoardo Comar
>
> A consumer that is manually assigned a topic-partition is able to consume 
> messages that a consumer that subscribes to the topic can not.
> To reproduce : take the test 
> EndToEndAuthorizationTest.testProduceConsume 
> (eg the SaslSslEndToEndAuthorizationTest implementation)
>  
> it passes ( = messages are consumed) 
> if the consumer is assigned the single topic-partition
>   consumers.head.assign(List(tp).asJava)
> but fails 
> if the consumer subscribes to the topic - changing the line to :
>   consumers.head.subscribe(List(topic).asJava)
> The failure when subscribed shows this error about synchronization:
>  org.apache.kafka.common.KafkaException: Unexpected error from SyncGroup: 
> Messages are rejected since there are fewer in-sync replicas than required.
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:455)
> The test passes in both cases (subscribe and assign) with the setting
>   this.serverConfig.setProperty(KafkaConfig.MinInSyncReplicasProp, "1")



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3728) EndToEndAuthorizationTest offsets_topic misconfigured

2016-05-20 Thread Edoardo Comar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3728?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Edoardo Comar updated KAFKA-3728:
-
Summary: EndToEndAuthorizationTest offsets_topic misconfigured  (was: 
inconsistent behavior of Consumer.poll() when assigned vs subscribed)

> EndToEndAuthorizationTest offsets_topic misconfigured
> -
>
> Key: KAFKA-3728
> URL: https://issues.apache.org/jira/browse/KAFKA-3728
> Project: Kafka
>  Issue Type: Bug
>Reporter: Edoardo Comar
>
> A consumer that is manually assigned a topic-partition is able to consume 
> messages that a consumer that subscribes to the topic can not.
> To reproduce : take the test 
> EndToEndAuthorizationTest.testProduceConsume 
> (eg the SaslSslEndToEndAuthorizationTest implementation)
>  
> it passes ( = messages are consumed) 
> if the consumer is assigned the single topic-partition
>   consumers.head.assign(List(tp).asJava)
> but fails 
> if the consumer subscribes to the topic - changing the line to :
>   consumers.head.subscribe(List(topic).asJava)
> The failure when subscribed shows this error about synchronization:
>  org.apache.kafka.common.KafkaException: Unexpected error from SyncGroup: 
> Messages are rejected since there are fewer in-sync replicas than required.
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:455)
> The test passes in both cases (subscribe and assign) with the setting
>   this.serverConfig.setProperty(KafkaConfig.MinInSyncReplicasProp, "1")



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3396) Unauthorized topics are returned to the user

2016-05-18 Thread Edoardo Comar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15289551#comment-15289551
 ] 

Edoardo Comar commented on KAFKA-3396:
--

I'm held back by 
https://issues.apache.org/jira/browse/KAFKA-3727
and
https://issues.apache.org/jira/browse/KAFKA-3728


> Unauthorized topics are returned to the user
> 
>
> Key: KAFKA-3396
> URL: https://issues.apache.org/jira/browse/KAFKA-3396
> Project: Kafka
>  Issue Type: Bug
>Reporter: Grant Henke
>Assignee: Edoardo Comar
>
> Kafka's clients and protocol exposes unauthorized topics to the end user. 
> This is often considered a security hole. To some, the topic name is 
> considered sensitive information. Those that do not consider the name 
> sensitive, still consider it more information that allows a user to try and 
> circumvent security.  Instead, if a user does not have access to the topic, 
> the servers should act as if the topic does not exist. 
> To solve this some of the changes could include:
>   - The broker should not return a TOPIC_AUTHORIZATION(29) error for 
> requests (metadata, produce, fetch, etc) that include a topic that the user 
> does not have DESCRIBE access to.
>   - A user should not receive a TopicAuthorizationException when they do 
> not have DESCRIBE access to a topic or the cluster.
>  - The client should not maintain and expose a list of unauthorized 
> topics in org.apache.kafka.common.Cluster. 
> Other changes may be required that are not listed here. Further analysis is 
> needed. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3728) inconsistent behavior of Consumer.poll() when assigned vs subscribed

2016-05-18 Thread Edoardo Comar (JIRA)
Edoardo Comar created KAFKA-3728:


 Summary: inconsistent behavior of Consumer.poll() when assigned vs 
subscribed
 Key: KAFKA-3728
 URL: https://issues.apache.org/jira/browse/KAFKA-3728
 Project: Kafka
  Issue Type: Bug
Reporter: Edoardo Comar


A consumer that is manually assigned a topic-partition is able to consume 
messages that a consumer that subscribes to the topic can not.

To reproduce : take the test 
EndToEndAuthorizationTest.testProduceConsume 
(eg the SaslSslEndToEndAuthorizationTest implementation)
 
it passes ( = messages are consumed) 
if the consumer is assigned the single topic-partition
  consumers.head.assign(List(tp).asJava)
but fails 
if the consumer subscribes to the topic - changing the line to :
  consumers.head.subscribe(List(topic).asJava)

The failure when subscribed shows this error about synchronization:

 org.apache.kafka.common.KafkaException: Unexpected error from SyncGroup: 
Messages are rejected since there are fewer in-sync replicas than required.
  at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:455)


The test passes in both cases (subscribe and assign) with the setting
  this.serverConfig.setProperty(KafkaConfig.MinInSyncReplicasProp, "1")




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3727) Consumer.poll() stuck in loop on non-existent topic manually assigned

2016-05-18 Thread Edoardo Comar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3727?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Edoardo Comar updated KAFKA-3727:
-
Description: 
The behavior of a consumer on poll() for a non-existing topic is surprisingly 
different/inconsistent 
between a consumer that subscribed to the topic and one that had the 
topic-partition manually assigned.

The "subscribed" consumer will return an empty collection
The "assigned" consumer will *loop forever* - this feels a bug to me.

sample snippet to reproduce:
{quote}
KafkaConsumer assignKc = new KafkaConsumer<>(props1);
KafkaConsumer subsKc = new KafkaConsumer<>(props2);
List tps = new ArrayList<>();
tps.add(new TopicPartition("topic-not-exists", 0));
assignKc.assign(tps);

subsKc.subscribe(Arrays.asList("topic-not-exists"));

System.out.println("* subscribe k consumer ");
ConsumerRecords crs2 = subsKc.poll(1000L); 
print("subscribeKc", crs2); // returns empty

System.out.println("* assign k consumer ");
ConsumerRecords crs1 = assignKc.poll(1000L); 
   // will loop forever ! 
print("assignKc", crs1);
{quote}

the logs for the "assigned" consumer show:
[2016-05-18 17:33:09,907] DEBUG Updated cluster metadata version 8 to 
Cluster(nodes = [192.168.10.18:9093 (id: 0 rack: null)], partitions = []) 
(org.apache.kafka.clients.Metadata)
[2016-05-18 17:33:09,908] DEBUG Partition topic-not-exists-0 is unknown for 
fetching offset, wait for metadata refresh 
(org.apache.kafka.clients.consumer.internals.Fetcher)
[2016-05-18 17:33:10,010] DEBUG Sending metadata request 
{topics=[topic-not-exists]} to node 0 (org.apache.kafka.clients.NetworkClient)
[2016-05-18 17:33:10,011] WARN Error while fetching metadata with correlation 
id 9 : {topic-not-exists=UNKNOWN_TOPIC_OR_PARTITION} 
(org.apache.kafka.clients.NetworkClient)


  was:
Inconsistent behavior of Consumer.poll() on non-existent topic when assigned vs 
subscribed

The behavior of a consumer on poll() for a non-existing topic is surprisingly 
different between a consumer that subscribed to the topic and one that had the 
topic-partition manually assigned.

the "subscribed" consumer will return an empty collection
the "assigned" consumer will *loop forever*.
the latter behavior feels a bug to me.

sample snippet to reproduce:
{quote}
KafkaConsumer assignKc = new KafkaConsumer<>(props1);
KafkaConsumer subsKc = new KafkaConsumer<>(props2);
List tps = new ArrayList<>();
tps.add(new TopicPartition("topic-not-exists", 0));
assignKc.assign(tps);

subsKc.subscribe(Arrays.asList("topic-not-exists"));

System.out.println("* subscribe k consumer ");
ConsumerRecords crs2 = subsKc.poll(1000L); 
print("subscribeKc", crs2); // returns empty

System.out.println("* assign k consumer ");
ConsumerRecords crs1 = assignKc.poll(1000L); 
   // will loop forever ! 
print("assignKc", crs1);
{quote}

the logs for the "assigned" consumer show:
[2016-05-18 17:33:09,907] DEBUG Updated cluster metadata version 8 to 
Cluster(nodes = [192.168.10.18:9093 (id: 0 rack: null)], partitions = []) 
(org.apache.kafka.clients.Metadata)
[2016-05-18 17:33:09,908] DEBUG Partition topic-not-exists-0 is unknown for 
fetching offset, wait for metadata refresh 
(org.apache.kafka.clients.consumer.internals.Fetcher)
[2016-05-18 17:33:10,010] DEBUG Sending metadata request 
{topics=[topic-not-exists]} to node 0 (org.apache.kafka.clients.NetworkClient)
[2016-05-18 17:33:10,011] WARN Error while fetching metadata with correlation 
id 9 : {topic-not-exists=UNKNOWN_TOPIC_OR_PARTITION} 
(org.apache.kafka.clients.NetworkClient)



> Consumer.poll() stuck in loop on non-existent topic manually assigned
> -
>
> Key: KAFKA-3727
> URL: https://issues.apache.org/jira/browse/KAFKA-3727
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Edoardo Comar
>
> The behavior of a consumer on poll() for a non-existing topic is surprisingly 
> different/inconsistent 
> between a consumer that subscribed to the topic and one that had the 
> topic-partition manually assigned.
> The "subscribed" consumer will return an empty collection
> The "assigned" consumer will *loop forever* - this feels a bug to me.
> sample snippet to reproduce:
> {quote}
> KafkaConsumer assignKc = new KafkaConsumer<>(props1);
> KafkaConsumer subsKc = new KafkaConsumer<>(props2);
> List tps = new ArrayList<>();
> tps.add(new TopicPartition("topic-not-exists", 0));
> assignKc.assign(tps);
> subsKc.subscribe(Arrays.asList("topic-not-exists"));
> System.out

[jira] [Updated] (KAFKA-3727) inconsistent behavior of Consumer.poll() on non-existent topic when assigned vs subscribed

2016-05-18 Thread Edoardo Comar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3727?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Edoardo Comar updated KAFKA-3727:
-
Description: 
Inconsistent behavior of Consumer.poll() on non-existent topic when assigned vs 
subscribed

The behavior of a consumer on poll() for a non-existing topic is surprisingly 
different between a consumer that subscribed to the topic and one that had the 
topic-partition manually assigned.

the "subscribed" consumer will return an empty collection
the "assigned" consumer will *loop forever*.
the latter behavior feels a bug to me.

sample snippet to reproduce:
{quote}
KafkaConsumer assignKc = new KafkaConsumer<>(props1);
KafkaConsumer subsKc = new KafkaConsumer<>(props2);
List tps = new ArrayList<>();
tps.add(new TopicPartition("topic-not-exists", 0));
assignKc.assign(tps);

subsKc.subscribe(Arrays.asList("topic-not-exists"));

System.out.println("* subscribe k consumer ");
ConsumerRecords crs2 = subsKc.poll(1000L); 
print("subscribeKc", crs2); // returns empty

System.out.println("* assign k consumer ");
ConsumerRecords crs1 = assignKc.poll(1000L); 
   // will loop forever ! 
print("assignKc", crs1);
{quote}

the logs for the "assigned" consumer show:
[2016-05-18 17:33:09,907] DEBUG Updated cluster metadata version 8 to 
Cluster(nodes = [192.168.10.18:9093 (id: 0 rack: null)], partitions = []) 
(org.apache.kafka.clients.Metadata)
[2016-05-18 17:33:09,908] DEBUG Partition topic-not-exists-0 is unknown for 
fetching offset, wait for metadata refresh 
(org.apache.kafka.clients.consumer.internals.Fetcher)
[2016-05-18 17:33:10,010] DEBUG Sending metadata request 
{topics=[topic-not-exists]} to node 0 (org.apache.kafka.clients.NetworkClient)
[2016-05-18 17:33:10,011] WARN Error while fetching metadata with correlation 
id 9 : {topic-not-exists=UNKNOWN_TOPIC_OR_PARTITION} 
(org.apache.kafka.clients.NetworkClient)


  was:
The behavior of a consumer on poll() for a non-existing topic is surprisingly 
different between a consumer that subscribed to the topic and one that had the 
topic-partition manually assigned.

the "subscribed" consumer will return an empty collection
the "assigned" consumer will *loop forever*.
the latter behavior feels a bug to me.

sample snippet to reproduce:
{quote}
KafkaConsumer assignKc = new KafkaConsumer<>(props1);
KafkaConsumer subsKc = new KafkaConsumer<>(props2);
List tps = new ArrayList<>();
tps.add(new TopicPartition("topic-not-exists", 0));
assignKc.assign(tps);

subsKc.subscribe(Arrays.asList("topic-not-exists"));

System.out.println("* subscribe k consumer ");
ConsumerRecords crs2 = subsKc.poll(1000L); 
print("subscribeKc", crs2); // returns empty

System.out.println("* assign k consumer ");
ConsumerRecords crs1 = assignKc.poll(1000L); 
   // will loop forever ! 
print("assignKc", crs1);
{quote}

the logs for the "assigned" consumer show:
[2016-05-18 17:33:09,907] DEBUG Updated cluster metadata version 8 to 
Cluster(nodes = [192.168.10.18:9093 (id: 0 rack: null)], partitions = []) 
(org.apache.kafka.clients.Metadata)
[2016-05-18 17:33:09,908] DEBUG Partition topic-not-exists-0 is unknown for 
fetching offset, wait for metadata refresh 
(org.apache.kafka.clients.consumer.internals.Fetcher)
[2016-05-18 17:33:10,010] DEBUG Sending metadata request 
{topics=[topic-not-exists]} to node 0 (org.apache.kafka.clients.NetworkClient)
[2016-05-18 17:33:10,011] WARN Error while fetching metadata with correlation 
id 9 : {topic-not-exists=UNKNOWN_TOPIC_OR_PARTITION} 
(org.apache.kafka.clients.NetworkClient)



> inconsistent behavior of Consumer.poll() on non-existent topic when assigned 
> vs subscribed
> --
>
> Key: KAFKA-3727
> URL: https://issues.apache.org/jira/browse/KAFKA-3727
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Edoardo Comar
>
> Inconsistent behavior of Consumer.poll() on non-existent topic when assigned 
> vs subscribed
> The behavior of a consumer on poll() for a non-existing topic is surprisingly 
> different between a consumer that subscribed to the topic and one that had 
> the topic-partition manually assigned.
> the "subscribed" consumer will return an empty collection
> the "assigned" consumer will *loop forever*.
> the latter behavior feels a bug to me.
> sample snippet to reproduce:
> {quote}
> KafkaConsumer assignKc = new KafkaConsumer<>(props1);
> KafkaConsumer subsKc = new KafkaConsumer<>(props2);
> List tps = new ArrayList<>();
> tps.add(new TopicPartition("topic-not-

[jira] [Updated] (KAFKA-3727) Consumer.poll() stuck in loop on non-existent topic manually assigned

2016-05-18 Thread Edoardo Comar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3727?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Edoardo Comar updated KAFKA-3727:
-
Summary: Consumer.poll() stuck in loop on non-existent topic manually 
assigned  (was: inconsistent behavior of Consumer.poll() on non-existent topic 
when assigned vs subscribed)

> Consumer.poll() stuck in loop on non-existent topic manually assigned
> -
>
> Key: KAFKA-3727
> URL: https://issues.apache.org/jira/browse/KAFKA-3727
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Edoardo Comar
>
> Inconsistent behavior of Consumer.poll() on non-existent topic when assigned 
> vs subscribed
> The behavior of a consumer on poll() for a non-existing topic is surprisingly 
> different between a consumer that subscribed to the topic and one that had 
> the topic-partition manually assigned.
> the "subscribed" consumer will return an empty collection
> the "assigned" consumer will *loop forever*.
> the latter behavior feels a bug to me.
> sample snippet to reproduce:
> {quote}
> KafkaConsumer assignKc = new KafkaConsumer<>(props1);
> KafkaConsumer subsKc = new KafkaConsumer<>(props2);
> List tps = new ArrayList<>();
> tps.add(new TopicPartition("topic-not-exists", 0));
> assignKc.assign(tps);
> subsKc.subscribe(Arrays.asList("topic-not-exists"));
> System.out.println("* subscribe k consumer ");
> ConsumerRecords crs2 = subsKc.poll(1000L); 
> print("subscribeKc", crs2); // returns empty
> System.out.println("* assign k consumer ");
> ConsumerRecords crs1 = assignKc.poll(1000L); 
>// will loop forever ! 
> print("assignKc", crs1);
> {quote}
> the logs for the "assigned" consumer show:
> [2016-05-18 17:33:09,907] DEBUG Updated cluster metadata version 8 to 
> Cluster(nodes = [192.168.10.18:9093 (id: 0 rack: null)], partitions = []) 
> (org.apache.kafka.clients.Metadata)
> [2016-05-18 17:33:09,908] DEBUG Partition topic-not-exists-0 is unknown for 
> fetching offset, wait for metadata refresh 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> [2016-05-18 17:33:10,010] DEBUG Sending metadata request 
> {topics=[topic-not-exists]} to node 0 (org.apache.kafka.clients.NetworkClient)
> [2016-05-18 17:33:10,011] WARN Error while fetching metadata with correlation 
> id 9 : {topic-not-exists=UNKNOWN_TOPIC_OR_PARTITION} 
> (org.apache.kafka.clients.NetworkClient)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3727) inconsistent behavior of Consumer.poll() on non-existent topic when assigned vs subscribed

2016-05-18 Thread Edoardo Comar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3727?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Edoardo Comar updated KAFKA-3727:
-
Description: 
The behavior of a consumer on poll() for a non-existing topic is surprisingly 
different between a consumer that subscribed to the topic and one that had the 
topic-partition manually assigned.

the "subscribed" consumer will return an empty collection
the "assigned" consumer will *loop forever*.
the latter behavior feels a bug to me.

sample snippet to reproduce:
{quote}
KafkaConsumer assignKc = new KafkaConsumer<>(props1);
KafkaConsumer subsKc = new KafkaConsumer<>(props2);
List tps = new ArrayList<>();
tps.add(new TopicPartition("topic-not-exists", 0));
assignKc.assign(tps);

subsKc.subscribe(Arrays.asList("topic-not-exists"));

System.out.println("* subscribe k consumer ");
ConsumerRecords crs2 = subsKc.poll(1000L); 
print("subscribeKc", crs2); // returns empty

System.out.println("* assign k consumer ");
ConsumerRecords crs1 = assignKc.poll(1000L); 
   // will loop forever ! 
print("assignKc", crs1);
{quote}

the logs for the "assigned" consumer show:
[2016-05-18 17:33:09,907] DEBUG Updated cluster metadata version 8 to 
Cluster(nodes = [192.168.10.18:9093 (id: 0 rack: null)], partitions = []) 
(org.apache.kafka.clients.Metadata)
[2016-05-18 17:33:09,908] DEBUG Partition topic-not-exists-0 is unknown for 
fetching offset, wait for metadata refresh 
(org.apache.kafka.clients.consumer.internals.Fetcher)
[2016-05-18 17:33:10,010] DEBUG Sending metadata request 
{topics=[topic-not-exists]} to node 0 (org.apache.kafka.clients.NetworkClient)
[2016-05-18 17:33:10,011] WARN Error while fetching metadata with correlation 
id 9 : {topic-not-exists=UNKNOWN_TOPIC_OR_PARTITION} 
(org.apache.kafka.clients.NetworkClient)


  was:
The behavior of a consumer on poll() for a non-existing topic is surprisingly 
different between a consumer that subscribed to the topic and one that had the 
topic-partition manually assigned.

the "subscribed" consumer will return an empty collection
the "assigned" consumer will *loop forever*.
the latter behavior feels a bug to me.

{quote}
KafkaConsumer assignKc = new KafkaConsumer<>(props1);
KafkaConsumer subsKc = new KafkaConsumer<>(props2);
List tps = new ArrayList<>();
tps.add(new TopicPartition("topic-not-exists", 0));
assignKc.assign(tps);

subsKc.subscribe(Arrays.asList("topic-not-exists"));

System.out.println("* subscribe k consumer ");
ConsumerRecords crs2 = subsKc.poll(1000L); 
print("subscribeKc", crs2); // returns empty

System.out.println("* assign k consumer ");
ConsumerRecords crs1 = assignKc.poll(1000L); 
   // will loop forever !
print("assignKc", crs1);
{quote}

the logs for the "assigned" consumer show:
[2016-05-18 17:33:09,907] DEBUG Updated cluster metadata version 8 to 
Cluster(nodes = [192.168.10.18:9093 (id: 0 rack: null)], partitions = []) 
(org.apache.kafka.clients.Metadata)
[2016-05-18 17:33:09,908] DEBUG Partition topic-not-exists-0 is unknown for 
fetching offset, wait for metadata refresh 
(org.apache.kafka.clients.consumer.internals.Fetcher)
[2016-05-18 17:33:10,010] DEBUG Sending metadata request 
{topics=[topic-not-exists]} to node 0 (org.apache.kafka.clients.NetworkClient)
[2016-05-18 17:33:10,011] WARN Error while fetching metadata with correlation 
id 9 : {topic-not-exists=UNKNOWN_TOPIC_OR_PARTITION} 
(org.apache.kafka.clients.NetworkClient)



> inconsistent behavior of Consumer.poll() on non-existent topic when assigned 
> vs subscribed
> --
>
> Key: KAFKA-3727
> URL: https://issues.apache.org/jira/browse/KAFKA-3727
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Edoardo Comar
>
> The behavior of a consumer on poll() for a non-existing topic is surprisingly 
> different between a consumer that subscribed to the topic and one that had 
> the topic-partition manually assigned.
> the "subscribed" consumer will return an empty collection
> the "assigned" consumer will *loop forever*.
> the latter behavior feels a bug to me.
> sample snippet to reproduce:
> {quote}
> KafkaConsumer assignKc = new KafkaConsumer<>(props1);
> KafkaConsumer subsKc = new KafkaConsumer<>(props2);
> List tps = new ArrayList<>();
> tps.add(new TopicPartition("topic-not-exists", 0));
> assignKc.assign(tps);
> subsKc.subscribe(Arrays.asList("topic-not-exists"));
> System.out.println("* subscribe k consumer ");
> ConsumerRecords crs

[jira] [Updated] (KAFKA-3727) inconsistent behavior of Consumer.poll() on non-existent topic when assigned vs subscribed

2016-05-18 Thread Edoardo Comar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3727?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Edoardo Comar updated KAFKA-3727:
-
Description: 
The behavior of a consumer on poll() for a non-existing topic is surprisingly 
different between a consumer that subscribed to the topic and one that had the 
topic-partition manually assigned.

the "subscribed" consumer will return an empty collection
the "assigned" consumer will *loop forever*.
the latter behavior feels a bug to me.

{quote}
KafkaConsumer assignKc = new KafkaConsumer<>(props1);
KafkaConsumer subsKc = new KafkaConsumer<>(props2);
List tps = new ArrayList<>();
tps.add(new TopicPartition("topic-not-exists", 0));
assignKc.assign(tps);

subsKc.subscribe(Arrays.asList("topic-not-exists"));

System.out.println("* subscribe k consumer ");
ConsumerRecords crs2 = subsKc.poll(1000L); 
print("subscribeKc", crs2); // returns empty

System.out.println("* assign k consumer ");
ConsumerRecords crs1 = assignKc.poll(1000L); 
   // will loop forever !
print("assignKc", crs1);
{quote}

the logs for the "assigned" consumer show:
[2016-05-18 17:33:09,907] DEBUG Updated cluster metadata version 8 to 
Cluster(nodes = [192.168.10.18:9093 (id: 0 rack: null)], partitions = []) 
(org.apache.kafka.clients.Metadata)
[2016-05-18 17:33:09,908] DEBUG Partition topic-not-exists-0 is unknown for 
fetching offset, wait for metadata refresh 
(org.apache.kafka.clients.consumer.internals.Fetcher)
[2016-05-18 17:33:10,010] DEBUG Sending metadata request 
{topics=[topic-not-exists]} to node 0 (org.apache.kafka.clients.NetworkClient)
[2016-05-18 17:33:10,011] WARN Error while fetching metadata with correlation 
id 9 : {topic-not-exists=UNKNOWN_TOPIC_OR_PARTITION} 
(org.apache.kafka.clients.NetworkClient)


  was:
The behavior of a consumer on poll() for a non-existing topic is surprisingly 
different between a consumer that subscribed to the topic and one that had the 
topic-partition manually assigned.

the "subscribed" consumer will return an empty collection
the "assigned" consumer will *loop forever*.
the latter behavior 

{quote}
KafkaConsumer assignKc = new KafkaConsumer<>(props1);
KafkaConsumer subsKc = new KafkaConsumer<>(props2);
List tps = new ArrayList<>();
tps.add(new TopicPartition("topic-not-exists", 0));
assignKc.assign(tps);

subsKc.subscribe(Arrays.asList("topic-not-exists"));

System.out.println("* subscribe k consumer ");
ConsumerRecords crs2 = subsKc.poll(1000L); 
print("subscribeKc", crs2); // returns empty

System.out.println("* assign k consumer ");
ConsumerRecords crs1 = assignKc.poll(1000L); 
   // will loop forever !
print("assignKc", crs1);
{quote}

the logs for the "assigned" consumer show:
[2016-05-18 17:33:09,907] DEBUG Updated cluster metadata version 8 to 
Cluster(nodes = [192.168.10.18:9093 (id: 0 rack: null)], partitions = []) 
(org.apache.kafka.clients.Metadata)
[2016-05-18 17:33:09,908] DEBUG Partition topic-not-exists-0 is unknown for 
fetching offset, wait for metadata refresh 
(org.apache.kafka.clients.consumer.internals.Fetcher)
[2016-05-18 17:33:10,010] DEBUG Sending metadata request 
{topics=[topic-not-exists]} to node 0 (org.apache.kafka.clients.NetworkClient)
[2016-05-18 17:33:10,011] WARN Error while fetching metadata with correlation 
id 9 : {topic-not-exists=UNKNOWN_TOPIC_OR_PARTITION} 
(org.apache.kafka.clients.NetworkClient)



> inconsistent behavior of Consumer.poll() on non-existent topic when assigned 
> vs subscribed
> --
>
> Key: KAFKA-3727
> URL: https://issues.apache.org/jira/browse/KAFKA-3727
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Edoardo Comar
>
> The behavior of a consumer on poll() for a non-existing topic is surprisingly 
> different between a consumer that subscribed to the topic and one that had 
> the topic-partition manually assigned.
> the "subscribed" consumer will return an empty collection
> the "assigned" consumer will *loop forever*.
> the latter behavior feels a bug to me.
> {quote}
> KafkaConsumer assignKc = new KafkaConsumer<>(props1);
> KafkaConsumer subsKc = new KafkaConsumer<>(props2);
> List tps = new ArrayList<>();
> tps.add(new TopicPartition("topic-not-exists", 0));
> assignKc.assign(tps);
> subsKc.subscribe(Arrays.asList("topic-not-exists"));
> System.out.println("* subscribe k consumer ");
> ConsumerRecords crs2 = subsKc.poll(1000L); 
> print("subscribeKc", crs2); // returns e

[jira] [Created] (KAFKA-3727) inconsistent behavior of Consumer.poll() on non-existent topic when assigned vs subscribed

2016-05-18 Thread Edoardo Comar (JIRA)
Edoardo Comar created KAFKA-3727:


 Summary: inconsistent behavior of Consumer.poll() on non-existent 
topic when assigned vs subscribed
 Key: KAFKA-3727
 URL: https://issues.apache.org/jira/browse/KAFKA-3727
 Project: Kafka
  Issue Type: Bug
  Components: clients
Reporter: Edoardo Comar


The behavior of a consumer on poll() for a non-existing topic is surprisingly 
different between a consumer that subscribed to the topic and one that had the 
topic-partition manually assigned.

the "subscribed" consumer will return an empty collection
the "assigned" consumer will *loop forever*.
the latter behavior 

{quote}
KafkaConsumer assignKc = new KafkaConsumer<>(props1);
KafkaConsumer subsKc = new KafkaConsumer<>(props2);
List tps = new ArrayList<>();
tps.add(new TopicPartition("topic-not-exists", 0));
assignKc.assign(tps);

subsKc.subscribe(Arrays.asList("topic-not-exists"));

System.out.println("* subscribe k consumer ");
ConsumerRecords crs2 = subsKc.poll(1000L); 
print("subscribeKc", crs2); // returns empty

System.out.println("* assign k consumer ");
ConsumerRecords crs1 = assignKc.poll(1000L); 
   // will loop forever !
print("assignKc", crs1);
{quote}

the logs for the "assigned" consumer show:
[2016-05-18 17:33:09,907] DEBUG Updated cluster metadata version 8 to 
Cluster(nodes = [192.168.10.18:9093 (id: 0 rack: null)], partitions = []) 
(org.apache.kafka.clients.Metadata)
[2016-05-18 17:33:09,908] DEBUG Partition topic-not-exists-0 is unknown for 
fetching offset, wait for metadata refresh 
(org.apache.kafka.clients.consumer.internals.Fetcher)
[2016-05-18 17:33:10,010] DEBUG Sending metadata request 
{topics=[topic-not-exists]} to node 0 (org.apache.kafka.clients.NetworkClient)
[2016-05-18 17:33:10,011] WARN Error while fetching metadata with correlation 
id 9 : {topic-not-exists=UNKNOWN_TOPIC_OR_PARTITION} 
(org.apache.kafka.clients.NetworkClient)




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-3396) Unauthorized topics are returned to the user

2016-05-17 Thread Edoardo Comar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3396?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Edoardo Comar reassigned KAFKA-3396:


Assignee: Edoardo Comar

> Unauthorized topics are returned to the user
> 
>
> Key: KAFKA-3396
> URL: https://issues.apache.org/jira/browse/KAFKA-3396
> Project: Kafka
>  Issue Type: Bug
>Reporter: Grant Henke
>Assignee: Edoardo Comar
>
> Kafka's clients and protocol exposes unauthorized topics to the end user. 
> This is often considered a security hole. To some, the topic name is 
> considered sensitive information. Those that do not consider the name 
> sensitive, still consider it more information that allows a user to try and 
> circumvent security.  Instead, if a user does not have access to the topic, 
> the servers should act as if the topic does not exist. 
> To solve this some of the changes could include:
>   - The broker should not return a TOPIC_AUTHORIZATION(29) error for 
> requests (metadata, produce, fetch, etc) that include a topic that the user 
> does not have DESCRIBE access to.
>   - A user should not receive a TopicAuthorizationException when they do 
> not have DESCRIBE access to a topic or the cluster.
>  - The client should not maintain and expose a list of unauthorized 
> topics in org.apache.kafka.common.Cluster. 
> Other changes may be required that are not listed here. Further analysis is 
> needed. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3396) Unauthorized topics are returned to the user

2016-05-17 Thread Edoardo Comar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15287873#comment-15287873
 ] 

Edoardo Comar commented on KAFKA-3396:
--

Hi [~granthenke] we're still working on it, 
we're working to get the unit tests pass (and added a few others) .

We had some surprises running `SaslSslEndToEndAuthorizationTest`
if we change the consumers from 
```consumers.head.assign(List(tp).asJava)```
to
```consumers.head.subscribe(List(topic).asJava) ```
the code paths are different and even the original tests may not pass unless we 
change
```  this.serverConfig.setProperty(KafkaConfig.MinInSyncReplicasProp, "1")```
to be a `"1"`
which is the case also for the original code. Not sure if we're missing 
something.


> Unauthorized topics are returned to the user
> 
>
> Key: KAFKA-3396
> URL: https://issues.apache.org/jira/browse/KAFKA-3396
> Project: Kafka
>  Issue Type: Bug
>Reporter: Grant Henke
>
> Kafka's clients and protocol exposes unauthorized topics to the end user. 
> This is often considered a security hole. To some, the topic name is 
> considered sensitive information. Those that do not consider the name 
> sensitive, still consider it more information that allows a user to try and 
> circumvent security.  Instead, if a user does not have access to the topic, 
> the servers should act as if the topic does not exist. 
> To solve this some of the changes could include:
>   - The broker should not return a TOPIC_AUTHORIZATION(29) error for 
> requests (metadata, produce, fetch, etc) that include a topic that the user 
> does not have DESCRIBE access to.
>   - A user should not receive a TopicAuthorizationException when they do 
> not have DESCRIBE access to a topic or the cluster.
>  - The client should not maintain and expose a list of unauthorized 
> topics in org.apache.kafka.common.Cluster. 
> Other changes may be required that are not listed here. Further analysis is 
> needed. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3396) Unauthorized topics are returned to the user

2016-05-13 Thread Edoardo Comar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15282659#comment-15282659
 ] 

Edoardo Comar commented on KAFKA-3396:
--

Hi [~granthenke] I thought that if a user has DESCRIBE permission on topic yet 
to be created, 
autocreate is on, but the user has no CREATE permission on cluster, 
there is no reason to hide the name of the topic or hide the reason of the 
failure.

The fact that, in SimpleAclAuthorizer, the DESCRIBE permission only accepts the 
name of the topic or a wildcard for all topic,
makes this scenario not very likely, IMHO, but still a possibility.
Might be more common with an Authorizer implementation that allows topic names 
with wildcard 
(e.g. I could have DESCRIBE on all topics starting named like "edo-*") 

have to work on unit tests before I can make a PR

> Unauthorized topics are returned to the user
> 
>
> Key: KAFKA-3396
> URL: https://issues.apache.org/jira/browse/KAFKA-3396
> Project: Kafka
>  Issue Type: Bug
>Reporter: Grant Henke
>
> Kafka's clients and protocol exposes unauthorized topics to the end user. 
> This is often considered a security hole. To some, the topic name is 
> considered sensitive information. Those that do not consider the name 
> sensitive, still consider it more information that allows a user to try and 
> circumvent security.  Instead, if a user does not have access to the topic, 
> the servers should act as if the topic does not exist. 
> To solve this some of the changes could include:
>   - The broker should not return a TOPIC_AUTHORIZATION(29) error for 
> requests (metadata, produce, fetch, etc) that include a topic that the user 
> does not have DESCRIBE access to.
>   - A user should not receive a TopicAuthorizationException when they do 
> not have DESCRIBE access to a topic or the cluster.
>  - The client should not maintain and expose a list of unauthorized 
> topics in org.apache.kafka.common.Cluster. 
> Other changes may be required that are not listed here. Further analysis is 
> needed. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3396) Unauthorized topics are returned to the user

2016-05-11 Thread Edoardo Comar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15280197#comment-15280197
 ] 

Edoardo Comar commented on KAFKA-3396:
--

Hi 
we have worked on a simple patch that would return 
Errors.UNKNOWN_TOPIC_OR_PARTITION on a TopicMetadataRequest if the session's 
user has no DESCRIBE access to the Topic.

we would only return TOPIC_AUTHORIZATION error if
- the user has DESCRIBE 
AND
- autocreate topic is on, but the user has no CREATE permission on Cluster.

This should make an attacker, without permissions :-),  unable to distinguish 
between a topic that exists and one that doesn't exist.

Note that with Authenticator and ACL, the setting autocreate=true on the 
broker, would require autocreation of the ACL on the new topic, 
else a user is still unable to create and use a new topic.  
 
Edo and [~mimaison]

> Unauthorized topics are returned to the user
> 
>
> Key: KAFKA-3396
> URL: https://issues.apache.org/jira/browse/KAFKA-3396
> Project: Kafka
>  Issue Type: Bug
>Reporter: Grant Henke
>
> Kafka's clients and protocol exposes unauthorized topics to the end user. 
> This is often considered a security hole. To some, the topic name is 
> considered sensitive information. Those that do not consider the name 
> sensitive, still consider it more information that allows a user to try and 
> circumvent security.  Instead, if a user does not have access to the topic, 
> the servers should act as if the topic does not exist. 
> To solve this some of the changes could include:
>   - The broker should not return a TOPIC_AUTHORIZATION(29) error for 
> requests (metadata, produce, fetch, etc) that include a topic that the user 
> does not have DESCRIBE access to.
>   - A user should not receive a TopicAuthorizationException when they do 
> not have DESCRIBE access to a topic or the cluster.
>  - The client should not maintain and expose a list of unauthorized 
> topics in org.apache.kafka.common.Cluster. 
> Other changes may be required that are not listed here. Further analysis is 
> needed. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3688) Unable to start broker with sasl.mechanism.inter.broker.protocol=PLAIN

2016-05-10 Thread Edoardo Comar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15278669#comment-15278669
 ] 

Edoardo Comar commented on KAFKA-3688:
--

thanks [~rsivaram] it wasn't obvious how to express the jaas config

> Unable to start broker with sasl.mechanism.inter.broker.protocol=PLAIN
> --
>
> Key: KAFKA-3688
> URL: https://issues.apache.org/jira/browse/KAFKA-3688
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.0
>Reporter: Edoardo Comar
>
> Starting a single broker with the following configuration :
>  
> server.properties:
> listeners=SASL_PLAINTEXT://:9093
> sasl.enabled.mechanisms=PLAIN
> security.inter.broker.protocol=SASL_PLAINTEXT
> sasl.mechanism.inter.broker.protocol=PLAIN
> jaas.conf:
> KafkaServer {
>   org.apache.kafka.common.security.plain.PlainLoginModule required
>   serviceName="kafka"
>   user_edo1="edo1pwd"
>   user_edo2="edo2pwd"
>   user_superkuser="wotever";
> };
> KafkaClient {
>   org.apache.kafka.common.security.plain.PlainLoginModule required
>   serviceName="kafka"
> username="superkuser"
> password="wotever";
> };
> results in a broker startup failure “Failed to create SaslClient with 
> mechanism PLAIN” (see stack trace below).
> Note that this configuration was attempted to try working around the issue
> https://issues.apache.org/jira/browse/KAFKA-3687 
> (unable to use ACLs with security.inter.broker.protocol=PLAIN).
> [2016-05-10 16:54:10,730] INFO Failed to create channel due to  
> (org.apache.kafka.common.network.SaslChannelBuilder)
> org.apache.kafka.common.KafkaException: Failed to configure 
> SaslClientAuthenticator
>   at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.configure(SaslClientAuthenticator.java:124)
>   at 
> org.apache.kafka.common.network.SaslChannelBuilder.buildChannel(SaslChannelBuilder.java:102)
>   at org.apache.kafka.common.network.Selector.connect(Selector.java:177)
>   at 
> org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:498)
>   at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:159)
>   at 
> kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:59)
>   at 
> kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:232)
>   at 
> kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:181)
>   at 
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:180)
>   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> Caused by: org.apache.kafka.common.KafkaException: Failed to create 
> SaslClient with mechanism PLAIN
>   at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslClient(SaslClientAuthenticator.java:139)
>   at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.configure(SaslClientAuthenticator.java:122)
>   ... 9 more
> Caused by: javax.security.sasl.SaslException: Cannot get userid/password 
> [Caused by javax.security.auth.callback.UnsupportedCallbackException: Could 
> not login: the client is being asked for a password, but the Kafka client 
> code does not currently support obtaining a password from the user.]
>   at 
> com.sun.security.sasl.ClientFactoryImpl.getUserInfo(ClientFactoryImpl.java:157)
>   at 
> com.sun.security.sasl.ClientFactoryImpl.createSaslClient(ClientFactoryImpl.java:94)
>   at javax.security.sasl.Sasl.createSaslClient(Sasl.java:372)
>   at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator$1.run(SaslClientAuthenticator.java:135)
>   at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator$1.run(SaslClientAuthenticator.java:1)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:415)
>   at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslClient(SaslClientAuthenticator.java:130)
>   ... 10 more
> Caused by: javax.security.auth.callback.UnsupportedCallbackException: Could 
> not login: the client is being asked for a password, but the Kafka client 
> code does not currently support obtaining a password from the user.
>   at 
> org.apache.kafka.common.security.authenticator.SaslClientCallbackHandler.handle(SaslClientCallbackHandler.java:73)
>   at 
> com.sun.security.sasl.ClientFactoryImpl.getUserInfo(ClientFactoryImpl.java:136)
>   ... 17 more
> discovered in collaboration with [~mimaison]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-3687) Internal Authorization Failure at startup with ACLs and security.inter.broker.protocol=PLAINTEXT

2016-05-10 Thread Edoardo Comar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Edoardo Comar resolved KAFKA-3687.
--
Resolution: Not A Bug

> Internal Authorization Failure at startup with ACLs and 
> security.inter.broker.protocol=PLAINTEXT
> 
>
> Key: KAFKA-3687
> URL: https://issues.apache.org/jira/browse/KAFKA-3687
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.0
>Reporter: Edoardo Comar
>
> The root cause is this failure in the authorizer.log at server startup:
> [] DEBUG Principal = User:ANONYMOUS is Denied Operation = ClusterAction from 
> host = 192.168.10.22 on resource = Cluster:kafka-cluster 
> (kafka.authorizer.logger)
> and has the *consequence that it's impossible to authorize a producer*.
> Steps to reproduce :
> [~mimaison] and I  launched a single broker and a console producer,
> using the SASL_PLAIN authentication between producer and broker.
> Created a topic "testtopic".
> The broker is configured with the SimpleAclAuthorizer
> and the ACL created for the producer is  :
> ./kafkacls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add 
> --allow-principal User:edo1  --producer --topic testtopic
> which results in :
> Current ACLs for resource `Topic:testtopic`: 
>   User:edo1 has Allow permission for operations: Write from hosts: *
>   User:edo1 has Allow permission for operations: Describe from hosts: * 
> Current ACLs for resource `Cluster:kafka-cluster`: 
>   User:edo1 has Allow permission for operations: Create from hosts: * 
> However running the producer, we got:
> [] WARN Error while fetching metadata with correlation id 0 : 
> {testtopic=UNKNOWN_TOPIC_OR_PARTITION} 
> (org.apache.kafka.clients.NetworkClient)
> Looking at the code, the UpdateMedataRequest that the single broker sends to 
> itself on startup, 
> which come through as a request with User:ANONYMOUS get denied by the 
> Authoriser 
> (in KafkaApis.handleUpdateMetadataRequest)
> and the MetadataCache is never updated.
> When the first producer requests come through, in 
> KafkaApis.handleTopicMetadataRequest
> the MetadataCache is empty and so we get UNKNOWN_TOPIC_OR_PARTITION.
> 
> configuration used:
> server.properties:
> listeners=PLAINTEXT://:9092,SASL_PLAINTEXT://:9093
> sasl.enabled.mechanisms=PLAIN
> authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
> producer.properties
> bootstrap.servers=localhost:9093
> security.protocol=SASL_PLAINTEXT
> sasl.mechanism=PLAIN
> producer jaas.conf
> KafkaClient {
>   org.apache.kafka.common.security.plain.PlainLoginModule required
>   serviceName="kafka"
> username="edo1"
> password="edo1pwd";
> };



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-3688) Unable to start broker with sasl.mechanism.inter.broker.protocol=PLAIN

2016-05-10 Thread Edoardo Comar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3688?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Edoardo Comar resolved KAFKA-3688.
--
Resolution: Invalid

> Unable to start broker with sasl.mechanism.inter.broker.protocol=PLAIN
> --
>
> Key: KAFKA-3688
> URL: https://issues.apache.org/jira/browse/KAFKA-3688
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.0
>Reporter: Edoardo Comar
>
> Starting a single broker with the following configuration :
>  
> server.properties:
> listeners=SASL_PLAINTEXT://:9093
> sasl.enabled.mechanisms=PLAIN
> security.inter.broker.protocol=SASL_PLAINTEXT
> sasl.mechanism.inter.broker.protocol=PLAIN
> jaas.conf:
> KafkaServer {
>   org.apache.kafka.common.security.plain.PlainLoginModule required
>   serviceName="kafka"
>   user_edo1="edo1pwd"
>   user_edo2="edo2pwd"
>   user_superkuser="wotever";
> };
> KafkaClient {
>   org.apache.kafka.common.security.plain.PlainLoginModule required
>   serviceName="kafka"
> username="superkuser"
> password="wotever";
> };
> results in a broker startup failure “Failed to create SaslClient with 
> mechanism PLAIN” (see stack trace below).
> Note that this configuration was attempted to try working around the issue
> https://issues.apache.org/jira/browse/KAFKA-3687 
> (unable to use ACLs with security.inter.broker.protocol=PLAIN).
> [2016-05-10 16:54:10,730] INFO Failed to create channel due to  
> (org.apache.kafka.common.network.SaslChannelBuilder)
> org.apache.kafka.common.KafkaException: Failed to configure 
> SaslClientAuthenticator
>   at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.configure(SaslClientAuthenticator.java:124)
>   at 
> org.apache.kafka.common.network.SaslChannelBuilder.buildChannel(SaslChannelBuilder.java:102)
>   at org.apache.kafka.common.network.Selector.connect(Selector.java:177)
>   at 
> org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:498)
>   at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:159)
>   at 
> kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:59)
>   at 
> kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:232)
>   at 
> kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:181)
>   at 
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:180)
>   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> Caused by: org.apache.kafka.common.KafkaException: Failed to create 
> SaslClient with mechanism PLAIN
>   at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslClient(SaslClientAuthenticator.java:139)
>   at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.configure(SaslClientAuthenticator.java:122)
>   ... 9 more
> Caused by: javax.security.sasl.SaslException: Cannot get userid/password 
> [Caused by javax.security.auth.callback.UnsupportedCallbackException: Could 
> not login: the client is being asked for a password, but the Kafka client 
> code does not currently support obtaining a password from the user.]
>   at 
> com.sun.security.sasl.ClientFactoryImpl.getUserInfo(ClientFactoryImpl.java:157)
>   at 
> com.sun.security.sasl.ClientFactoryImpl.createSaslClient(ClientFactoryImpl.java:94)
>   at javax.security.sasl.Sasl.createSaslClient(Sasl.java:372)
>   at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator$1.run(SaslClientAuthenticator.java:135)
>   at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator$1.run(SaslClientAuthenticator.java:1)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:415)
>   at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslClient(SaslClientAuthenticator.java:130)
>   ... 10 more
> Caused by: javax.security.auth.callback.UnsupportedCallbackException: Could 
> not login: the client is being asked for a password, but the Kafka client 
> code does not currently support obtaining a password from the user.
>   at 
> org.apache.kafka.common.security.authenticator.SaslClientCallbackHandler.handle(SaslClientCallbackHandler.java:73)
>   at 
> com.sun.security.sasl.ClientFactoryImpl.getUserInfo(ClientFactoryImpl.java:136)
>   ... 17 more
> discovered in collaboration with [~mimaison]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3687) Internal Authorization Failure at startup with ACLs and security.inter.broker.protocol=PLAINTEXT

2016-05-10 Thread Edoardo Comar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Edoardo Comar updated KAFKA-3687:
-
Summary: Internal Authorization Failure at startup with ACLs and 
security.inter.broker.protocol=PLAINTEXT  (was: Internal Authorization Failure 
at startup with ACLs and security.inter.broker.protocol=PLAIN)

> Internal Authorization Failure at startup with ACLs and 
> security.inter.broker.protocol=PLAINTEXT
> 
>
> Key: KAFKA-3687
> URL: https://issues.apache.org/jira/browse/KAFKA-3687
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.0
>Reporter: Edoardo Comar
>
> The root cause is this failure in the authorizer.log at server startup:
> [] DEBUG Principal = User:ANONYMOUS is Denied Operation = ClusterAction from 
> host = 192.168.10.22 on resource = Cluster:kafka-cluster 
> (kafka.authorizer.logger)
> and has the *consequence that it's impossible to authorize a producer*.
> Steps to reproduce :
> [~mimaison] and I  launched a single broker and a console producer,
> using the SASL_PLAIN authentication between producer and broker.
> Created a topic "testtopic".
> The broker is configured with the SimpleAclAuthorizer
> and the ACL created for the producer is  :
> ./kafkacls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add 
> --allow-principal User:edo1  --producer --topic testtopic
> which results in :
> Current ACLs for resource `Topic:testtopic`: 
>   User:edo1 has Allow permission for operations: Write from hosts: *
>   User:edo1 has Allow permission for operations: Describe from hosts: * 
> Current ACLs for resource `Cluster:kafka-cluster`: 
>   User:edo1 has Allow permission for operations: Create from hosts: * 
> However running the producer, we got:
> [] WARN Error while fetching metadata with correlation id 0 : 
> {testtopic=UNKNOWN_TOPIC_OR_PARTITION} 
> (org.apache.kafka.clients.NetworkClient)
> Looking at the code, the UpdateMedataRequest that the single broker sends to 
> itself on startup, 
> which come through as a request with User:ANONYMOUS get denied by the 
> Authoriser 
> (in KafkaApis.handleUpdateMetadataRequest)
> and the MetadataCache is never updated.
> When the first producer requests come through, in 
> KafkaApis.handleTopicMetadataRequest
> the MetadataCache is empty and so we get UNKNOWN_TOPIC_OR_PARTITION.
> 
> configuration used:
> server.properties:
> listeners=PLAINTEXT://:9092,SASL_PLAINTEXT://:9093
> sasl.enabled.mechanisms=PLAIN
> authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
> producer.properties
> bootstrap.servers=localhost:9093
> security.protocol=SASL_PLAINTEXT
> sasl.mechanism=PLAIN
> producer jaas.conf
> KafkaClient {
>   org.apache.kafka.common.security.plain.PlainLoginModule required
>   serviceName="kafka"
> username="edo1"
> password="edo1pwd";
> };



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3688) Unable to start broker with sasl.mechanism.inter.broker.protocol=PLAIN

2016-05-10 Thread Edoardo Comar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15278511#comment-15278511
 ] 

Edoardo Comar commented on KAFKA-3688:
--

Hi, that's what I've done by adding the KafkaClient section in the server's 
jaas.conf

> Unable to start broker with sasl.mechanism.inter.broker.protocol=PLAIN
> --
>
> Key: KAFKA-3688
> URL: https://issues.apache.org/jira/browse/KAFKA-3688
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.0
>Reporter: Edoardo Comar
>
> Starting a single broker with the following configuration :
>  
> server.properties:
> listeners=SASL_PLAINTEXT://:9093
> sasl.enabled.mechanisms=PLAIN
> security.inter.broker.protocol=SASL_PLAINTEXT
> sasl.mechanism.inter.broker.protocol=PLAIN
> jaas.conf:
> KafkaServer {
>   org.apache.kafka.common.security.plain.PlainLoginModule required
>   serviceName="kafka"
>   user_edo1="edo1pwd"
>   user_edo2="edo2pwd"
>   user_superkuser="wotever";
> };
> KafkaClient {
>   org.apache.kafka.common.security.plain.PlainLoginModule required
>   serviceName="kafka"
> username="superkuser"
> password="wotever";
> };
> results in a broker startup failure “Failed to create SaslClient with 
> mechanism PLAIN” (see stack trace below).
> Note that this configuration was attempted to try working around the issue
> https://issues.apache.org/jira/browse/KAFKA-3687 
> (unable to use ACLs with security.inter.broker.protocol=PLAIN).
> [2016-05-10 16:54:10,730] INFO Failed to create channel due to  
> (org.apache.kafka.common.network.SaslChannelBuilder)
> org.apache.kafka.common.KafkaException: Failed to configure 
> SaslClientAuthenticator
>   at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.configure(SaslClientAuthenticator.java:124)
>   at 
> org.apache.kafka.common.network.SaslChannelBuilder.buildChannel(SaslChannelBuilder.java:102)
>   at org.apache.kafka.common.network.Selector.connect(Selector.java:177)
>   at 
> org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:498)
>   at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:159)
>   at 
> kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:59)
>   at 
> kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:232)
>   at 
> kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:181)
>   at 
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:180)
>   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> Caused by: org.apache.kafka.common.KafkaException: Failed to create 
> SaslClient with mechanism PLAIN
>   at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslClient(SaslClientAuthenticator.java:139)
>   at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.configure(SaslClientAuthenticator.java:122)
>   ... 9 more
> Caused by: javax.security.sasl.SaslException: Cannot get userid/password 
> [Caused by javax.security.auth.callback.UnsupportedCallbackException: Could 
> not login: the client is being asked for a password, but the Kafka client 
> code does not currently support obtaining a password from the user.]
>   at 
> com.sun.security.sasl.ClientFactoryImpl.getUserInfo(ClientFactoryImpl.java:157)
>   at 
> com.sun.security.sasl.ClientFactoryImpl.createSaslClient(ClientFactoryImpl.java:94)
>   at javax.security.sasl.Sasl.createSaslClient(Sasl.java:372)
>   at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator$1.run(SaslClientAuthenticator.java:135)
>   at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator$1.run(SaslClientAuthenticator.java:1)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:415)
>   at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslClient(SaslClientAuthenticator.java:130)
>   ... 10 more
> Caused by: javax.security.auth.callback.UnsupportedCallbackException: Could 
> not login: the client is being asked for a password, but the Kafka client 
> code does not currently support obtaining a password from the user.
>   at 
> org.apache.kafka.common.security.authenticator.SaslClientCallbackHandler.handle(SaslClientCallbackHandler.java:73)
>   at 
> com.sun.security.sasl.ClientFactoryImpl.getUserInfo(ClientFactoryImpl.java:136)
>   ... 17 more
> discovered in collaboration with [~mimaison]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3688) Unable to start broker with sasl.mechanism.inter.broker.protocol=PLAIN

2016-05-10 Thread Edoardo Comar (JIRA)
Edoardo Comar created KAFKA-3688:


 Summary: Unable to start broker with 
sasl.mechanism.inter.broker.protocol=PLAIN
 Key: KAFKA-3688
 URL: https://issues.apache.org/jira/browse/KAFKA-3688
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.10.0.0
Reporter: Edoardo Comar


Starting a single broker with the following configuration :
 
server.properties:
listeners=SASL_PLAINTEXT://:9093
sasl.enabled.mechanisms=PLAIN
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN

jaas.conf:
KafkaServer {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  serviceName="kafka"
  user_edo1="edo1pwd"
  user_edo2="edo2pwd"
  user_superkuser="wotever";
};


KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  serviceName="kafka"
username="superkuser"
password="wotever";
};


results in a broker startup failure “Failed to create SaslClient with mechanism 
PLAIN” (see stack trace below).

Note that this configuration was attempted to try working around the issue
https://issues.apache.org/jira/browse/KAFKA-3687 
(unable to use ACLs with security.inter.broker.protocol=PLAIN).


[2016-05-10 16:54:10,730] INFO Failed to create channel due to  
(org.apache.kafka.common.network.SaslChannelBuilder)
org.apache.kafka.common.KafkaException: Failed to configure 
SaslClientAuthenticator
at 
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.configure(SaslClientAuthenticator.java:124)
at 
org.apache.kafka.common.network.SaslChannelBuilder.buildChannel(SaslChannelBuilder.java:102)
at org.apache.kafka.common.network.Selector.connect(Selector.java:177)
at 
org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:498)
at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:159)
at 
kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:59)
at 
kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:232)
at 
kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:181)
at 
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:180)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
Caused by: org.apache.kafka.common.KafkaException: Failed to create SaslClient 
with mechanism PLAIN
at 
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslClient(SaslClientAuthenticator.java:139)
at 
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.configure(SaslClientAuthenticator.java:122)
... 9 more
Caused by: javax.security.sasl.SaslException: Cannot get userid/password 
[Caused by javax.security.auth.callback.UnsupportedCallbackException: Could not 
login: the client is being asked for a password, but the Kafka client code does 
not currently support obtaining a password from the user.]
at 
com.sun.security.sasl.ClientFactoryImpl.getUserInfo(ClientFactoryImpl.java:157)
at 
com.sun.security.sasl.ClientFactoryImpl.createSaslClient(ClientFactoryImpl.java:94)
at javax.security.sasl.Sasl.createSaslClient(Sasl.java:372)
at 
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator$1.run(SaslClientAuthenticator.java:135)
at 
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator$1.run(SaslClientAuthenticator.java:1)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at 
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslClient(SaslClientAuthenticator.java:130)
... 10 more
Caused by: javax.security.auth.callback.UnsupportedCallbackException: Could not 
login: the client is being asked for a password, but the Kafka client code does 
not currently support obtaining a password from the user.
at 
org.apache.kafka.common.security.authenticator.SaslClientCallbackHandler.handle(SaslClientCallbackHandler.java:73)
at 
com.sun.security.sasl.ClientFactoryImpl.getUserInfo(ClientFactoryImpl.java:136)
... 17 more


discovered in collaboration with [~mimaison]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3687) Internal Authorization Failure at startup with ACLs and security.inter.broker.protocol=PLAIN

2016-05-10 Thread Edoardo Comar (JIRA)
Edoardo Comar created KAFKA-3687:


 Summary: Internal Authorization Failure at startup with ACLs and 
security.inter.broker.protocol=PLAIN
 Key: KAFKA-3687
 URL: https://issues.apache.org/jira/browse/KAFKA-3687
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.10.0.0
Reporter: Edoardo Comar


The root cause is this failure in the authorizer.log at server startup:

[] DEBUG Principal = User:ANONYMOUS is Denied Operation = ClusterAction from 
host = 192.168.10.22 on resource = Cluster:kafka-cluster 
(kafka.authorizer.logger)

and has the *consequence that it's impossible to authorize a producer*.

Steps to reproduce :

[~mimaison] and I  launched a single broker and a console producer,
using the SASL_PLAIN authentication between producer and broker.

Created a topic "testtopic".

The broker is configured with the SimpleAclAuthorizer
and the ACL created for the producer is  :

./kafkacls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add 
--allow-principal User:edo1  --producer --topic testtopic

which results in :
Current ACLs for resource `Topic:testtopic`: 
User:edo1 has Allow permission for operations: Write from hosts: *
User:edo1 has Allow permission for operations: Describe from hosts: * 

Current ACLs for resource `Cluster:kafka-cluster`: 
User:edo1 has Allow permission for operations: Create from hosts: * 


However running the producer, we got:
[] WARN Error while fetching metadata with correlation id 0 : 
{testtopic=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient)

Looking at the code, the UpdateMedataRequest that the single broker sends to 
itself on startup, 
which come through as a request with User:ANONYMOUS get denied by the 
Authoriser 
(in KafkaApis.handleUpdateMetadataRequest)
and the MetadataCache is never updated.

When the first producer requests come through, in 
KafkaApis.handleTopicMetadataRequest
the MetadataCache is empty and so we get UNKNOWN_TOPIC_OR_PARTITION.



configuration used:

server.properties:
listeners=PLAINTEXT://:9092,SASL_PLAINTEXT://:9093
sasl.enabled.mechanisms=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

producer.properties
bootstrap.servers=localhost:9093
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

producer jaas.conf
KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  serviceName="kafka"
username="edo1"
password="edo1pwd";
};




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3682) ArrayIndexOutOfBoundsException thrown by SkimpyOffsetMap.get() when full

2016-05-09 Thread Edoardo Comar (JIRA)
Edoardo Comar created KAFKA-3682:


 Summary: ArrayIndexOutOfBoundsException thrown by 
SkimpyOffsetMap.get() when full
 Key: KAFKA-3682
 URL: https://issues.apache.org/jira/browse/KAFKA-3682
 Project: Kafka
  Issue Type: Bug
  Components: core
Reporter: Edoardo Comar
Assignee: Edoardo Comar


When the SkimpyOffsetMap is full, a request for a key that is not in the map 
will throw an ArrayIndexOutOfBoundsException as the number of internal attempts 
overflows after Integer.MAXINT



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3679) Allow reuse of implementation of RFC 4616 in PlainSaslServer

2016-05-09 Thread Edoardo Comar (JIRA)
Edoardo Comar created KAFKA-3679:


 Summary: Allow reuse of implementation of RFC 4616 in 
PlainSaslServer 
 Key: KAFKA-3679
 URL: https://issues.apache.org/jira/browse/KAFKA-3679
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Affects Versions: 0.10.0.0
Reporter: Edoardo Comar
Assignee: Edoardo Comar


Using SASL PLAIN in production may require a different username/password 
checking than what is currently in the codebase, based on data contained in the 
server jaas.conf.

To do so, a deployment needs to extend the SaslPlainServer as described here
http://kafka.apache.org/0100/documentation.html#security_sasl_plain_production

However the evaluate(byes) method still needs to impleemnt RFC4616, so it is 
useful to separate the password checking from the reading of the data from the 
wire. 
A simple extract method into an overridable methos should suffice



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3587) LogCleaner fails due to incorrect offset map computation on a replica

2016-05-06 Thread Edoardo Comar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3587?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15274977#comment-15274977
 ] 

Edoardo Comar commented on KAFKA-3587:
--

[~junrao] the difference between option 1 and option 2 is that with the latter 
we would guarantee that deduplication of a segment would alway retain a value 
that came from one of the compacted segments, while with option 1 the value 
might come from a 'later' segment. 
However option 1 as you observed, satisfies the log deduplication contract 
(though not the individual segment deduplication) and so it may well suffice as 
it's simpler and more performant.

> LogCleaner fails due to incorrect offset map computation on a replica
> -
>
> Key: KAFKA-3587
> URL: https://issues.apache.org/jira/browse/KAFKA-3587
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.1
> Environment: Linux
>Reporter: Kiran Pillarisetty
>Assignee: Edoardo Comar
> Fix For: 0.10.0.0
>
> Attachments: 0001-POC-improving-deduping-segments.patch
>
>
> Log Cleaner fails to compact a segment even when the number of messages in it 
> is less than the offset map.
> In version 0.9.0.1, (LogCleaner.scala -> buildOffsetMap()), LogCleaner 
> computes segment size by subtracting segment's base offset from the latest 
> offset ("segmentSize = segment.nextOffset() - segment.baseOffset").  This 
> works fine until you create another replica. When you create a replica, it's 
> segment could contain data which is already compacted on other brokers. 
> Depending up on the type of data, offset difference could be too big, larger 
> than the offset map (maxDesiredMapSize), and that causes LogCleaner to fail 
> on that segment.
> Scenario:
> - Kafka 0.9.0.1
> - Cluster has two brokers.
> - Server.properties:
> log.cleaner.enable=true
> log.cleaner.dedupe.buffer.size=10485760 #10MB
> log.roll.ms=30
> delete.topic.enable=true
> log.cleanup.policy=compact
> Steps to reproduce:
> 1. Create a topic with replication-factor of 1.
> ./kafka-topics.sh --zookeeper=localhost:2181 --create --topic 
> test.log.compact.1M --partitions 1 --replication-factor 1 --config 
> cleanup.policy=compact --config segment.ms=30
> 2. Use kafka-console-producer.sh to produce a single message with the 
> following key:
> LC1,{"test": "xyz"}
> 3. Use  kafka-console-producer.sh to produce a large number of messages with 
> the following key:
> LC2,{"test": "abc"}
> 4. Let log cleaner run. Make sure log is compacted.  Verify with:
>  ./kafka-run-class.sh kafka.tools.DumpLogSegments  --files 
> .log  --print-data-log
> Dumping .log
> Starting offset: 0
> offset: 0 position: 0 isvalid: true payloadsize: 11 magic: 0 compresscodec: 
> NoCompressionCodec crc: 3067045277 keysize: 11 key: LC1 payload: {"test": 
> "xyz"}
> offset: 7869818 position: 48 isvalid: true payloadsize: 11 magic: 0 
> compresscodec: NoCompressionCodec crc: 2668089711 keysize: 11 key: LC2 
> payload: {"test": "abc"}
> 5.  Increase Replication Factor to 2.  Followed these steps: 
> http://kafka.apache.org/documentation.html#basic_ops_increase_replication_factor
> 6. Notice that log cleaner fails to compact the newly created replica with 
> the following error.
> [2016-04-18 14:49:45,599] ERROR [kafka-log-cleaner-thread-0], Error due to  
> (kafka.log.LogCleaner)
> java.lang.IllegalArgumentException: requirement failed: 7206179 messages in 
> segment test.log.compact.1M-0/.log but offset map can fit 
> only 393215. You can increase log.cleaner.dedupe.buffer.size or decrease 
> log.cleaner.threads
> at scala.Predef$.require(Predef.scala:219)
> at 
> kafka.log.Cleaner$$anonfun$buildOffsetMap$4.apply(LogCleaner.scala:584)
> at 
> kafka.log.Cleaner$$anonfun$buildOffsetMap$4.apply(LogCleaner.scala:580)
> at 
> scala.collection.immutable.Stream$StreamWithFilter.foreach(Stream.scala:570)
> at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:580)
> at kafka.log.Cleaner.clean(LogCleaner.scala:322)
> at 
> kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:230)
> at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:208)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2016-04-18 14:49:45,601] INFO [kafka-log-cleaner-thread-0], Stopped  
> (kafka.log.LogCleaner)
> 7. Examine the entries in the replica segment:
> ./kafka-run-class.sh kafka.tools.DumpLogSegments --files 
> .log  --print-data-log
> There are only 218418 messages in that segment.
> However, Log Cleaner seems to think that there are 7206179 messages in that 
> segment (as per the above error)
> Error stems from this line in LogCleaner.sca

[jira] [Commented] (KAFKA-3587) LogCleaner fails due to incorrect offset map computation on a replica

2016-05-06 Thread Edoardo Comar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3587?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15274622#comment-15274622
 ] 

Edoardo Comar commented on KAFKA-3587:
--

[~junrao] thanks for your observation 
https://issues.apache.org/jira/browse/KAFKA-3587?focusedCommentId=15274372&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15274372

The idea for option 1 is NOT to clean the segment for which we the map was 
partially loaded.
But to clean the segments up to the one which was fully map loaded.

Even if the clean would use potentially values coming from a subsequent 
segment, these are "newer" than the ones they replace and so would satisfy the 
contract of deduplication.

This is what we implemented in [~mimaison] PR

> LogCleaner fails due to incorrect offset map computation on a replica
> -
>
> Key: KAFKA-3587
> URL: https://issues.apache.org/jira/browse/KAFKA-3587
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.1
> Environment: Linux
>Reporter: Kiran Pillarisetty
>Assignee: Edoardo Comar
> Attachments: 0001-POC-improving-deduping-segments.patch
>
>
> Log Cleaner fails to compact a segment even when the number of messages in it 
> is less than the offset map.
> In version 0.9.0.1, (LogCleaner.scala -> buildOffsetMap()), LogCleaner 
> computes segment size by subtracting segment's base offset from the latest 
> offset ("segmentSize = segment.nextOffset() - segment.baseOffset").  This 
> works fine until you create another replica. When you create a replica, it's 
> segment could contain data which is already compacted on other brokers. 
> Depending up on the type of data, offset difference could be too big, larger 
> than the offset map (maxDesiredMapSize), and that causes LogCleaner to fail 
> on that segment.
> Scenario:
> - Kafka 0.9.0.1
> - Cluster has two brokers.
> - Server.properties:
> log.cleaner.enable=true
> log.cleaner.dedupe.buffer.size=10485760 #10MB
> log.roll.ms=30
> delete.topic.enable=true
> log.cleanup.policy=compact
> Steps to reproduce:
> 1. Create a topic with replication-factor of 1.
> ./kafka-topics.sh --zookeeper=localhost:2181 --create --topic 
> test.log.compact.1M --partitions 1 --replication-factor 1 --config 
> cleanup.policy=compact --config segment.ms=30
> 2. Use kafka-console-producer.sh to produce a single message with the 
> following key:
> LC1,{"test": "xyz"}
> 3. Use  kafka-console-producer.sh to produce a large number of messages with 
> the following key:
> LC2,{"test": "abc"}
> 4. Let log cleaner run. Make sure log is compacted.  Verify with:
>  ./kafka-run-class.sh kafka.tools.DumpLogSegments  --files 
> .log  --print-data-log
> Dumping .log
> Starting offset: 0
> offset: 0 position: 0 isvalid: true payloadsize: 11 magic: 0 compresscodec: 
> NoCompressionCodec crc: 3067045277 keysize: 11 key: LC1 payload: {"test": 
> "xyz"}
> offset: 7869818 position: 48 isvalid: true payloadsize: 11 magic: 0 
> compresscodec: NoCompressionCodec crc: 2668089711 keysize: 11 key: LC2 
> payload: {"test": "abc"}
> 5.  Increase Replication Factor to 2.  Followed these steps: 
> http://kafka.apache.org/documentation.html#basic_ops_increase_replication_factor
> 6. Notice that log cleaner fails to compact the newly created replica with 
> the following error.
> [2016-04-18 14:49:45,599] ERROR [kafka-log-cleaner-thread-0], Error due to  
> (kafka.log.LogCleaner)
> java.lang.IllegalArgumentException: requirement failed: 7206179 messages in 
> segment test.log.compact.1M-0/.log but offset map can fit 
> only 393215. You can increase log.cleaner.dedupe.buffer.size or decrease 
> log.cleaner.threads
> at scala.Predef$.require(Predef.scala:219)
> at 
> kafka.log.Cleaner$$anonfun$buildOffsetMap$4.apply(LogCleaner.scala:584)
> at 
> kafka.log.Cleaner$$anonfun$buildOffsetMap$4.apply(LogCleaner.scala:580)
> at 
> scala.collection.immutable.Stream$StreamWithFilter.foreach(Stream.scala:570)
> at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:580)
> at kafka.log.Cleaner.clean(LogCleaner.scala:322)
> at 
> kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:230)
> at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:208)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2016-04-18 14:49:45,601] INFO [kafka-log-cleaner-thread-0], Stopped  
> (kafka.log.LogCleaner)
> 7. Examine the entries in the replica segment:
> ./kafka-run-class.sh kafka.tools.DumpLogSegments --files 
> .log  --print-data-log
> There are only 218418 messages in that segment.
> However, Log Cleaner seems to think that there are 7206179 mess

[jira] [Commented] (KAFKA-3587) LogCleaner fails due to incorrect offset map computation on a replica

2016-05-06 Thread Edoardo Comar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3587?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15274204#comment-15274204
 ] 

Edoardo Comar commented on KAFKA-3587:
--

For clarity, [~mimaison] PR https://github.com/apache/kafka/pull/1332 
impleemnts the 1st of the two options I mentioned in my previous comment 
https://issues.apache.org/jira/browse/KAFKA-3587?focusedCommentId=15273821&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15273821

> LogCleaner fails due to incorrect offset map computation on a replica
> -
>
> Key: KAFKA-3587
> URL: https://issues.apache.org/jira/browse/KAFKA-3587
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.1
> Environment: Linux
>Reporter: Kiran Pillarisetty
>Assignee: Edoardo Comar
> Attachments: 0001-POC-improving-deduping-segments.patch
>
>
> Log Cleaner fails to compact a segment even when the number of messages in it 
> is less than the offset map.
> In version 0.9.0.1, (LogCleaner.scala -> buildOffsetMap()), LogCleaner 
> computes segment size by subtracting segment's base offset from the latest 
> offset ("segmentSize = segment.nextOffset() - segment.baseOffset").  This 
> works fine until you create another replica. When you create a replica, it's 
> segment could contain data which is already compacted on other brokers. 
> Depending up on the type of data, offset difference could be too big, larger 
> than the offset map (maxDesiredMapSize), and that causes LogCleaner to fail 
> on that segment.
> Scenario:
> - Kafka 0.9.0.1
> - Cluster has two brokers.
> - Server.properties:
> log.cleaner.enable=true
> log.cleaner.dedupe.buffer.size=10485760 #10MB
> log.roll.ms=30
> delete.topic.enable=true
> log.cleanup.policy=compact
> Steps to reproduce:
> 1. Create a topic with replication-factor of 1.
> ./kafka-topics.sh --zookeeper=localhost:2181 --create --topic 
> test.log.compact.1M --partitions 1 --replication-factor 1 --config 
> cleanup.policy=compact --config segment.ms=30
> 2. Use kafka-console-producer.sh to produce a single message with the 
> following key:
> LC1,{"test": "xyz"}
> 3. Use  kafka-console-producer.sh to produce a large number of messages with 
> the following key:
> LC2,{"test": "abc"}
> 4. Let log cleaner run. Make sure log is compacted.  Verify with:
>  ./kafka-run-class.sh kafka.tools.DumpLogSegments  --files 
> .log  --print-data-log
> Dumping .log
> Starting offset: 0
> offset: 0 position: 0 isvalid: true payloadsize: 11 magic: 0 compresscodec: 
> NoCompressionCodec crc: 3067045277 keysize: 11 key: LC1 payload: {"test": 
> "xyz"}
> offset: 7869818 position: 48 isvalid: true payloadsize: 11 magic: 0 
> compresscodec: NoCompressionCodec crc: 2668089711 keysize: 11 key: LC2 
> payload: {"test": "abc"}
> 5.  Increase Replication Factor to 2.  Followed these steps: 
> http://kafka.apache.org/documentation.html#basic_ops_increase_replication_factor
> 6. Notice that log cleaner fails to compact the newly created replica with 
> the following error.
> [2016-04-18 14:49:45,599] ERROR [kafka-log-cleaner-thread-0], Error due to  
> (kafka.log.LogCleaner)
> java.lang.IllegalArgumentException: requirement failed: 7206179 messages in 
> segment test.log.compact.1M-0/.log but offset map can fit 
> only 393215. You can increase log.cleaner.dedupe.buffer.size or decrease 
> log.cleaner.threads
> at scala.Predef$.require(Predef.scala:219)
> at 
> kafka.log.Cleaner$$anonfun$buildOffsetMap$4.apply(LogCleaner.scala:584)
> at 
> kafka.log.Cleaner$$anonfun$buildOffsetMap$4.apply(LogCleaner.scala:580)
> at 
> scala.collection.immutable.Stream$StreamWithFilter.foreach(Stream.scala:570)
> at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:580)
> at kafka.log.Cleaner.clean(LogCleaner.scala:322)
> at 
> kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:230)
> at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:208)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2016-04-18 14:49:45,601] INFO [kafka-log-cleaner-thread-0], Stopped  
> (kafka.log.LogCleaner)
> 7. Examine the entries in the replica segment:
> ./kafka-run-class.sh kafka.tools.DumpLogSegments --files 
> .log  --print-data-log
> There are only 218418 messages in that segment.
> However, Log Cleaner seems to think that there are 7206179 messages in that 
> segment (as per the above error)
> Error stems from this line in LogCleaner.scala:
> """val segmentSize = segment.nextOffset() - segment.baseOffset"""
> In Replica's log segment file ( .log), ending offset is 
> 7206178. Beginning offset is 0.  That

[jira] [Commented] (KAFKA-3587) LogCleaner fails due to incorrect offset map computation on a replica

2016-05-06 Thread Edoardo Comar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3587?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15273821#comment-15273821
 ] 

Edoardo Comar commented on KAFKA-3587:
--

[~junrao] I was thinking of two possible optimistic approaches (if you allow me 
to call 'pessimistic' the current one :-).

1 - try scanning all dirty segments without the pessimistic check; use the 
(only) map until full; 
this may allow to process all dirty segment (optimism) or may happen in the 
middle of a dirt segment. 
In either case, do compaction using the map loaded that way.
When compaction takes place, if the map became full in the mid of a segment, 
compaction may replace some values in the compacted segments with values that 
actually come from the last partially examined segment - but this should not 
affect correctness of consumer for the topic as the topic actually has such 
values for a given key and they are newer than the values contained in the 
compacted segments.

OR

2 - try scanning all dirty segments without the pessimistic check, but, if the 
map gets full, save as state the last segment that was fully loaded without 
filling the map, and restart compaction for that log, this time stopping at the 
segment that we now know will fit.
So in the unlucky case that compaction has not enough memory for processing all 
segments - this incurs a perf penalty.
But such a case is rendered less likely to happen than now for a given amount 
of memory, because of the optimistic use of the map.
If a single segment has too many different keys for the map - then that segment 
is not compactable - as happens now.


> LogCleaner fails due to incorrect offset map computation on a replica
> -
>
> Key: KAFKA-3587
> URL: https://issues.apache.org/jira/browse/KAFKA-3587
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.1
> Environment: Linux
>Reporter: Kiran Pillarisetty
>Assignee: Edoardo Comar
> Attachments: 0001-POC-improving-deduping-segments.patch
>
>
> Log Cleaner fails to compact a segment even when the number of messages in it 
> is less than the offset map.
> In version 0.9.0.1, (LogCleaner.scala -> buildOffsetMap()), LogCleaner 
> computes segment size by subtracting segment's base offset from the latest 
> offset ("segmentSize = segment.nextOffset() - segment.baseOffset").  This 
> works fine until you create another replica. When you create a replica, it's 
> segment could contain data which is already compacted on other brokers. 
> Depending up on the type of data, offset difference could be too big, larger 
> than the offset map (maxDesiredMapSize), and that causes LogCleaner to fail 
> on that segment.
> Scenario:
> - Kafka 0.9.0.1
> - Cluster has two brokers.
> - Server.properties:
> log.cleaner.enable=true
> log.cleaner.dedupe.buffer.size=10485760 #10MB
> log.roll.ms=30
> delete.topic.enable=true
> log.cleanup.policy=compact
> Steps to reproduce:
> 1. Create a topic with replication-factor of 1.
> ./kafka-topics.sh --zookeeper=localhost:2181 --create --topic 
> test.log.compact.1M --partitions 1 --replication-factor 1 --config 
> cleanup.policy=compact --config segment.ms=30
> 2. Use kafka-console-producer.sh to produce a single message with the 
> following key:
> LC1,{"test": "xyz"}
> 3. Use  kafka-console-producer.sh to produce a large number of messages with 
> the following key:
> LC2,{"test": "abc"}
> 4. Let log cleaner run. Make sure log is compacted.  Verify with:
>  ./kafka-run-class.sh kafka.tools.DumpLogSegments  --files 
> .log  --print-data-log
> Dumping .log
> Starting offset: 0
> offset: 0 position: 0 isvalid: true payloadsize: 11 magic: 0 compresscodec: 
> NoCompressionCodec crc: 3067045277 keysize: 11 key: LC1 payload: {"test": 
> "xyz"}
> offset: 7869818 position: 48 isvalid: true payloadsize: 11 magic: 0 
> compresscodec: NoCompressionCodec crc: 2668089711 keysize: 11 key: LC2 
> payload: {"test": "abc"}
> 5.  Increase Replication Factor to 2.  Followed these steps: 
> http://kafka.apache.org/documentation.html#basic_ops_increase_replication_factor
> 6. Notice that log cleaner fails to compact the newly created replica with 
> the following error.
> [2016-04-18 14:49:45,599] ERROR [kafka-log-cleaner-thread-0], Error due to  
> (kafka.log.LogCleaner)
> java.lang.IllegalArgumentException: requirement failed: 7206179 messages in 
> segment test.log.compact.1M-0/.log but offset map can fit 
> only 393215. You can increase log.cleaner.dedupe.buffer.size or decrease 
> log.cleaner.threads
> at scala.Predef$.require(Predef.scala:219)
> at 
> kafka.log.Cleaner$$anonfun$buildOffsetMap$4.apply(LogCleaner.scala:584)
> at 
> kafka.log.Cleaner$$anonfun$buildOffsetMap$4.

[jira] [Commented] (KAFKA-3587) LogCleaner fails due to incorrect offset map computation on a replica

2016-05-05 Thread Edoardo Comar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3587?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15273482#comment-15273482
 ] 

Edoardo Comar commented on KAFKA-3587:
--

[~ishiihara] In your PR the very pessimistic check 
val segmentSize = segment.nextOffset() - segment.baseOffset
is still used. That is the reason if this JIRA

> LogCleaner fails due to incorrect offset map computation on a replica
> -
>
> Key: KAFKA-3587
> URL: https://issues.apache.org/jira/browse/KAFKA-3587
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.1
> Environment: Linux
>Reporter: Kiran Pillarisetty
>Assignee: Edoardo Comar
> Attachments: 0001-POC-improving-deduping-segments.patch
>
>
> Log Cleaner fails to compact a segment even when the number of messages in it 
> is less than the offset map.
> In version 0.9.0.1, (LogCleaner.scala -> buildOffsetMap()), LogCleaner 
> computes segment size by subtracting segment's base offset from the latest 
> offset ("segmentSize = segment.nextOffset() - segment.baseOffset").  This 
> works fine until you create another replica. When you create a replica, it's 
> segment could contain data which is already compacted on other brokers. 
> Depending up on the type of data, offset difference could be too big, larger 
> than the offset map (maxDesiredMapSize), and that causes LogCleaner to fail 
> on that segment.
> Scenario:
> - Kafka 0.9.0.1
> - Cluster has two brokers.
> - Server.properties:
> log.cleaner.enable=true
> log.cleaner.dedupe.buffer.size=10485760 #10MB
> log.roll.ms=30
> delete.topic.enable=true
> log.cleanup.policy=compact
> Steps to reproduce:
> 1. Create a topic with replication-factor of 1.
> ./kafka-topics.sh --zookeeper=localhost:2181 --create --topic 
> test.log.compact.1M --partitions 1 --replication-factor 1 --config 
> cleanup.policy=compact --config segment.ms=30
> 2. Use kafka-console-producer.sh to produce a single message with the 
> following key:
> LC1,{"test": "xyz"}
> 3. Use  kafka-console-producer.sh to produce a large number of messages with 
> the following key:
> LC2,{"test": "abc"}
> 4. Let log cleaner run. Make sure log is compacted.  Verify with:
>  ./kafka-run-class.sh kafka.tools.DumpLogSegments  --files 
> .log  --print-data-log
> Dumping .log
> Starting offset: 0
> offset: 0 position: 0 isvalid: true payloadsize: 11 magic: 0 compresscodec: 
> NoCompressionCodec crc: 3067045277 keysize: 11 key: LC1 payload: {"test": 
> "xyz"}
> offset: 7869818 position: 48 isvalid: true payloadsize: 11 magic: 0 
> compresscodec: NoCompressionCodec crc: 2668089711 keysize: 11 key: LC2 
> payload: {"test": "abc"}
> 5.  Increase Replication Factor to 2.  Followed these steps: 
> http://kafka.apache.org/documentation.html#basic_ops_increase_replication_factor
> 6. Notice that log cleaner fails to compact the newly created replica with 
> the following error.
> [2016-04-18 14:49:45,599] ERROR [kafka-log-cleaner-thread-0], Error due to  
> (kafka.log.LogCleaner)
> java.lang.IllegalArgumentException: requirement failed: 7206179 messages in 
> segment test.log.compact.1M-0/.log but offset map can fit 
> only 393215. You can increase log.cleaner.dedupe.buffer.size or decrease 
> log.cleaner.threads
> at scala.Predef$.require(Predef.scala:219)
> at 
> kafka.log.Cleaner$$anonfun$buildOffsetMap$4.apply(LogCleaner.scala:584)
> at 
> kafka.log.Cleaner$$anonfun$buildOffsetMap$4.apply(LogCleaner.scala:580)
> at 
> scala.collection.immutable.Stream$StreamWithFilter.foreach(Stream.scala:570)
> at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:580)
> at kafka.log.Cleaner.clean(LogCleaner.scala:322)
> at 
> kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:230)
> at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:208)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2016-04-18 14:49:45,601] INFO [kafka-log-cleaner-thread-0], Stopped  
> (kafka.log.LogCleaner)
> 7. Examine the entries in the replica segment:
> ./kafka-run-class.sh kafka.tools.DumpLogSegments --files 
> .log  --print-data-log
> There are only 218418 messages in that segment.
> However, Log Cleaner seems to think that there are 7206179 messages in that 
> segment (as per the above error)
> Error stems from this line in LogCleaner.scala:
> """val segmentSize = segment.nextOffset() - segment.baseOffset"""
> In Replica's log segment file ( .log), ending offset is 
> 7206178. Beginning offset is 0.  That makes Log Cleaner think that there are 
> 7206179 messages in that segment although there are only 218418 messages in 
> it.
> IMO,  to address thi

[jira] [Commented] (KAFKA-3587) LogCleaner fails due to incorrect offset map computation on a replica

2016-05-05 Thread Edoardo Comar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3587?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15273455#comment-15273455
 ] 

Edoardo Comar commented on KAFKA-3587:
--

[~alekar] I had a look at your patch. The approach you took is that of using 
one temp map per segment and one overall map, which I had considered too.
Do you think that allocating to the overall map just 1/2 the memory (w.r.t. the 
current approach ) is a good strategy?

The approach I and [~mimaison] are working on is to try and compact until the 
(only-one-per-thread) map is full, which is maximising the use of the memory 
allocated to the map.


> LogCleaner fails due to incorrect offset map computation on a replica
> -
>
> Key: KAFKA-3587
> URL: https://issues.apache.org/jira/browse/KAFKA-3587
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.1
> Environment: Linux
>Reporter: Kiran Pillarisetty
>Assignee: Edoardo Comar
> Attachments: 0001-POC-improving-deduping-segments.patch
>
>
> Log Cleaner fails to compact a segment even when the number of messages in it 
> is less than the offset map.
> In version 0.9.0.1, (LogCleaner.scala -> buildOffsetMap()), LogCleaner 
> computes segment size by subtracting segment's base offset from the latest 
> offset ("segmentSize = segment.nextOffset() - segment.baseOffset").  This 
> works fine until you create another replica. When you create a replica, it's 
> segment could contain data which is already compacted on other brokers. 
> Depending up on the type of data, offset difference could be too big, larger 
> than the offset map (maxDesiredMapSize), and that causes LogCleaner to fail 
> on that segment.
> Scenario:
> - Kafka 0.9.0.1
> - Cluster has two brokers.
> - Server.properties:
> log.cleaner.enable=true
> log.cleaner.dedupe.buffer.size=10485760 #10MB
> log.roll.ms=30
> delete.topic.enable=true
> log.cleanup.policy=compact
> Steps to reproduce:
> 1. Create a topic with replication-factor of 1.
> ./kafka-topics.sh --zookeeper=localhost:2181 --create --topic 
> test.log.compact.1M --partitions 1 --replication-factor 1 --config 
> cleanup.policy=compact --config segment.ms=30
> 2. Use kafka-console-producer.sh to produce a single message with the 
> following key:
> LC1,{"test": "xyz"}
> 3. Use  kafka-console-producer.sh to produce a large number of messages with 
> the following key:
> LC2,{"test": "abc"}
> 4. Let log cleaner run. Make sure log is compacted.  Verify with:
>  ./kafka-run-class.sh kafka.tools.DumpLogSegments  --files 
> .log  --print-data-log
> Dumping .log
> Starting offset: 0
> offset: 0 position: 0 isvalid: true payloadsize: 11 magic: 0 compresscodec: 
> NoCompressionCodec crc: 3067045277 keysize: 11 key: LC1 payload: {"test": 
> "xyz"}
> offset: 7869818 position: 48 isvalid: true payloadsize: 11 magic: 0 
> compresscodec: NoCompressionCodec crc: 2668089711 keysize: 11 key: LC2 
> payload: {"test": "abc"}
> 5.  Increase Replication Factor to 2.  Followed these steps: 
> http://kafka.apache.org/documentation.html#basic_ops_increase_replication_factor
> 6. Notice that log cleaner fails to compact the newly created replica with 
> the following error.
> [2016-04-18 14:49:45,599] ERROR [kafka-log-cleaner-thread-0], Error due to  
> (kafka.log.LogCleaner)
> java.lang.IllegalArgumentException: requirement failed: 7206179 messages in 
> segment test.log.compact.1M-0/.log but offset map can fit 
> only 393215. You can increase log.cleaner.dedupe.buffer.size or decrease 
> log.cleaner.threads
> at scala.Predef$.require(Predef.scala:219)
> at 
> kafka.log.Cleaner$$anonfun$buildOffsetMap$4.apply(LogCleaner.scala:584)
> at 
> kafka.log.Cleaner$$anonfun$buildOffsetMap$4.apply(LogCleaner.scala:580)
> at 
> scala.collection.immutable.Stream$StreamWithFilter.foreach(Stream.scala:570)
> at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:580)
> at kafka.log.Cleaner.clean(LogCleaner.scala:322)
> at 
> kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:230)
> at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:208)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2016-04-18 14:49:45,601] INFO [kafka-log-cleaner-thread-0], Stopped  
> (kafka.log.LogCleaner)
> 7. Examine the entries in the replica segment:
> ./kafka-run-class.sh kafka.tools.DumpLogSegments --files 
> .log  --print-data-log
> There are only 218418 messages in that segment.
> However, Log Cleaner seems to think that there are 7206179 messages in that 
> segment (as per the above error)
> Error stems from this line in LogCleaner.scala:
> """val segmentSize = segment.nextOffse

[jira] [Commented] (KAFKA-2273) Add rebalance with a minimal number of reassignments to server-defined strategy list

2016-04-28 Thread Edoardo Comar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15263195#comment-15263195
 ] 

Edoardo Comar commented on KAFKA-2273:
--

Working on this with [~vahid] - propagating the overall global assignment 
solution of every consumer to every consumers in a group
 by using the userdata field in the 
org.apache.kafka.clients.consumer.internals.PartitionAssignor.Assignment.
This way, even if the leader dies, any other consumer on becoming the leader 
has access to the last computed assignment for everyone.



> Add rebalance with a minimal number of reassignments to server-defined 
> strategy list
> 
>
> Key: KAFKA-2273
> URL: https://issues.apache.org/jira/browse/KAFKA-2273
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Olof Johansson
>Assignee: Vahid Hashemian
>  Labels: newbie++, newbiee
> Fix For: 0.10.1.0
>
>
> Add a new partitions.assignment.strategy to the server-defined list that will 
> do reassignments based on moving as few partitions as possible. This should 
> be a quite common reassignment strategy especially for the cases where the 
> consumer has to maintain state, either in memory, or on disk.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-3587) LogCleaner fails due to incorrect offset map computation on a replica

2016-04-28 Thread Edoardo Comar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3587?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Edoardo Comar reassigned KAFKA-3587:


Assignee: Edoardo Comar  (was: Manikumar Reddy)

> LogCleaner fails due to incorrect offset map computation on a replica
> -
>
> Key: KAFKA-3587
> URL: https://issues.apache.org/jira/browse/KAFKA-3587
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.1
> Environment: Linux
>Reporter: Kiran Pillarisetty
>Assignee: Edoardo Comar
>
> Log Cleaner fails to compact a segment even when the number of messages in it 
> is less than the offset map.
> In version 0.9.0.1, (LogCleaner.scala -> buildOffsetMap()), LogCleaner 
> computes segment size by subtracting segment's base offset from the latest 
> offset ("segmentSize = segment.nextOffset() - segment.baseOffset").  This 
> works fine until you create another replica. When you create a replica, it's 
> segment could contain data which is already compacted on other brokers. 
> Depending up on the type of data, offset difference could be too big, larger 
> than the offset map (maxDesiredMapSize), and that causes LogCleaner to fail 
> on that segment.
> Scenario:
> - Kafka 0.9.0.1
> - Cluster has two brokers.
> - Server.properties:
> log.cleaner.enable=true
> log.cleaner.dedupe.buffer.size=10485760 #10MB
> log.roll.ms=30
> delete.topic.enable=true
> log.cleanup.policy=compact
> Steps to reproduce:
> 1. Create a topic with replication-factor of 1.
> ./kafka-topics.sh --zookeeper=localhost:2181 --create --topic 
> test.log.compact.1M --partitions 1 --replication-factor 1 --config 
> cleanup.policy=compact --config segment.ms=30
> 2. Use kafka-console-producer.sh to produce a single message with the 
> following key:
> LC1,{"test": "xyz"}
> 3. Use  kafka-console-producer.sh to produce a large number of messages with 
> the following key:
> LC2,{"test": "abc"}
> 4. Let log cleaner run. Make sure log is compacted.  Verify with:
>  ./kafka-run-class.sh kafka.tools.DumpLogSegments  --files 
> .log  --print-data-log
> Dumping .log
> Starting offset: 0
> offset: 0 position: 0 isvalid: true payloadsize: 11 magic: 0 compresscodec: 
> NoCompressionCodec crc: 3067045277 keysize: 11 key: LC1 payload: {"test": 
> "xyz"}
> offset: 7869818 position: 48 isvalid: true payloadsize: 11 magic: 0 
> compresscodec: NoCompressionCodec crc: 2668089711 keysize: 11 key: LC2 
> payload: {"test": "abc"}
> 5.  Increase Replication Factor to 2.  Followed these steps: 
> http://kafka.apache.org/documentation.html#basic_ops_increase_replication_factor
> 6. Notice that log cleaner fails to compact the newly created replica with 
> the following error.
> [2016-04-18 14:49:45,599] ERROR [kafka-log-cleaner-thread-0], Error due to  
> (kafka.log.LogCleaner)
> java.lang.IllegalArgumentException: requirement failed: 7206179 messages in 
> segment test.log.compact.1M-0/.log but offset map can fit 
> only 393215. You can increase log.cleaner.dedupe.buffer.size or decrease 
> log.cleaner.threads
> at scala.Predef$.require(Predef.scala:219)
> at 
> kafka.log.Cleaner$$anonfun$buildOffsetMap$4.apply(LogCleaner.scala:584)
> at 
> kafka.log.Cleaner$$anonfun$buildOffsetMap$4.apply(LogCleaner.scala:580)
> at 
> scala.collection.immutable.Stream$StreamWithFilter.foreach(Stream.scala:570)
> at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:580)
> at kafka.log.Cleaner.clean(LogCleaner.scala:322)
> at 
> kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:230)
> at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:208)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2016-04-18 14:49:45,601] INFO [kafka-log-cleaner-thread-0], Stopped  
> (kafka.log.LogCleaner)
> 7. Examine the entries in the replica segment:
> ./kafka-run-class.sh kafka.tools.DumpLogSegments --files 
> .log  --print-data-log
> There are only 218418 messages in that segment.
> However, Log Cleaner seems to think that there are 7206179 messages in that 
> segment (as per the above error)
> Error stems from this line in LogCleaner.scala:
> """val segmentSize = segment.nextOffset() - segment.baseOffset"""
> In Replica's log segment file ( .log), ending offset is 
> 7206178. Beginning offset is 0.  That makes Log Cleaner think that there are 
> 7206179 messages in that segment although there are only 218418 messages in 
> it.
> IMO,  to address this kind of scenario, LogCleaner.scala should check for the 
> number of messages in the segment, instead of subtracting beginning offset 
> from the ending offset.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3587) LogCleaner fails due to incorrect offset map computation on a replica

2016-04-22 Thread Edoardo Comar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3587?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15253833#comment-15253833
 ] 

Edoardo Comar commented on KAFKA-3587:
--

Hi I was having a look at this issue too.

in LogCleaner.scala  Cleaner.buildOffsetMap 
the calculation of segmentSize as described in this defect is actually an upper 
bound for the actual segmentSize;
moreover segmentSize itself is an upper bound to the number of entries to the 
offsetMap that a segment may contribute
(as many entries may share the same key, else why dedup ?).

So the result is that the memorybuffer allocated may not be used near its 
loadfactor 
i.e. the map actual size may be much, much smaller than maxDesiredMapSize

Perhaps a different trade-off may be used. 
The current one seems based on speed vs memory size.

Rather than aborting on a test that may be overly pessimistic,
the alternative trade-off could be that the 
the code checks * while building the map * if actually the mapSize goes over 
the desired size 
and only in that case, abort the compaction with an exception.

What do you think?
cheers Edoardo

> LogCleaner fails due to incorrect offset map computation on a replica
> -
>
> Key: KAFKA-3587
> URL: https://issues.apache.org/jira/browse/KAFKA-3587
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.1
> Environment: Linux
>Reporter: Kiran Pillarisetty
>Assignee: Manikumar Reddy
>
> Log Cleaner fails to compact a segment even when the number of messages in it 
> is less than the offset map.
> In version 0.9.0.1, (LogCleaner.scala -> buildOffsetMap()), LogCleaner 
> computes segment size by subtracting segment's base offset from the latest 
> offset ("segmentSize = segment.nextOffset() - segment.baseOffset").  This 
> works fine until you create another replica. When you create a replica, it's 
> segment could contain data which is already compacted on other brokers. 
> Depending up on the type of data, offset difference could be too big, larger 
> than the offset map (maxDesiredMapSize), and that causes LogCleaner to fail 
> on that segment.
> Scenario:
> - Kafka 0.9.0.1
> - Cluster has two brokers.
> - Server.properties:
> log.cleaner.enable=true
> log.cleaner.dedupe.buffer.size=10485760 #10MB
> log.roll.ms=30
> delete.topic.enable=true
> log.cleanup.policy=compact
> Steps to reproduce:
> 1. Create a topic with replication-factor of 1.
> ./kafka-topics.sh --zookeeper=localhost:2181 --create --topic 
> test.log.compact.1M --partitions 1 --replication-factor 1 --config 
> cleanup.policy=compact --config segment.ms=30
> 2. Use kafka-console-producer.sh to produce a single message with the 
> following key:
> LC1,{"test": "xyz"}
> 3. Use  kafka-console-producer.sh to produce a large number of messages with 
> the following key:
> LC2,{"test": "abc"}
> 4. Let log cleaner run. Make sure log is compacted.  Verify with:
>  ./kafka-run-class.sh kafka.tools.DumpLogSegments  --files 
> .log  --print-data-log
> Dumping .log
> Starting offset: 0
> offset: 0 position: 0 isvalid: true payloadsize: 11 magic: 0 compresscodec: 
> NoCompressionCodec crc: 3067045277 keysize: 11 key: LC1 payload: {"test": 
> "xyz"}
> offset: 7869818 position: 48 isvalid: true payloadsize: 11 magic: 0 
> compresscodec: NoCompressionCodec crc: 2668089711 keysize: 11 key: LC2 
> payload: {"test": "abc"}
> 5.  Increase Replication Factor to 2.  Followed these steps: 
> http://kafka.apache.org/documentation.html#basic_ops_increase_replication_factor
> 6. Notice that log cleaner fails to compact the newly created replica with 
> the following error.
> [2016-04-18 14:49:45,599] ERROR [kafka-log-cleaner-thread-0], Error due to  
> (kafka.log.LogCleaner)
> java.lang.IllegalArgumentException: requirement failed: 7206179 messages in 
> segment test.log.compact.1M-0/.log but offset map can fit 
> only 393215. You can increase log.cleaner.dedupe.buffer.size or decrease 
> log.cleaner.threads
> at scala.Predef$.require(Predef.scala:219)
> at 
> kafka.log.Cleaner$$anonfun$buildOffsetMap$4.apply(LogCleaner.scala:584)
> at 
> kafka.log.Cleaner$$anonfun$buildOffsetMap$4.apply(LogCleaner.scala:580)
> at 
> scala.collection.immutable.Stream$StreamWithFilter.foreach(Stream.scala:570)
> at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:580)
> at kafka.log.Cleaner.clean(LogCleaner.scala:322)
> at 
> kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:230)
> at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:208)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2016-04-18 14:49:45,601] INFO [kafka-log-cleaner-thread-0], Stopped  
> (kafk

[jira] [Commented] (KAFKA-3415) AdminOperationException when altering Topic with same number of partitions

2016-03-20 Thread Edoardo Comar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3415?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1510#comment-1510
 ] 

Edoardo Comar commented on KAFKA-3415:
--

As the user opening the JIRA stated, they use the scripts as part of 
automation. 
Having the script behave idempotently is actually simpler than checking the 
number of partitions and bumping up only if needed.

> AdminOperationException when altering Topic with same number of partitions
> --
>
> Key: KAFKA-3415
> URL: https://issues.apache.org/jira/browse/KAFKA-3415
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Affects Versions: 0.9.0.1
>Reporter: Gérald Quintana
>Priority: Minor
>
> To automate topic creation/modification, we sometimes run kafka-topics.sh 
> script with the same topic config. It raises an AdminOperationException, in 
> short it's idempotent
> {code}
> bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic logfailed
> Topic:logfailed PartitionCount:1ReplicationFactor:1 
> Configs:retention.ms=60480,retention.bytes=209715200
> Topic: logfailedPartition: 0Leader: 1   Replicas: 1   
>   Isr: 1
> bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic logfailed 
> --partitions 1 --config retention.bytes=209715200 --config 
> retention.ms=60480
> WARNING: Altering topic configuration from this script has been deprecated 
> and may be removed in future releases.
>  Going forward, please use kafka-configs.sh for this functionality
> Updated config for topic "logfailed".
> WARNING: If partitions are increased for a topic that has a key, the 
> partition logic or ordering of the messages will be affected
> Error while executing topic command : The number of partitions for a topic 
> can only be increased
> [2016-03-17 12:25:20,458] ERROR kafka.admin.AdminOperationException: The 
> number of partitions for a topic can only be increased
> at kafka.admin.AdminUtils$.addPartitions(AdminUtils.scala:119)
> at 
> kafka.admin.TopicCommand$$anonfun$alterTopic$1.apply(TopicCommand.scala:139)
> at 
> kafka.admin.TopicCommand$$anonfun$alterTopic$1.apply(TopicCommand.scala:116)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at kafka.admin.TopicCommand$.alterTopic(TopicCommand.scala:116)
> at kafka.admin.TopicCommand$.main(TopicCommand.scala:62)
> at kafka.admin.TopicCommand.main(TopicCommand.scala)
>  (kafka.admin.TopicCommand$)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3415) AdminOperationException when altering Topic with same number of partitions

2016-03-19 Thread Edoardo Comar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3415?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15199776#comment-15199776
 ] 

Edoardo Comar commented on KAFKA-3415:
--

I agree that the retcode to the shell should be 0, not 1.

I had a look at the code and it seems an improved check in 
AdminUtils.addPartitions is the way to go.
could change the log to a warning in this case, not an error.

will look into submitting a PR

> AdminOperationException when altering Topic with same number of partitions
> --
>
> Key: KAFKA-3415
> URL: https://issues.apache.org/jira/browse/KAFKA-3415
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Affects Versions: 0.9.0.1
>Reporter: Gérald Quintana
>Priority: Minor
>
> To automate topic creation/modification, we sometimes run kafka-topics.sh 
> script with the same topic config. It raises an AdminOperationException, in 
> short it's idempotent
> {code}
> bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic logfailed
> Topic:logfailed PartitionCount:1ReplicationFactor:1 
> Configs:retention.ms=60480,retention.bytes=209715200
> Topic: logfailedPartition: 0Leader: 1   Replicas: 1   
>   Isr: 1
> bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic logfailed 
> --partitions 1 --config retention.bytes=209715200 --config 
> retention.ms=60480
> WARNING: Altering topic configuration from this script has been deprecated 
> and may be removed in future releases.
>  Going forward, please use kafka-configs.sh for this functionality
> Updated config for topic "logfailed".
> WARNING: If partitions are increased for a topic that has a key, the 
> partition logic or ordering of the messages will be affected
> Error while executing topic command : The number of partitions for a topic 
> can only be increased
> [2016-03-17 12:25:20,458] ERROR kafka.admin.AdminOperationException: The 
> number of partitions for a topic can only be increased
> at kafka.admin.AdminUtils$.addPartitions(AdminUtils.scala:119)
> at 
> kafka.admin.TopicCommand$$anonfun$alterTopic$1.apply(TopicCommand.scala:139)
> at 
> kafka.admin.TopicCommand$$anonfun$alterTopic$1.apply(TopicCommand.scala:116)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at kafka.admin.TopicCommand$.alterTopic(TopicCommand.scala:116)
> at kafka.admin.TopicCommand$.main(TopicCommand.scala:62)
> at kafka.admin.TopicCommand.main(TopicCommand.scala)
>  (kafka.admin.TopicCommand$)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2832) support exclude.internal.topics in new consumer

2016-03-18 Thread Edoardo Comar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15197454#comment-15197454
 ] 

Edoardo Comar commented on KAFKA-2832:
--

Hi all, Vahid is on vacation so I am creating a new pull request to accommodate 
the changes discussed so far - and added a unit test for the new option.

> support exclude.internal.topics in new consumer
> ---
>
> Key: KAFKA-2832
> URL: https://issues.apache.org/jira/browse/KAFKA-2832
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients
>Reporter: Jun Rao
>Assignee: Vahid Hashemian
>Priority: Blocker
> Fix For: 0.10.0.0
>
>
> The old consumer supports exclude.internal.topics that prevents internal 
> topics from being consumed by default. It would be useful to add that in the 
> new consumer, especially when wildcards are used.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


<    1   2