[jira] [Created] (KAFKA-9731) Increased fetch request rate with leader selector due to HW propagation

2020-03-17 Thread Vahid Hashemian (Jira)
Vahid Hashemian created KAFKA-9731:
--

 Summary: Increased fetch request rate with leader selector due to 
HW propagation
 Key: KAFKA-9731
 URL: https://issues.apache.org/jira/browse/KAFKA-9731
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 2.4.1, 2.4.0
Reporter: Vahid Hashemian
 Attachments: image-2020-03-17-10-19-08-987.png

KIP-392 adds high watermark propagation to followers as a means to better sync 
up followers HW with leader. The issue we have noticed after trying out 2.4.0 
and 2.4.1 is a spike in fetch request rate in the default selector case 
(leader), that does not really require this high watermark propagation:

!image-2020-03-17-10-19-08-987.png|width=811,height=354!

This spike causes an increase in resource allocation (CPU) on the brokers.

An easy solution would be to disable this propagation (at least) for the 
default leader selector case to improve the backward compatibility.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9205) Add an option to enforce rack-aware partition reassignment

2019-11-18 Thread Vahid Hashemian (Jira)
Vahid Hashemian created KAFKA-9205:
--

 Summary: Add an option to enforce rack-aware partition reassignment
 Key: KAFKA-9205
 URL: https://issues.apache.org/jira/browse/KAFKA-9205
 Project: Kafka
  Issue Type: Improvement
  Components: admin, tools
Reporter: Vahid Hashemian


One regularly used healing operation on Kafka clusters is replica reassignments 
for topic partitions. For example, when there is a skew in inbound/outbound 
traffic of a broker replica reassignment can be used to move some 
leaders/followers from the broker; or if there is a skew in disk usage of 
brokers, replica reassignment can more some partitions to other brokers that 
have more disk space available.

In Kafka clusters that span across multiple data centers (or availability 
zones), high availability is a priority; in the sense that when a data center 
goes offline the cluster should be able to resume normal operation by 
guaranteeing partition replicas in all data centers.

This guarantee is currently the responsibility of the on-call engineer that 
performs the reassignment or the tool that automatically generates the 
reassignment plan for improving the cluster health (e.g. by considering the 
rack configuration value of each broker in the cluster). the former, is quite 
error-prone, and the latter, would lead to duplicate code in all such admin 
tools (which are not error free either).

It would be great for the built-in replica assignment API and tool provided by 
Kafka to support a rack aware verification option that would simply return an 
error when [some] brokers in any replica set share a common rack. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-8289) KTable, Long> can't be suppressed

2019-05-03 Thread Vahid Hashemian (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8289?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian resolved KAFKA-8289.

Resolution: Fixed

> KTable, Long>  can't be suppressed
> ---
>
> Key: KAFKA-8289
> URL: https://issues.apache.org/jira/browse/KAFKA-8289
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0, 2.2.0, 2.1.1
> Environment: Broker on a Linux, stream app on my win10 laptop. 
> I add one row log.message.timestamp.type=LogAppendTime to my broker's 
> server.properties. stream app all default config.
>Reporter: Xiaolin Jia
>Assignee: John Roesler
>Priority: Blocker
> Fix For: 2.3.0, 2.1.2, 2.2.1
>
>
> I write a simple stream app followed official developer guide [Stream 
> DSL|[https://kafka.apache.org/22/documentation/streams/developer-guide/dsl-api.html#window-final-results]].
>  but I got more than one [Window Final 
> Results|https://kafka.apache.org/22/documentation/streams/developer-guide/dsl-api.html#id31]
>  from a session time window.
> time ticker A -> (4,A) / 25s,
> time ticker B -> (4, B) / 25s  all send to the same topic 
> below is my stream app code 
> {code:java}
> kstreams[0]
> .peek((k, v) -> log.info("--> ping, k={},v={}", k, v))
> .groupBy((k, v) -> v, Grouped.with(Serdes.String(), Serdes.String()))
> .windowedBy(SessionWindows.with(Duration.ofSeconds(100)).grace(Duration.ofMillis(20)))
> .count()
> .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()))
> .toStream().peek((k, v) -> log.info("window={},k={},v={}", k.window(), 
> k.key(), v));
> {code}
> {{here is my log print}}
> {noformat}
> 2019-04-24 20:00:26.142  INFO --- [-StreamThread-1] c.g.k.AppStreams  
>   : --> ping, k=4,v=B
> 2019-04-24 20:00:47.070  INFO --- [-StreamThread-1] c.g.k.AppStreams  
>   : window=Window{startMs=1556106587744, 
> endMs=1556107129191},k=A,v=20
> 2019-04-24 20:00:51.071  INFO --- [-StreamThread-1] c.g.k.AppStreams  
>   : --> ping, k=4,v=B
> 2019-04-24 20:01:16.065  INFO --- [-StreamThread-1] c.g.k.AppStreams  
>   : --> ping, k=4,v=B
> 2019-04-24 20:01:41.066  INFO --- [-StreamThread-1] c.g.k.AppStreams  
>   : --> ping, k=4,v=B
> 2019-04-24 20:02:06.069  INFO --- [-StreamThread-1] c.g.k.AppStreams  
>   : --> ping, k=4,v=B
> 2019-04-24 20:02:31.066  INFO --- [-StreamThread-1] c.g.k.AppStreams  
>   : --> ping, k=4,v=B
> 2019-04-24 20:02:56.208  INFO --- [-StreamThread-1] c.g.k.AppStreams  
>   : --> ping, k=4,v=B
> 2019-04-24 20:03:21.070  INFO --- [-StreamThread-1] c.g.k.AppStreams  
>   : --> ping, k=4,v=B
> 2019-04-24 20:03:46.078  INFO --- [-StreamThread-1] c.g.k.AppStreams  
>   : --> ping, k=4,v=B
> 2019-04-24 20:04:04.684  INFO --- [-StreamThread-1] c.g.k.AppStreams  
>   : --> ping, k=4,v=A
> 2019-04-24 20:04:11.069  INFO --- [-StreamThread-1] c.g.k.AppStreams  
>   : --> ping, k=4,v=B
> 2019-04-24 20:04:19.371  INFO --- [-StreamThread-1] c.g.k.AppStreams  
>   : window=Window{startMs=1556107226473, 
> endMs=1556107426409},k=B,v=9
> 2019-04-24 20:04:19.372  INFO --- [-StreamThread-1] c.g.k.AppStreams  
>   : window=Window{startMs=1556107445012, 
> endMs=1556107445012},k=A,v=1
> 2019-04-24 20:04:29.604  INFO --- [-StreamThread-1] c.g.k.AppStreams  
>   : --> ping, k=4,v=A
> 2019-04-24 20:04:36.067  INFO --- [-StreamThread-1] c.g.k.AppStreams  
>   : --> ping, k=4,v=B
> 2019-04-24 20:04:49.715  INFO --- [-StreamThread-1] c.g.k.AppStreams  
>   : window=Window{startMs=1556107226473, 
> endMs=1556107451397},k=B,v=10
> 2019-04-24 20:04:49.716  INFO --- [-StreamThread-1] c.g.k.AppStreams  
>   : window=Window{startMs=1556107445012, 
> endMs=1556107469935},k=A,v=2
> 2019-04-24 20:04:54.593  INFO --- [-StreamThread-1] c.g.k.AppStreams  
>   : --> ping, k=4,v=A
> 2019-04-24 20:05:01.070  INFO --- [-StreamThread-1] c.g.k.AppStreams  
>   : --> ping, k=4,v=B
> 2019-04-24 20:05:19.599  INFO --- [-StreamThread-1] c.g.k.AppStreams  
>   : --> ping, k=4,v=A
> 2019-04-24 20:05:20.045  INFO --- [-StreamThread-1] c.g.k.AppStreams  
>   : window=Window{startMs=1556107226473, 
> endMs=1556107476398},k=B,v=11
> 2019-04-24 20:05:20.047  INFO --- [-StreamThread-1] c.g.k.AppStreams  
>   : window=Window{startMs=1556107226473, 
> endMs=1556107501398},k=B,v=12
> 2019-04-24 20:05:26.075  INFO --- [-StreamThread-1] c.g.k.AppStreams  
>   : --> pi

[jira] [Resolved] (KAFKA-7946) Flaky Test DeleteConsumerGroupsTest#testDeleteNonEmptyGroup

2019-05-03 Thread Vahid Hashemian (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian resolved KAFKA-7946.

   Resolution: Fixed
Fix Version/s: 2.2.1

> Flaky Test DeleteConsumerGroupsTest#testDeleteNonEmptyGroup
> ---
>
> Key: KAFKA-7946
> URL: https://issues.apache.org/jira/browse/KAFKA-7946
> Project: Kafka
>  Issue Type: Bug
>  Components: admin, unit tests
>Affects Versions: 2.2.0
>Reporter: Matthias J. Sax
>Assignee: Gwen Shapira
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.3.0, 2.2.1, 2.2.2
>
>
> To get stable nightly builds for `2.2` release, I create tickets for all 
> observed test failures.
> [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/17/]
> {quote}java.lang.NullPointerException at 
> kafka.admin.DeleteConsumerGroupsTest.testDeleteNonEmptyGroup(DeleteConsumerGroupsTest.scala:96){quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7962) StickyAssignor: throws NullPointerException during assignments if topic is deleted

2019-02-27 Thread Vahid Hashemian (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7962?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian resolved KAFKA-7962.

   Resolution: Fixed
 Reviewer: Vahid Hashemian
Fix Version/s: 2.3.0

> StickyAssignor: throws NullPointerException during assignments if topic is 
> deleted
> --
>
> Key: KAFKA-7962
> URL: https://issues.apache.org/jira/browse/KAFKA-7962
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.1.0
> Environment: 1. MacOS, com.salesforce.kafka.test.KafkaTestUtils (kind 
> of embedded kafka integration tests)
> 2. Linux, dockerised kafka and our service
>Reporter: Oleg Smirnov
>Assignee: huxihx
>Priority: Major
> Fix For: 2.3.0
>
> Attachments: NPE-StickyAssignor-issues.apache.log
>
>
> Integration tests with  com.salesforce.kafka.test.KafkaTestUtils, local 
> setup, StickyAssignor used, local topics are created / removed, one topic is 
> created in the beginning of test and without unsubscribing from it - deleted.
> Same happens in real environment.
>  
>  # have single "topic" with 1 partition
>  # single consumer subscribed to this "topic" (StickyAssignor)
>  # delete "topic"
> =>
>  * rebalance starts, topic partition(s) is revoked
>  * on assignment StickyAssignor throws exception (line 223), because 
> partitionsPerTopic.("topic") returns null in for loop (topic deleted - no 
> partitions are present)
>  
> In the provided log part, tearDown() causes topic deletion, while consumer 
> still running and tries to poll data from topic.
> RangeAssignor works fine (revokes partition, assigns empty set).
> Problem doesn't have workaround (like handle i in onPartitionsAssigned and 
> remove unsubscribe topic), because everything happens before listener called.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7993) Revert/Update the fix for KAFKA-7937 when coordinator lookup retries is implemented

2019-02-24 Thread Vahid Hashemian (JIRA)
Vahid Hashemian created KAFKA-7993:
--

 Summary: Revert/Update the fix for KAFKA-7937 when coordinator 
lookup retries is implemented 
 Key: KAFKA-7993
 URL: https://issues.apache.org/jira/browse/KAFKA-7993
 Project: Kafka
  Issue Type: Improvement
Reporter: Vahid Hashemian


Since the new {{AdminClient}} API does not support coordinator lookup retries, 
[KAFKA-7937| https://issues.apache.org/jira/browse/KAFKA-7937] improved some 
unit test by adding a wait until coordinator is available. 
[KAFKA-6789|https://issues.apache.org/jira/browse/KAFKA-6789] is an open ticket 
to add the retry to the new API. Once that is implemented the unit test fix 
should be reverted / updated accordingly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7604) Flaky Test `ConsumerCoordinatorTest.testRebalanceAfterTopicUnavailableWithPatternSubscribe`

2018-11-10 Thread Vahid Hashemian (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7604?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian resolved KAFKA-7604.

Resolution: Fixed

> Flaky Test 
> `ConsumerCoordinatorTest.testRebalanceAfterTopicUnavailableWithPatternSubscribe`
> ---
>
> Key: KAFKA-7604
> URL: https://issues.apache.org/jira/browse/KAFKA-7604
> Project: Kafka
>  Issue Type: Bug
>  Components: unit tests
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> {code}
> java.lang.AssertionError: Metadata refresh requested unnecessarily
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.assertTrue(Assert.java:41)
>   at org.junit.Assert.assertFalse(Assert.java:64)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.unavailableTopicTest(ConsumerCoordinatorTest.java:1034)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.testRebalanceAfterTopicUnavailableWithPatternSubscribe(ConsumerCoordinatorTest.java:984)
> {code}
> The problem seems to be a race condition in the test case with the heartbeat 
> thread and the foreground thread unsafely attempting to update metadata at 
> the same time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6717) TopicPartition Assined twice to a consumer group for 2 consumer instances

2018-07-17 Thread Vahid Hashemian (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6717?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian resolved KAFKA-6717.

Resolution: Duplicate

Marking it as duplicate to keep all the discussion in the other JIRA.

> TopicPartition Assined twice to a consumer group for 2 consumer instances 
> --
>
> Key: KAFKA-6717
> URL: https://issues.apache.org/jira/browse/KAFKA-6717
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.1
>Reporter: Yuancheng PENG
>Priority: Major
>
> I'm using \{{StickyAssignor}} for consuming more than 100 topics with certain 
> pattern.
> There are 10 consumers with the same group id.
> I expected that topic-partition to be assigned to only one consumer instance. 
> However some topic partitions are assigned twice in 2 different difference 
> instance, hence the consumer group process duplicate messages.
> {code:java}
> props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
> Collections.singletonList(StickyAssignor.class));
> KafkaConsumer c = new KafkaConsumer<>(props);
> c.subscribe(Pattern.compile(TOPIC_PATTERN), new 
> NoOpConsumerRebalanceListener());
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7156) Deleting topics with long names can bring all brokers to unrecoverable state

2018-07-13 Thread Vahid Hashemian (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7156?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian resolved KAFKA-7156.

Resolution: Duplicate

> Deleting topics with long names can bring all brokers to unrecoverable state
> 
>
> Key: KAFKA-7156
> URL: https://issues.apache.org/jira/browse/KAFKA-7156
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.1.0
>Reporter: Petr Pchelko
>Priority: Major
>
> Kafka limit for the topic name is 249 symbols, so creating a topic with a 
> name 248 symbol long is possible. However, when deleting the topic, Kafka 
> tries to rename the data directory for the topic to add some hash and 
> `-deleted` in the data directory, so that the resulting file name exceeds the 
> 255 symbol file name limit in most of the Unix file systems. This provokes a  
> java.nio.file.FileSystemException which in turn immediately shuts down all 
> the brokers. Further attemts to restart the broker fail with the same 
> exception. The only way to resurrect the cluster is to manually delete the 
> affected topic from zookeeper and from the disk on all the broker machines.
> Steps to reproduce:
> (Note: delete.topic.enable=true must be set in the config)
> {code:java}
> > kafka-topics.sh --zookeeper localhost:2181 --create --topic 
> > 
> >  --partitions 1 --replication-factor 1
> > kafka-topics.sh --zookeeper localhost:2181 --delete --topic 
> > aaa
>  {code}
> After these 2 commands executed all the brokers where this topic is 
> replicated immediately shut down with the following logs:
> {code:java}
> ERROR Error while renaming dir for 
> -0
>  in log dir /tmp/kafka-logs (kafka.server.LogDirFailureChannel)
> java.nio.file.FileSystemException: 
> /tmp/kafka-logs/-0
>  -> 
> /tmp/kafka-logs/-0.093fd1e1728f438ea990cbad8a514b9f-delete:
>  File name too long
> at sun.nio.fs.UnixException.translateToIOException(UnixException.java:91)
> at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
> at sun.nio.fs.UnixCopyFile.move(UnixCopyFile.java:457)
> at sun.nio.fs.UnixFileSystemProvider.move(UnixFileSystemProvider.java:262)
> at java.nio.file.Files.move(Files.java:1395)
> ...
> Suppressed: java.nio.file.FileSystemException: 
> /tmp/kafka-logs/-0
>  -> 
> /tmp/kafka-logs/-0.093fd1e1728f438ea990cbad8a514b9f-delete:
>  File name too long
> at sun.nio.fs.UnixException.translateToIOException(UnixException.java:91)
> at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
> at sun.nio.fs.UnixCopyFile.move(UnixCopyFile.java:396)
> at sun.nio.fs.UnixFileSystemProvider.move(UnixFileSystemProvider.java:262)
> at java.nio.file.Files.move(Files.java:1395)
> at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:694)
> ... 23 more
> [2018-07-12 13:34:45,847] INFO [ReplicaManager broker=0] Stopping serving 
> replicas in dir /tmp/kafka-logs (kafka.server.ReplicaManager)
> [2018-07-12 13:34:45,848] INFO [ReplicaFetcherManager on broker 0] Removed 
> fetcher for partitions  (kafka.server.ReplicaFetche

[jira] [Resolved] (KAFKA-7141) kafka-consumer-group doesn't describe existing group

2018-07-10 Thread Vahid Hashemian (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7141?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian resolved KAFKA-7141.

Resolution: Not A Problem

> kafka-consumer-group doesn't describe existing group
> 
>
> Key: KAFKA-7141
> URL: https://issues.apache.org/jira/browse/KAFKA-7141
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 0.11.0.0, 1.0.1
>Reporter: Bohdana Panchenko
>Priority: Major
>
> I am running two consumers: akka-stream-kafka consumer with standard config 
> section as described in the 
> [https://doc.akka.io/docs/akka-stream-kafka/current/consumer.html] and  
> kafka-console-consumer.
> akka-stream-kafka consumer configuration looks like this
> {color:#33}_akka.kafka.consumer{_{color}
> {color:#33}  _kafka-clients{_{color}
> {color:#33}    _group.id = "myakkastreamkafka-1"_{color}
> {color:#33}   _enable.auto.commit = false_{color}
> }
> {color:#33} }{color}
>  
>  I am able to see the both groups with the command
>  
>  *kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --list*
>  _Note: This will not show information about old Zookeeper-based consumers._
>  
>  _myakkastreamkafka-1_
>  _console-consumer-57171_
> {color:#33}I am able to view details about the console consumer 
> group{color}
> *kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group 
> console-consumer-57171*
>  _{color:#205081}Note: This will not show information about old 
> Zookeeper-based consumers.{color}_
> _{color:#205081}TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID 
> HOST CLIENT-ID{color}_
>  _{color:#205081}STREAM-TEST 0 0 0 0 
> consumer-1-6b928e07-196a-4322-9928-068681617878 /172.19.0.4 consumer-1{color}_
> {color:#33}But the command to describe my akka stream consumer gives me 
> empty output:{color}
> *kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group 
> myakkastreamkafka-1*
>  {color:#205081}_Note: This will not show information about old 
> Zookeeper-based consumers._{color}
> {color:#205081}_TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID 
> HOST CLIENT-ID_{color}
>  
> {color:#33}That is strange. Can you please check the issue?{color}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7026) Sticky assignor could assign a partition to multiple consumers

2018-06-08 Thread Vahid Hashemian (JIRA)
Vahid Hashemian created KAFKA-7026:
--

 Summary: Sticky assignor could assign a partition to multiple 
consumers
 Key: KAFKA-7026
 URL: https://issues.apache.org/jira/browse/KAFKA-7026
 Project: Kafka
  Issue Type: Bug
  Components: clients
Reporter: Vahid Hashemian
Assignee: Vahid Hashemian


In the following scenario sticky assignor assigns a topic partition to two 
consumers in the group:
 # Create a topic {{test}} with a single partition
 # Start consumer {{c1}} in group {{sticky-group}} ({{c1}} becomes group leader 
and gets {{test-0}})
 # Start consumer {{c2}}  in group {{sticky-group}} ({{c1}} holds onto 
{{test-0}}, {{c2}} does not get any partition) 
 # Pause {{c1}} (e.g. using Java debugger) ({{c2}} becomes leader and takes 
over {{test-0}}, {{c1}} leaves the group)
 # Resume {{c1}}

At this point both {{c1}} and {{c2}} will have {{test-0}} assigned to them.

 

The reason is {{c1}} still has kept its previous assignment ({{test-0}}) from 
the last assignment it received from the leader (itself) and did not get the 
next round of assignments (when {{c2}} became leader) because it was paused. 
Both {{c1}} and {{c2}} enter the rebalance supplying {{test-0}} as their 
existing assignment. The sticky assignor code does not currently check for this 
duplication.

 


Note: This issue was originally reported on 
[StackOverflow|https://stackoverflow.com/questions/50761842/kafka-stickyassignor-breaking-delivery-to-single-consumer-in-the-group].



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6956) Use Java AdminClient in BrokerApiVersionsCommand

2018-05-27 Thread Vahid Hashemian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6956?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian resolved KAFKA-6956.

Resolution: Duplicate

> Use Java AdminClient in BrokerApiVersionsCommand
> 
>
> Key: KAFKA-6956
> URL: https://issues.apache.org/jira/browse/KAFKA-6956
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Assignee: Vahid Hashemian
>Priority: Major
>
> The Scala AdminClient was introduced as a stop gap until we had an officially 
> supported API. The Java AdminClient is the supported API so we should migrate 
> all usages to it and remove the Scala AdminClient. This JIRA is for using the 
> Java AdminClient in BrokerApiVersionsCommand. We would need to verify that 
> the necessary APIs are available via the Java AdminClient.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6951) Implement offset expiration semantics for unsubscribed topics

2018-05-25 Thread Vahid Hashemian (JIRA)
Vahid Hashemian created KAFKA-6951:
--

 Summary: Implement offset expiration semantics for unsubscribed 
topics
 Key: KAFKA-6951
 URL: https://issues.apache.org/jira/browse/KAFKA-6951
 Project: Kafka
  Issue Type: Improvement
  Components: core
Reporter: Vahid Hashemian
Assignee: Vahid Hashemian
 Fix For: 2.1.0


[This 
portion|https://cwiki.apache.org/confluence/display/KAFKA/KIP-211%3A+Revise+Expiration+Semantics+of+Consumer+Group+Offsets#KIP-211:ReviseExpirationSemanticsofConsumerGroupOffsets-UnsubscribingfromaTopic]
 of KIP-211 will be implemented separately from the main PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6110) Warning when running the broker on Windows

2017-11-07 Thread Vahid Hashemian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6110?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian resolved KAFKA-6110.

   Resolution: Duplicate
Fix Version/s: 1.1.0

> Warning when running the broker on Windows
> --
>
> Key: KAFKA-6110
> URL: https://issues.apache.org/jira/browse/KAFKA-6110
> Project: Kafka
>  Issue Type: Bug
> Environment: Windows 10 VM
>Reporter: Vahid Hashemian
>Priority: Minor
> Fix For: 1.1.0
>
>
> *This issue exists in 1.0.0-RC2.*
> The following warning appears in the broker log at startup:
> {code}
> [2017-10-23 15:29:49,370] WARN Error processing 
> kafka.log:type=LogManager,name=LogDirectoryOffline,logDirectory=C:\tmp\kafka-logs
>  (com.yammer.metrics.reporting.JmxReporter)
> javax.management.MalformedObjectNameException: Invalid character ':' in value 
> part of property
> at javax.management.ObjectName.construct(ObjectName.java:618)
> at javax.management.ObjectName.(ObjectName.java:1382)
> at 
> com.yammer.metrics.reporting.JmxReporter.onMetricAdded(JmxReporter.java:395)
> at 
> com.yammer.metrics.core.MetricsRegistry.notifyMetricAdded(MetricsRegistry.java:516)
> at 
> com.yammer.metrics.core.MetricsRegistry.getOrAdd(MetricsRegistry.java:491)
> at 
> com.yammer.metrics.core.MetricsRegistry.newGauge(MetricsRegistry.java:79)
> at 
> kafka.metrics.KafkaMetricsGroup$class.newGauge(KafkaMetricsGroup.scala:80)
> at kafka.log.LogManager.newGauge(LogManager.scala:50)
> at kafka.log.LogManager$$anonfun$6.apply(LogManager.scala:117)
> at kafka.log.LogManager$$anonfun$6.apply(LogManager.scala:116)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at kafka.log.LogManager.(LogManager.scala:116)
> at kafka.log.LogManager$.apply(LogManager.scala:799)
> at kafka.server.KafkaServer.startup(KafkaServer.scala:222)
> at 
> kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:38)
> at kafka.Kafka$.main(Kafka.scala:92)
> at kafka.Kafka.main(Kafka.scala)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6110) Warning when running the broker on Windows

2017-10-23 Thread Vahid Hashemian (JIRA)
Vahid Hashemian created KAFKA-6110:
--

 Summary: Warning when running the broker on Windows
 Key: KAFKA-6110
 URL: https://issues.apache.org/jira/browse/KAFKA-6110
 Project: Kafka
  Issue Type: Bug
Reporter: Vahid Hashemian
Priority: Minor


The following warning appears in the broker log at startup:
{code}
[2017-10-23 15:29:49,370] WARN Error processing 
kafka.log:type=LogManager,name=LogDirectoryOffline,logDirectory=C:\tmp\kafka-logs
 (com.yammer.metrics.reporting.JmxReporter)
javax.management.MalformedObjectNameException: Invalid character ':' in value 
part of property
at javax.management.ObjectName.construct(ObjectName.java:618)
at javax.management.ObjectName.(ObjectName.java:1382)
at 
com.yammer.metrics.reporting.JmxReporter.onMetricAdded(JmxReporter.java:395)
at 
com.yammer.metrics.core.MetricsRegistry.notifyMetricAdded(MetricsRegistry.java:516)
at 
com.yammer.metrics.core.MetricsRegistry.getOrAdd(MetricsRegistry.java:491)
at 
com.yammer.metrics.core.MetricsRegistry.newGauge(MetricsRegistry.java:79)
at 
kafka.metrics.KafkaMetricsGroup$class.newGauge(KafkaMetricsGroup.scala:80)
at kafka.log.LogManager.newGauge(LogManager.scala:50)
at kafka.log.LogManager$$anonfun$6.apply(LogManager.scala:117)
at kafka.log.LogManager$$anonfun$6.apply(LogManager.scala:116)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at kafka.log.LogManager.(LogManager.scala:116)
at kafka.log.LogManager$.apply(LogManager.scala:799)
at kafka.server.KafkaServer.startup(KafkaServer.scala:222)
at 
kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:38)
at kafka.Kafka$.main(Kafka.scala:92)
at kafka.Kafka.main(Kafka.scala)
{code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6100) Streams quick start crashes Java on Windows

2017-10-20 Thread Vahid Hashemian (JIRA)
Vahid Hashemian created KAFKA-6100:
--

 Summary: Streams quick start crashes Java on Windows 
 Key: KAFKA-6100
 URL: https://issues.apache.org/jira/browse/KAFKA-6100
 Project: Kafka
  Issue Type: Bug
  Components: streams
 Environment: Windows 10 VM
Reporter: Vahid Hashemian
 Attachments: Screen Shot 2017-10-20 at 11.53.14 AM.png

*This issue was detected in 1.0.0 RC2.*

The following step in streams quick start crashes Java on Windows 10:
{{bin/kafka-run-class.sh 
org.apache.kafka.streams.examples.wordcount.WordCountDemo}}

I tracked this down to [this 
change|https://github.com/apache/kafka/commit/196bcfca0c56420793f85514d1602bde564b0651#diff-6512f838e273b79676cac5f72456127fR67],
 and it seems to new version of RocksDB is to blame.  I tried the quick start 
with the previous version of RocksDB (5.7.3) and did not run into this issue.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6075) Kafka cannot recover after an unclean shutdown on Windows

2017-10-17 Thread Vahid Hashemian (JIRA)
Vahid Hashemian created KAFKA-6075:
--

 Summary: Kafka cannot recover after an unclean shutdown on Windows
 Key: KAFKA-6075
 URL: https://issues.apache.org/jira/browse/KAFKA-6075
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.11.0.1
Reporter: Vahid Hashemian


An unclean shutdown of broker on Windows cannot be recovered by Kafka. Steps to 
reproduce from a fresh build:
# Start zookeeper
# Start a broker
# Create a topic {{test}}
# Do an unclean shutdown of broker (find the process id by {{wmic process where 
"caption = 'java.exe' and commandline like '%server.properties%'" get 
processid}}), then kill the process by {{taskkill /pid  /f}}
# Start the broker again

This leads to the following errors:
{code}
[2017-10-17 17:13:24,819] ERROR Error while loading log dir C:\tmp\kafka-logs 
(kafka.log.LogManager)
java.nio.file.FileSystemException: 
C:\tmp\kafka-logs\test-0\.timeindex: The process cannot 
access the file because it is being used by another process.

at 
sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:86)
at 
sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97)
at 
sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:102)
at 
sun.nio.fs.WindowsFileSystemProvider.implDelete(WindowsFileSystemProvider.java:269)
at 
sun.nio.fs.AbstractFileSystemProvider.deleteIfExists(AbstractFileSystemProvider.java:108)
at java.nio.file.Files.deleteIfExists(Files.java:1165)
at kafka.log.Log$$anonfun$loadSegmentFiles$3.apply(Log.scala:333)
at kafka.log.Log$$anonfun$loadSegmentFiles$3.apply(Log.scala:295)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at kafka.log.Log.loadSegmentFiles(Log.scala:295)
at kafka.log.Log.loadSegments(Log.scala:404)
at kafka.log.Log.(Log.scala:201)
at kafka.log.Log$.apply(Log.scala:1729)
at 
kafka.log.LogManager.kafka$log$LogManager$$loadLog(LogManager.scala:221)
at 
kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$8$$anonfun$apply$16$$anonfun$apply$2.apply$mcV$sp(LogManager.scala:292)
at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:61)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[2017-10-17 17:13:24,819] ERROR Error while deleting the clean shutdown file in 
dir C:\tmp\kafka-logs (kafka.server.LogDirFailureChannel)
java.nio.file.FileSystemException: 
C:\tmp\kafka-logs\test-0\.timeindex: The process cannot 
access the file because it is being used by another process.

at 
sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:86)
at 
sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97)
at 
sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:102)
at 
sun.nio.fs.WindowsFileSystemProvider.implDelete(WindowsFileSystemProvider.java:269)
at 
sun.nio.fs.AbstractFileSystemProvider.deleteIfExists(AbstractFileSystemProvider.java:108)
at java.nio.file.Files.deleteIfExists(Files.java:1165)
at kafka.log.Log$$anonfun$loadSegmentFiles$3.apply(Log.scala:333)
at kafka.log.Log$$anonfun$loadSegmentFiles$3.apply(Log.scala:295)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at kafka.log.Log.loadSegmentFiles(Log.scala:295)
at kafka.log.Log.loadSegments(Log.scala:404)
at kafka.log.Log.(Log.scala:201)
at kafka.log.Log$.apply(Log.scala:1729)
at 
kafka.log.LogManager.kafka$log$LogManager$$loadLog(LogManager.scala:221)
at 
kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$8$$anonfun$apply$16$$anonfun$apply$2.apply$mcV$sp(LogManager.scala:292)
at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:61)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask

[jira] [Created] (KAFKA-6055) Running tools on Windows fail due to misconfigured JVM config

2017-10-11 Thread Vahid Hashemian (JIRA)
Vahid Hashemian created KAFKA-6055:
--

 Summary: Running tools on Windows fail due to misconfigured JVM 
config
 Key: KAFKA-6055
 URL: https://issues.apache.org/jira/browse/KAFKA-6055
 Project: Kafka
  Issue Type: Bug
  Components: tools
Reporter: Vahid Hashemian
Assignee: Vahid Hashemian
Priority: Blocker
 Fix For: 1.0.0


This affects the current trunk and 1.0.0 RC0.

When running any of the Windows commands under {{bin/windows}} the following 
error is returned:

{code}
Missing +/- setting for VM option 'ExplicitGCInvokesConcurrent'
Error: Could not create the Java Virtual Machine.
Error: A fatal exception has occurred. Program will exit.
{code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5639) Enhance DescribeGroups API to return additional group information

2017-07-25 Thread Vahid Hashemian (JIRA)
Vahid Hashemian created KAFKA-5639:
--

 Summary: Enhance DescribeGroups API to return additional group 
information
 Key: KAFKA-5639
 URL: https://issues.apache.org/jira/browse/KAFKA-5639
 Project: Kafka
  Issue Type: Improvement
  Components: core
Reporter: Vahid Hashemian
Assignee: Vahid Hashemian
Priority: Minor


The 
[{{DescribeGroups}}|https://kafka.apache.org/protocol#The_Messages_DescribeGroups]
 API v1 currently returns this information for each consumer group:
* {{error_code}}
* {{group_id}}
* {{state}}
* {{protocol_type}}
* {{protocol}}
* {{members}}

There are additional info in a {{GroupMetadata}} object on the server side, 
some of which could be useful if exposed via the {{DescribeGroups}} API. Here 
are some examples:
* {{generationId}}
* {{leaderId}}
* {{numOffsets}}
* {{hasOffsets}}

Enhancing the API with this additional info means improving the existing tools 
that make use of the API. For example, using this additional info, the consumer 
group command's {{\-\-describe}} output will provide more information about 
each consumer group to help with its monitoring / troubleshooting / 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5638) Inconsistency in consumer group related ACLs

2017-07-25 Thread Vahid Hashemian (JIRA)
Vahid Hashemian created KAFKA-5638:
--

 Summary: Inconsistency in consumer group related ACLs
 Key: KAFKA-5638
 URL: https://issues.apache.org/jira/browse/KAFKA-5638
 Project: Kafka
  Issue Type: Bug
  Components: security
Affects Versions: 0.11.0.0
Reporter: Vahid Hashemian
Assignee: Vahid Hashemian
Priority: Minor


Users can see all groups in the cluster (using consumer group’s {{--list}} 
option) provided that they have {{Describe}} access to the cluster. It would 
make more sense to modify that experience and limit what is listed in the 
output to only those groups they have {{Describe}} access to. The reason is, 
almost everything else is accessible by a user only if the access is 
specifically granted (through ACL {{--add}}); and this scenario should not be 
an exception. The potential change would be updating the minimum required 
permission of {{ListGroup}} from {{Describe (Cluster)}} to {{Describe (Group)}}.

We can also look at this issue from a different angle: A user with {{Read}} 
access to a group can describe the group, but the same user would not see 
anything when listing groups (assuming there is no {{Describe}} access to the 
cluster). It makes more sense for this user to be able to list all groups s/he 
can already describe.

It would be great to know if any user is relying on the existing behavior 
(listing all consumer groups using a {{Describe (Cluster)}} ACL.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5495) Replace the deprecated 'ConsumerOffsetChecker' in documentation

2017-06-21 Thread Vahid Hashemian (JIRA)
Vahid Hashemian created KAFKA-5495:
--

 Summary: Replace the deprecated 'ConsumerOffsetChecker' in 
documentation
 Key: KAFKA-5495
 URL: https://issues.apache.org/jira/browse/KAFKA-5495
 Project: Kafka
  Issue Type: Improvement
  Components: documentation
Reporter: Vahid Hashemian
Assignee: Vahid Hashemian
Priority: Minor
 Fix For: 0.11.1.0


Use {{kafka-consumer-groups.sh}} instead.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-3969) kafka.admin.ConsumerGroupCommand doesn't show consumer groups

2017-06-20 Thread Vahid Hashemian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian resolved KAFKA-3969.

Resolution: Not A Bug

[~dieter_be] Closing this JIRA as it didn't seem to be a bug. Please re-open if 
you disagree.

> kafka.admin.ConsumerGroupCommand doesn't show consumer groups
> -
>
> Key: KAFKA-3969
> URL: https://issues.apache.org/jira/browse/KAFKA-3969
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.0
>Reporter: Dieter Plaetinck
>
> http://kafka.apache.org/documentation.html , at 
> http://kafka.apache.org/documentation.html#basic_ops_consumer_lag says 
> " Note, however, after 0.9.0, the kafka.tools.ConsumerOffsetChecker tool is 
> deprecated and you should use the kafka.admin.ConsumerGroupCommand (or the 
> bin/kafka-consumer-groups.sh script) to manage consumer groups, including 
> consumers created with the new consumer API."
> I'm sure that i have a consumer running, because i wrote an app that is 
> processing data, and i can see the data as well as the metrics that confirm 
> it's receiving data. I'm using kafka 0.10
> Yet when I run the command as instructed, it doesn't list any consumer groups
> $ /opt/kafka_2.11-0.10.0.0/bin/kafka-run-class.sh 
> kafka.admin.ConsumerGroupCommand --zookeeper localhost:2181 --list
> $
> So either something is wrong with the tool, or with the docs.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5434) Console consumer hangs if not existing partition is specified

2017-06-15 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050915#comment-16050915
 ] 

Vahid Hashemian commented on KAFKA-5434:


[~ppatierno] I look at this from a different angle. In the scenario you 
described the consumer is just waiting to receive messages in the specified 
partition. The partition may not exist yet, but it's always possible to expand 
the topic and add more partitions. At that point if a producer sends messages 
to that (now existing) partition the consumer would see them. So, in my opinion 
this is not a bug, because the consumer is just waiting (vs. hanging).
I understand this is debatable and perhaps others can weigh in.

> Console consumer hangs if not existing partition is specified
> -
>
> Key: KAFKA-5434
> URL: https://issues.apache.org/jira/browse/KAFKA-5434
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Reporter: Paolo Patierno
>Assignee: Paolo Patierno
>
> Hi,
> if I specify the --partition option for the console consumer with a not 
> existing partition for a topic, the application hangs indefinitely.
> Debugging the code I see that it asks for metadata but when it receives topic 
> information and it doesn't find the requested partition inside such metadata, 
> the code retries new time.
> Could be it worst to check if the partition exists using the partitionFor 
> method before calling the assign in the seek of the BaseConsumer and throwing 
> an exception so printing an error on the console ?
> Thanks,
> Paolo



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-4333) Report consumer group coordinator id when '--list' option is used

2017-06-15 Thread Vahid Hashemian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4333?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian updated KAFKA-4333:
---
Fix Version/s: 0.11.1.0

> Report consumer group coordinator id when '--list' option is used
> -
>
> Key: KAFKA-4333
> URL: https://issues.apache.org/jira/browse/KAFKA-4333
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Vahid Hashemian
>Assignee: Vahid Hashemian
>Priority: Minor
> Fix For: 0.11.1.0
>
>
> One piece of information missing when extracting information about consumer 
> groups (Java API based) is the coordinator id (broker id of the coordinator). 
> It would be useful to enhance the {{--list}} option of the consumer group 
> command to report the corresponding coordinator id of each consumer group.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-4416) Add a '--group' option to the console consumer

2017-06-15 Thread Vahid Hashemian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4416?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian updated KAFKA-4416:
---
Fix Version/s: 0.11.1.0

> Add a '--group' option to the console consumer
> --
>
> Key: KAFKA-4416
> URL: https://issues.apache.org/jira/browse/KAFKA-4416
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Vahid Hashemian
>Assignee: Vahid Hashemian
>Priority: Minor
> Fix For: 0.11.1.0
>
>
> Add a {{--group}} option to the console consumer to simplify associating 
> consumers to consumer groups. The command line option would overwrite any 
> {{group.id}} property that may be specified in the consumer config.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5359) Exceptions from RequestFuture lack parts of the stack trace

2017-06-15 Thread Vahid Hashemian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian updated KAFKA-5359:
---
Fix Version/s: 0.11.1.0

> Exceptions from RequestFuture lack parts of the stack trace
> ---
>
> Key: KAFKA-5359
> URL: https://issues.apache.org/jira/browse/KAFKA-5359
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Magnus Reftel
>Assignee: Vahid Hashemian
>Priority: Minor
> Fix For: 0.11.1.0
>
>
> When an exception occurs within a task that reports its result using a 
> RequestFuture, that exception is stored in a field on the RequestFuture using 
> the {{raise}} method. In many places in the code where such futures are 
> completed, that exception is then thrown directly using {{throw 
> future.exception();}} (see e.g. 
> [Fetcher.getTopicMetadata|https://github.com/apache/kafka/blob/aebba89a2b9b5ea6a7cab2599555232ef3fe21ad/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L316]).
> This means that the exception that ends up in client code only has stack 
> traces related to the original exception, but nothing leading up to the 
> completion of the future. The client therefore gets no indication of what was 
> going on in the client code - only that it somehow ended up in the Kafka 
> libraries, and that a task failed at some point.
> One solution to this is to use the exceptions from the future as causes for 
> chained exceptions, so that the client gets a stack trace that shows what the 
> client was doing, in addition to getting the stack traces for the exception 
> in the task.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4585) Offset fetch and commit requests use the same permissions

2017-06-15 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050802#comment-16050802
 ] 

Vahid Hashemian commented on KAFKA-4585:


[~ewencp] I'd appreciate your feedback on the 
[KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-163%3A+Lower+the+Minimum+Required+ACL+Permission+of+OffsetFetch]
 opened to address this issue.

> Offset fetch and commit requests use the same permissions
> -
>
> Key: KAFKA-4585
> URL: https://issues.apache.org/jira/browse/KAFKA-4585
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.1.1
>Reporter: Ewen Cheslack-Postava
>Assignee: Vahid Hashemian
>  Labels: kip
>
> Currently the handling of permissions for consumer groups seems a bit odd 
> because most of the requests use the Read permission on the Group (join, 
> sync, heartbeat, leave, offset commit, and offset fetch). This means you 
> cannot lock down certain functionality for certain users. For this issue I'll 
> highlight a realistic issue since conflating the ability to perform most of 
> these operations may not be a serious issue.
> In particular, if you want tooling for monitoring offsets (i.e. you want to 
> be able to read from all groups) but don't want that tool to be able to write 
> offsets, you currently cannot achieve this. Part of the reason this seems odd 
> to me is that any operation which can mutate state seems like it should be a 
> Write operation (i.e. joining, syncing, leaving, and committing; maybe 
> heartbeat as well). However, [~hachikuji] has mentioned that the use of Read 
> may have been intentional. If that is the case, changing at least offset 
> fetch to be a Describe operation instead would allow isolating the mutating 
> vs non-mutating request types.
> Note that this would require a KIP and would potentially have some 
> compatibility implications. Note however, that if we went with the Describe 
> option, Describe is allowed by default when Read, Write, or Delete are 
> allowed, so this may not have to have any compatibility issues (if the user 
> previously allowed Read, they'd still have all the same capabilities as 
> before).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5430) new consumers getting data for revoked partitions

2017-06-14 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16049582#comment-16049582
 ] 

Vahid Hashemian commented on KAFKA-5430:


I could simulate a network failure and get the warning messages below on the 
disconnected consumer after the network recovered:
bq. \[2017-06-14 12:34:39,446\] WARN Auto-commit of offsets 
{test-2=OffsetAndMetadata{offset=6, metadata=''}} failed for group group1: 
Commit cannot be completed since the group has already rebalanced and assigned 
the partitions to another member. This means that the time between subsequent 
calls to poll() was longer than the configured max.poll.interval.ms, which 
typically implies that the poll loop is spending too much time message 
processing. You can address this either by increasing the session timeout or by 
reducing the maximum size of batches returned in poll() with max.poll.records. 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)

However, this goes on for only a fraction of a second and then the group 
rebalances and distributes the partitions among all clients (different from 
what you experienced above). Not sure if there is anything else other than the 
network failure that contributed to the behavior you described.

> new consumers getting data for revoked partitions
> -
>
> Key: KAFKA-5430
> URL: https://issues.apache.org/jira/browse/KAFKA-5430
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 0.10.2.0
>Reporter: Lior Chaga
> Attachments: kafka_trace.log.gz
>
>
> Due to bad configuration applied to network components, we experienced issues 
> with communication between kafka brokers (causing under replication) as well 
> as producers/consumers not being able to work against kafka.
> The symptoms on the consumer were many errors of the following form:
> {code}
> 2017-06-04 04:27:35,200 ERROR [Kafka Topics Cosumer 
> requestlegacy.consumer-11_session_parser_02] TaboolaKafkaConsumer [] - Failed 
> committing to kafka topicPartitions 
> [requestlegacy-2,requestlegacy-0,requestlegacy-1] 
> org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset 
> commit failed with a retriable exception. You should retry committing offsets.
> Caused by: org.apache.kafka.common.errors.DisconnectException
> {code}
> So far so good. However, upon network recovery, there were several rebalance 
> operations, which eventually resulted in only one consumer (#14) being 
> assigned with all topic partitions (at this case we're talking about a 
> consumer groups for which all consumers are running in same process):
> {code}
> 2017-06-04 04:27:02,168 INFO  [Kafka Topics Cosumer 
> requestlegacy.consumer-14_session_parser_02] ConsumerCoordinator [] - 
> Revoking previously assigned partitions [requestlegacy-8, requestlegacy-9] 
> for group session_parser_02
> 2017-06-04 04:27:04,208 INFO  [Kafka Topics Cosumer 
> requestlegacy.consumer-15_session_parser_02] ConsumerCoordinator [] - 
> Revoking previously assigned partitions [requestlegacy-10, requestlegacy-11] 
> for group session_parser_02
> 2017-06-04 04:27:18,167 INFO  [Kafka Topics Cosumer 
> requestlegacy.consumer-12_session_parser_02] ConsumerCoordinator [] - 
> Revoking previously assigned partitions [requestlegacy-3, requestlegacy-4, 
> requestlegacy-5] for group session_parser_02
> 2017-06-04 04:27:20,232 INFO  [Kafka Topics Cosumer 
> requestlegacy.consumer-11_session_parser_02] ConsumerCoordinator [] - 
> Revoking previously assigned partitions [requestlegacy-2, requestlegacy-0, 
> requestlegacy-1] for group session_parser_02
> 2017-06-04 04:27:20,236 INFO  [Kafka Topics Cosumer 
> requestlegacy.consumer-15_session_parser_02] ConsumerCoordinator [] - Setting 
> newly assigned partitions [requestlegacy-9, requestlegacy-10, 
> requestlegacy-11] for group session_parser_02
> 2017-06-04 04:27:20,237 INFO  [Kafka Topics Cosumer 
> requestlegacy.consumer-12_session_parser_02] ConsumerCoordinator [] - Setting 
> newly assigned partitions [requestlegacy-3, requestlegacy-4, requestlegacy-5] 
> for group session_parser_02
> 2017-06-04 04:27:20,237 INFO  [Kafka Topics Cosumer 
> requestlegacy.consumer-14_session_parser_02] ConsumerCoordinator [] - Setting 
> newly assigned partitions [requestlegacy-6, requestlegacy-7, requestlegacy-8] 
> for group session_parser_02
> 2017-06-04 04:27:20,332 INFO  [Kafka Topics Cosumer 
> requestlegacy.consumer-11_session_parser_02] ConsumerCoordinator [] - Setting 
> newly assigned partitions [requestlegacy-2, requestlegacy-0, requestlegacy-1] 
> for group session_parser_02
> 2017-06-04 04:28:52,368 INFO  [Kafka Topics Cosumer 
> requestlegacy.consumer-13_session_parser_02] ConsumerCoordinator [] - 
> Revoking previously assigned partitions [requestlega

[jira] [Comment Edited] (KAFKA-5348) kafka-consumer-groups.sh refuses to remove groups without ids

2017-06-13 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16048486#comment-16048486
 ] 

Vahid Hashemian edited comment on KAFKA-5348 at 6/13/17 10:36 PM:
--

[~bobrik] In the scenario you described I assume some consumer id exists under 
the {{/ids}} path. By design, the consumer group (for old consumers) can be 
deleted only if there is no active consumer in the group. There are active 
consumers group in the group iff the path {{/ids}} exists for this group and 
there are consumer ids inside this path. If this is not what you're 
experiencing please advise and perhaps provide steps to reproduce. Thanks.



was (Author: vahid):
[~bobrik] In the scenario you described I assume some consumer id exists under 
the {{/ids}} path. By design, the consumer group (for old consumers) can be 
deleted only if there is no active consumer in the group. There is no active 
consumer group in the group iff the path {{/ids}} exists for this group and 
there are consumer ids inside this path. If this is not what you're 
experiencing please advise and perhaps provide steps to reproduce. Thanks.


> kafka-consumer-groups.sh refuses to remove groups without ids
> -
>
> Key: KAFKA-5348
> URL: https://issues.apache.org/jira/browse/KAFKA-5348
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 0.10.2.0
>Reporter: Ivan Babrou
>Assignee: Vahid Hashemian
>
> In zookeeper I have:
> {noformat}
> [zk: foo(CONNECTED) 37] ls /kafka/logs/consumers/console-consumer-4107
> [offsets]
> {noformat}
> This consumer group also shows up when I list consumer groups:
> {noformat}
> $ /usr/local/kafka/bin/kafka-consumer-groups.sh --zookeeper 
> foo:2181/kafka/logs --list | fgrep console-consumer-4107
> Note: This will only show information about consumers that use ZooKeeper (not 
> those using the Java consumer API).
> console-consumer-4107
> {noformat}
> But I cannot remove this group:
> {noformat}
> $ /usr/local/kafka/bin/kafka-consumer-groups.sh --zookeeper 
> 36zk1.in.pdx.cfdata.org:2181/kafka/logs --delete --group console-consumer-4107
> Note: This will only show information about consumers that use ZooKeeper (not 
> those using the Java consumer API).
> Error: Delete for group 'console-consumer-4107' failed because group does not 
> exist.
> {noformat}
> I ran tcpdump and it turns out that /ids path is checked:
> {noformat}
> $.e.P.fP...&..<...//kafka/logs/consumers/console-consumer-4107/ids.
> {noformat}
> I think kafka should not check for /ids, it should check for / instead here.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5348) kafka-consumer-groups.sh refuses to remove groups without ids

2017-06-13 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16048486#comment-16048486
 ] 

Vahid Hashemian commented on KAFKA-5348:


[~bobrik] In the scenario you described I assume some consumer id exists under 
the {{/ids}} path. By design, the consumer group (for old consumers) can be 
deleted only if there is no active consumer in the group. There is no active 
consumer group in the group iff the path {{/ids}} exists for this group and 
there are consumer ids inside this path. If this is not what you're 
experiencing please advise and perhaps provide steps to reproduce. Thanks.


> kafka-consumer-groups.sh refuses to remove groups without ids
> -
>
> Key: KAFKA-5348
> URL: https://issues.apache.org/jira/browse/KAFKA-5348
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 0.10.2.0
>Reporter: Ivan Babrou
>Assignee: Vahid Hashemian
>
> In zookeeper I have:
> {noformat}
> [zk: foo(CONNECTED) 37] ls /kafka/logs/consumers/console-consumer-4107
> [offsets]
> {noformat}
> This consumer group also shows up when I list consumer groups:
> {noformat}
> $ /usr/local/kafka/bin/kafka-consumer-groups.sh --zookeeper 
> foo:2181/kafka/logs --list | fgrep console-consumer-4107
> Note: This will only show information about consumers that use ZooKeeper (not 
> those using the Java consumer API).
> console-consumer-4107
> {noformat}
> But I cannot remove this group:
> {noformat}
> $ /usr/local/kafka/bin/kafka-consumer-groups.sh --zookeeper 
> 36zk1.in.pdx.cfdata.org:2181/kafka/logs --delete --group console-consumer-4107
> Note: This will only show information about consumers that use ZooKeeper (not 
> those using the Java consumer API).
> Error: Delete for group 'console-consumer-4107' failed because group does not 
> exist.
> {noformat}
> I ran tcpdump and it turns out that /ids path is checked:
> {noformat}
> $.e.P.fP...&..<...//kafka/logs/consumers/console-consumer-4107/ids.
> {noformat}
> I think kafka should not check for /ids, it should check for / instead here.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-4585) Offset fetch and commit requests use the same permissions

2017-06-13 Thread Vahid Hashemian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian updated KAFKA-4585:
---
Labels: kip  (was: needs-kip)

> Offset fetch and commit requests use the same permissions
> -
>
> Key: KAFKA-4585
> URL: https://issues.apache.org/jira/browse/KAFKA-4585
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.1.1
>Reporter: Ewen Cheslack-Postava
>Assignee: Vahid Hashemian
>  Labels: kip
>
> Currently the handling of permissions for consumer groups seems a bit odd 
> because most of the requests use the Read permission on the Group (join, 
> sync, heartbeat, leave, offset commit, and offset fetch). This means you 
> cannot lock down certain functionality for certain users. For this issue I'll 
> highlight a realistic issue since conflating the ability to perform most of 
> these operations may not be a serious issue.
> In particular, if you want tooling for monitoring offsets (i.e. you want to 
> be able to read from all groups) but don't want that tool to be able to write 
> offsets, you currently cannot achieve this. Part of the reason this seems odd 
> to me is that any operation which can mutate state seems like it should be a 
> Write operation (i.e. joining, syncing, leaving, and committing; maybe 
> heartbeat as well). However, [~hachikuji] has mentioned that the use of Read 
> may have been intentional. If that is the case, changing at least offset 
> fetch to be a Describe operation instead would allow isolating the mutating 
> vs non-mutating request types.
> Note that this would require a KIP and would potentially have some 
> compatibility implications. Note however, that if we went with the Describe 
> option, Describe is allowed by default when Read, Write, or Delete are 
> allowed, so this may not have to have any compatibility issues (if the user 
> previously allowed Read, they'd still have all the same capabilities as 
> before).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Work started] (KAFKA-5370) Replace uses of old consumer with the new consumer

2017-06-13 Thread Vahid Hashemian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-5370 started by Vahid Hashemian.
--
> Replace uses of old consumer with the new consumer 
> ---
>
> Key: KAFKA-5370
> URL: https://issues.apache.org/jira/browse/KAFKA-5370
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Vahid Hashemian
>Assignee: Vahid Hashemian
>Priority: Minor
>
> Where possible, use the new consumer In tools and tests instead of the old 
> consumer, and remove the deprecation warning.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5434) Console consumer hangs if not existing partition is specified

2017-06-13 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16048048#comment-16048048
 ] 

Vahid Hashemian commented on KAFKA-5434:


Yeah, I couldn't assign it to you either. You can take over the JIRA whenever 
you get access.

> Console consumer hangs if not existing partition is specified
> -
>
> Key: KAFKA-5434
> URL: https://issues.apache.org/jira/browse/KAFKA-5434
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Reporter: Paolo Patierno
>Assignee: Vahid Hashemian
>
> Hi,
> if I specify the --partition option for the console consumer with a not 
> existing partition for a topic, the application hangs indefinitely.
> Debugging the code I see that it asks for metadata but when it receives topic 
> information and it doesn't find the requested partition inside such metadata, 
> the code retries new time.
> Could be it worst to check if the partition exists using the partitionFor 
> method before calling the assign in the seek of the BaseConsumer and throwing 
> an exception so printing an error on the console ?
> Thanks,
> Paolo



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5434) Console consumer hangs if not existing partition is specified

2017-06-13 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16048030#comment-16048030
 ] 

Vahid Hashemian commented on KAFKA-5434:


[~ppatierno] Sure. Feel free to assign the JIRA to yourself.

> Console consumer hangs if not existing partition is specified
> -
>
> Key: KAFKA-5434
> URL: https://issues.apache.org/jira/browse/KAFKA-5434
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Reporter: Paolo Patierno
>Assignee: Vahid Hashemian
>
> Hi,
> if I specify the --partition option for the console consumer with a not 
> existing partition for a topic, the application hangs indefinitely.
> Debugging the code I see that it asks for metadata but when it receives topic 
> information and it doesn't find the requested partition inside such metadata, 
> the code retries new time.
> Could be it worst to check if the partition exists using the partitionFor 
> method before calling the assign in the seek of the BaseConsumer and throwing 
> an exception so printing an error on the console ?
> Thanks,
> Paolo



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-3129) Console producer issue when request-required-acks=0

2017-06-12 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3129?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16047074#comment-16047074
 ] 

Vahid Hashemian commented on KAFKA-3129:


[~pmishra01] I tried this on Ubuntu, Windows 7 and Windows 10 but were not able 
to reproduce it after a few tries.
Please note that the default {{acks}} value has changed from 0 to 1 based on 
the [this PR|https://github.com/apache/kafka/pull/1795]. So if you like to try 
producing with {{acks=0}} you'd have to overwrite the default.

> Console producer issue when request-required-acks=0
> ---
>
> Key: KAFKA-3129
> URL: https://issues.apache.org/jira/browse/KAFKA-3129
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.9.0.0, 0.10.0.0
>Reporter: Vahid Hashemian
>Assignee: Dustin Cote
> Attachments: kafka-3129.mov, server.log.abnormal.txt, 
> server.log.normal.txt
>
>
> I have been running a simple test case in which I have a text file 
> {{messages.txt}} with 1,000,000 lines (lines contain numbers from 1 to 
> 1,000,000 in ascending order). I run the console consumer like this:
> {{$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test}}
> Topic {{test}} is on 1 partition with a replication factor of 1.
> Then I run the console producer like this:
> {{$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test < 
> messages.txt}}
> Then the console starts receiving the messages. And about half the times it 
> goes all the way to 1,000,000. But, in other cases, it stops short, usually 
> at 999,735.
> I tried running another console consumer on another machine and both 
> consumers behave the same way. I can't see anything related to this in the 
> logs.
> I also ran the same experiment with a similar file of 10,000 lines, and am 
> getting a similar behavior. When the consumer does not receive all the 10,000 
> messages it usually stops at 9,864.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (KAFKA-5434) Console consumer hangs if not existing partition is specified

2017-06-12 Thread Vahid Hashemian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian reassigned KAFKA-5434:
--

Assignee: Vahid Hashemian

> Console consumer hangs if not existing partition is specified
> -
>
> Key: KAFKA-5434
> URL: https://issues.apache.org/jira/browse/KAFKA-5434
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Reporter: Paolo Patierno
>Assignee: Vahid Hashemian
>
> Hi,
> if I specify the --partition option for the console consumer with a not 
> existing partition for a topic, the application hangs indefinitely.
> Debugging the code I see that it asks for metadata but when it receives topic 
> information and it doesn't find the requested partition inside such metadata, 
> the code retries new time.
> Could be it worst to check if the partition exists using the partitionFor 
> method before calling the assign in the seek of the BaseConsumer and throwing 
> an exception so printing an error on the console ?
> Thanks,
> Paolo



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5370) Replace uses of old consumer with the new consumer

2017-06-02 Thread Vahid Hashemian (JIRA)
Vahid Hashemian created KAFKA-5370:
--

 Summary: Replace uses of old consumer with the new consumer 
 Key: KAFKA-5370
 URL: https://issues.apache.org/jira/browse/KAFKA-5370
 Project: Kafka
  Issue Type: Improvement
Reporter: Vahid Hashemian
Assignee: Vahid Hashemian
Priority: Minor


Where possible, use the new consumer In tools and tests instead of the old 
consumer, and remove the deprecation warning.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-5359) Exceptions from RequestFuture lack parts of the stack trace

2017-06-02 Thread Vahid Hashemian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian updated KAFKA-5359:
---
Status: Patch Available  (was: Open)

> Exceptions from RequestFuture lack parts of the stack trace
> ---
>
> Key: KAFKA-5359
> URL: https://issues.apache.org/jira/browse/KAFKA-5359
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Magnus Reftel
>Assignee: Vahid Hashemian
>Priority: Minor
>
> When an exception occurs within a task that reports its result using a 
> RequestFuture, that exception is stored in a field on the RequestFuture using 
> the {{raise}} method. In many places in the code where such futures are 
> completed, that exception is then thrown directly using {{throw 
> future.exception();}} (see e.g. 
> [Fetcher.getTopicMetadata|https://github.com/apache/kafka/blob/aebba89a2b9b5ea6a7cab2599555232ef3fe21ad/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L316]).
> This means that the exception that ends up in client code only has stack 
> traces related to the original exception, but nothing leading up to the 
> completion of the future. The client therefore gets no indication of what was 
> going on in the client code - only that it somehow ended up in the Kafka 
> libraries, and that a task failed at some point.
> One solution to this is to use the exceptions from the future as causes for 
> chained exceptions, so that the client gets a stack trace that shows what the 
> client was doing, in addition to getting the stack traces for the exception 
> in the task.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (KAFKA-5359) Exceptions from RequestFuture lack parts of the stack trace

2017-06-01 Thread Vahid Hashemian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian reassigned KAFKA-5359:
--

Assignee: Vahid Hashemian

> Exceptions from RequestFuture lack parts of the stack trace
> ---
>
> Key: KAFKA-5359
> URL: https://issues.apache.org/jira/browse/KAFKA-5359
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Magnus Reftel
>Assignee: Vahid Hashemian
>Priority: Minor
>
> When an exception occurs within a task that reports its result using a 
> RequestFuture, that exception is stored in a field on the RequestFuture using 
> the {{raise}} method. In many places in the code where such futures are 
> completed, that exception is then thrown directly using {{throw 
> future.exception();}} (see e.g. 
> [Fetcher.getTopicMetadata|https://github.com/apache/kafka/blob/aebba89a2b9b5ea6a7cab2599555232ef3fe21ad/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L316]).
> This means that the exception that ends up in client code only has stack 
> traces related to the original exception, but nothing leading up to the 
> completion of the future. The client therefore gets no indication of what was 
> going on in the client code - only that it somehow ended up in the Kafka 
> libraries, and that a task failed at some point.
> One solution to this is to use the exceptions from the future as causes for 
> chained exceptions, so that the client gets a stack trace that shows what the 
> client was doing, in addition to getting the stack traces for the exception 
> in the task.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (KAFKA-5348) kafka-consumer-groups.sh refuses to remove groups without ids

2017-05-30 Thread Vahid Hashemian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5348?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian reassigned KAFKA-5348:
--

Assignee: Vahid Hashemian

> kafka-consumer-groups.sh refuses to remove groups without ids
> -
>
> Key: KAFKA-5348
> URL: https://issues.apache.org/jira/browse/KAFKA-5348
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 0.10.2.0
>Reporter: Ivan Babrou
>Assignee: Vahid Hashemian
>
> In zookeeper I have:
> {noformat}
> [zk: foo(CONNECTED) 37] ls /kafka/logs/consumers/console-consumer-4107
> [offsets]
> {noformat}
> This consumer group also shows up when I list consumer groups:
> {noformat}
> $ /usr/local/kafka/bin/kafka-consumer-groups.sh --zookeeper 
> foo:2181/kafka/logs --list | fgrep console-consumer-4107
> Note: This will only show information about consumers that use ZooKeeper (not 
> those using the Java consumer API).
> console-consumer-4107
> {noformat}
> But I cannot remove this group:
> {noformat}
> $ /usr/local/kafka/bin/kafka-consumer-groups.sh --zookeeper 
> 36zk1.in.pdx.cfdata.org:2181/kafka/logs --delete --group console-consumer-4107
> Note: This will only show information about consumers that use ZooKeeper (not 
> those using the Java consumer API).
> Error: Delete for group 'console-consumer-4107' failed because group does not 
> exist.
> {noformat}
> I ran tcpdump and it turns out that /ids path is checked:
> {noformat}
> $.e.P.fP...&..<...//kafka/logs/consumers/console-consumer-4107/ids.
> {noformat}
> I think kafka should not check for /ids, it should check for / instead here.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-5336) The required ACL permission for ListGroup is invalid

2017-05-26 Thread Vahid Hashemian (JIRA)
Vahid Hashemian created KAFKA-5336:
--

 Summary: The required ACL permission for ListGroup is invalid
 Key: KAFKA-5336
 URL: https://issues.apache.org/jira/browse/KAFKA-5336
 Project: Kafka
  Issue Type: Bug
  Components: security
Affects Versions: 0.10.2.1
Reporter: Vahid Hashemian
Assignee: Vahid Hashemian
Priority: Minor


The {{ListGroup}} API authorizes requests with _Describe_ access to the cluster 
resource:

{code}
  def handleListGroupsRequest(request: RequestChannel.Request) {
if (!authorize(request.session, Describe, Resource.ClusterResource)) {
  sendResponseMaybeThrottle(request, requestThrottleMs =>
ListGroupsResponse.fromError(requestThrottleMs, 
Errors.CLUSTER_AUTHORIZATION_FAILED))
} else {
  ...
{code}

 However, the list of operations (or permissions) allowed for the cluster 
resource does not include _Describe_:
{code}
  val ResourceTypeToValidOperations = Map[ResourceType, Set[Operation]] (
...
Cluster -> Set(Create, ClusterAction, DescribeConfigs, AlterConfigs, 
IdempotentWrite, All),
...
  )
{code}

Only a user with _All_ cluster permission can successfully call the 
{{ListGroup}} API. No other permission (not even any combination that does not 
include _All_) would let user use this API.

The bug could be as simple as a typo in the API handler. Though it's not 
obvious what actual permission was meant to be used there (perhaps 
_DescribeConfigs_?)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-3151) kafka-consumer-groups.sh fail with sasl enabled

2017-05-26 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16026658#comment-16026658
 ] 

Vahid Hashemian commented on KAFKA-3151:


For this to work I had to also add the following line to the properties file 
that [~baluchicken] mentioned:
{code}
sasl.mechanism=PLAIN
{code}

> kafka-consumer-groups.sh fail with sasl enabled 
> 
>
> Key: KAFKA-3151
> URL: https://issues.apache.org/jira/browse/KAFKA-3151
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.1
> Environment: redhat as6.5
>Reporter: linbao111
>
> ./bin/kafka-consumer-groups.sh --new-consumer  --bootstrap-server 
> slave1.otocyon.com:9092 --list
> Error while executing consumer group command Request METADATA failed on 
> brokers List(Node(-1, slave1.otocyon.com, 9092))
> java.lang.RuntimeException: Request METADATA failed on brokers List(Node(-1, 
> slave1.otocyon.com, 9092))
> at kafka.admin.AdminClient.sendAnyNode(AdminClient.scala:73)
> at kafka.admin.AdminClient.findAllBrokers(AdminClient.scala:93)
> at kafka.admin.AdminClient.listAllGroups(AdminClient.scala:101)
> at 
> kafka.admin.AdminClient.listAllGroupsFlattened(AdminClient.scala:122)
> at 
> kafka.admin.AdminClient.listAllConsumerGroupsFlattened(AdminClient.scala:126)
> at 
> kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.list(ConsumerGroupCommand.scala:310)
> at 
> kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:61)
> at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)
> same error for:
> bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand  --bootstrap-server 
> slave16:9092,app:9092 --describe --group test-consumer-group  --new-consumer
> Error while executing consumer group command Request GROUP_COORDINATOR failed 
> on brokers List(Node(-1, slave16, 9092), Node(-2, app, 9092))
> java.lang.RuntimeException: Request GROUP_COORDINATOR failed on brokers 
> List(Node(-1, slave16, 9092), Node(-2, app, 9092))
> at kafka.admin.AdminClient.sendAnyNode(AdminClient.scala:73)
> at kafka.admin.AdminClient.findCoordinator(AdminClient.scala:78)
> at kafka.admin.AdminClient.describeGroup(AdminClient.scala:130)
> at 
> kafka.admin.AdminClient.describeConsumerGroup(AdminClient.scala:152)
> at 
> kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.describeGroup(ConsumerGroupCommand.scala:314)
> at 
> kafka.admin.ConsumerGroupCommand$ConsumerGroupService$class.describe(ConsumerGroupCommand.scala:84)
> at 
> kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.describe(ConsumerGroupCommand.scala:302)
> at 
> kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:63)
> at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-5282) Transactions integration test: Use factory methods to keep track of open producers and consumers and close them all on tearDown

2017-05-25 Thread Vahid Hashemian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian updated KAFKA-5282:
---
Status: Patch Available  (was: In Progress)

> Transactions integration test: Use factory methods to keep track of open 
> producers and consumers and close them all on tearDown
> ---
>
> Key: KAFKA-5282
> URL: https://issues.apache.org/jira/browse/KAFKA-5282
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, core, producer 
>Reporter: Apurva Mehta
>Assignee: Vahid Hashemian
>  Labels: exactly-once
> Fix For: 0.11.0.0
>
>
> See: https://github.com/apache/kafka/pull/3093/files#r117354588
> The current transactions integration test creates individual producers and 
> consumer per test, and closes them independently. 
> It would be more robust to create them through a central factory method that 
> keeps track of each instance, and then close those instances on `tearDown`.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4585) Offset fetch and commit requests use the same permissions

2017-05-25 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16025569#comment-16025569
 ] 

Vahid Hashemian commented on KAFKA-4585:


[~ewencp] I'll work on a KIP for changing the required permission for offset 
fetch. I am wondering why the required permission for heartbeat is set to Read 
(vs. Describe). Because it seems that heartbeat is non-mutating by itself and 
during the processing of a heartbeat request no change occurs to the group or 
the consumer. Also, cc [~hachikuji]. Thanks.

> Offset fetch and commit requests use the same permissions
> -
>
> Key: KAFKA-4585
> URL: https://issues.apache.org/jira/browse/KAFKA-4585
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.1.1
>Reporter: Ewen Cheslack-Postava
>Assignee: Vahid Hashemian
>  Labels: needs-kip
>
> Currently the handling of permissions for consumer groups seems a bit odd 
> because most of the requests use the Read permission on the Group (join, 
> sync, heartbeat, leave, offset commit, and offset fetch). This means you 
> cannot lock down certain functionality for certain users. For this issue I'll 
> highlight a realistic issue since conflating the ability to perform most of 
> these operations may not be a serious issue.
> In particular, if you want tooling for monitoring offsets (i.e. you want to 
> be able to read from all groups) but don't want that tool to be able to write 
> offsets, you currently cannot achieve this. Part of the reason this seems odd 
> to me is that any operation which can mutate state seems like it should be a 
> Write operation (i.e. joining, syncing, leaving, and committing; maybe 
> heartbeat as well). However, [~hachikuji] has mentioned that the use of Read 
> may have been intentional. If that is the case, changing at least offset 
> fetch to be a Describe operation instead would allow isolating the mutating 
> vs non-mutating request types.
> Note that this would require a KIP and would potentially have some 
> compatibility implications. Note however, that if we went with the Describe 
> option, Describe is allowed by default when Read, Write, or Delete are 
> allowed, so this may not have to have any compatibility issues (if the user 
> previously allowed Read, they'd still have all the same capabilities as 
> before).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Work started] (KAFKA-4585) Offset fetch and commit requests use the same permissions

2017-05-25 Thread Vahid Hashemian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-4585 started by Vahid Hashemian.
--
> Offset fetch and commit requests use the same permissions
> -
>
> Key: KAFKA-4585
> URL: https://issues.apache.org/jira/browse/KAFKA-4585
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.1.1
>Reporter: Ewen Cheslack-Postava
>Assignee: Vahid Hashemian
>  Labels: needs-kip
>
> Currently the handling of permissions for consumer groups seems a bit odd 
> because most of the requests use the Read permission on the Group (join, 
> sync, heartbeat, leave, offset commit, and offset fetch). This means you 
> cannot lock down certain functionality for certain users. For this issue I'll 
> highlight a realistic issue since conflating the ability to perform most of 
> these operations may not be a serious issue.
> In particular, if you want tooling for monitoring offsets (i.e. you want to 
> be able to read from all groups) but don't want that tool to be able to write 
> offsets, you currently cannot achieve this. Part of the reason this seems odd 
> to me is that any operation which can mutate state seems like it should be a 
> Write operation (i.e. joining, syncing, leaving, and committing; maybe 
> heartbeat as well). However, [~hachikuji] has mentioned that the use of Read 
> may have been intentional. If that is the case, changing at least offset 
> fetch to be a Describe operation instead would allow isolating the mutating 
> vs non-mutating request types.
> Note that this would require a KIP and would potentially have some 
> compatibility implications. Note however, that if we went with the Describe 
> option, Describe is allowed by default when Read, Write, or Delete are 
> allowed, so this may not have to have any compatibility issues (if the user 
> previously allowed Read, they'd still have all the same capabilities as 
> before).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Work started] (KAFKA-5282) Transactions integration test: Use factory methods to keep track of open producers and consumers and close them all on tearDown

2017-05-23 Thread Vahid Hashemian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-5282 started by Vahid Hashemian.
--
> Transactions integration test: Use factory methods to keep track of open 
> producers and consumers and close them all on tearDown
> ---
>
> Key: KAFKA-5282
> URL: https://issues.apache.org/jira/browse/KAFKA-5282
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, core, producer 
>Reporter: Apurva Mehta
>Assignee: Vahid Hashemian
>  Labels: exactly-once
> Fix For: 0.11.0.0
>
>
> See: https://github.com/apache/kafka/pull/3093/files#r117354588
> The current transactions integration test creates individual producers and 
> consumer per test, and closes them independently. 
> It would be more robust to create them through a central factory method that 
> keeps track of each instance, and then close those instances on `tearDown`.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (KAFKA-5282) Transactions integration test: Use factory methods to keep track of open producers and consumers and close them all on tearDown

2017-05-22 Thread Vahid Hashemian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian reassigned KAFKA-5282:
--

Assignee: Vahid Hashemian

> Transactions integration test: Use factory methods to keep track of open 
> producers and consumers and close them all on tearDown
> ---
>
> Key: KAFKA-5282
> URL: https://issues.apache.org/jira/browse/KAFKA-5282
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, core, producer 
>Reporter: Apurva Mehta
>Assignee: Vahid Hashemian
>  Labels: exactly-once
> Fix For: 0.11.0.0
>
>
> See: https://github.com/apache/kafka/pull/3093/files#r117354588
> The current transactions integration test creates individual producers and 
> consumer per test, and closes them independently. 
> It would be more robust to create them through a central factory method that 
> keeps track of each instance, and then close those instances on `tearDown`.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-5278) kafka-console-consumer: `--value-deserializer` is not working but `--property value.deserializer` does

2017-05-19 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16017556#comment-16017556
 ] 

Vahid Hashemian commented on KAFKA-5278:


Looks like the issue reported here is a subset of 
[KAFKA-3982|https://issues.apache.org/jira/browse/KAFKA-3982].
Plus, the correct syntax for providing additional properties directly in the 
command line is {{--consumer-propert}}, not {{--property}}.

> kafka-console-consumer: `--value-deserializer` is not working but `--property 
> value.deserializer` does
> --
>
> Key: KAFKA-5278
> URL: https://issues.apache.org/jira/browse/KAFKA-5278
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.2.1
>Reporter: Yeva Byzek
>Assignee: huxi
>Priority: Minor
>
> kafka-console-consumer: {{--value-deserializer}} is not working but 
> {{--property value.deserializer}} is working
> 1. Does not work
> {noformat}
> $ kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning 
> --topic TEST1  --value-deserializer 
> org.apache.kafka.common.serialization.LongDeserializer
> [2017-05-18 13:09:41,745] ERROR Error processing message, terminating 
> consumer process:  (kafka.tools.ConsoleConsumer$)
> java.lang.ClassCastException: java.lang.Long cannot be cast to [B
>   at kafka.consumer.NewShinyConsumer.receive(BaseConsumer.scala:100)
>   at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:120)
>   at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:75)
>   at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:50)
>   at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
> Processed a total of 0 messages
> {noformat}
> 2. Does work
> {noformat}
> $ kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning 
> --topic TEST1  --property 
> value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
> 1000
> 2500
> 2000
> 5500
> 8000
> {noformat}
> Without either, the output is
> {noformat}
> $ kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning 
> --topic TEST1
> ?
>   ?
> ?
> |
> @
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (KAFKA-5278) kafka-console-consumer: `--value-deserializer` is not working but `--property value.deserializer` does

2017-05-19 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16017556#comment-16017556
 ] 

Vahid Hashemian edited comment on KAFKA-5278 at 5/19/17 3:38 PM:
-

Looks like the issue reported here is a subset of 
[KAFKA-3982|https://issues.apache.org/jira/browse/KAFKA-3982].
Plus, the correct syntax for providing additional properties directly in the 
command line is {{\-\-consumer-propert}}, not {{\-\-property}}.


was (Author: vahid):
Looks like the issue reported here is a subset of 
[KAFKA-3982|https://issues.apache.org/jira/browse/KAFKA-3982].
Plus, the correct syntax for providing additional properties directly in the 
command line is {{--consumer-propert}}, not {{--property}}.

> kafka-console-consumer: `--value-deserializer` is not working but `--property 
> value.deserializer` does
> --
>
> Key: KAFKA-5278
> URL: https://issues.apache.org/jira/browse/KAFKA-5278
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.2.1
>Reporter: Yeva Byzek
>Assignee: huxi
>Priority: Minor
>
> kafka-console-consumer: {{--value-deserializer}} is not working but 
> {{--property value.deserializer}} is working
> 1. Does not work
> {noformat}
> $ kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning 
> --topic TEST1  --value-deserializer 
> org.apache.kafka.common.serialization.LongDeserializer
> [2017-05-18 13:09:41,745] ERROR Error processing message, terminating 
> consumer process:  (kafka.tools.ConsoleConsumer$)
> java.lang.ClassCastException: java.lang.Long cannot be cast to [B
>   at kafka.consumer.NewShinyConsumer.receive(BaseConsumer.scala:100)
>   at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:120)
>   at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:75)
>   at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:50)
>   at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
> Processed a total of 0 messages
> {noformat}
> 2. Does work
> {noformat}
> $ kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning 
> --topic TEST1  --property 
> value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
> 1000
> 2500
> 2000
> 5500
> 8000
> {noformat}
> Without either, the output is
> {noformat}
> $ kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning 
> --topic TEST1
> ?
>   ?
> ?
> |
> @
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4950) ConcurrentModificationException when iterating over Kafka Metrics

2017-05-18 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16016640#comment-16016640
 ] 

Vahid Hashemian commented on KAFKA-4950:


[~dpostoronca] Still no luck on my side reproducing this error. I run two 
threads as you suggested. In one thread I run (in a loop) your metric 
collection code above (that leads to calling 
{{PartitionStates.partitionSet()}}; and in the other, I do repeating {{poll}} 
calls (that lead to calling {{PartitionStates.moveToEnd(..)}}). Both threads 
run with the same consumer instance.

> ConcurrentModificationException when iterating over Kafka Metrics
> -
>
> Key: KAFKA-4950
> URL: https://issues.apache.org/jira/browse/KAFKA-4950
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.1
>Reporter: Dumitru Postoronca
>Assignee: Vahid Hashemian
>Priority: Minor
> Fix For: 0.11.0.0
>
>
> It looks like the when calling {{PartitionStates.partitionSet()}}, while the 
> resulting Hashmap is being built, the internal state of the allocations can 
> change, which leads to ConcurrentModificationException during the copy 
> operation.
> {code}
> java.util.ConcurrentModificationException
> at 
> java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719)
> at 
> java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742)
> at java.util.AbstractCollection.addAll(AbstractCollection.java:343)
> at java.util.HashSet.(HashSet.java:119)
> at 
> org.apache.kafka.common.internals.PartitionStates.partitionSet(PartitionStates.java:66)
> at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedPartitions(SubscriptionState.java:291)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$ConsumerCoordinatorMetrics$1.measure(ConsumerCoordinator.java:783)
> at 
> org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61)
> at 
> org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52)
> {code}
> {code}
> // client code:
> import java.util.Collections;
> import java.util.HashMap;
> import java.util.Map;
> import com.codahale.metrics.Gauge;
> import com.codahale.metrics.Metric;
> import com.codahale.metrics.MetricSet;
> import org.apache.kafka.clients.consumer.KafkaConsumer;
> import org.apache.kafka.common.MetricName;
> import static com.codahale.metrics.MetricRegistry.name;
> public class KafkaMetricSet implements MetricSet {
> private final KafkaConsumer client;
> public KafkaMetricSet(KafkaConsumer client) {
> this.client = client;
> }
> @Override
> public Map getMetrics() {
> final Map gauges = new HashMap();
> Map m = client.metrics();
> for (Map.Entry e : 
> m.entrySet()) {
> gauges.put(name(e.getKey().group(), e.getKey().name(), "count"), 
> new Gauge() {
> @Override
> public Double getValue() {
> return e.getValue().value(); // exception thrown here 
> }
> });
> }
> return Collections.unmodifiableMap(gauges);
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-5277) Sticky Assignor should not cache the calculated assignment (KIP-54 follow-up)

2017-05-17 Thread Vahid Hashemian (JIRA)
Vahid Hashemian created KAFKA-5277:
--

 Summary: Sticky Assignor should not cache the calculated 
assignment (KIP-54 follow-up)
 Key: KAFKA-5277
 URL: https://issues.apache.org/jira/browse/KAFKA-5277
 Project: Kafka
  Issue Type: Improvement
Reporter: Vahid Hashemian
Assignee: Vahid Hashemian
Priority: Minor


As a follow-up to KIP-54, remove the dependency of Sticky Assignor to 
previously calculated assignment. This dependency is not required because each 
consumer participating in the rebalance now notifies the group leader of their 
assignment prior to rebalance. So the leader can compile the previous 
assignment of the whole group from this information. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4950) ConcurrentModificationException when iterating over Kafka Metrics

2017-05-10 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16005658#comment-16005658
 ] 

Vahid Hashemian commented on KAFKA-4950:


[~dpostoronca] Thanks for providing the additional details. I cannot reproduce 
the error when running a simple consumer (that polls and collects the metrics 
in a loop) with the provided {{KafkaMetricSet}} class. I see that both 
{{PartitionStates.partitionSet()}} and the overloaded {{getValue()}} methods 
are called but they don't seem to interfere with each other. Perhaps I'm 
missing something? 

> ConcurrentModificationException when iterating over Kafka Metrics
> -
>
> Key: KAFKA-4950
> URL: https://issues.apache.org/jira/browse/KAFKA-4950
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.1
>Reporter: Dumitru Postoronca
>Assignee: Vahid Hashemian
>Priority: Minor
> Fix For: 0.11.0.0
>
>
> It looks like the when calling {{PartitionStates.partitionSet()}}, while the 
> resulting Hashmap is being built, the internal state of the allocations can 
> change, which leads to ConcurrentModificationException during the copy 
> operation.
> {code}
> java.util.ConcurrentModificationException
> at 
> java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719)
> at 
> java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742)
> at java.util.AbstractCollection.addAll(AbstractCollection.java:343)
> at java.util.HashSet.(HashSet.java:119)
> at 
> org.apache.kafka.common.internals.PartitionStates.partitionSet(PartitionStates.java:66)
> at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedPartitions(SubscriptionState.java:291)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$ConsumerCoordinatorMetrics$1.measure(ConsumerCoordinator.java:783)
> at 
> org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61)
> at 
> org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52)
> {code}
> {code}
> // client code:
> import java.util.Collections;
> import java.util.HashMap;
> import java.util.Map;
> import com.codahale.metrics.Gauge;
> import com.codahale.metrics.Metric;
> import com.codahale.metrics.MetricSet;
> import org.apache.kafka.clients.consumer.KafkaConsumer;
> import org.apache.kafka.common.MetricName;
> import static com.codahale.metrics.MetricRegistry.name;
> public class KafkaMetricSet implements MetricSet {
> private final KafkaConsumer client;
> public KafkaMetricSet(KafkaConsumer client) {
> this.client = client;
> }
> @Override
> public Map getMetrics() {
> final Map gauges = new HashMap();
> Map m = client.metrics();
> for (Map.Entry e : 
> m.entrySet()) {
> gauges.put(name(e.getKey().group(), e.getKey().name(), "count"), 
> new Gauge() {
> @Override
> public Double getValue() {
> return e.getValue().value(); // exception thrown here 
> }
> });
> }
> return Collections.unmodifiableMap(gauges);
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (KAFKA-5016) Consumer hang in poll method while rebalancing is in progress

2017-04-27 Thread Vahid Hashemian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5016?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian resolved KAFKA-5016.

Resolution: Not A Bug

> Consumer hang in poll method while rebalancing is in progress
> -
>
> Key: KAFKA-5016
> URL: https://issues.apache.org/jira/browse/KAFKA-5016
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.1.0, 0.10.2.0
>Reporter: Domenico Di Giulio
>Assignee: Vahid Hashemian
> Attachments: Kafka 0.10.2.0 Issue (TRACE) - Server + Client.txt, 
> Kafka 0.10.2.0 Issue (TRACE).txt, KAFKA_5016.java
>
>
> After moving to Kafka 0.10.2.0, it looks like I'm experiencing a hang in the 
> rebalancing code. 
> This is a test case, not (still) production code. It does the following with 
> a single-partition topic and two consumers in the same group:
> 1) a topic with one partition is forced to be created (auto-created)
> 2) a producer is used to write 10 messages
> 3) the first consumer reads all the messages and commits
> 4) the second consumer attempts a poll() and hangs indefinitely
> The same issue can't be found with 0.10.0.0.
> See the attached logs at TRACE level. Look for "SERVER HANGS" to see where 
> the hang is found: when this happens, the client keeps failing any hearbeat 
> attempt, as the rebalancing is in progress, and the poll method hangs 
> indefinitely.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-5016) Consumer hang in poll method while rebalancing is in progress

2017-04-26 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15985043#comment-15985043
 ] 

Vahid Hashemian commented on KAFKA-5016:


[~domenico74] Thanks for confirming. Can we close the JIRA then?

> Consumer hang in poll method while rebalancing is in progress
> -
>
> Key: KAFKA-5016
> URL: https://issues.apache.org/jira/browse/KAFKA-5016
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.1.0, 0.10.2.0
>Reporter: Domenico Di Giulio
>Assignee: Vahid Hashemian
> Attachments: Kafka 0.10.2.0 Issue (TRACE) - Server + Client.txt, 
> Kafka 0.10.2.0 Issue (TRACE).txt, KAFKA_5016.java
>
>
> After moving to Kafka 0.10.2.0, it looks like I'm experiencing a hang in the 
> rebalancing code. 
> This is a test case, not (still) production code. It does the following with 
> a single-partition topic and two consumers in the same group:
> 1) a topic with one partition is forced to be created (auto-created)
> 2) a producer is used to write 10 messages
> 3) the first consumer reads all the messages and commits
> 4) the second consumer attempts a poll() and hangs indefinitely
> The same issue can't be found with 0.10.0.0.
> See the attached logs at TRACE level. Look for "SERVER HANGS" to see where 
> the hang is found: when this happens, the client keeps failing any hearbeat 
> attempt, as the rebalancing is in progress, and the poll method hangs 
> indefinitely.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-5110) ConsumerGroupCommand error handling improvement

2017-04-24 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15981652#comment-15981652
 ] 

Vahid Hashemian commented on KAFKA-5110:


[~cotedm] Thanks for your response. Yes, hopefully the PR would help better 
pinpoint any potential issues in the future.

> ConsumerGroupCommand error handling improvement
> ---
>
> Key: KAFKA-5110
> URL: https://issues.apache.org/jira/browse/KAFKA-5110
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.10.1.1
>Reporter: Dustin Cote
>Assignee: Jason Gustafson
>
> The ConsumerGroupCommand isn't handling partition errors properly. It throws 
> the following:
> {code}
> kafka-consumer-groups.sh --zookeeper 10.10.10.10:2181 --group mygroup 
> --describe
> GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER
> Error while executing consumer group command empty.head
> java.lang.UnsupportedOperationException: empty.head
> at scala.collection.immutable.Vector.head(Vector.scala:193)
> at 
> kafka.admin.ConsumerGroupCommand$ZkConsumerGroupService$$anonfun$getLogEndOffset$1.apply(ConsumerGroupCommand.scala:197)
> at 
> kafka.admin.ConsumerGroupCommand$ZkConsumerGroupService$$anonfun$getLogEndOffset$1.apply(ConsumerGroupCommand.scala:194)
> at scala.Option.map(Option.scala:146)
> at 
> kafka.admin.ConsumerGroupCommand$ZkConsumerGroupService.getLogEndOffset(ConsumerGroupCommand.scala:194)
> at 
> kafka.admin.ConsumerGroupCommand$ConsumerGroupService$class.kafka$admin$ConsumerGroupCommand$ConsumerGroupService$$describePartition(ConsumerGroupCommand.scala:125)
> at 
> kafka.admin.ConsumerGroupCommand$ConsumerGroupService$$anonfun$describeTopicPartition$2.apply(ConsumerGroupCommand.scala:107)
> at 
> kafka.admin.ConsumerGroupCommand$ConsumerGroupService$$anonfun$describeTopicPartition$2.apply(ConsumerGroupCommand.scala:106)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at 
> kafka.admin.ConsumerGroupCommand$ConsumerGroupService$class.describeTopicPartition(ConsumerGroupCommand.scala:106)
> at 
> kafka.admin.ConsumerGroupCommand$ZkConsumerGroupService.describeTopicPartition(ConsumerGroupCommand.scala:134)
> at 
> kafka.admin.ConsumerGroupCommand$ZkConsumerGroupService.kafka$admin$ConsumerGroupCommand$ZkConsumerGroupService$$describeTopic(ConsumerGroupCommand.scala:181)
> at 
> kafka.admin.ConsumerGroupCommand$ZkConsumerGroupService$$anonfun$describeGroup$1.apply(ConsumerGroupCommand.scala:166)
> at 
> kafka.admin.ConsumerGroupCommand$ZkConsumerGroupService$$anonfun$describeGroup$1.apply(ConsumerGroupCommand.scala:166)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at 
> kafka.admin.ConsumerGroupCommand$ZkConsumerGroupService.describeGroup(ConsumerGroupCommand.scala:166)
> at 
> kafka.admin.ConsumerGroupCommand$ConsumerGroupService$class.describe(ConsumerGroupCommand.scala:89)
> at 
> kafka.admin.ConsumerGroupCommand$ZkConsumerGroupService.describe(ConsumerGroupCommand.scala:134)
> at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:68)
> at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (KAFKA-4795) Confusion around topic deletion

2017-04-21 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4795?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15979597#comment-15979597
 ] 

Vahid Hashemian edited comment on KAFKA-4795 at 4/21/17 11:40 PM:
--

[~ecomar] Thanks for your feedback. What I take from your note is that you're 
observing the same behavior as I explained above. I have not been able to 
reproduce a {{\-\-marked for deletion}} with {{\-\-list}} myself, even with a 
background script that keeps listing the topics and outputting them to a file. 
I remember it was very easy to get topics marked for deletion with the 
{{\-\-list}} option.


was (Author: vahid):
[~ecomar] Thanks for your feedback. What I take from your note is that you're 
observing the same behavior as I explained above. I have not been able to 
reproduce a {{--marked for deletion}} with {{--list}} myself, even with a 
background script that keeps listing the topics and outputting them to a file. 
I remember it was very easy to get topics marked for deletion with the 
{{--list}} option.

> Confusion around topic deletion
> ---
>
> Key: KAFKA-4795
> URL: https://issues.apache.org/jira/browse/KAFKA-4795
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.2.0
>Reporter: Vahid Hashemian
>Assignee: Vahid Hashemian
>
> The topic deletion works like this in 0.10.2.0:
> # {{bin/zookeeper-server-start.sh config/zookeeper.properties}}
> # {{bin/kafka-server-start.sh config/server.properties}} (uses default 
> {{server.properties}})
> # {{bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test 
> --replication-factor 1 --partitions 1}} (creates the topic {{test}})
> # {{bin/kafka-topics.sh --zookeeper localhost:2181 --list}} (returns {{test}})
> # {{bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test}} 
> (reports {{Topic test is marked for deletion. Note: This will have no impact 
> if delete.topic.enable is not set to true.}})
> # {{bin/kafka-topics.sh --zookeeper localhost:2181 --list}} (returns {{test}})
> Previously, the last command above returned {{test - marked for deletion}}, 
> which matched the output statement of the {{--delete}} topic command.
> Continuing with the above scenario,
> # stop the broker
> # add the broker config {{delete.topic.enable=true}} in the config file
> # {{bin/kafka-server-start.sh config/server.properties}} (this does not 
> remove the topic {{test}}, as if the topic was never marked for deletion).
> # {{bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test}} 
> (reports {{Topic test is marked for deletion. Note: This will have no impact 
> if delete.topic.enable is not set to true.}})
> # {{bin/kafka-topics.sh --zookeeper localhost:2181 --list}} (returns no 
> topics).
> It seems that the "marked for deletion" state for topics no longer exists.
> I opened this JIRA so I can get a confirmation on the expected topic deletion 
> behavior, because in any case, I think the user experience could be improved 
> (either there is a bug in the code, or the command's output statement is 
> misleading).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4795) Confusion around topic deletion

2017-04-21 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4795?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15979597#comment-15979597
 ] 

Vahid Hashemian commented on KAFKA-4795:


[~ecomar] Thanks for your feedback. What I take from your note is that you're 
observing the same behavior as I explained above. I have not been able to 
reproduce a {{--marked for deletion}} with {{--list}} myself, even with a 
background script that keeps listing the topics and outputting them to a file. 
I remember it was very easy to get topics marked for deletion with the 
{{--list}} option.

> Confusion around topic deletion
> ---
>
> Key: KAFKA-4795
> URL: https://issues.apache.org/jira/browse/KAFKA-4795
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.2.0
>Reporter: Vahid Hashemian
>Assignee: Vahid Hashemian
>
> The topic deletion works like this in 0.10.2.0:
> # {{bin/zookeeper-server-start.sh config/zookeeper.properties}}
> # {{bin/kafka-server-start.sh config/server.properties}} (uses default 
> {{server.properties}})
> # {{bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test 
> --replication-factor 1 --partitions 1}} (creates the topic {{test}})
> # {{bin/kafka-topics.sh --zookeeper localhost:2181 --list}} (returns {{test}})
> # {{bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test}} 
> (reports {{Topic test is marked for deletion. Note: This will have no impact 
> if delete.topic.enable is not set to true.}})
> # {{bin/kafka-topics.sh --zookeeper localhost:2181 --list}} (returns {{test}})
> Previously, the last command above returned {{test - marked for deletion}}, 
> which matched the output statement of the {{--delete}} topic command.
> Continuing with the above scenario,
> # stop the broker
> # add the broker config {{delete.topic.enable=true}} in the config file
> # {{bin/kafka-server-start.sh config/server.properties}} (this does not 
> remove the topic {{test}}, as if the topic was never marked for deletion).
> # {{bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test}} 
> (reports {{Topic test is marked for deletion. Note: This will have no impact 
> if delete.topic.enable is not set to true.}})
> # {{bin/kafka-topics.sh --zookeeper localhost:2181 --list}} (returns no 
> topics).
> It seems that the "marked for deletion" state for topics no longer exists.
> I opened this JIRA so I can get a confirmation on the expected topic deletion 
> behavior, because in any case, I think the user experience could be improved 
> (either there is a bug in the code, or the command's output statement is 
> misleading).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-5110) ConsumerGroupCommand error handling improvement

2017-04-21 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15979502#comment-15979502
 ] 

Vahid Hashemian commented on KAFKA-5110:


[~cotedm] Could you please share the steps to reproduce the issue? Thanks.

> ConsumerGroupCommand error handling improvement
> ---
>
> Key: KAFKA-5110
> URL: https://issues.apache.org/jira/browse/KAFKA-5110
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.10.1.1
>Reporter: Dustin Cote
>Assignee: Jason Gustafson
>
> The ConsumerGroupCommand isn't handling partition errors properly. It throws 
> the following:
> {code}
> kafka-consumer-groups.sh --zookeeper 10.10.10.10:2181 --group mygroup 
> --describe
> GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER
> Error while executing consumer group command empty.head
> java.lang.UnsupportedOperationException: empty.head
> at scala.collection.immutable.Vector.head(Vector.scala:193)
> at 
> kafka.admin.ConsumerGroupCommand$ZkConsumerGroupService$$anonfun$getLogEndOffset$1.apply(ConsumerGroupCommand.scala:197)
> at 
> kafka.admin.ConsumerGroupCommand$ZkConsumerGroupService$$anonfun$getLogEndOffset$1.apply(ConsumerGroupCommand.scala:194)
> at scala.Option.map(Option.scala:146)
> at 
> kafka.admin.ConsumerGroupCommand$ZkConsumerGroupService.getLogEndOffset(ConsumerGroupCommand.scala:194)
> at 
> kafka.admin.ConsumerGroupCommand$ConsumerGroupService$class.kafka$admin$ConsumerGroupCommand$ConsumerGroupService$$describePartition(ConsumerGroupCommand.scala:125)
> at 
> kafka.admin.ConsumerGroupCommand$ConsumerGroupService$$anonfun$describeTopicPartition$2.apply(ConsumerGroupCommand.scala:107)
> at 
> kafka.admin.ConsumerGroupCommand$ConsumerGroupService$$anonfun$describeTopicPartition$2.apply(ConsumerGroupCommand.scala:106)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at 
> kafka.admin.ConsumerGroupCommand$ConsumerGroupService$class.describeTopicPartition(ConsumerGroupCommand.scala:106)
> at 
> kafka.admin.ConsumerGroupCommand$ZkConsumerGroupService.describeTopicPartition(ConsumerGroupCommand.scala:134)
> at 
> kafka.admin.ConsumerGroupCommand$ZkConsumerGroupService.kafka$admin$ConsumerGroupCommand$ZkConsumerGroupService$$describeTopic(ConsumerGroupCommand.scala:181)
> at 
> kafka.admin.ConsumerGroupCommand$ZkConsumerGroupService$$anonfun$describeGroup$1.apply(ConsumerGroupCommand.scala:166)
> at 
> kafka.admin.ConsumerGroupCommand$ZkConsumerGroupService$$anonfun$describeGroup$1.apply(ConsumerGroupCommand.scala:166)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at 
> kafka.admin.ConsumerGroupCommand$ZkConsumerGroupService.describeGroup(ConsumerGroupCommand.scala:166)
> at 
> kafka.admin.ConsumerGroupCommand$ConsumerGroupService$class.describe(ConsumerGroupCommand.scala:89)
> at 
> kafka.admin.ConsumerGroupCommand$ZkConsumerGroupService.describe(ConsumerGroupCommand.scala:134)
> at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:68)
> at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-5016) Consumer hang in poll method while rebalancing is in progress

2017-04-21 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15979482#comment-15979482
 ] 

Vahid Hashemian commented on KAFKA-5016:


[~domenico74] I think you are seeing the difference in behavior because of 
[KIP-62|https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread],
 which was implemented sometime between 0.10.0.0 and 0.10.1.0 releases. The 
config {{max.poll.interval.ms}} that you used in your sample code was actually 
introduced in this KIP, and would not have any impact when the code runs with 
an older client. This config specifies the max amount of time to wait before a 
new {{poll}} is expected of a consumer; or it will leave the group. In older 
versions, {{session.timeout.ms}} used to specify this timeout, which defaulted 
to 30 seconds. In fact if you run the code with a 0.10.0.0 client the second 
consumer's {{read}} takes 30 seconds to finish (time out) and leave the group. 
Setting {{max.poll.interval.ms}} to max int is like making the group wait 
forever for subsequent {{poll}}s from consumers (hence the hang). So to me what 
you are seeing with the configuration you used is the expected behavior. Please 
advise if I'm missing something. Thanks.

> Consumer hang in poll method while rebalancing is in progress
> -
>
> Key: KAFKA-5016
> URL: https://issues.apache.org/jira/browse/KAFKA-5016
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.1.0, 0.10.2.0
>Reporter: Domenico Di Giulio
>Assignee: Vahid Hashemian
> Attachments: Kafka 0.10.2.0 Issue (TRACE) - Server + Client.txt, 
> Kafka 0.10.2.0 Issue (TRACE).txt, KAFKA_5016.java
>
>
> After moving to Kafka 0.10.2.0, it looks like I'm experiencing a hang in the 
> rebalancing code. 
> This is a test case, not (still) production code. It does the following with 
> a single-partition topic and two consumers in the same group:
> 1) a topic with one partition is forced to be created (auto-created)
> 2) a producer is used to write 10 messages
> 3) the first consumer reads all the messages and commits
> 4) the second consumer attempts a poll() and hangs indefinitely
> The same issue can't be found with 0.10.0.0.
> See the attached logs at TRACE level. Look for "SERVER HANGS" to see where 
> the hang is found: when this happens, the client keeps failing any hearbeat 
> attempt, as the rebalancing is in progress, and the poll method hangs 
> indefinitely.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-5016) Consumer hang in poll method while rebalancing is in progress

2017-04-21 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15979030#comment-15979030
 ] 

Vahid Hashemian commented on KAFKA-5016:


[~domenico74] Thanks for confirming. I'll update this thread with my findings.

> Consumer hang in poll method while rebalancing is in progress
> -
>
> Key: KAFKA-5016
> URL: https://issues.apache.org/jira/browse/KAFKA-5016
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.1.0, 0.10.2.0
>Reporter: Domenico Di Giulio
>Assignee: Vahid Hashemian
> Attachments: Kafka 0.10.2.0 Issue (TRACE) - Server + Client.txt, 
> Kafka 0.10.2.0 Issue (TRACE).txt, KAFKA_5016.java
>
>
> After moving to Kafka 0.10.2.0, it looks like I'm experiencing a hang in the 
> rebalancing code. 
> This is a test case, not (still) production code. It does the following with 
> a single-partition topic and two consumers in the same group:
> 1) a topic with one partition is forced to be created (auto-created)
> 2) a producer is used to write 10 messages
> 3) the first consumer reads all the messages and commits
> 4) the second consumer attempts a poll() and hangs indefinitely
> The same issue can't be found with 0.10.0.0.
> See the attached logs at TRACE level. Look for "SERVER HANGS" to see where 
> the hang is found: when this happens, the client keeps failing any hearbeat 
> attempt, as the rebalancing is in progress, and the poll method hangs 
> indefinitely.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-5016) Consumer hang in poll method while rebalancing is in progress

2017-04-20 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15977499#comment-15977499
 ] 

Vahid Hashemian commented on KAFKA-5016:


[~domenico74] Thanks a lot for sharing the executable code that reproduces the 
issue. I was able to reproduce the hang only when using a 0.10.2.0 broker and 
client. Not sure why it's slightly different from your observations, but I'll 
investigate the behavior change.

> Consumer hang in poll method while rebalancing is in progress
> -
>
> Key: KAFKA-5016
> URL: https://issues.apache.org/jira/browse/KAFKA-5016
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.1.0, 0.10.2.0
>Reporter: Domenico Di Giulio
>Assignee: Vahid Hashemian
> Attachments: Kafka 0.10.2.0 Issue (TRACE) - Server + Client.txt, 
> Kafka 0.10.2.0 Issue (TRACE).txt, KAFKA_5016.java
>
>
> After moving to Kafka 0.10.2.0, it looks like I'm experiencing a hang in the 
> rebalancing code. 
> This is a test case, not (still) production code. It does the following with 
> a single-partition topic and two consumers in the same group:
> 1) a topic with one partition is forced to be created (auto-created)
> 2) a producer is used to write 10 messages
> 3) the first consumer reads all the messages and commits
> 4) the second consumer attempts a poll() and hangs indefinitely
> The same issue can't be found with 0.10.0.0.
> See the attached logs at TRACE level. Look for "SERVER HANGS" to see where 
> the hang is found: when this happens, the client keeps failing any hearbeat 
> attempt, as the rebalancing is in progress, and the poll method hangs 
> indefinitely.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4291) TopicCommand --describe shows topics marked for deletion as under-replicated and unavailable

2017-04-20 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15977214#comment-15977214
 ] 

Vahid Hashemian commented on KAFKA-4291:


[~mimaison] I opened 
[KAFKA-4795|https://issues.apache.org/jira/browse/KAFKA-4795], which is related 
to this JIRA, a while back due to some confusion I had about the expected 
behavior of topic deletion. Could you please take a look and share your opinion?

> TopicCommand --describe shows topics marked for deletion as under-replicated 
> and unavailable
> 
>
> Key: KAFKA-4291
> URL: https://issues.apache.org/jira/browse/KAFKA-4291
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Affects Versions: 0.10.0.1
>Reporter: Mickael Maison
>Assignee: Mickael Maison
>
> When using kafka-topics.sh --describe with --under-replicated-partitions or 
> --unavailable-partitions, topics marked for deletion are listed.
> While this is debatable whether we want to list such topics this way, it 
> should at least print that the topic is marked for deletion, like --list 
> does. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4893) async topic deletion conflicts with max topic length

2017-04-20 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4893?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15977130#comment-15977130
 ] 

Vahid Hashemian commented on KAFKA-4893:


[~soumabrata] Thanks for checking. The solution has not been finalized yet. 
Maybe [~ijuma] can weigh in on the [~onurkaraman]'s response regarding fsync.

> async topic deletion conflicts with max topic length
> 
>
> Key: KAFKA-4893
> URL: https://issues.apache.org/jira/browse/KAFKA-4893
> Project: Kafka
>  Issue Type: Bug
>Reporter: Onur Karaman
>Assignee: Vahid Hashemian
>Priority: Minor
>
> As per the 
> [documentation|http://kafka.apache.org/documentation/#basic_ops_add_topic], 
> topics can be only 249 characters long to line up with typical filesystem 
> limitations:
> {quote}
> Each sharded partition log is placed into its own folder under the Kafka log 
> directory. The name of such folders consists of the topic name, appended by a 
> dash (\-) and the partition id. Since a typical folder name can not be over 
> 255 characters long, there will be a limitation on the length of topic names. 
> We assume the number of partitions will not ever be above 100,000. Therefore, 
> topic names cannot be longer than 249 characters. This leaves just enough 
> room in the folder name for a dash and a potentially 5 digit long partition 
> id.
> {quote}
> {{kafka.common.Topic.maxNameLength}} is set to 249 and is used during 
> validation.
> This limit ends up not being quite right since topic deletion ends up 
> renaming the directory to the form {{topic-partition.uniqueId-delete}} as can 
> be seen in {{LogManager.asyncDelete}}:
> {code}
> val dirName = new StringBuilder(removedLog.name)
>   .append(".")
>   
> .append(java.util.UUID.randomUUID.toString.replaceAll("-",""))
>   .append(Log.DeleteDirSuffix)
>   .toString()
> {code}
> So the unique id and "-delete" suffix end up hogging some of the characters. 
> Deleting a long-named topic results in a log message such as the following:
> {code}
> kafka.common.KafkaStorageException: Failed to rename log directory from 
> /tmp/kafka-logs0/0-0
>  to 
> /tmp/kafka-logs0/0-0.797bba3fb2464729840f87769243edbb-delete
>   at kafka.log.LogManager.asyncDelete(LogManager.scala:439)
>   at 
> kafka.cluster.Partition$$anonfun$delete$1.apply$mcV$sp(Partition.scala:142)
>   at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:137)
>   at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:137)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213)
>   at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:221)
>   at kafka.cluster.Partition.delete(Partition.scala:137)
>   at kafka.server.ReplicaManager.stopReplica(ReplicaManager.scala:230)
>   at 
> kafka.server.ReplicaManager$$anonfun$stopReplicas$2.apply(ReplicaManager.scala:260)
>   at 
> kafka.server.ReplicaManager$$anonfun$stopReplicas$2.apply(ReplicaManager.scala:259)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at kafka.server.ReplicaManager.stopReplicas(ReplicaManager.scala:259)
>   at kafka.server.KafkaApis.handleStopReplicaRequest(KafkaApis.scala:174)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:86)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:64)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> The topic after this point still exists but has Leader set to -1 and the 
> controller recognizes the topic completion as incomplete (the topic znode is 
> still in /admin/delete_topics).
> I don't believe linkedin has any topic name this long but I'm making the 
> ticket in case anyone runs into this problem.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4950) ConcurrentModificationException when iterating over Kafka Metrics

2017-04-20 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15977116#comment-15977116
 ] 

Vahid Hashemian commented on KAFKA-4950:


[~dpostoronca] Would you mind sharing a bit more of your client code to avoid 
the compile errors? Thanks.

> ConcurrentModificationException when iterating over Kafka Metrics
> -
>
> Key: KAFKA-4950
> URL: https://issues.apache.org/jira/browse/KAFKA-4950
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.1
>Reporter: Dumitru Postoronca
>Assignee: Vahid Hashemian
>Priority: Minor
> Fix For: 0.11.0.0
>
>
> It looks like the when calling {{PartitionStates.partitionSet()}}, while the 
> resulting Hashmap is being built, the internal state of the allocations can 
> change, which leads to ConcurrentModificationException during the copy 
> operation.
> {code}
> java.util.ConcurrentModificationException
> at 
> java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719)
> at 
> java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742)
> at java.util.AbstractCollection.addAll(AbstractCollection.java:343)
> at java.util.HashSet.(HashSet.java:119)
> at 
> org.apache.kafka.common.internals.PartitionStates.partitionSet(PartitionStates.java:66)
> at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedPartitions(SubscriptionState.java:291)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$ConsumerCoordinatorMetrics$1.measure(ConsumerCoordinator.java:783)
> at 
> org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61)
> at 
> org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52)
> {code}
> {code}
> // client code:
> private final KafkaConsumer client;
> Map m = client.metrics();
> for (Map.Entry e : 
> m.entrySet()) {
> gauges.put(name(e.getKey().group(), e.getKey().name(), "count"), 
> new Gauge() {
> @Override
> public Double getValue() {
> return e.getValue().value(); // exception thrown here
> }
> });
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-5016) Consumer hang in poll method while rebalancing is in progress

2017-04-12 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15966278#comment-15966278
 ] 

Vahid Hashemian commented on KAFKA-5016:


[~domenico74] I have not been able to reproduce this issue with the 0.10.2.0 
code base.
I tried this with command line producer and consumer, and also coded a simple 
Jave based producer and consumer to run your use case.

Here's what I did:
* Ran a producer that writes 10 messages to a non-existing topic {{test}} (this 
causes the topic {{test}} to be auto-created with a single partition)
* Ran a consumer that belongs to the consumer group {{cgroup}} and consumes 
from the topic {{test}}
* Ran a second consumer similar to first one.

After this, when I run the consumer group command, I see this:
{code}
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe 
--group cgroup

TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID   
  HOST CLIENT-ID
test  0 10 10 0   
consumer-1-1c06c568-7a83-42ea-a2d7-8a53132397bd /127.0.0.1   consumer-1
- - -  -  -   
consumer-1-73aaff69-b43f-4be3-8b48-99bf337abc9a /127.0.0.1   consumer-1
{code}

I also don't see any issues in my server log:
{code}
...
[2017-04-12 10:35:57,976] INFO [Group Metadata Manager on Broker 0]: Finished 
loading offsets from __consumer_offsets-42 in 1 milliseconds. 
(kafka.coordinator.GroupMetadataManager)
[2017-04-12 10:35:57,976] INFO [Group Metadata Manager on Broker 0]: Loading 
offsets and group metadata from __consumer_offsets-45 
(kafka.coordinator.GroupMetadataManager)
[2017-04-12 10:35:57,977] INFO [Group Metadata Manager on Broker 0]: Finished 
loading offsets from __consumer_offsets-45 in 1 milliseconds. 
(kafka.coordinator.GroupMetadataManager)
[2017-04-12 10:35:57,977] INFO [Group Metadata Manager on Broker 0]: Loading 
offsets and group metadata from __consumer_offsets-48 
(kafka.coordinator.GroupMetadataManager)
[2017-04-12 10:35:57,978] INFO [Group Metadata Manager on Broker 0]: Finished 
loading offsets from __consumer_offsets-48 in 1 milliseconds. 
(kafka.coordinator.GroupMetadataManager)
[2017-04-12 10:35:57,984] INFO [GroupCoordinator 0]: Preparing to restabilize 
group cgroup1 with old generation 0 (kafka.coordinator.GroupCoordinator)
[2017-04-12 10:35:57,987] INFO [GroupCoordinator 0]: Stabilized group cgroup1 
generation 1 (kafka.coordinator.GroupCoordinator)
[2017-04-12 10:35:57,995] INFO [GroupCoordinator 0]: Assignment received from 
leader for group cgroup1 for generation 1 (kafka.coordinator.GroupCoordinator)
[2017-04-12 10:36:10,624] INFO [GroupCoordinator 0]: Preparing to restabilize 
group cgroup1 with old generation 1 (kafka.coordinator.GroupCoordinator)
[2017-04-12 10:36:13,116] INFO [GroupCoordinator 0]: Stabilized group cgroup1 
generation 2 (kafka.coordinator.GroupCoordinator)
[2017-04-12 10:36:13,118] INFO [GroupCoordinator 0]: Assignment received from 
leader for group cgroup1 for generation 2 (kafka.coordinator.GroupCoordinator)
{code}

Would you be able to share the code you are using that leads to the issue? 
Thanks.

> Consumer hang in poll method while rebalancing is in progress
> -
>
> Key: KAFKA-5016
> URL: https://issues.apache.org/jira/browse/KAFKA-5016
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.1.0, 0.10.2.0
>Reporter: Domenico Di Giulio
>Assignee: Vahid Hashemian
> Attachments: Kafka 0.10.2.0 Issue (TRACE) - Server + Client.txt, 
> Kafka 0.10.2.0 Issue (TRACE).txt
>
>
> After moving to Kafka 0.10.2.0, it looks like I'm experiencing a hang in the 
> rebalancing code. 
> This is a test case, not (still) production code. It does the following with 
> a single-partition topic and two consumers in the same group:
> 1) a topic with one partition is forced to be created (auto-created)
> 2) a producer is used to write 10 messages
> 3) the first consumer reads all the messages and commits
> 4) the second consumer attempts a poll() and hangs indefinitely
> The same issue can't be found with 0.10.0.0.
> See the attached logs at TRACE level. Look for "SERVER HANGS" to see where 
> the hang is found: when this happens, the client keeps failing any hearbeat 
> attempt, as the rebalancing is in progress, and the poll method hangs 
> indefinitely.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (KAFKA-4950) ConcurrentModificationException when iterating over Kafka Metrics

2017-04-10 Thread Vahid Hashemian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4950?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian reassigned KAFKA-4950:
--

Assignee: Vahid Hashemian

> ConcurrentModificationException when iterating over Kafka Metrics
> -
>
> Key: KAFKA-4950
> URL: https://issues.apache.org/jira/browse/KAFKA-4950
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.1
>Reporter: Dumitru Postoronca
>Assignee: Vahid Hashemian
>Priority: Minor
> Fix For: 0.11.0.0
>
>
> It looks like the when calling {{PartitionStates.partitionSet()}}, while the 
> resulting Hashmap is being built, the internal state of the allocations can 
> change, which leads to ConcurrentModificationException during the copy 
> operation.
> {code}
> java.util.ConcurrentModificationException
> at 
> java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719)
> at 
> java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742)
> at java.util.AbstractCollection.addAll(AbstractCollection.java:343)
> at java.util.HashSet.(HashSet.java:119)
> at 
> org.apache.kafka.common.internals.PartitionStates.partitionSet(PartitionStates.java:66)
> at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedPartitions(SubscriptionState.java:291)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$ConsumerCoordinatorMetrics$1.measure(ConsumerCoordinator.java:783)
> at 
> org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61)
> at 
> org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52)
> {code}
> {code}
> // client code:
> private final KafkaConsumer client;
> Map m = client.metrics();
> for (Map.Entry e : 
> m.entrySet()) {
> gauges.put(name(e.getKey().group(), e.getKey().name(), "count"), 
> new Gauge() {
> @Override
> public Double getValue() {
> return e.getValue().value(); // exception thrown here
> }
> });
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4950) ConcurrentModificationException when iterating over Kafka Metrics

2017-04-10 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15963308#comment-15963308
 ] 

Vahid Hashemian commented on KAFKA-4950:


[~ijuma] Yup, thanks!

> ConcurrentModificationException when iterating over Kafka Metrics
> -
>
> Key: KAFKA-4950
> URL: https://issues.apache.org/jira/browse/KAFKA-4950
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.1
>Reporter: Dumitru Postoronca
>Priority: Minor
> Fix For: 0.11.0.0
>
>
> It looks like the when calling {{PartitionStates.partitionSet()}}, while the 
> resulting Hashmap is being built, the internal state of the allocations can 
> change, which leads to ConcurrentModificationException during the copy 
> operation.
> {code}
> java.util.ConcurrentModificationException
> at 
> java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719)
> at 
> java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742)
> at java.util.AbstractCollection.addAll(AbstractCollection.java:343)
> at java.util.HashSet.(HashSet.java:119)
> at 
> org.apache.kafka.common.internals.PartitionStates.partitionSet(PartitionStates.java:66)
> at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedPartitions(SubscriptionState.java:291)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$ConsumerCoordinatorMetrics$1.measure(ConsumerCoordinator.java:783)
> at 
> org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61)
> at 
> org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52)
> {code}
> {code}
> // client code:
> private final KafkaConsumer client;
> Map m = client.metrics();
> for (Map.Entry e : 
> m.entrySet()) {
> gauges.put(name(e.getKey().group(), e.getKey().name(), "count"), 
> new Gauge() {
> @Override
> public Double getValue() {
> return e.getValue().value(); // exception thrown here
> }
> });
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (KAFKA-5016) Consumer hang in poll method while rebalancing is in progress

2017-04-10 Thread Vahid Hashemian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5016?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian reassigned KAFKA-5016:
--

Assignee: Vahid Hashemian

> Consumer hang in poll method while rebalancing is in progress
> -
>
> Key: KAFKA-5016
> URL: https://issues.apache.org/jira/browse/KAFKA-5016
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.1.0, 0.10.2.0
>Reporter: Domenico Di Giulio
>Assignee: Vahid Hashemian
> Attachments: Kafka 0.10.2.0 Issue (TRACE) - Server + Client.txt, 
> Kafka 0.10.2.0 Issue (TRACE).txt
>
>
> After moving to Kafka 0.10.2.0, it looks like I'm experiencing a hang in the 
> rebalancing code. 
> This is a test case, not (still) production code. It does the following with 
> a single-partition topic and two consumers in the same group:
> 1) a topic with one partition is forced to be created (auto-created)
> 2) a producer is used to write 10 messages
> 3) the first consumer reads all the messages and commits
> 4) the second consumer attempts a poll() and hangs indefinitely
> The same issue can't be found with 0.10.0.0.
> See the attached logs at TRACE level. Look for "SERVER HANGS" to see where 
> the hang is found: when this happens, the client keeps failing any hearbeat 
> attempt, as the rebalancing is in progress, and the poll method hangs 
> indefinitely.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-5016) Consumer hang in poll method while rebalancing is in progress

2017-04-10 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15963299#comment-15963299
 ] 

Vahid Hashemian commented on KAFKA-5016:


[~ijuma] Sure, I'll take a look.
[~domenico74] Thanks for providing the additional details.

> Consumer hang in poll method while rebalancing is in progress
> -
>
> Key: KAFKA-5016
> URL: https://issues.apache.org/jira/browse/KAFKA-5016
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.1.0, 0.10.2.0
>Reporter: Domenico Di Giulio
> Attachments: Kafka 0.10.2.0 Issue (TRACE) - Server + Client.txt, 
> Kafka 0.10.2.0 Issue (TRACE).txt
>
>
> After moving to Kafka 0.10.2.0, it looks like I'm experiencing a hang in the 
> rebalancing code. 
> This is a test case, not (still) production code. It does the following with 
> a single-partition topic and two consumers in the same group:
> 1) a topic with one partition is forced to be created (auto-created)
> 2) a producer is used to write 10 messages
> 3) the first consumer reads all the messages and commits
> 4) the second consumer attempts a poll() and hangs indefinitely
> The same issue can't be found with 0.10.0.0.
> See the attached logs at TRACE level. Look for "SERVER HANGS" to see where 
> the hang is found: when this happens, the client keeps failing any hearbeat 
> attempt, as the rebalancing is in progress, and the poll method hangs 
> indefinitely.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4858) Long topic names created using old kafka-topics.sh can prevent newer brokers from joining any ISRs

2017-03-16 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15929196#comment-15929196
 ] 

Vahid Hashemian commented on KAFKA-4858:


[~wushujames] Thanks for bringing those use cases up. I believe they also need 
to be addressed. Will work on those too.

> Long topic names created using old kafka-topics.sh can prevent newer brokers 
> from joining any ISRs
> --
>
> Key: KAFKA-4858
> URL: https://issues.apache.org/jira/browse/KAFKA-4858
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.1, 0.10.2.0
>Reporter: James Cheng
>Assignee: Vahid Hashemian
>
> I ran into a variant of KAFKA-3219 that resulted in a broker being unable to 
> join any ISRs the cluster.
> Prior to 0.10.0.0, the maximum topic length was 255.
> With 0.10.0.0 and beyond, the maximum topic length is 249.
> The check on topic name length is done by kafka-topics.sh prior to topic 
> creation. Thus, it is possible to use a 0.9.0.1 kafka-topics.sh script to 
> create a 255 character topic on a 0.10.1.1 broker.
> When this happens, you will get the following stack trace (the same one seen 
> in KAFKA-3219)
> {code}
> $ TOPIC=$(printf 'd%.0s' {1..255} ) ; bin/kafka-topics.sh --zookeeper 
> 127.0.0.1 --create --topic $TOPIC --partitions 1 --replication-factor 2
> Created topic 
> "ddd".
> {code}
> {code}
> [2017-03-06 22:01:19,011] ERROR [KafkaApi-2] Error when handling request 
> {controller_id=1,controller_epoch=1,partition_states=[{topic=ddd,partition=0,controller_epoch=1,leader=2,leader_epoch=0,isr=[2,1],zk_version=0,replicas=[2,1]}],live_leaders=[{id=2,host=jchengmbpro15,port=9093}]}
>  (kafka.server.KafkaApis)
> java.lang.NullPointerException
>   at 
> scala.collection.mutable.ArrayOps$ofRef$.length$extension(ArrayOps.scala:192)
>   at scala.collection.mutable.ArrayOps$ofRef.length(ArrayOps.scala:192)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:32)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>   at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
>   at kafka.log.Log.loadSegments(Log.scala:155)
>   at kafka.log.Log.(Log.scala:108)
>   at kafka.log.LogManager.createLog(LogManager.scala:362)
>   at kafka.cluster.Partition.getOrCreateReplica(Partition.scala:94)
>   at 
> kafka.cluster.Partition$$anonfun$4$$anonfun$apply$2.apply(Partition.scala:174)
>   at 
> kafka.cluster.Partition$$anonfun$4$$anonfun$apply$2.apply(Partition.scala:174)
>   at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
>   at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:174)
>   at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:168)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>   at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:242)
>   at kafka.cluster.Partition.makeLeader(Partition.scala:168)
>   at 
> kafka.server.ReplicaManager$$anonfun$makeLeaders$4.apply(ReplicaManager.scala:758)
>   at 
> kafka.server.ReplicaManager$$anonfun$makeLeaders$4.apply(ReplicaManager.scala:757)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
>   at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
>   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
>   at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
>   at kafka.server.ReplicaManager.makeLeaders(ReplicaManager.scala:757)
>   at 
> kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:703)
>   at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:148)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:82)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> The topic does not get created on disk, but the broker thinks the topic is 
> ready. The broker seems functional, for other topics. I can produce/consume 
> to other topics.
> {code}
> $ ./bin/kafka-topics.sh --zookeeper 127.0

[jira] [Commented] (KAFKA-4893) async topic deletion conflicts with max topic length

2017-03-15 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4893?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15927303#comment-15927303
 ] 

Vahid Hashemian commented on KAFKA-4893:


[~onurkaraman] Sure, I agree that your suggestion is the long term fix for the 
issue. I just wanted to mention and document the alternative here :)

> async topic deletion conflicts with max topic length
> 
>
> Key: KAFKA-4893
> URL: https://issues.apache.org/jira/browse/KAFKA-4893
> Project: Kafka
>  Issue Type: Bug
>Reporter: Onur Karaman
>Assignee: Vahid Hashemian
>Priority: Minor
>
> As per the 
> [documentation|http://kafka.apache.org/documentation/#basic_ops_add_topic], 
> topics can be only 249 characters long to line up with typical filesystem 
> limitations:
> {quote}
> Each sharded partition log is placed into its own folder under the Kafka log 
> directory. The name of such folders consists of the topic name, appended by a 
> dash (\-) and the partition id. Since a typical folder name can not be over 
> 255 characters long, there will be a limitation on the length of topic names. 
> We assume the number of partitions will not ever be above 100,000. Therefore, 
> topic names cannot be longer than 249 characters. This leaves just enough 
> room in the folder name for a dash and a potentially 5 digit long partition 
> id.
> {quote}
> {{kafka.common.Topic.maxNameLength}} is set to 249 and is used during 
> validation.
> This limit ends up not being quite right since topic deletion ends up 
> renaming the directory to the form {{topic-partition.uniqueId-delete}} as can 
> be seen in {{LogManager.asyncDelete}}:
> {code}
> val dirName = new StringBuilder(removedLog.name)
>   .append(".")
>   
> .append(java.util.UUID.randomUUID.toString.replaceAll("-",""))
>   .append(Log.DeleteDirSuffix)
>   .toString()
> {code}
> So the unique id and "-delete" suffix end up hogging some of the characters. 
> Deleting a long-named topic results in a log message such as the following:
> {code}
> kafka.common.KafkaStorageException: Failed to rename log directory from 
> /tmp/kafka-logs0/0-0
>  to 
> /tmp/kafka-logs0/0-0.797bba3fb2464729840f87769243edbb-delete
>   at kafka.log.LogManager.asyncDelete(LogManager.scala:439)
>   at 
> kafka.cluster.Partition$$anonfun$delete$1.apply$mcV$sp(Partition.scala:142)
>   at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:137)
>   at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:137)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213)
>   at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:221)
>   at kafka.cluster.Partition.delete(Partition.scala:137)
>   at kafka.server.ReplicaManager.stopReplica(ReplicaManager.scala:230)
>   at 
> kafka.server.ReplicaManager$$anonfun$stopReplicas$2.apply(ReplicaManager.scala:260)
>   at 
> kafka.server.ReplicaManager$$anonfun$stopReplicas$2.apply(ReplicaManager.scala:259)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at kafka.server.ReplicaManager.stopReplicas(ReplicaManager.scala:259)
>   at kafka.server.KafkaApis.handleStopReplicaRequest(KafkaApis.scala:174)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:86)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:64)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> The topic after this point still exists but has Leader set to -1 and the 
> controller recognizes the topic completion as incomplete (the topic znode is 
> still in /admin/delete_topics).
> I don't believe linkedin has any topic name this long but I'm making the 
> ticket in case anyone runs into this problem.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (KAFKA-4893) async topic deletion conflicts with max topic length

2017-03-15 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4893?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15927033#comment-15927033
 ] 

Vahid Hashemian edited comment on KAFKA-4893 at 3/15/17 11:55 PM:
--

[~onurkaraman] What you suggested should work. Of course, an easier solution 
would be to decrease the max topic length (again) to 249 - 1 - 32 - 7 = 209.
{code}
.append(".") // len = 1
.append(java.util.UUID.randomUUID.toString.replaceAll("-","")) // len = 32
.append(Log.DeleteDirSuffix) // len = 7
{code}


was (Author: vahid):
[~onurkaraman] What you suggested should work. Of course, an easier solution 
would be to decrease the max topic length (again) to 249 - 1 - 32 - 7 = 219.
{code}
.append(".") // len = 1
.append(java.util.UUID.randomUUID.toString.replaceAll("-","")) // len = 32
.append(Log.DeleteDirSuffix) // len = 7
{code}

> async topic deletion conflicts with max topic length
> 
>
> Key: KAFKA-4893
> URL: https://issues.apache.org/jira/browse/KAFKA-4893
> Project: Kafka
>  Issue Type: Bug
>Reporter: Onur Karaman
>Assignee: Vahid Hashemian
>Priority: Minor
>
> As per the 
> [documentation|http://kafka.apache.org/documentation/#basic_ops_add_topic], 
> topics can be only 249 characters long to line up with typical filesystem 
> limitations:
> {quote}
> Each sharded partition log is placed into its own folder under the Kafka log 
> directory. The name of such folders consists of the topic name, appended by a 
> dash (\-) and the partition id. Since a typical folder name can not be over 
> 255 characters long, there will be a limitation on the length of topic names. 
> We assume the number of partitions will not ever be above 100,000. Therefore, 
> topic names cannot be longer than 249 characters. This leaves just enough 
> room in the folder name for a dash and a potentially 5 digit long partition 
> id.
> {quote}
> {{kafka.common.Topic.maxNameLength}} is set to 249 and is used during 
> validation.
> This limit ends up not being quite right since topic deletion ends up 
> renaming the directory to the form {{topic-partition.uniqueId-delete}} as can 
> be seen in {{LogManager.asyncDelete}}:
> {code}
> val dirName = new StringBuilder(removedLog.name)
>   .append(".")
>   
> .append(java.util.UUID.randomUUID.toString.replaceAll("-",""))
>   .append(Log.DeleteDirSuffix)
>   .toString()
> {code}
> So the unique id and "-delete" suffix end up hogging some of the characters. 
> Deleting a long-named topic results in a log message such as the following:
> {code}
> kafka.common.KafkaStorageException: Failed to rename log directory from 
> /tmp/kafka-logs0/0-0
>  to 
> /tmp/kafka-logs0/0-0.797bba3fb2464729840f87769243edbb-delete
>   at kafka.log.LogManager.asyncDelete(LogManager.scala:439)
>   at 
> kafka.cluster.Partition$$anonfun$delete$1.apply$mcV$sp(Partition.scala:142)
>   at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:137)
>   at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:137)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213)
>   at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:221)
>   at kafka.cluster.Partition.delete(Partition.scala:137)
>   at kafka.server.ReplicaManager.stopReplica(ReplicaManager.scala:230)
>   at 
> kafka.server.ReplicaManager$$anonfun$stopReplicas$2.apply(ReplicaManager.scala:260)
>   at 
> kafka.server.ReplicaManager$$anonfun$stopReplicas$2.apply(ReplicaManager.scala:259)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at kafka.server.ReplicaManager.stopReplicas(ReplicaManager.scala:259)
>   at kafka.server.KafkaApis.handleStopReplicaRequest(KafkaApis.scala:174)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:86)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:64)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> The topic after this point still exists but has Leader set to -1 and the 
> controller recognizes the topic completion as incomplete (the 

[jira] [Commented] (KAFKA-4893) async topic deletion conflicts with max topic length

2017-03-15 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4893?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15927033#comment-15927033
 ] 

Vahid Hashemian commented on KAFKA-4893:


[~onurkaraman] What you suggested should work. Of course, an easier solution 
would be to decrease the max topic length (again) to 249 - 1 - 32 - 7 = 219.
{code}
.append(".") // len = 1
.append(java.util.UUID.randomUUID.toString.replaceAll("-","")) // len = 32
.append(Log.DeleteDirSuffix) // len = 7
{code}

> async topic deletion conflicts with max topic length
> 
>
> Key: KAFKA-4893
> URL: https://issues.apache.org/jira/browse/KAFKA-4893
> Project: Kafka
>  Issue Type: Bug
>Reporter: Onur Karaman
>Assignee: Vahid Hashemian
>Priority: Minor
>
> As per the 
> [documentation|http://kafka.apache.org/documentation/#basic_ops_add_topic], 
> topics can be only 249 characters long to line up with typical filesystem 
> limitations:
> {quote}
> Each sharded partition log is placed into its own folder under the Kafka log 
> directory. The name of such folders consists of the topic name, appended by a 
> dash (\-) and the partition id. Since a typical folder name can not be over 
> 255 characters long, there will be a limitation on the length of topic names. 
> We assume the number of partitions will not ever be above 100,000. Therefore, 
> topic names cannot be longer than 249 characters. This leaves just enough 
> room in the folder name for a dash and a potentially 5 digit long partition 
> id.
> {quote}
> {{kafka.common.Topic.maxNameLength}} is set to 249 and is used during 
> validation.
> This limit ends up not being quite right since topic deletion ends up 
> renaming the directory to the form {{topic-partition.uniqueId-delete}} as can 
> be seen in {{LogManager.asyncDelete}}:
> {code}
> val dirName = new StringBuilder(removedLog.name)
>   .append(".")
>   
> .append(java.util.UUID.randomUUID.toString.replaceAll("-",""))
>   .append(Log.DeleteDirSuffix)
>   .toString()
> {code}
> So the unique id and "-delete" suffix end up hogging some of the characters. 
> Deleting a long-named topic results in a log message such as the following:
> {code}
> kafka.common.KafkaStorageException: Failed to rename log directory from 
> /tmp/kafka-logs0/0-0
>  to 
> /tmp/kafka-logs0/0-0.797bba3fb2464729840f87769243edbb-delete
>   at kafka.log.LogManager.asyncDelete(LogManager.scala:439)
>   at 
> kafka.cluster.Partition$$anonfun$delete$1.apply$mcV$sp(Partition.scala:142)
>   at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:137)
>   at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:137)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213)
>   at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:221)
>   at kafka.cluster.Partition.delete(Partition.scala:137)
>   at kafka.server.ReplicaManager.stopReplica(ReplicaManager.scala:230)
>   at 
> kafka.server.ReplicaManager$$anonfun$stopReplicas$2.apply(ReplicaManager.scala:260)
>   at 
> kafka.server.ReplicaManager$$anonfun$stopReplicas$2.apply(ReplicaManager.scala:259)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at kafka.server.ReplicaManager.stopReplicas(ReplicaManager.scala:259)
>   at kafka.server.KafkaApis.handleStopReplicaRequest(KafkaApis.scala:174)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:86)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:64)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> The topic after this point still exists but has Leader set to -1 and the 
> controller recognizes the topic completion as incomplete (the topic znode is 
> still in /admin/delete_topics).
> I don't believe linkedin has any topic name this long but I'm making the 
> ticket in case anyone runs into this problem.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (KAFKA-4893) async topic deletion conflicts with max topic length

2017-03-13 Thread Vahid Hashemian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4893?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian reassigned KAFKA-4893:
--

Assignee: Vahid Hashemian

> async topic deletion conflicts with max topic length
> 
>
> Key: KAFKA-4893
> URL: https://issues.apache.org/jira/browse/KAFKA-4893
> Project: Kafka
>  Issue Type: Bug
>Reporter: Onur Karaman
>Assignee: Vahid Hashemian
>Priority: Minor
>
> As per the 
> [documentation|http://kafka.apache.org/documentation/#basic_ops_add_topic], 
> topics can be only 249 characters long to line up with typical filesystem 
> limitations:
> {quote}
> Each sharded partition log is placed into its own folder under the Kafka log 
> directory. The name of such folders consists of the topic name, appended by a 
> dash (\-) and the partition id. Since a typical folder name can not be over 
> 255 characters long, there will be a limitation on the length of topic names. 
> We assume the number of partitions will not ever be above 100,000. Therefore, 
> topic names cannot be longer than 249 characters. This leaves just enough 
> room in the folder name for a dash and a potentially 5 digit long partition 
> id.
> {quote}
> {{kafka.common.Topic.maxNameLength}} is set to 249 and is used during 
> validation.
> This limit ends up not being quite right since topic deletion ends up 
> renaming the directory to the form {{topic-partition.uniqueId-delete}} as can 
> be seen in {{LogManager.asyncDelete}}:
> {code}
> val dirName = new StringBuilder(removedLog.name)
>   .append(".")
>   
> .append(java.util.UUID.randomUUID.toString.replaceAll("-",""))
>   .append(Log.DeleteDirSuffix)
>   .toString()
> {code}
> So the unique id and "-delete" suffix end up hogging some of the characters. 
> Deleting a long-named topic results in a log message such as the following:
> {code}
> kafka.common.KafkaStorageException: Failed to rename log directory from 
> /tmp/kafka-logs0/0-0
>  to 
> /tmp/kafka-logs0/0-0.797bba3fb2464729840f87769243edbb-delete
>   at kafka.log.LogManager.asyncDelete(LogManager.scala:439)
>   at 
> kafka.cluster.Partition$$anonfun$delete$1.apply$mcV$sp(Partition.scala:142)
>   at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:137)
>   at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:137)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213)
>   at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:221)
>   at kafka.cluster.Partition.delete(Partition.scala:137)
>   at kafka.server.ReplicaManager.stopReplica(ReplicaManager.scala:230)
>   at 
> kafka.server.ReplicaManager$$anonfun$stopReplicas$2.apply(ReplicaManager.scala:260)
>   at 
> kafka.server.ReplicaManager$$anonfun$stopReplicas$2.apply(ReplicaManager.scala:259)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at kafka.server.ReplicaManager.stopReplicas(ReplicaManager.scala:259)
>   at kafka.server.KafkaApis.handleStopReplicaRequest(KafkaApis.scala:174)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:86)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:64)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> The topic after this point still exists but has Leader set to -1 and the 
> controller recognizes the topic completion as incomplete (the topic znode is 
> still in /admin/delete_topics).
> I don't believe linkedin has any topic name this long but I'm making the 
> ticket in case anyone runs into this problem.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4858) Long topic names created using old kafka-topics.sh can prevent newer brokers from joining any ISRs

2017-03-10 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15905927#comment-15905927
 ] 

Vahid Hashemian commented on KAFKA-4858:


[~jeffwidman] Well, assuming we are using an old client script, we are going to 
have the topic created in ZK (because the ZK path creation happens before the 
broker is triggered). So the PR improves upon how broker deals with these ZK 
topic nodes with very long names. Suggestions are welcome on how to improve the 
user experience in such a situation. Thanks.

> Long topic names created using old kafka-topics.sh can prevent newer brokers 
> from joining any ISRs
> --
>
> Key: KAFKA-4858
> URL: https://issues.apache.org/jira/browse/KAFKA-4858
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.1, 0.10.2.0
>Reporter: James Cheng
>Assignee: Vahid Hashemian
>
> I ran into a variant of KAFKA-3219 that resulted in a broker being unable to 
> join any ISRs the cluster.
> Prior to 0.10.0.0, the maximum topic length was 255.
> With 0.10.0.0 and beyond, the maximum topic length is 249.
> The check on topic name length is done by kafka-topics.sh prior to topic 
> creation. Thus, it is possible to use a 0.9.0.1 kafka-topics.sh script to 
> create a 255 character topic on a 0.10.1.1 broker.
> When this happens, you will get the following stack trace (the same one seen 
> in KAFKA-3219)
> {code}
> $ TOPIC=$(printf 'd%.0s' {1..255} ) ; bin/kafka-topics.sh --zookeeper 
> 127.0.0.1 --create --topic $TOPIC --partitions 1 --replication-factor 2
> Created topic 
> "ddd".
> {code}
> {code}
> [2017-03-06 22:01:19,011] ERROR [KafkaApi-2] Error when handling request 
> {controller_id=1,controller_epoch=1,partition_states=[{topic=ddd,partition=0,controller_epoch=1,leader=2,leader_epoch=0,isr=[2,1],zk_version=0,replicas=[2,1]}],live_leaders=[{id=2,host=jchengmbpro15,port=9093}]}
>  (kafka.server.KafkaApis)
> java.lang.NullPointerException
>   at 
> scala.collection.mutable.ArrayOps$ofRef$.length$extension(ArrayOps.scala:192)
>   at scala.collection.mutable.ArrayOps$ofRef.length(ArrayOps.scala:192)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:32)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>   at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
>   at kafka.log.Log.loadSegments(Log.scala:155)
>   at kafka.log.Log.(Log.scala:108)
>   at kafka.log.LogManager.createLog(LogManager.scala:362)
>   at kafka.cluster.Partition.getOrCreateReplica(Partition.scala:94)
>   at 
> kafka.cluster.Partition$$anonfun$4$$anonfun$apply$2.apply(Partition.scala:174)
>   at 
> kafka.cluster.Partition$$anonfun$4$$anonfun$apply$2.apply(Partition.scala:174)
>   at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
>   at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:174)
>   at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:168)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>   at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:242)
>   at kafka.cluster.Partition.makeLeader(Partition.scala:168)
>   at 
> kafka.server.ReplicaManager$$anonfun$makeLeaders$4.apply(ReplicaManager.scala:758)
>   at 
> kafka.server.ReplicaManager$$anonfun$makeLeaders$4.apply(ReplicaManager.scala:757)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
>   at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
>   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
>   at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
>   at kafka.server.ReplicaManager.makeLeaders(ReplicaManager.scala:757)
>   at 
> kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:703)
>   at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:148)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:82)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
>   at java.lang.Thread.run(Thread.java

[jira] [Commented] (KAFKA-4858) Long topic names created using old kafka-topics.sh can prevent newer brokers from joining any ISRs

2017-03-09 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904026#comment-15904026
 ] 

Vahid Hashemian commented on KAFKA-4858:


[~wushujames] [~jeffwidman] From what I can tell it all starts inside the 
controller package, where the listener for ZK changes is 
([here|https://github.com/apache/kafka/blob/a786be94788816bbce32a6cd6ffcf8949ed95556/core/src/main/scala/kafka/controller/PartitionStateMachine.scala#L407]).
 I agree that the broker should be able to handle situations like this more 
graciously. I'll try to find the right spot for adding the check and logging a 
warning message. Thank you both for your feedback.

> Long topic names created using old kafka-topics.sh can prevent newer brokers 
> from joining any ISRs
> --
>
> Key: KAFKA-4858
> URL: https://issues.apache.org/jira/browse/KAFKA-4858
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.1, 0.10.2.0
>Reporter: James Cheng
>Assignee: Vahid Hashemian
>
> I ran into a variant of KAFKA-3219 that resulted in a broker being unable to 
> join any ISRs the cluster.
> Prior to 0.10.0.0, the maximum topic length was 255.
> With 0.10.0.0 and beyond, the maximum topic length is 249.
> The check on topic name length is done by kafka-topics.sh prior to topic 
> creation. Thus, it is possible to use a 0.9.0.1 kafka-topics.sh script to 
> create a 255 character topic on a 0.10.1.1 broker.
> When this happens, you will get the following stack trace (the same one seen 
> in KAFKA-3219)
> {code}
> $ TOPIC=$(printf 'd%.0s' {1..255} ) ; bin/kafka-topics.sh --zookeeper 
> 127.0.0.1 --create --topic $TOPIC --partitions 1 --replication-factor 2
> Created topic 
> "ddd".
> {code}
> {code}
> [2017-03-06 22:01:19,011] ERROR [KafkaApi-2] Error when handling request 
> {controller_id=1,controller_epoch=1,partition_states=[{topic=ddd,partition=0,controller_epoch=1,leader=2,leader_epoch=0,isr=[2,1],zk_version=0,replicas=[2,1]}],live_leaders=[{id=2,host=jchengmbpro15,port=9093}]}
>  (kafka.server.KafkaApis)
> java.lang.NullPointerException
>   at 
> scala.collection.mutable.ArrayOps$ofRef$.length$extension(ArrayOps.scala:192)
>   at scala.collection.mutable.ArrayOps$ofRef.length(ArrayOps.scala:192)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:32)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>   at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
>   at kafka.log.Log.loadSegments(Log.scala:155)
>   at kafka.log.Log.(Log.scala:108)
>   at kafka.log.LogManager.createLog(LogManager.scala:362)
>   at kafka.cluster.Partition.getOrCreateReplica(Partition.scala:94)
>   at 
> kafka.cluster.Partition$$anonfun$4$$anonfun$apply$2.apply(Partition.scala:174)
>   at 
> kafka.cluster.Partition$$anonfun$4$$anonfun$apply$2.apply(Partition.scala:174)
>   at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
>   at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:174)
>   at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:168)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>   at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:242)
>   at kafka.cluster.Partition.makeLeader(Partition.scala:168)
>   at 
> kafka.server.ReplicaManager$$anonfun$makeLeaders$4.apply(ReplicaManager.scala:758)
>   at 
> kafka.server.ReplicaManager$$anonfun$makeLeaders$4.apply(ReplicaManager.scala:757)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
>   at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
>   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
>   at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
>   at kafka.server.ReplicaManager.makeLeaders(ReplicaManager.scala:757)
>   at 
> kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:703)
>   at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:148)
>   at kafka.server.KafkaApis.handle(KafkaA

[jira] [Commented] (KAFKA-4858) Long topic names created using old kafka-topics.sh can prevent newer brokers from joining any ISRs

2017-03-09 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903897#comment-15903897
 ] 

Vahid Hashemian commented on KAFKA-4858:


[~wushujames], so the issue is actually the same as what was reported in 
KAFKA-3219. The stack trace seems to be the same. The reason is the fix for 
KAFKA-3219 was on the client side. So it's not surprising to see the issue come 
back if you use a client that predates that fix.

In the particular case of this JIRA since we are not checking the topic length 
the topic is created in ZK. Up to this point the broker is not involved 
(AdminClient talks to ZK). This ZK addition triggers the broker's ZK listener 
and then the process of initializing the leader and ISR for topic partitions 
starts. However, during the process, the log folder for topic partitions cannot 
be created (since the name length is over 255 chars) and the exception that you 
reported is thrown. We added the check in the other JIRA to block the whole 
thing up-front on the client side.

Without the patch for KAFKA-3219 the topic and partitions will be created in 
ZK, and therefore the broker's listener will be invoked. I'm trying to think 
what a proper fix for the issue would be (and whether we need to fix it). 
Feedbacks are welcome!

> Long topic names created using old kafka-topics.sh can prevent newer brokers 
> from joining any ISRs
> --
>
> Key: KAFKA-4858
> URL: https://issues.apache.org/jira/browse/KAFKA-4858
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.1, 0.10.2.0
>Reporter: James Cheng
>Assignee: Vahid Hashemian
>
> I ran into a variant of KAFKA-3219 that resulted in a broker being unable to 
> join any ISRs the cluster.
> Prior to 0.10.0.0, the maximum topic length was 255.
> With 0.10.0.0 and beyond, the maximum topic length is 249.
> The check on topic name length is done by kafka-topics.sh prior to topic 
> creation. Thus, it is possible to use a 0.9.0.1 kafka-topics.sh script to 
> create a 255 character topic on a 0.10.1.1 broker.
> When this happens, you will get the following stack trace (the same one seen 
> in KAFKA-3219)
> {code}
> $ TOPIC=$(printf 'd%.0s' {1..255} ) ; bin/kafka-topics.sh --zookeeper 
> 127.0.0.1 --create --topic $TOPIC --partitions 1 --replication-factor 2
> Created topic 
> "ddd".
> {code}
> {code}
> [2017-03-06 22:01:19,011] ERROR [KafkaApi-2] Error when handling request 
> {controller_id=1,controller_epoch=1,partition_states=[{topic=ddd,partition=0,controller_epoch=1,leader=2,leader_epoch=0,isr=[2,1],zk_version=0,replicas=[2,1]}],live_leaders=[{id=2,host=jchengmbpro15,port=9093}]}
>  (kafka.server.KafkaApis)
> java.lang.NullPointerException
>   at 
> scala.collection.mutable.ArrayOps$ofRef$.length$extension(ArrayOps.scala:192)
>   at scala.collection.mutable.ArrayOps$ofRef.length(ArrayOps.scala:192)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:32)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>   at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
>   at kafka.log.Log.loadSegments(Log.scala:155)
>   at kafka.log.Log.(Log.scala:108)
>   at kafka.log.LogManager.createLog(LogManager.scala:362)
>   at kafka.cluster.Partition.getOrCreateReplica(Partition.scala:94)
>   at 
> kafka.cluster.Partition$$anonfun$4$$anonfun$apply$2.apply(Partition.scala:174)
>   at 
> kafka.cluster.Partition$$anonfun$4$$anonfun$apply$2.apply(Partition.scala:174)
>   at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
>   at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:174)
>   at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:168)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>   at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:242)
>   at kafka.cluster.Partition.makeLeader(Partition.scala:168)
>   at 
> kafka.server.ReplicaManager$$anonfun$makeLeaders$4.apply(ReplicaManager.scala:758)
>   at 
> kafka.server.ReplicaManager$$anonfun$makeLeaders$4.apply(ReplicaManager.scala:757)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99

[jira] [Assigned] (KAFKA-4858) Long topic names created using old kafka-topics.sh can prevent newer brokers from joining any ISRs

2017-03-07 Thread Vahid Hashemian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian reassigned KAFKA-4858:
--

Assignee: Vahid Hashemian

> Long topic names created using old kafka-topics.sh can prevent newer brokers 
> from joining any ISRs
> --
>
> Key: KAFKA-4858
> URL: https://issues.apache.org/jira/browse/KAFKA-4858
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.1, 0.10.2.0
>Reporter: James Cheng
>Assignee: Vahid Hashemian
>
> I ran into a variant of KAFKA-3219 that resulted in a broker being unable to 
> join any ISRs the cluster.
> Prior to 0.10.0.0, the maximum topic length was 255.
> With 0.10.0.0 and beyond, the maximum topic length is 249.
> The check on topic name length is done by kafka-topics.sh prior to topic 
> creation. Thus, it is possible to use a 0.9.0.1 kafka-topics.sh script to 
> create a 255 character topic on a 0.10.1.1 broker.
> When this happens, you will get the following stack trace (the same one seen 
> in KAFKA-3219)
> {code}
> $ TOPIC=$(printf 'd%.0s' {1..255} ) ; bin/kafka-topics.sh --zookeeper 
> 127.0.0.1 --create --topic $TOPIC --partitions 1 --replication-factor 2
> Created topic 
> "ddd".
> {code}
> {code}
> [2017-03-06 22:01:19,011] ERROR [KafkaApi-2] Error when handling request 
> {controller_id=1,controller_epoch=1,partition_states=[{topic=ddd,partition=0,controller_epoch=1,leader=2,leader_epoch=0,isr=[2,1],zk_version=0,replicas=[2,1]}],live_leaders=[{id=2,host=jchengmbpro15,port=9093}]}
>  (kafka.server.KafkaApis)
> java.lang.NullPointerException
>   at 
> scala.collection.mutable.ArrayOps$ofRef$.length$extension(ArrayOps.scala:192)
>   at scala.collection.mutable.ArrayOps$ofRef.length(ArrayOps.scala:192)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:32)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>   at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
>   at kafka.log.Log.loadSegments(Log.scala:155)
>   at kafka.log.Log.(Log.scala:108)
>   at kafka.log.LogManager.createLog(LogManager.scala:362)
>   at kafka.cluster.Partition.getOrCreateReplica(Partition.scala:94)
>   at 
> kafka.cluster.Partition$$anonfun$4$$anonfun$apply$2.apply(Partition.scala:174)
>   at 
> kafka.cluster.Partition$$anonfun$4$$anonfun$apply$2.apply(Partition.scala:174)
>   at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
>   at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:174)
>   at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:168)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>   at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:242)
>   at kafka.cluster.Partition.makeLeader(Partition.scala:168)
>   at 
> kafka.server.ReplicaManager$$anonfun$makeLeaders$4.apply(ReplicaManager.scala:758)
>   at 
> kafka.server.ReplicaManager$$anonfun$makeLeaders$4.apply(ReplicaManager.scala:757)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
>   at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
>   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
>   at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
>   at kafka.server.ReplicaManager.makeLeaders(ReplicaManager.scala:757)
>   at 
> kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:703)
>   at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:148)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:82)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> The topic does not get created on disk, but the broker thinks the topic is 
> ready. The broker seems functional, for other topics. I can produce/consume 
> to other topics.
> {code}
> $ ./bin/kafka-topics.sh --zookeeper 127.0.0.1 --describe
> Topic:dd

[jira] [Commented] (KAFKA-4795) Confusion around topic deletion

2017-03-07 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4795?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15900271#comment-15900271
 ] 

Vahid Hashemian commented on KAFKA-4795:


[~ijuma] [~hachikuji] I'd appreciate your clarification on this JIRA.

> Confusion around topic deletion
> ---
>
> Key: KAFKA-4795
> URL: https://issues.apache.org/jira/browse/KAFKA-4795
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.2.0
>Reporter: Vahid Hashemian
>Assignee: Vahid Hashemian
>
> The topic deletion works like this in 0.10.2.0:
> # {{bin/zookeeper-server-start.sh config/zookeeper.properties}}
> # {{bin/kafka-server-start.sh config/server.properties}} (uses default 
> {{server.properties}})
> # {{bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test 
> --replication-factor 1 --partitions 1}} (creates the topic {{test}})
> # {{bin/kafka-topics.sh --zookeeper localhost:2181 --list}} (returns {{test}})
> # {{bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test}} 
> (reports {{Topic test is marked for deletion. Note: This will have no impact 
> if delete.topic.enable is not set to true.}})
> # {{bin/kafka-topics.sh --zookeeper localhost:2181 --list}} (returns {{test}})
> Previously, the last command above returned {{test - marked for deletion}}, 
> which matched the output statement of the {{--delete}} topic command.
> Continuing with the above scenario,
> # stop the broker
> # add the broker config {{delete.topic.enable=true}} in the config file
> # {{bin/kafka-server-start.sh config/server.properties}} (this does not 
> remove the topic {{test}}, as if the topic was never marked for deletion).
> # {{bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test}} 
> (reports {{Topic test is marked for deletion. Note: This will have no impact 
> if delete.topic.enable is not set to true.}})
> # {{bin/kafka-topics.sh --zookeeper localhost:2181 --list}} (returns no 
> topics).
> It seems that the "marked for deletion" state for topics no longer exists.
> I opened this JIRA so I can get a confirmation on the expected topic deletion 
> behavior, because in any case, I think the user experience could be improved 
> (either there is a bug in the code, or the command's output statement is 
> misleading).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (KAFKA-4845) KafkaConsumer.seekToEnd cannot take effect when integrating with spark streaming

2017-03-07 Thread Vahid Hashemian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4845?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian resolved KAFKA-4845.

Resolution: Duplicate

> KafkaConsumer.seekToEnd cannot take effect when integrating with spark 
> streaming
> 
>
> Key: KAFKA-4845
> URL: https://issues.apache.org/jira/browse/KAFKA-4845
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.0, 0.10.1.1, 0.10.2.0
>Reporter: Dan
>Assignee: Vahid Hashemian
>
> When integrating with spark streaming, kafka consumer cannot get the latest 
> offsets except for one partition. The  code snippet is as follows: 
> {code}
> protected def latestOffsets(): Map[TopicPartition, Long] = {
> val c = consumer
> c.poll(0)
> val parts = c.assignment().asScala
> val newPartitions = parts.diff(currentOffsets.keySet)
> currentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> 
> c.position(tp)).toMap
> c.pause(newPartitions.asJava)
> c.seekToEnd(currentOffsets.keySet.asJava)
> parts.map(tp => tp -> c.position(tp)).toMap
>   }
> {code}
> When calling consumer.position(topicPartition), it will call 
> updateFetchPositions(Collections.singleton(partition)):
> The bug lies in updateFetchPositions(Set partitions):
> {code}
> fetcher.resetOffsetsIfNeeded(partitions);// reset to latest 
> offset for current partition
> if (!subscriptions.hasAllFetchPositions()) {  // called seekToEnd for 
> all partitions before, so this sentence will be true 
> coordinator.refreshCommittedOffsetsIfNeeded();
> fetcher.updateFetchPositions(partitions);  // reset to committed 
> offsets for current partition
> }
> {code}
> So eventually there is only one partition(the last partition in assignment) 
> can get latest offset while all the others get the committed offset.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4845) KafkaConsumer.seekToEnd cannot take effect when integrating with spark streaming

2017-03-07 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15899634#comment-15899634
 ] 

Vahid Hashemian commented on KAFKA-4845:


Great, then I'll mark this one as a duplicate of KAFKA-4547. Thanks for 
confirming.

> KafkaConsumer.seekToEnd cannot take effect when integrating with spark 
> streaming
> 
>
> Key: KAFKA-4845
> URL: https://issues.apache.org/jira/browse/KAFKA-4845
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.0, 0.10.1.1, 0.10.2.0
>Reporter: Dan
>Assignee: Vahid Hashemian
>
> When integrating with spark streaming, kafka consumer cannot get the latest 
> offsets except for one partition. The  code snippet is as follows: 
> {code}
> protected def latestOffsets(): Map[TopicPartition, Long] = {
> val c = consumer
> c.poll(0)
> val parts = c.assignment().asScala
> val newPartitions = parts.diff(currentOffsets.keySet)
> currentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> 
> c.position(tp)).toMap
> c.pause(newPartitions.asJava)
> c.seekToEnd(currentOffsets.keySet.asJava)
> parts.map(tp => tp -> c.position(tp)).toMap
>   }
> {code}
> When calling consumer.position(topicPartition), it will call 
> updateFetchPositions(Collections.singleton(partition)):
> The bug lies in updateFetchPositions(Set partitions):
> {code}
> fetcher.resetOffsetsIfNeeded(partitions);// reset to latest 
> offset for current partition
> if (!subscriptions.hasAllFetchPositions()) {  // called seekToEnd for 
> all partitions before, so this sentence will be true 
> coordinator.refreshCommittedOffsetsIfNeeded();
> fetcher.updateFetchPositions(partitions);  // reset to committed 
> offsets for current partition
> }
> {code}
> So eventually there is only one partition(the last partition in assignment) 
> can get latest offset while all the others get the committed offset.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (KAFKA-4845) KafkaConsumer.seekToEnd cannot take effect when integrating with spark streaming

2017-03-06 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15898258#comment-15898258
 ] 

Vahid Hashemian edited comment on KAFKA-4845 at 3/6/17 10:20 PM:
-

[~DanC], The issue you raised sounds very similar to the one reported in 
[KAFKA-4547|https://issues.apache.org/jira/browse/KAFKA-4547], that was fixed 
in 0.10.2.0.
Also, the seconds code snippet and the comments you added there apply to before 
[KAFKA-4547|https://issues.apache.org/jira/browse/KAFKA-4547] was 
[fixed|https://github.com/apache/kafka/commit/813897a00653351710d37acbbb598235e86db824#diff-267b7c1e68156c1301c56be63ae41dd0].
 The function {{updateFetchPositions(Set partitions)}} 
currently looks like this:
{code}
fetcher.resetOffsetsIfNeeded(partitions);
if (!subscriptions.hasAllFetchPositions(partitions)) {
coordinator.refreshCommittedOffsetsIfNeeded();
fetcher.updateFetchPositions(partitions);
}
{code}

So it sounds like you are not running the latest KafkaConsumer code. The issue 
you raised applies to 0.10.1.0 and 0.10.1.1 (a duplicate of 
[KAFKA-4547|https://issues.apache.org/jira/browse/KAFKA-4547]), but not to 
0.10.2.0.

Please advise if I misunderstood the defect or am missing something. Thanks.


was (Author: vahid):
[~DanC], The issue you raised sounds very similar to the one reported in 
[KAFKA-4547|https://issues.apache.org/jira/browse/KAFKA-4547], that was fixed 
in 0.10.2.0.
Also, the seconds code snippet and the comments you added there apply to before 
[KAFKA-4547|https://issues.apache.org/jira/browse/KAFKA-4547] was 
[fixed|https://github.com/apache/kafka/commit/813897a00653351710d37acbbb598235e86db824#diff-267b7c1e68156c1301c56be63ae41dd0].
 The function {{updateFetchPositions(Set partitions)}} 
currently looks like this:
{code}
fetcher.resetOffsetsIfNeeded(partitions);
if (!subscriptions.hasAllFetchPositions(partitions)) {
coordinator.refreshCommittedOffsetsIfNeeded();
fetcher.updateFetchPositions(partitions);
}
{code}

So it sounds like you are not running the latest KafkaConsumer code. The issue 
you raised applies to 0.10.1.0 and 0.10.1.1 (a duplicate of 
[KAFKA-4547|https://issues.apache.org/jira/browse/KAFKA-4547]), but not to 
0.10.2.0.

Please advise if I'm misunderstood the defect or am missing something. Thanks.

> KafkaConsumer.seekToEnd cannot take effect when integrating with spark 
> streaming
> 
>
> Key: KAFKA-4845
> URL: https://issues.apache.org/jira/browse/KAFKA-4845
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.0, 0.10.1.1, 0.10.2.0
>Reporter: Dan
>Assignee: Vahid Hashemian
>
> When integrating with spark streaming, kafka consumer cannot get the latest 
> offsets except for one partition. The  code snippet is as follows: 
> {code}
> protected def latestOffsets(): Map[TopicPartition, Long] = {
> val c = consumer
> c.poll(0)
> val parts = c.assignment().asScala
> val newPartitions = parts.diff(currentOffsets.keySet)
> currentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> 
> c.position(tp)).toMap
> c.pause(newPartitions.asJava)
> c.seekToEnd(currentOffsets.keySet.asJava)
> parts.map(tp => tp -> c.position(tp)).toMap
>   }
> {code}
> When calling consumer.position(topicPartition), it will call 
> updateFetchPositions(Collections.singleton(partition)):
> The bug lies in updateFetchPositions(Set partitions):
> {code}
> fetcher.resetOffsetsIfNeeded(partitions);// reset to latest 
> offset for current partition
> if (!subscriptions.hasAllFetchPositions()) {  // called seekToEnd for 
> all partitions before, so this sentence will be true 
> coordinator.refreshCommittedOffsetsIfNeeded();
> fetcher.updateFetchPositions(partitions);  // reset to committed 
> offsets for current partition
> }
> {code}
> So eventually there is only one partition(the last partition in assignment) 
> can get latest offset while all the others get the committed offset.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4845) KafkaConsumer.seekToEnd cannot take effect when integrating with spark streaming

2017-03-06 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15898258#comment-15898258
 ] 

Vahid Hashemian commented on KAFKA-4845:


[~DanC], The issue you raised sounds very similar to the one reported in 
[KAFKA-4547|https://issues.apache.org/jira/browse/KAFKA-4547], that was fixed 
in 0.10.2.0.
Also, the seconds code snippet and the comments you added there apply to before 
[KAFKA-4547|https://issues.apache.org/jira/browse/KAFKA-4547] was 
[fixed|https://github.com/apache/kafka/commit/813897a00653351710d37acbbb598235e86db824#diff-267b7c1e68156c1301c56be63ae41dd0].
 The function {{updateFetchPositions(Set partitions)}} 
currently looks like this:
{code}
fetcher.resetOffsetsIfNeeded(partitions);
if (!subscriptions.hasAllFetchPositions(partitions)) {
coordinator.refreshCommittedOffsetsIfNeeded();
fetcher.updateFetchPositions(partitions);
}
{code}

So it sounds like you are not running the latest KafkaConsumer code. The issue 
you raised applies to 0.10.1.0 and 0.10.1.1 (a duplicate of 
[KAFKA-4547|https://issues.apache.org/jira/browse/KAFKA-4547]), but not to 
0.10.2.0.

Please advise if I'm misunderstood the defect or am missing something. Thanks.

> KafkaConsumer.seekToEnd cannot take effect when integrating with spark 
> streaming
> 
>
> Key: KAFKA-4845
> URL: https://issues.apache.org/jira/browse/KAFKA-4845
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.0, 0.10.1.1, 0.10.2.0
>Reporter: Dan
>Assignee: Vahid Hashemian
>
> When integrating with spark streaming, kafka consumer cannot get the latest 
> offsets except for one partition. The  code snippet is as follows: 
> {code}
> protected def latestOffsets(): Map[TopicPartition, Long] = {
> val c = consumer
> c.poll(0)
> val parts = c.assignment().asScala
> val newPartitions = parts.diff(currentOffsets.keySet)
> currentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> 
> c.position(tp)).toMap
> c.pause(newPartitions.asJava)
> c.seekToEnd(currentOffsets.keySet.asJava)
> parts.map(tp => tp -> c.position(tp)).toMap
>   }
> {code}
> When calling consumer.position(topicPartition), it will call 
> updateFetchPositions(Collections.singleton(partition)):
> The bug lies in updateFetchPositions(Set partitions):
> {code}
> fetcher.resetOffsetsIfNeeded(partitions);// reset to latest 
> offset for current partition
> if (!subscriptions.hasAllFetchPositions()) {  // called seekToEnd for 
> all partitions before, so this sentence will be true 
> coordinator.refreshCommittedOffsetsIfNeeded();
> fetcher.updateFetchPositions(partitions);  // reset to committed 
> offsets for current partition
> }
> {code}
> So eventually there is only one partition(the last partition in assignment) 
> can get latest offset while all the others get the committed offset.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (KAFKA-4845) KafkaConsumer.seekToEnd cannot take effect when integrating with spark streaming

2017-03-06 Thread Vahid Hashemian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4845?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian reassigned KAFKA-4845:
--

Assignee: Vahid Hashemian

> KafkaConsumer.seekToEnd cannot take effect when integrating with spark 
> streaming
> 
>
> Key: KAFKA-4845
> URL: https://issues.apache.org/jira/browse/KAFKA-4845
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.0, 0.10.1.1, 0.10.2.0
>Reporter: Dan
>Assignee: Vahid Hashemian
>
> When integrating with spark streaming, kafka consumer cannot get the latest 
> offsets except for one partition. The  code snippet is as follows: 
> {code}
> protected def latestOffsets(): Map[TopicPartition, Long] = {
> val c = consumer
> c.poll(0)
> val parts = c.assignment().asScala
> val newPartitions = parts.diff(currentOffsets.keySet)
> currentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> 
> c.position(tp)).toMap
> c.pause(newPartitions.asJava)
> c.seekToEnd(currentOffsets.keySet.asJava)
> parts.map(tp => tp -> c.position(tp)).toMap
>   }
> {code}
> When calling consumer.position(topicPartition), it will call 
> updateFetchPositions(Collections.singleton(partition)):
> The bug lies in updateFetchPositions(Set partitions):
> {code}
> fetcher.resetOffsetsIfNeeded(partitions);// reset to latest 
> offset for current partition
> if (!subscriptions.hasAllFetchPositions()) {  // called seekToEnd for 
> all partitions before, so this sentence will be true 
> coordinator.refreshCommittedOffsetsIfNeeded();
> fetcher.updateFetchPositions(partitions);  // reset to committed 
> offsets for current partition
> }
> {code}
> So eventually there is only one partition(the last partition in assignment) 
> can get latest offset while all the others get the committed offset.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4845) KafkaConsumer.seekToEnd cannot take effect when integrating with spark streaming

2017-03-06 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15897780#comment-15897780
 ] 

Vahid Hashemian commented on KAFKA-4845:


[~ijuma], sure, I'll take a look.

> KafkaConsumer.seekToEnd cannot take effect when integrating with spark 
> streaming
> 
>
> Key: KAFKA-4845
> URL: https://issues.apache.org/jira/browse/KAFKA-4845
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.0, 0.10.1.1, 0.10.2.0
>Reporter: Dan
>
> When integrating with spark streaming, kafka consumer cannot get the latest 
> offsets except for one partition. The  code snippet is as follows: 
> {code}
> protected def latestOffsets(): Map[TopicPartition, Long] = {
> val c = consumer
> c.poll(0)
> val parts = c.assignment().asScala
> val newPartitions = parts.diff(currentOffsets.keySet)
> currentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> 
> c.position(tp)).toMap
> c.pause(newPartitions.asJava)
> c.seekToEnd(currentOffsets.keySet.asJava)
> parts.map(tp => tp -> c.position(tp)).toMap
>   }
> {code}
> When calling consumer.position(topicPartition), it will call 
> updateFetchPositions(Collections.singleton(partition)):
> The bug lies in updateFetchPositions(Set partitions):
> {code}
> fetcher.resetOffsetsIfNeeded(partitions);// reset to latest 
> offset for current partition
> if (!subscriptions.hasAllFetchPositions()) {  // called seekToEnd for 
> all partitions before, so this sentence will be true 
> coordinator.refreshCommittedOffsetsIfNeeded();
> fetcher.updateFetchPositions(partitions);  // reset to committed 
> offsets for current partition
> }
> {code}
> So eventually there is only one partition(the last partition in assignment) 
> can get latest offset while all the others get the committed offset.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4795) Confusion around topic deletion

2017-03-03 Thread Vahid Hashemian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian updated KAFKA-4795:
---
Description: 
The topic deletion works like this in 0.10.2.0:
# {{bin/zookeeper-server-start.sh config/zookeeper.properties}}
# {{bin/kafka-server-start.sh config/server.properties}} (uses default 
{{server.properties}})
# {{bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test 
--replication-factor 1 --partitions 1}} (creates the topic {{test}})
# {{bin/kafka-topics.sh --zookeeper localhost:2181 --list}} (returns {{test}})
# {{bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test}} 
(reports {{Topic test is marked for deletion. Note: This will have no impact if 
delete.topic.enable is not set to true.}})
# {{bin/kafka-topics.sh --zookeeper localhost:2181 --list}} (returns {{test}})

Previously, the last command above returned {{test - marked for deletion}}, 
which matched the output statement of the {{--delete}} topic command.

Continuing with the above scenario,
# stop the broker
# add the broker config {{delete.topic.enable=true}} in the config file
# {{bin/kafka-server-start.sh config/server.properties}} (this does not remove 
the topic {{test}}, as if the topic was never marked for deletion).
# {{bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test}} 
(reports {{Topic test is marked for deletion. Note: This will have no impact if 
delete.topic.enable is not set to true.}})
# {{bin/kafka-topics.sh --zookeeper localhost:2181 --list}} (returns no topics).

It seems that the "marked for deletion" state for topics no longer exists.

I opened this JIRA so I can get a confirmation on the expected topic deletion 
behavior, because in any case, I think the user experience could be improved 
(either there is a bug in the code, or the command's output statement is 
misleading).

  was:
The topic deletion works like in 0.10.2.0:
# {{bin/zookeeper-server-start.sh config/zookeeper.properties}}
# {{bin/kafka-server-start.sh config/server.properties}} (uses default 
{{server.properties}})
# {{bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test 
--replication-factor 1 --partitions 1}} (creates the topic {{test}})
# {{bin/kafka-topics.sh --zookeeper localhost:2181 --list}} (returns {{test}})
# {{bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test}} 
(reports {{Topic test is marked for deletion. Note: This will have no impact if 
delete.topic.enable is not set to true.}})
# {{bin/kafka-topics.sh --zookeeper localhost:2181 --list}} (returns {{test}})

Previously, the last command above returned {{test - marked for deletion}}, 
which matched the output statement of the {{--delete}} topic command.

Continuing with the above scenario,
# stop the broker
# add the broker config {{delete.topic.enable=true}} in the config file
# {{bin/kafka-server-start.sh config/server.properties}} (this does not remove 
the topic {{test}}, as if the topic was never marked for deletion).
# {{bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test}} 
(reports {{Topic test is marked for deletion. Note: This will have no impact if 
delete.topic.enable is not set to true.}})
# {{bin/kafka-topics.sh --zookeeper localhost:2181 --list}} (returns no topics).

It seems that the "marked for deletion" state for topics no longer exists.

I opened this JIRA so I can get a confirmation on the expected topic deletion 
behavior, because in any case, I think the user experience could be improved 
(either there is a bug in the code, or the command's output statement is 
misleading).


> Confusion around topic deletion
> ---
>
> Key: KAFKA-4795
> URL: https://issues.apache.org/jira/browse/KAFKA-4795
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.2.0
>Reporter: Vahid Hashemian
>Assignee: Vahid Hashemian
>
> The topic deletion works like this in 0.10.2.0:
> # {{bin/zookeeper-server-start.sh config/zookeeper.properties}}
> # {{bin/kafka-server-start.sh config/server.properties}} (uses default 
> {{server.properties}})
> # {{bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test 
> --replication-factor 1 --partitions 1}} (creates the topic {{test}})
> # {{bin/kafka-topics.sh --zookeeper localhost:2181 --list}} (returns {{test}})
> # {{bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test}} 
> (reports {{Topic test is marked for deletion. Note: This will have no impact 
> if delete.topic.enable is not set to true.}})
> # {{bin/kafka-topics.sh --zookeeper localhost:2181 --list}} (returns {{test}})
> Previously, the last command above returned {{test - marked for deletion}}, 
> which matched the output statement of the {{--delete}} topic command.
> Continuing with the above scenario,
> # stop the broker
> # add the broker config {

[jira] [Commented] (KAFKA-4795) Confusion around topic deletion

2017-03-03 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4795?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15895220#comment-15895220
 ] 

Vahid Hashemian commented on KAFKA-4795:


[~omkreddy] Thanks for your comment. I had looked at the JIRA you mentioned 
when I opened this ticket. I'm still not clear about what the expected behavior 
is because the inconsistency I explained above currently exists. As I mentioned 
it is as if there is no intermediary "marked for deletion" step anymore. Either 
{{\-\-delete}} has no effect at all, or it immediately removes the topic. Even 
if that is the intended behavior the output statements from {{\-\-delete}} are 
misleading.

> Confusion around topic deletion
> ---
>
> Key: KAFKA-4795
> URL: https://issues.apache.org/jira/browse/KAFKA-4795
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.2.0
>Reporter: Vahid Hashemian
>Assignee: Vahid Hashemian
>
> The topic deletion works like in 0.10.2.0:
> # {{bin/zookeeper-server-start.sh config/zookeeper.properties}}
> # {{bin/kafka-server-start.sh config/server.properties}} (uses default 
> {{server.properties}})
> # {{bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test 
> --replication-factor 1 --partitions 1}} (creates the topic {{test}})
> # {{bin/kafka-topics.sh --zookeeper localhost:2181 --list}} (returns {{test}})
> # {{bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test}} 
> (reports {{Topic test is marked for deletion. Note: This will have no impact 
> if delete.topic.enable is not set to true.}})
> # {{bin/kafka-topics.sh --zookeeper localhost:2181 --list}} (returns {{test}})
> Previously, the last command above returned {{test - marked for deletion}}, 
> which matched the output statement of the {{--delete}} topic command.
> Continuing with the above scenario,
> # stop the broker
> # add the broker config {{delete.topic.enable=true}} in the config file
> # {{bin/kafka-server-start.sh config/server.properties}} (this does not 
> remove the topic {{test}}, as if the topic was never marked for deletion).
> # {{bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test}} 
> (reports {{Topic test is marked for deletion. Note: This will have no impact 
> if delete.topic.enable is not set to true.}})
> # {{bin/kafka-topics.sh --zookeeper localhost:2181 --list}} (returns no 
> topics).
> It seems that the "marked for deletion" state for topics no longer exists.
> I opened this JIRA so I can get a confirmation on the expected topic deletion 
> behavior, because in any case, I think the user experience could be improved 
> (either there is a bug in the code, or the command's output statement is 
> misleading).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4095) When a topic is deleted and then created with the same name, 'committed' offsets are not reset

2017-02-28 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4095?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15889127#comment-15889127
 ] 

Vahid Hashemian commented on KAFKA-4095:


[~jeffwidman] This JIRA applies to the old (ZooKeeper based) consumer, as you 
mentioned. But the old consumer is not yet deprecated yet. It is expected to 
officially become deprecated in the next release though 
([KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-109%3A+Old+Consumer+Deprecation]).

> When a topic is deleted and then created with the same name, 'committed' 
> offsets are not reset
> --
>
> Key: KAFKA-4095
> URL: https://issues.apache.org/jira/browse/KAFKA-4095
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.9.0.1, 0.10.0.0
>Reporter: Alex Glikson
>Assignee: Vahid Hashemian
>
> I encountered a very strange behavior of Kafka, which seems to be a bug.
> After deleting a topic and re-creating it with the same name, I produced 
> certain amount of new messages, and then opened a consumer with the same ID 
> that I used before re-creating the topic (with auto.commit=false, 
> auto.offset.reset=earliest). While the latest offsets seemed up to date, the 
> *committed* offset (returned by committed() method) was an *old* offset, from 
> the time before the topic has been deleted and created.
> I would have assumed that when a topic is deleted, all the associated 
> topic-partitions and consumer groups are recycled too.
> I am using the Java client version 0.9, with Kafka server 0.10.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-4795) Confusion around topic deletion

2017-02-23 Thread Vahid Hashemian (JIRA)
Vahid Hashemian created KAFKA-4795:
--

 Summary: Confusion around topic deletion
 Key: KAFKA-4795
 URL: https://issues.apache.org/jira/browse/KAFKA-4795
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.10.2.0
Reporter: Vahid Hashemian
Assignee: Vahid Hashemian


The topic deletion works like in 0.10.2.0:
# {{bin/zookeeper-server-start.sh config/zookeeper.properties}}
# {{bin/kafka-server-start.sh config/server.properties}} (uses default 
{{server.properties}})
# {{bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test 
--replication-factor 1 --partitions 1}} (creates the topic {{test}})
# {{bin/kafka-topics.sh --zookeeper localhost:2181 --list}} (returns {{test}})
# {{bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test}} 
(reports {{Topic test is marked for deletion. Note: This will have no impact if 
delete.topic.enable is not set to true.}})
# {{bin/kafka-topics.sh --zookeeper localhost:2181 --list}} (returns {{test}})

Previously, the last command above returned {{test - marked for deletion}}, 
which matched the output statement of the {{--delete}} topic command.

Continuing with the above scenario,
# stop the broker
# add the broker config {{delete.topic.enable=true}} in the config file
# {{bin/kafka-server-start.sh config/server.properties}} (this does not remove 
the topic {{test}}, as if the topic was never marked for deletion).
# {{bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test}} 
(reports {{Topic test is marked for deletion. Note: This will have no impact if 
delete.topic.enable is not set to true.}})
# {{bin/kafka-topics.sh --zookeeper localhost:2181 --list}} (returns no topics).

It seems that the "marked for deletion" state for topics no longer exists.

I opened this JIRA so I can get a confirmation on the expected topic deletion 
behavior, because in any case, I think the user experience could be improved 
(either there is a bug in the code, or the command's output statement is 
misleading).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-2273) KIP-54: Add rebalance with a minimal number of reassignments to server-defined strategy list

2017-02-21 Thread Vahid Hashemian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian updated KAFKA-2273:
---
Summary: KIP-54: Add rebalance with a minimal number of reassignments to 
server-defined strategy list  (was: Kip-54: Add rebalance with a minimal number 
of reassignments to server-defined strategy list)

> KIP-54: Add rebalance with a minimal number of reassignments to 
> server-defined strategy list
> 
>
> Key: KAFKA-2273
> URL: https://issues.apache.org/jira/browse/KAFKA-2273
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Olof Johansson
>Assignee: Vahid Hashemian
>  Labels: kip
>
> Add a new partitions.assignment.strategy to the server-defined list that will 
> do reassignments based on moving as few partitions as possible. This should 
> be a quite common reassignment strategy especially for the cases where the 
> consumer has to maintain state, either in memory, or on disk.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-2273) Kip-54: Add rebalance with a minimal number of reassignments to server-defined strategy list

2017-02-21 Thread Vahid Hashemian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian updated KAFKA-2273:
---
Summary: Kip-54: Add rebalance with a minimal number of reassignments to 
server-defined strategy list  (was: Add rebalance with a minimal number of 
reassignments to server-defined strategy list)

> Kip-54: Add rebalance with a minimal number of reassignments to 
> server-defined strategy list
> 
>
> Key: KAFKA-2273
> URL: https://issues.apache.org/jira/browse/KAFKA-2273
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Olof Johansson
>Assignee: Vahid Hashemian
>  Labels: kip
>
> Add a new partitions.assignment.strategy to the server-defined list that will 
> do reassignments based on moving as few partitions as possible. This should 
> be a quite common reassignment strategy especially for the cases where the 
> consumer has to maintain state, either in memory, or on disk.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-2273) Add rebalance with a minimal number of reassignments to server-defined strategy list

2017-02-21 Thread Vahid Hashemian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian updated KAFKA-2273:
---
Status: Patch Available  (was: In Progress)

> Add rebalance with a minimal number of reassignments to server-defined 
> strategy list
> 
>
> Key: KAFKA-2273
> URL: https://issues.apache.org/jira/browse/KAFKA-2273
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Olof Johansson
>Assignee: Vahid Hashemian
>  Labels: kip
>
> Add a new partitions.assignment.strategy to the server-defined list that will 
> do reassignments based on moving as few partitions as possible. This should 
> be a quite common reassignment strategy especially for the cases where the 
> consumer has to maintain state, either in memory, or on disk.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-2273) Add rebalance with a minimal number of reassignments to server-defined strategy list

2017-02-21 Thread Vahid Hashemian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian updated KAFKA-2273:
---
Labels: kip  (was: kip newbie++ newbiee)

> Add rebalance with a minimal number of reassignments to server-defined 
> strategy list
> 
>
> Key: KAFKA-2273
> URL: https://issues.apache.org/jira/browse/KAFKA-2273
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Olof Johansson
>Assignee: Vahid Hashemian
>  Labels: kip
>
> Add a new partitions.assignment.strategy to the server-defined list that will 
> do reassignments based on moving as few partitions as possible. This should 
> be a quite common reassignment strategy especially for the cases where the 
> consumer has to maintain state, either in memory, or on disk.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4665) Inconsistent handling of non-existing topics in offset fetch handling

2017-02-14 Thread Vahid Hashemian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4665?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian updated KAFKA-4665:
---
Status: Patch Available  (was: In Progress)

> Inconsistent handling of non-existing topics in offset fetch handling
> -
>
> Key: KAFKA-4665
> URL: https://issues.apache.org/jira/browse/KAFKA-4665
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, core
>Reporter: Jason Gustafson
>Assignee: Vahid Hashemian
> Fix For: 0.10.3.0
>
>
> For version 0 of the offset fetch API, the broker returns 
> UNKNOWN_TOPIC_OR_PARTITION for any topics/partitions which do not exist at 
> the time of fetching. In later versions, we skip this check. We do, however, 
> continue to return UNKNOWN_TOPIC_OR_PARTITION for authorization errors (i.e. 
> if the principal does not have Describe access to the corresponding topic). 
> We should probably make this behavior consistent across versions.
> Note also that currently the consumer raises {{KafkaException}} when it 
> encounters an UNKNOWN_TOPIC_OR_PARTITION error in the offset fetch response, 
> which is inconsistent with how we usually handle this error. This probably 
> doesn't cause any problems currently only because of the inconsistency 
> mentioned in the first paragraph above.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-3264) Mark the old Scala consumer and related classes as deprecated

2017-02-13 Thread Vahid Hashemian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3264?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian updated KAFKA-3264:
---
Status: Patch Available  (was: Open)

> Mark the old Scala consumer and related classes as deprecated
> -
>
> Key: KAFKA-3264
> URL: https://issues.apache.org/jira/browse/KAFKA-3264
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Grant Henke
>Assignee: Vahid Hashemian
> Fix For: 0.10.3.0
>
>
> Once the new consumer is out of beta, we should consider deprecating the old 
> Scala consumers to encourage use of the new consumer and facilitate the 
> removal of the old consumers.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-2857) ConsumerGroupCommand throws GroupCoordinatorNotAvailableException when describing a non-existent group before the offset topic is created

2017-02-13 Thread Vahid Hashemian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian updated KAFKA-2857:
---
Status: Patch Available  (was: Open)

> ConsumerGroupCommand throws GroupCoordinatorNotAvailableException when 
> describing a non-existent group before the offset topic is created
> -
>
> Key: KAFKA-2857
> URL: https://issues.apache.org/jira/browse/KAFKA-2857
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Reporter: Ismael Juma
>Assignee: Vahid Hashemian
>Priority: Minor
>
> If we describe a non-existing group before the offset topic is created, like 
> the following:
> {code}
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --new-consumer 
> --describe --group 
> {code}
> We get the following error:
> {code}
> Error while executing consumer group command The group coordinator is not 
> available.
> org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException: The 
> group coordinator is not available.
> {code}
> The exception is thrown in the `adminClient.describeConsumerGroup` call. We 
> can't interpret this exception as meaning that the group doesn't exist 
> because it could also be thrown f all replicas for a offset topic partition 
> are down (as explained by Jun).
> Jun also suggested that we should distinguish if a coordinator is not 
> available from the case where a coordinator doesn't exist.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-2857) ConsumerGroupCommand throws GroupCoordinatorNotAvailableException when describing a non-existent group before the offset topic is created

2017-02-06 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15855052#comment-15855052
 ] 

Vahid Hashemian commented on KAFKA-2857:


Sorry, you are right. This would become a config option for the consumer group 
command (for new consumers only).

> ConsumerGroupCommand throws GroupCoordinatorNotAvailableException when 
> describing a non-existent group before the offset topic is created
> -
>
> Key: KAFKA-2857
> URL: https://issues.apache.org/jira/browse/KAFKA-2857
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Reporter: Ismael Juma
>Assignee: Vahid Hashemian
>Priority: Minor
>
> If we describe a non-existing group before the offset topic is created, like 
> the following:
> {code}
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --new-consumer 
> --describe --group 
> {code}
> We get the following error:
> {code}
> Error while executing consumer group command The group coordinator is not 
> available.
> org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException: The 
> group coordinator is not available.
> {code}
> The exception is thrown in the `adminClient.describeConsumerGroup` call. We 
> can't interpret this exception as meaning that the group doesn't exist 
> because it could also be thrown f all replicas for a offset topic partition 
> are down (as explained by Jun).
> Jun also suggested that we should distinguish if a coordinator is not 
> available from the case where a coordinator doesn't exist.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-2857) ConsumerGroupCommand throws GroupCoordinatorNotAvailableException when describing a non-existent group before the offset topic is created

2017-02-06 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15855001#comment-15855001
 ] 

Vahid Hashemian commented on KAFKA-2857:


[~hachikuji] Great. I assume you are suggesting adding a consumer config for 
this timeout. If so, do you think a KIP is required?

> ConsumerGroupCommand throws GroupCoordinatorNotAvailableException when 
> describing a non-existent group before the offset topic is created
> -
>
> Key: KAFKA-2857
> URL: https://issues.apache.org/jira/browse/KAFKA-2857
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Reporter: Ismael Juma
>Assignee: Vahid Hashemian
>Priority: Minor
>
> If we describe a non-existing group before the offset topic is created, like 
> the following:
> {code}
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --new-consumer 
> --describe --group 
> {code}
> We get the following error:
> {code}
> Error while executing consumer group command The group coordinator is not 
> available.
> org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException: The 
> group coordinator is not available.
> {code}
> The exception is thrown in the `adminClient.describeConsumerGroup` call. We 
> can't interpret this exception as meaning that the group doesn't exist 
> because it could also be thrown f all replicas for a offset topic partition 
> are down (as explained by Jun).
> Jun also suggested that we should distinguish if a coordinator is not 
> available from the case where a coordinator doesn't exist.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-2857) ConsumerGroupCommand throws GroupCoordinatorNotAvailableException when describing a non-existent group before the offset topic is created

2017-02-01 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15848972#comment-15848972
 ] 

Vahid Hashemian commented on KAFKA-2857:


[~hachikuji] I am thinking about the two options you suggested earlier 
([here|https://issues.apache.org/jira/browse/KAFKA-2857?focusedCommentId=15326653&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15326653])
 and am wondering if retrying in case of "coordinator not available" would 
provide a better user experience. IMHO, user's goal is to get the group details 
and they would probably rather not having to issue the command twice to get it.

> ConsumerGroupCommand throws GroupCoordinatorNotAvailableException when 
> describing a non-existent group before the offset topic is created
> -
>
> Key: KAFKA-2857
> URL: https://issues.apache.org/jira/browse/KAFKA-2857
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Reporter: Ismael Juma
>Assignee: Vahid Hashemian
>Priority: Minor
>
> If we describe a non-existing group before the offset topic is created, like 
> the following:
> {code}
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --new-consumer 
> --describe --group 
> {code}
> We get the following error:
> {code}
> Error while executing consumer group command The group coordinator is not 
> available.
> org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException: The 
> group coordinator is not available.
> {code}
> The exception is thrown in the `adminClient.describeConsumerGroup` call. We 
> can't interpret this exception as meaning that the group doesn't exist 
> because it could also be thrown f all replicas for a offset topic partition 
> are down (as explained by Jun).
> Jun also suggested that we should distinguish if a coordinator is not 
> available from the case where a coordinator doesn't exist.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4665) Inconsistent handling of non-existing topics in offset fetch handling

2017-01-26 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4665?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15840346#comment-15840346
 ] 

Vahid Hashemian commented on KAFKA-4665:


{quote}
Note also that currently the consumer raises {{KafkaException}} when it 
encounters an UNKNOWN_TOPIC_OR_PARTITION error in the offset fetch response, 
which is inconsistent with how we usually handle this error. This probably 
doesn't cause any problems currently only because of the inconsistency 
mentioned in the first paragraph above.
{quote}

[~hachikuji] Do you mean we should avoid raising exceptions for partition level 
errors? 
([here|https://github.com/apache/kafka/blob/254e3b77d656a610f19efd1124802e073dfda4b8/core/src/main/scala/kafka/admin/AdminClient.scala#L130])

> Inconsistent handling of non-existing topics in offset fetch handling
> -
>
> Key: KAFKA-4665
> URL: https://issues.apache.org/jira/browse/KAFKA-4665
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, core
>Reporter: Jason Gustafson
>Assignee: Vahid Hashemian
> Fix For: 0.10.3.0
>
>
> For version 0 of the offset fetch API, the broker returns 
> UNKNOWN_TOPIC_OR_PARTITION for any topics/partitions which do not exist at 
> the time of fetching. In later versions, we skip this check. We do, however, 
> continue to return UNKNOWN_TOPIC_OR_PARTITION for authorization errors (i.e. 
> if the principal does not have Describe access to the corresponding topic). 
> We should probably make this behavior consistent across versions.
> Note also that currently the consumer raises {{KafkaException}} when it 
> encounters an UNKNOWN_TOPIC_OR_PARTITION error in the offset fetch response, 
> which is inconsistent with how we usually handle this error. This probably 
> doesn't cause any problems currently only because of the inconsistency 
> mentioned in the first paragraph above.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


  1   2   3   4   5   >