[jira] [Commented] (KAFKA-6375) Follower replicas can never catch up to be ISR due to creating ReplicaFetcherThread failed.

2017-12-15 Thread huxihx (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16293576#comment-16293576
 ] 

huxihx commented on KAFKA-6375:
---

“ Unable to establish loopback connection” Did you disable the firewall?

> Follower replicas can never catch up to be ISR due to creating 
> ReplicaFetcherThread failed.
> ---
>
> Key: KAFKA-6375
> URL: https://issues.apache.org/jira/browse/KAFKA-6375
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.1.0
> Environment: Windows,  23 brokers KafkaCluster
>Reporter: Rong Tang
>
> Hi, I met with a case that in one broker, the out of sync replicas never 
> catch up.
> When the broker starts up, it receives LeaderAndISR requests from controller, 
> which will call createFetcherThread, the thread creation failed, with 
> exceptions below.
> And then, there is no fetcher for these follower replicas, and it is out of 
> sync forever. Unless, later, it receives LeaderAndISR requests that has 
> higher leader EPOCH.  The broker had 260 out of 330 replicas out of sync for 
> one day, until I restarted it.
> Restart the broker can mitigate the issue.
> I have 2 questions.  
> First, Why NEW ReplicaFetcherThread failed?
> *Second, should Kafka do something to fail over, instead of letting the 
> broker in abnormal state.*
> It is a 23 brokers Kafka cluster running on Windows. each broker has 330 
> replicas.
> [2017-12-13 16:29:21,317] ERROR Error on broker 1000 while processing 
> LeaderAndIsr request with correlationId 1 received from controller 427703487 
> epoch 22 (state.change.logger)
> org.apache.kafka.common.KafkaException: java.io.IOException: Unable to 
> establish loopback connection
>   at org.apache.kafka.common.network.Selector.(Selector.java:124)
>   at 
> kafka.server.ReplicaFetcherThread.(ReplicaFetcherThread.scala:87)
>   at 
> kafka.server.ReplicaFetcherManager.createFetcherThread(ReplicaFetcherManager.scala:35)
>   at 
> kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:83)
>   at 
> kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:78)
>   at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
>   at 
> scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221)
>   at 
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
>   at 
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
>   at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
>   at 
> kafka.server.AbstractFetcherManager.addFetcherForPartitions(AbstractFetcherManager.scala:78)
>   at kafka.server.ReplicaManager.makeFollowers(ReplicaManager.scala:869)
>   at 
> kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:689)
>   at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:149)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:83)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Unable to establish loopback connection
>   at sun.nio.ch.PipeImpl$Initializer.run(PipeImpl.java:94)
>   at sun.nio.ch.PipeImpl$Initializer.run(PipeImpl.java:61)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at sun.nio.ch.PipeImpl.(PipeImpl.java:171)
>   at 
> sun.nio.ch.SelectorProviderImpl.openPipe(SelectorProviderImpl.java:50)
>   at java.nio.channels.Pipe.open(Pipe.java:155)
>   at sun.nio.ch.WindowsSelectorImpl.(WindowsSelectorImpl.java:127)
>   at 
> sun.nio.ch.WindowsSelectorProvider.openSelector(WindowsSelectorProvider.java:44)
>   at java.nio.channels.Selector.open(Selector.java:227)
>   at org.apache.kafka.common.network.Selector.(Selector.java:122)
>   ... 16 more
> Caused by: java.net.ConnectException: Connection timed out: connect
>   at sun.nio.ch.Net.connect0(Native Method)
>   at sun.nio.ch.Net.connect(Net.java:454)
>   at sun.nio.ch.Net.connect(Net.java:446)
>   at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:648)
>   at java.nio.channels.SocketChannel.open(SocketChannel.java:189)
>   at 
> sun.nio.ch.PipeImpl$Initializer$LoopbackConnector.run(PipeImpl.java:127)
>   at sun.nio.ch.PipeImpl$Initializer.run(PipeImpl.java:76)
>   ... 25 more



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6375) Follower replicas can never catch up to be ISR due to creating ReplicaFetcherThread failed.

2017-12-15 Thread Rong Tang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rong Tang updated KAFKA-6375:
-
Description: 
Hi, I met with a case that in one broker, the out of sync replicas never catch 
up.
When the broker starts up, it receives LeaderAndISR requests from controller, 
which will call createFetcherThread, the thread creation failed, with 
exceptions below.

And then, there is no fetcher for these follower replicas, and it is out of 
sync forever. Unless, later, it receives LeaderAndISR requests that has higher 
leader EPOCH.  The broker had 260 out of 330 replicas out of sync for one day, 
until I restarted it.

Restart the broker can mitigate the issue.

I have 2 questions.  
First, Why NEW ReplicaFetcherThread failed?
*Second, should Kafka do something to fail over, instead of letting the broker 
in abnormal state.*

It is a 23 brokers Kafka cluster running on Windows. each broker has 330 
replicas.

[2017-12-13 16:29:21,317] ERROR Error on broker 1000 while processing 
LeaderAndIsr request with correlationId 1 received from controller 427703487 
epoch 22 (state.change.logger)
org.apache.kafka.common.KafkaException: java.io.IOException: Unable to 
establish loopback connection
at org.apache.kafka.common.network.Selector.(Selector.java:124)
at 
kafka.server.ReplicaFetcherThread.(ReplicaFetcherThread.scala:87)
at 
kafka.server.ReplicaFetcherManager.createFetcherThread(ReplicaFetcherManager.scala:35)
at 
kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:83)
at 
kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:78)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at 
scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221)
at 
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
at 
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at 
kafka.server.AbstractFetcherManager.addFetcherForPartitions(AbstractFetcherManager.scala:78)
at kafka.server.ReplicaManager.makeFollowers(ReplicaManager.scala:869)
at 
kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:689)
at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:149)
at kafka.server.KafkaApis.handle(KafkaApis.scala:83)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Unable to establish loopback connection
at sun.nio.ch.PipeImpl$Initializer.run(PipeImpl.java:94)
at sun.nio.ch.PipeImpl$Initializer.run(PipeImpl.java:61)
at java.security.AccessController.doPrivileged(Native Method)
at sun.nio.ch.PipeImpl.(PipeImpl.java:171)
at 
sun.nio.ch.SelectorProviderImpl.openPipe(SelectorProviderImpl.java:50)
at java.nio.channels.Pipe.open(Pipe.java:155)
at sun.nio.ch.WindowsSelectorImpl.(WindowsSelectorImpl.java:127)
at 
sun.nio.ch.WindowsSelectorProvider.openSelector(WindowsSelectorProvider.java:44)
at java.nio.channels.Selector.open(Selector.java:227)
at org.apache.kafka.common.network.Selector.(Selector.java:122)
... 16 more
Caused by: java.net.ConnectException: Connection timed out: connect
at sun.nio.ch.Net.connect0(Native Method)
at sun.nio.ch.Net.connect(Net.java:454)
at sun.nio.ch.Net.connect(Net.java:446)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:648)
at java.nio.channels.SocketChannel.open(SocketChannel.java:189)
at 
sun.nio.ch.PipeImpl$Initializer$LoopbackConnector.run(PipeImpl.java:127)
at sun.nio.ch.PipeImpl$Initializer.run(PipeImpl.java:76)
... 25 more


  was:
Hi, I met with a case that in one broker, the out of sync replicas never catch 
up.
When the broker starts up, it receives LeaderAndISR requests from controller, 
which will call createFetcherThread, the thread creation failed, with 
exceptions below.

And then, there is no fetcher for these follower replicas, and it is out of 
sync forever. Unless, later, it receives LeaderAndISR requests that has higher 
leader EPOCH.  The broker had 260 out of 330 replicas out of sync for one day, 
until I restarted it.

Restart the broker can mitigate the issue.

I have 2 questions.  
First, Why NEW ReplicaFetcherThread failed?
*Second, shouldn't Kafka do something to fail over, instead of letting the 
broker in abnormal state.*

It is a 23 brokers Kafka cluster running on Windows. each broker has 330 
replicas.

[2017-12-13 16:29:21,317] ERROR Error on broker 1000 while processing 
LeaderAnd

[jira] [Commented] (KAFKA-3410) Unclean leader election and "Halting because log truncation is not allowed"

2017-12-15 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16293473#comment-16293473
 ] 

Jun Rao commented on KAFKA-3410:


[~wushujames], yes, KAFKA-1211 should help address this. Do you want to try 
your set up again now that KAFKA-1211 is fixed?

> Unclean leader election and "Halting because log truncation is not allowed"
> ---
>
> Key: KAFKA-3410
> URL: https://issues.apache.org/jira/browse/KAFKA-3410
> Project: Kafka
>  Issue Type: Bug
>Reporter: James Cheng
>  Labels: reliability
>
> I ran into a scenario where one of my brokers would continually shutdown, 
> with the error message:
> [2016-02-25 00:29:39,236] FATAL [ReplicaFetcherThread-0-1], Halting because 
> log truncation is not allowed for topic test, Current leader 1's latest 
> offset 0 is less than replica 2's latest offset 151 
> (kafka.server.ReplicaFetcherThread)
> I managed to reproduce it with the following scenario:
> 1. Start broker1, with unclean.leader.election.enable=false
> 2. Start broker2, with unclean.leader.election.enable=false
> 3. Create topic, single partition, with replication-factor 2.
> 4. Write data to the topic.
> 5. At this point, both brokers are in the ISR. Broker1 is the partition 
> leader.
> 6. Ctrl-Z on broker2. (Simulates a GC pause or a slow network) Broker2 gets 
> dropped out of ISR. Broker1 is still the leader. I can still write data to 
> the partition.
> 7. Shutdown Broker1. Hard or controlled, doesn't matter.
> 8. rm -rf the log directory of broker1. (This simulates a disk replacement or 
> full hardware replacement)
> 9. Resume broker2. It attempts to connect to broker1, but doesn't succeed 
> because broker1 is down. At this point, the partition is offline. Can't write 
> to it.
> 10. Resume broker1. Broker1 resumes leadership of the topic. Broker2 attempts 
> to join ISR, and immediately halts with the error message:
> [2016-02-25 00:29:39,236] FATAL [ReplicaFetcherThread-0-1], Halting because 
> log truncation is not allowed for topic test, Current leader 1's latest 
> offset 0 is less than replica 2's latest offset 151 
> (kafka.server.ReplicaFetcherThread)
> I am able to recover by setting unclean.leader.election.enable=true on my 
> brokers.
> I'm trying to understand a couple things:
> * In step 10, why is broker1 allowed to resume leadership even though it has 
> no data?
> * In step 10, why is it necessary to stop the entire broker due to one 
> partition that is in this state? Wouldn't it be possible for the broker to 
> continue to serve traffic for all the other topics, and just mark this one as 
> unavailable?
> * Would it make sense to allow an operator to manually specify which broker 
> they want to become the new master? This would give me more control over how 
> much data loss I am willing to handle. In this case, I would want broker2 to 
> become the new master. Or, is that possible and I just don't know how to do 
> it?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6265) GlobalKTable missing #queryableStoreName()

2017-12-15 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6265?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16293467#comment-16293467
 ] 

Matthias J. Sax commented on KAFKA-6265:


[~Yohan123] Please assign ticket to yourself when you work on them. Thx.

> GlobalKTable missing #queryableStoreName()
> --
>
> Key: KAFKA-6265
> URL: https://issues.apache.org/jira/browse/KAFKA-6265
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Antony Stubbs
>Assignee: Richard Yu
>  Labels: beginner, needs-kip, newbie
> Fix For: 1.1.0
>
>
> KTable has the nicely useful #queryableStoreName(), it seems to be missing 
> from GlobalKTable



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (KAFKA-4999) Add convenience overload for seek* methods

2017-12-15 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4999?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-4999:
--

Assignee: Richard Yu

> Add convenience overload for seek* methods
> --
>
> Key: KAFKA-4999
> URL: https://issues.apache.org/jira/browse/KAFKA-4999
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Timo Meijer
>Assignee: Richard Yu
>  Labels: Quickfix
>
> The most common use case when using the seek* methods is to work on the 
> currently assigned partitions. This behavior is supported by passing an empty 
> list, but this is not very intuitive.
> Adding an overloaded method for all seek* methods without parameters that has 
> the same behavior; using the currently assigned partitions, would improve the 
> API and user experience.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (KAFKA-6265) GlobalKTable missing #queryableStoreName()

2017-12-15 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6265?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-6265:
--

Assignee: Richard Yu

> GlobalKTable missing #queryableStoreName()
> --
>
> Key: KAFKA-6265
> URL: https://issues.apache.org/jira/browse/KAFKA-6265
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Antony Stubbs
>Assignee: Richard Yu
>  Labels: beginner, needs-kip, newbie
>
> KTable has the nicely useful #queryableStoreName(), it seems to be missing 
> from GlobalKTable



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6265) GlobalKTable missing #queryableStoreName()

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6265?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16293412#comment-16293412
 ] 

ASF GitHub Bot commented on KAFKA-6265:
---

GitHub user ConcurrencyPractitioner opened a pull request:

https://github.com/apache/kafka/pull/4334

[KAFKA-6265] GlobalKTable missing #queryableStoreName()




You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ConcurrencyPractitioner/kafka trunk

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/4334.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4334


commit 9264592828fe7bc38c660588c07526de17a5b761
Author: RichardYuSTUG 
Date:   2017-12-15T23:40:54Z

[KAFKA-6265] GlobalKTable missing #queryableStoreName()




> GlobalKTable missing #queryableStoreName()
> --
>
> Key: KAFKA-6265
> URL: https://issues.apache.org/jira/browse/KAFKA-6265
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Antony Stubbs
>  Labels: beginner, needs-kip, newbie
>
> KTable has the nicely useful #queryableStoreName(), it seems to be missing 
> from GlobalKTable



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (KAFKA-6302) Topic can not be recreated after it is deleted

2017-12-15 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-6302:
--

Assignee: Matthias J. Sax

> Topic can not be recreated after it is deleted
> --
>
> Key: KAFKA-6302
> URL: https://issues.apache.org/jira/browse/KAFKA-6302
> Project: Kafka
>  Issue Type: Bug
>  Components: admin, clients
>Affects Versions: 1.0.0
>Reporter: kic
>Assignee: Matthias J. Sax
>
> I use an embedded kafka for unit test. My application relies on the ability 
> to recreate topics programmatically. Currently it is not possible to 
> re-create a topic after it has been deleted.
> {code}
> // needs compile time depedency 
> 'net.manub:scalatest-embedded-kafka_2.11:1.0.0' and 
> 'org.apache.kafka:kafka-clients:1.0.0'
> package kic.kafka.embedded
> import java.util.Properties
> import org.apache.kafka.clients.admin.{AdminClient, NewTopic}
> import org.scalatest._
> import scala.collection.JavaConverters._
> class EmbeddedKafaJavaWrapperTest extends FlatSpec with Matchers {
>   val props = new Properties()
>   val testTopic = "test-topic"
>   "The admin client" should "be able to create, delete and re-create topics" 
> in {
> props.setProperty("bootstrap.servers", "localhost:10001")
> props.setProperty("delete.enable.topic", "true")
> props.setProperty("group.id", "test-client")
> props.setProperty("key.deserializer", 
> "org.apache.kafka.common.serialization.LongDeserializer")
> props.setProperty("value.deserializer", 
> "org.apache.kafka.common.serialization.StringDeserializer")
> props.setProperty("clinet.id", "test-client")
> props.setProperty("key.serializer", 
> "org.apache.kafka.common.serialization.LongSerializer")
> props.setProperty("value.serializer", 
> "org.apache.kafka.common.serialization.StringSerializer")
> EmbeddedKafaJavaWrapper.start(10001, 10002, props)
> try {
>   implicit val admin = AdminClient.create(props)
>   // create topic and confirm it exists
>   createTopic(testTopic)
>   val topics = listTopics()
>   info(s"topics: $topics")
>   topics should contain(testTopic)
>   // now we should be able to send something to this topic
>   // TODO create producer and send something
>   // delete topic
>   deleteTopic(testTopic)
>   listTopics() shouldNot contain(testTopic)
>   // recreate topic
>   createTopic(testTopic)
>   // listTopics() should contain(testTopic)
>   // and finally consume from the topic and expect to get 0 entries
>   // TODO create consumer and poll once
> } finally {
>   EmbeddedKafaJavaWrapper.stop()
> }
>   }
>   def listTopics()(implicit admin: AdminClient) =
> admin.listTopics().names().get()
>   def createTopic(topic: String)(implicit admin: AdminClient) =
> admin.createTopics(Seq(new NewTopic(topic, 1, 1)).asJava)
>   def deleteTopic(topic: String)(implicit admin: AdminClient) =
> admin.deleteTopics(Seq("test-topic").asJava).all().get()
> }
> {code}
> Btw, what happens to connected producers/consumers when I delete a topic? 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6302) Topic can not be recreated after it is deleted

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16293400#comment-16293400
 ] 

ASF GitHub Bot commented on KAFKA-6302:
---

GitHub user mjsax opened a pull request:

https://github.com/apache/kafka/pull/4332

KAFKA-6302: Improve AdmintClient JavaDocs




You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mjsax/kafka 
kafka-6302-update-admin-client-javadoc

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/4332.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4332


commit c07ab77b7897fd1036b8111a2353e77837fe0f4b
Author: Matthias J. Sax 
Date:   2017-12-15T23:20:05Z

KAFKA-6302: Improve AdmintClient JavaDocs




> Topic can not be recreated after it is deleted
> --
>
> Key: KAFKA-6302
> URL: https://issues.apache.org/jira/browse/KAFKA-6302
> Project: Kafka
>  Issue Type: Bug
>  Components: admin, clients
>Affects Versions: 1.0.0
>Reporter: kic
>
> I use an embedded kafka for unit test. My application relies on the ability 
> to recreate topics programmatically. Currently it is not possible to 
> re-create a topic after it has been deleted.
> {code}
> // needs compile time depedency 
> 'net.manub:scalatest-embedded-kafka_2.11:1.0.0' and 
> 'org.apache.kafka:kafka-clients:1.0.0'
> package kic.kafka.embedded
> import java.util.Properties
> import org.apache.kafka.clients.admin.{AdminClient, NewTopic}
> import org.scalatest._
> import scala.collection.JavaConverters._
> class EmbeddedKafaJavaWrapperTest extends FlatSpec with Matchers {
>   val props = new Properties()
>   val testTopic = "test-topic"
>   "The admin client" should "be able to create, delete and re-create topics" 
> in {
> props.setProperty("bootstrap.servers", "localhost:10001")
> props.setProperty("delete.enable.topic", "true")
> props.setProperty("group.id", "test-client")
> props.setProperty("key.deserializer", 
> "org.apache.kafka.common.serialization.LongDeserializer")
> props.setProperty("value.deserializer", 
> "org.apache.kafka.common.serialization.StringDeserializer")
> props.setProperty("clinet.id", "test-client")
> props.setProperty("key.serializer", 
> "org.apache.kafka.common.serialization.LongSerializer")
> props.setProperty("value.serializer", 
> "org.apache.kafka.common.serialization.StringSerializer")
> EmbeddedKafaJavaWrapper.start(10001, 10002, props)
> try {
>   implicit val admin = AdminClient.create(props)
>   // create topic and confirm it exists
>   createTopic(testTopic)
>   val topics = listTopics()
>   info(s"topics: $topics")
>   topics should contain(testTopic)
>   // now we should be able to send something to this topic
>   // TODO create producer and send something
>   // delete topic
>   deleteTopic(testTopic)
>   listTopics() shouldNot contain(testTopic)
>   // recreate topic
>   createTopic(testTopic)
>   // listTopics() should contain(testTopic)
>   // and finally consume from the topic and expect to get 0 entries
>   // TODO create consumer and poll once
> } finally {
>   EmbeddedKafaJavaWrapper.stop()
> }
>   }
>   def listTopics()(implicit admin: AdminClient) =
> admin.listTopics().names().get()
>   def createTopic(topic: String)(implicit admin: AdminClient) =
> admin.createTopics(Seq(new NewTopic(topic, 1, 1)).asJava)
>   def deleteTopic(topic: String)(implicit admin: AdminClient) =
> admin.deleteTopics(Seq("test-topic").asJava).all().get()
> }
> {code}
> Btw, what happens to connected producers/consumers when I delete a topic? 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5473) handle ZK session expiration properly when a new session can't be established

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16293384#comment-16293384
 ] 

ASF GitHub Bot commented on KAFKA-5473:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/3990


> handle ZK session expiration properly when a new session can't be established
> -
>
> Key: KAFKA-5473
> URL: https://issues.apache.org/jira/browse/KAFKA-5473
> Project: Kafka
>  Issue Type: Sub-task
>Affects Versions: 0.9.0.0
>Reporter: Jun Rao
>Assignee: Prasanna Gautam
> Fix For: 1.1.0
>
>
> In https://issues.apache.org/jira/browse/KAFKA-2405, we change the logic in 
> handling ZK session expiration a bit. If a new ZK session can't be 
> established after session expiration, we just log an error and continue. 
> However, this can leave the broker in a bad state since it's up, but not 
> registered from the controller's perspective. Replicas on this broker may 
> never to be in sync.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6375) Follower replicas can never catch up to be ISR due to creating ReplicaFetcherThread failed.

2017-12-15 Thread Rong Tang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rong Tang updated KAFKA-6375:
-
Affects Version/s: (was: 0.10.2.0)
   0.10.1.0

> Follower replicas can never catch up to be ISR due to creating 
> ReplicaFetcherThread failed.
> ---
>
> Key: KAFKA-6375
> URL: https://issues.apache.org/jira/browse/KAFKA-6375
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.1.0
> Environment: Windows,  23 brokers KafkaCluster
>Reporter: Rong Tang
>
> Hi, I met with a case that in one broker, the out of sync replicas never 
> catch up.
> When the broker starts up, it receives LeaderAndISR requests from controller, 
> which will call createFetcherThread, the thread creation failed, with 
> exceptions below.
> And then, there is no fetcher for these follower replicas, and it is out of 
> sync forever. Unless, later, it receives LeaderAndISR requests that has 
> higher leader EPOCH.  The broker had 260 out of 330 replicas out of sync for 
> one day, until I restarted it.
> Restart the broker can mitigate the issue.
> I have 2 questions.  
> First, Why NEW ReplicaFetcherThread failed?
> *Second, shouldn't Kafka do something to fail over, instead of letting the 
> broker in abnormal state.*
> It is a 23 brokers Kafka cluster running on Windows. each broker has 330 
> replicas.
> [2017-12-13 16:29:21,317] ERROR Error on broker 1000 while processing 
> LeaderAndIsr request with correlationId 1 received from controller 427703487 
> epoch 22 (state.change.logger)
> org.apache.kafka.common.KafkaException: java.io.IOException: Unable to 
> establish loopback connection
>   at org.apache.kafka.common.network.Selector.(Selector.java:124)
>   at 
> kafka.server.ReplicaFetcherThread.(ReplicaFetcherThread.scala:87)
>   at 
> kafka.server.ReplicaFetcherManager.createFetcherThread(ReplicaFetcherManager.scala:35)
>   at 
> kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:83)
>   at 
> kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:78)
>   at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
>   at 
> scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221)
>   at 
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
>   at 
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
>   at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
>   at 
> kafka.server.AbstractFetcherManager.addFetcherForPartitions(AbstractFetcherManager.scala:78)
>   at kafka.server.ReplicaManager.makeFollowers(ReplicaManager.scala:869)
>   at 
> kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:689)
>   at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:149)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:83)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Unable to establish loopback connection
>   at sun.nio.ch.PipeImpl$Initializer.run(PipeImpl.java:94)
>   at sun.nio.ch.PipeImpl$Initializer.run(PipeImpl.java:61)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at sun.nio.ch.PipeImpl.(PipeImpl.java:171)
>   at 
> sun.nio.ch.SelectorProviderImpl.openPipe(SelectorProviderImpl.java:50)
>   at java.nio.channels.Pipe.open(Pipe.java:155)
>   at sun.nio.ch.WindowsSelectorImpl.(WindowsSelectorImpl.java:127)
>   at 
> sun.nio.ch.WindowsSelectorProvider.openSelector(WindowsSelectorProvider.java:44)
>   at java.nio.channels.Selector.open(Selector.java:227)
>   at org.apache.kafka.common.network.Selector.(Selector.java:122)
>   ... 16 more
> Caused by: java.net.ConnectException: Connection timed out: connect
>   at sun.nio.ch.Net.connect0(Native Method)
>   at sun.nio.ch.Net.connect(Net.java:454)
>   at sun.nio.ch.Net.connect(Net.java:446)
>   at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:648)
>   at java.nio.channels.SocketChannel.open(SocketChannel.java:189)
>   at 
> sun.nio.ch.PipeImpl$Initializer$LoopbackConnector.run(PipeImpl.java:127)
>   at sun.nio.ch.PipeImpl$Initializer.run(PipeImpl.java:76)
>   ... 25 more



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6265) GlobalKTable missing #queryableStoreName()

2017-12-15 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6265?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16293344#comment-16293344
 ] 

Guozhang Wang commented on KAFKA-6265:
--

That's a good point. 

{{KStreamBuilder}} is a deprecated class that has been replaced by 
{{StreamsBuilder}}, but it always passed in a non-null {{storeSupplier.name()}} 
to the {{KTableSourceValueGetterSupplier}} parameter of the 
{{GlobalKTableImpl}} constructor, if this name is generated as {{final String 
internalStoreName = queryableStoreName != null ? queryableStoreName : 
newStoreName(KTableImpl.SOURCE_NAME);}} then we should not return it in 
{{queryableStoreName}}. It is the same logic as in the new class where internal 
store names as of {{nameProvider.newStoreName(generatedStorePrefix)}} should 
not be returned.

> GlobalKTable missing #queryableStoreName()
> --
>
> Key: KAFKA-6265
> URL: https://issues.apache.org/jira/browse/KAFKA-6265
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Antony Stubbs
>  Labels: beginner, needs-kip, newbie
>
> KTable has the nicely useful #queryableStoreName(), it seems to be missing 
> from GlobalKTable



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6336) when using assign() with kafka consumer the KafkaConsumerGroup command doesnt show those consumers

2017-12-15 Thread Neerja Khattar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16293343#comment-16293343
 ] 

Neerja Khattar commented on KAFKA-6336:
---

[~hachikuji] I did try using --describe and that didn't work too it didn't give 
any result. I can try again if you want but I remember the groups which were 
listed using consumergroup command they did return a result with --describe.

> when using assign() with kafka consumer the KafkaConsumerGroup command doesnt 
> show those consumers
> --
>
> Key: KAFKA-6336
> URL: https://issues.apache.org/jira/browse/KAFKA-6336
> Project: Kafka
>  Issue Type: Bug
>Reporter: Neerja Khattar
>
> The issue is when using assign rather than subscribe for kafka consumers 
> commit not able to get the lag using ConsumerGroup command. It doesnt even 
> list those groups.
> JMX tool also doesnt show lag properly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6375) Follower replicas can never catch up to be ISR due to creating ReplicaFetcherThread failed.

2017-12-15 Thread Rong Tang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rong Tang updated KAFKA-6375:
-
Description: 
Hi, I met with a case that in one broker, the out of sync replicas never catch 
up.
When the broker starts up, it receives LeaderAndISR requests from controller, 
which will call createFetcherThread, the thread creation failed, with 
exceptions below.

And then, there is no fetcher for these follower replicas, and it is out of 
sync forever. Unless, later, it receives LeaderAndISR requests that has higher 
leader EPOCH.  The broker had 260 out of 330 replicas out of sync for one day, 
until I restarted it.

Restart the broker can mitigate the issue.

I have 2 questions.  
First, Why NEW ReplicaFetcherThread failed?
*Second, shouldn't Kafka do something to fail over, instead of letting the 
broker in abnormal state.*

It is a 23 brokers Kafka cluster running on Windows. each broker has 330 
replicas.

[2017-12-13 16:29:21,317] ERROR Error on broker 1000 while processing 
LeaderAndIsr request with correlationId 1 received from controller 427703487 
epoch 22 (state.change.logger)
org.apache.kafka.common.KafkaException: java.io.IOException: Unable to 
establish loopback connection
at org.apache.kafka.common.network.Selector.(Selector.java:124)
at 
kafka.server.ReplicaFetcherThread.(ReplicaFetcherThread.scala:87)
at 
kafka.server.ReplicaFetcherManager.createFetcherThread(ReplicaFetcherManager.scala:35)
at 
kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:83)
at 
kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:78)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at 
scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221)
at 
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
at 
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at 
kafka.server.AbstractFetcherManager.addFetcherForPartitions(AbstractFetcherManager.scala:78)
at kafka.server.ReplicaManager.makeFollowers(ReplicaManager.scala:869)
at 
kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:689)
at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:149)
at kafka.server.KafkaApis.handle(KafkaApis.scala:83)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Unable to establish loopback connection
at sun.nio.ch.PipeImpl$Initializer.run(PipeImpl.java:94)
at sun.nio.ch.PipeImpl$Initializer.run(PipeImpl.java:61)
at java.security.AccessController.doPrivileged(Native Method)
at sun.nio.ch.PipeImpl.(PipeImpl.java:171)
at 
sun.nio.ch.SelectorProviderImpl.openPipe(SelectorProviderImpl.java:50)
at java.nio.channels.Pipe.open(Pipe.java:155)
at sun.nio.ch.WindowsSelectorImpl.(WindowsSelectorImpl.java:127)
at 
sun.nio.ch.WindowsSelectorProvider.openSelector(WindowsSelectorProvider.java:44)
at java.nio.channels.Selector.open(Selector.java:227)
at org.apache.kafka.common.network.Selector.(Selector.java:122)
... 16 more
Caused by: java.net.ConnectException: Connection timed out: connect
at sun.nio.ch.Net.connect0(Native Method)
at sun.nio.ch.Net.connect(Net.java:454)
at sun.nio.ch.Net.connect(Net.java:446)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:648)
at java.nio.channels.SocketChannel.open(SocketChannel.java:189)
at 
sun.nio.ch.PipeImpl$Initializer$LoopbackConnector.run(PipeImpl.java:127)
at sun.nio.ch.PipeImpl$Initializer.run(PipeImpl.java:76)
... 25 more


  was:
Hi, I met with a case that in one broker, the out of sync replicas never catch 
up.
When the broker starts up, it receives LeaderAndISR requests from controller, 
which will call createFetcherThread, the thread creation failed, with 
exceptions below.

And then, there is no fetcher for these follower replicas, and it is out of 
sync forever. Unless, later, it receives LeaderAndISR requests that has higher 
leader EPOCH.  The broker had 260 out of 330 replicas out of sync for one day, 
until I restarted it.

Restart the broker can mitigate the issue.

I have 2 questions.  
First, Why NEW ReplicaFetcherThread failed?
*Second, shouldn't Kafka do something to fail over, instead of letting the 
broker in abnormal state.*

It is a 23 brokers Kafka cluster running on Windows. each broker has 330 
replicas.

[2017-12-13 16:29:21,317] ERROR Error on broker 1000 while processing 
Leader

[jira] [Updated] (KAFKA-6375) Follower replicas can never catch up to be ISR due to creating ReplicaFetcherThread failed.

2017-12-15 Thread Rong Tang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rong Tang updated KAFKA-6375:
-
Description: 
Hi, I met with a case that in one broker, the out of sync replicas never catch 
up.
When the broker starts up, it receives LeaderAndISR requests from controller, 
which will call createFetcherThread, the thread creation failed, with 
exceptions below.

And then, there is no fetcher for these follower replicas, and it is out of 
sync forever. Unless, later, it receives LeaderAndISR requests that has higher 
leader EPOCH.  The broker had 260 out of 330 replicas out of sync for one day, 
until I restarted it.

Restart the broker can mitigate the issue.

I have 2 questions.  
First, Why NEW ReplicaFetcherThread failed?
*Second, shouldn't Kafka do something to fail over, instead of letting the 
broker in abnormal state.*

It is a 23 brokers Kafka cluster running on Windows. each broker has 330 
replicas.

[2017-12-13 16:29:21,317] ERROR Error on broker 1000 while processing 
LeaderAndIsr request with correlationId 1 received from controller 427703487 
epoch 22 (state.change.logger)
org.apache.kafka.common.KafkaException: java.io.IOException: **Unable to 
establish loopback connection
at org.apache.kafka.common.network.Selector.(Selector.java:124)
at 
kafka.server.ReplicaFetcherThread.(ReplicaFetcherThread.scala:87)
at 
*kafka.server.ReplicaFetcherManager.createFetcherThread(ReplicaFetcherManager.scala:35)
at 
kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:83)*
at 
kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:78)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at 
scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221)
at 
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
at 
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at 
kafka.server.AbstractFetcherManager.addFetcherForPartitions(AbstractFetcherManager.scala:78)
at kafka.server.ReplicaManager.makeFollowers(ReplicaManager.scala:869)
at 
kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:689)
at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:149)
at kafka.server.KafkaApis.handle(KafkaApis.scala:83)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Unable to establish loopback connection
at sun.nio.ch.PipeImpl$Initializer.run(PipeImpl.java:94)
at sun.nio.ch.PipeImpl$Initializer.run(PipeImpl.java:61)
at java.security.AccessController.doPrivileged(Native Method)
at sun.nio.ch.PipeImpl.(PipeImpl.java:171)
at 
sun.nio.ch.SelectorProviderImpl.openPipe(SelectorProviderImpl.java:50)
at java.nio.channels.Pipe.open(Pipe.java:155)
at sun.nio.ch.WindowsSelectorImpl.(WindowsSelectorImpl.java:127)
at 
sun.nio.ch.WindowsSelectorProvider.openSelector(WindowsSelectorProvider.java:44)
at java.nio.channels.Selector.open(Selector.java:227)
at org.apache.kafka.common.network.Selector.(Selector.java:122)
... 16 more
Caused by: java.net.ConnectException: Connection timed out: connect
at sun.nio.ch.Net.connect0(Native Method)
at sun.nio.ch.Net.connect(Net.java:454)
at sun.nio.ch.Net.connect(Net.java:446)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:648)
at java.nio.channels.SocketChannel.open(SocketChannel.java:189)
at 
sun.nio.ch.PipeImpl$Initializer$LoopbackConnector.run(PipeImpl.java:127)
at sun.nio.ch.PipeImpl$Initializer.run(PipeImpl.java:76)
... 25 more


  was:
Hi, I met with a case that in one broker, the out of sync replicas never catch 
up.
When the broker starts up, it receives LeaderAndISR requests from controller, 
which will call createFetcherThread, the thread creation failed, with 
exceptions below.

And then, there is no fetcher for these follower replicas, and it is out of 
sync forever. Unless, later, it receives LeaderAndISR requests that has higher 
leader EPOCH.  The broker had 260 out of 330 replicas out of sync for one day, 
until I restarted it.

Restart the broker can mitigate the issue.

I have 2 questions.  
First, Why NEW ReplicaFetcherThread failed?
*Second, shouldn't Kafka do something to fail over, instead of letting the 
broker in abnormal state.*

It is a 23 brokers Kafka cluster running on Windows. each broker has 330 
replicas.

[2017-12-13 16:29:21,317] ERROR Error on broker 1000 while processing 
Le

[jira] [Updated] (KAFKA-6375) Follower replicas can never catch up to be ISR due to creating ReplicaFetcherThread failed.

2017-12-15 Thread Rong Tang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rong Tang updated KAFKA-6375:
-
Description: 
Hi, I met with a case that in one broker, the out of sync replicas never catch 
up.
When the broker starts up, it receives LeaderAndISR requests from controller, 
which will call createFetcherThread, the thread creation failed, with 
exceptions below.

And then, there is no fetcher for these follower replicas, and it is out of 
sync forever. Unless, later, it receives LeaderAndISR requests that has higher 
leader EPOCH.  The broker had 260 out of 330 replicas out of sync for one day, 
until I restarted it.

Restart the broker can mitigate the issue.

I have 2 questions.  
First, Why NEW ReplicaFetcherThread failed?
*Second, shouldn't Kafka do something to fail over, instead of letting the 
broker in abnormal state.*

It is a 23 brokers Kafka cluster running on Windows. each broker has 330 
replicas.

[2017-12-13 16:29:21,317] ERROR Error on broker 1000 while processing 
LeaderAndIsr request with correlationId 1 received from controller 427703487 
epoch 22 (state.change.logger)
*org.apache.kafka.common.KafkaException: java.io.IOException: Unable to 
establish loopback connection
at org.apache.kafka.common.network.Selector.(Selector.java:124)
at 
kafka.server.ReplicaFetcherThread.(ReplicaFetcherThread.scala:87)
at 
kafka.server.ReplicaFetcherManager.createFetcherThread(ReplicaFetcherManager.scala:35)
at 
kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:83)*
at 
kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:78)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at 
scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221)
at 
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
at 
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at 
kafka.server.AbstractFetcherManager.addFetcherForPartitions(AbstractFetcherManager.scala:78)
at kafka.server.ReplicaManager.makeFollowers(ReplicaManager.scala:869)
at 
kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:689)
at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:149)
at kafka.server.KafkaApis.handle(KafkaApis.scala:83)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Unable to establish loopback connection
at sun.nio.ch.PipeImpl$Initializer.run(PipeImpl.java:94)
at sun.nio.ch.PipeImpl$Initializer.run(PipeImpl.java:61)
at java.security.AccessController.doPrivileged(Native Method)
at sun.nio.ch.PipeImpl.(PipeImpl.java:171)
at 
sun.nio.ch.SelectorProviderImpl.openPipe(SelectorProviderImpl.java:50)
at java.nio.channels.Pipe.open(Pipe.java:155)
at sun.nio.ch.WindowsSelectorImpl.(WindowsSelectorImpl.java:127)
at 
sun.nio.ch.WindowsSelectorProvider.openSelector(WindowsSelectorProvider.java:44)
at java.nio.channels.Selector.open(Selector.java:227)
at org.apache.kafka.common.network.Selector.(Selector.java:122)
... 16 more
Caused by: java.net.ConnectException: Connection timed out: connect
at sun.nio.ch.Net.connect0(Native Method)
at sun.nio.ch.Net.connect(Net.java:454)
at sun.nio.ch.Net.connect(Net.java:446)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:648)
at java.nio.channels.SocketChannel.open(SocketChannel.java:189)
at 
sun.nio.ch.PipeImpl$Initializer$LoopbackConnector.run(PipeImpl.java:127)
at sun.nio.ch.PipeImpl$Initializer.run(PipeImpl.java:76)
... 25 more


  was:
Hi, I met with a case that in one broker, the out of sync replicas never catch 
up.
When the broker starts up, it receives LeaderAndISR requests from controller, 
which will call createFetcherThread, the thread creation failed, with 
exceptions below.

And then, there is no fetcher for these follower replicas, and it is out of 
sync forever. Unless, later, it receives LeaderAndISR requests that has higher 
leader EPOCH.  The broker had 260 out of 330 replicas out of sync for one day, 
until I restarted it.

Restart the broker can mitigate the issue.

I have 2 questions.  
First, Why NEW ReplicaFetcherThread failed?
*Second, shouldn't Kafka do something to fail over, instead of letting the 
broker in abnormal state.*

It is a 23 brokers Kafka cluster running on Windows. each broker has 330 
replicas.

[2017-12-13 16:29:21,317] ERROR Error on broker 1000 while processing 
Lead

[jira] [Updated] (KAFKA-6375) Follower replicas can never catch up to be ISR due to creating ReplicaFetcherThread failed.

2017-12-15 Thread Rong Tang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rong Tang updated KAFKA-6375:
-
Description: 
Hi, I met with a case that in one broker, the out of sync replicas never catch 
up.
When the broker starts up, it receives LeaderAndISR requests from controller, 
which will call createFetcherThread, the thread creation failed, with 
exceptions below.

And then, there is no fetcher for these follower replicas, and it is out of 
sync forever. Unless, later, it receives LeaderAndISR requests that has higher 
leader EPOCH.  The broker had 260 out of 330 replicas out of sync for one day, 
until I restarted it.

Restart the broker can mitigate the issue.

I have 2 questions.  
First, Why NEW ReplicaFetcherThread failed?
*Second, shouldn't Kafka do something to fail over, instead of letting the 
broker in abnormal state.*

It is a 23 brokers Kafka cluster running on Windows. each broker has 330 
replicas.

[2017-12-13 16:29:21,317] ERROR Error on broker 1000 while processing 
LeaderAndIsr request with correlationId 1 received from controller 427703487 
epoch 22 (state.change.logger)
org.apache.kafka.common.KafkaException: java.io.IOException: *Unable to 
establish loopback connection
at org.apache.kafka.common.network.Selector.(Selector.java:124)
at 
kafka.server.ReplicaFetcherThread.(ReplicaFetcherThread.scala:87)
at 
*kafka.server.ReplicaFetcherManager.createFetcherThread(ReplicaFetcherManager.scala:35)
at 
kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:83)
at 
kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:78)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at 
scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221)
at 
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
at 
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at 
kafka.server.AbstractFetcherManager.addFetcherForPartitions(AbstractFetcherManager.scala:78)
at kafka.server.ReplicaManager.makeFollowers(ReplicaManager.scala:869)
at 
kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:689)
at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:149)
at kafka.server.KafkaApis.handle(KafkaApis.scala:83)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Unable to establish loopback connection
at sun.nio.ch.PipeImpl$Initializer.run(PipeImpl.java:94)
at sun.nio.ch.PipeImpl$Initializer.run(PipeImpl.java:61)
at java.security.AccessController.doPrivileged(Native Method)
at sun.nio.ch.PipeImpl.(PipeImpl.java:171)
at 
sun.nio.ch.SelectorProviderImpl.openPipe(SelectorProviderImpl.java:50)
at java.nio.channels.Pipe.open(Pipe.java:155)
at sun.nio.ch.WindowsSelectorImpl.(WindowsSelectorImpl.java:127)
at 
sun.nio.ch.WindowsSelectorProvider.openSelector(WindowsSelectorProvider.java:44)
at java.nio.channels.Selector.open(Selector.java:227)
at org.apache.kafka.common.network.Selector.(Selector.java:122)
... 16 more
Caused by: java.net.ConnectException: Connection timed out: connect
at sun.nio.ch.Net.connect0(Native Method)
at sun.nio.ch.Net.connect(Net.java:454)
at sun.nio.ch.Net.connect(Net.java:446)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:648)
at java.nio.channels.SocketChannel.open(SocketChannel.java:189)
at 
sun.nio.ch.PipeImpl$Initializer$LoopbackConnector.run(PipeImpl.java:127)
at sun.nio.ch.PipeImpl$Initializer.run(PipeImpl.java:76)
... 25 more


  was:
Hi, I met with a case that in one broker, the out of sync replicas never catch 
up.
When the broker starts up, it receives LeaderAndISR requests from controller, 
which will call createFetcherThread, the thread creation failed, with 
exceptions below.

And then, there is no fetcher for these follower replicas, and it is out of 
sync forever. Unless, later, it receives LeaderAndISR requests that has higher 
leader EPOCH. 

Restart the broker can mitigate the issue.

I have 2 questions.  
First, Why NEW ReplicaFetcherThread failed?
*Second, shouldn't Kafka do something to fail over, instead of letting the 
broker in abnormal state.*

It is a 23 brokers Kafka cluster running on Windows. each broker has 330 
replicas.

[2017-12-13 16:29:21,317] ERROR Error on broker 1000 while processing 
LeaderAndIsr request with correlationId 1 received from controller 427703487 
epoch 22 (sta

[jira] [Created] (KAFKA-6375) Follower replicas can never catch up to be ISR due to creating ReplicaFetcherThread failed.

2017-12-15 Thread Rong Tang (JIRA)
Rong Tang created KAFKA-6375:


 Summary: Follower replicas can never catch up to be ISR due to 
creating ReplicaFetcherThread failed.
 Key: KAFKA-6375
 URL: https://issues.apache.org/jira/browse/KAFKA-6375
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.10.2.0
 Environment: Windows,  23 brokers KafkaCluster
Reporter: Rong Tang


Hi, I met with a case that in one broker, the out of sync replicas never catch 
up.
When the broker starts up, it receives LeaderAndISR requests from controller, 
which will call createFetcherThread, the thread creation failed, with 
exceptions below.

And then, there is no fetcher for these follower replicas, and it is out of 
sync forever. Unless, later, it receives LeaderAndISR requests that has higher 
leader EPOCH. 

Restart the broker can mitigate the issue.

I have 2 questions.  
First, Why NEW ReplicaFetcherThread failed?
*Second, shouldn't Kafka do something to fail over, instead of letting the 
broker in abnormal state.*

It is a 23 brokers Kafka cluster running on Windows. each broker has 330 
replicas.

[2017-12-13 16:29:21,317] ERROR Error on broker 1000 while processing 
LeaderAndIsr request with correlationId 1 received from controller 427703487 
epoch 22 (state.change.logger)
org.apache.kafka.common.KafkaException: java.io.IOException: *Unable to 
establish loopback connection
at org.apache.kafka.common.network.Selector.(Selector.java:124)
at 
kafka.server.ReplicaFetcherThread.(ReplicaFetcherThread.scala:87)
at 
*kafka.server.ReplicaFetcherManager.createFetcherThread(ReplicaFetcherManager.scala:35)
at 
kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:83)
at 
kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:78)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at 
scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221)
at 
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
at 
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at 
kafka.server.AbstractFetcherManager.addFetcherForPartitions(AbstractFetcherManager.scala:78)
at kafka.server.ReplicaManager.makeFollowers(ReplicaManager.scala:869)
at 
kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:689)
at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:149)
at kafka.server.KafkaApis.handle(KafkaApis.scala:83)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Unable to establish loopback connection
at sun.nio.ch.PipeImpl$Initializer.run(PipeImpl.java:94)
at sun.nio.ch.PipeImpl$Initializer.run(PipeImpl.java:61)
at java.security.AccessController.doPrivileged(Native Method)
at sun.nio.ch.PipeImpl.(PipeImpl.java:171)
at 
sun.nio.ch.SelectorProviderImpl.openPipe(SelectorProviderImpl.java:50)
at java.nio.channels.Pipe.open(Pipe.java:155)
at sun.nio.ch.WindowsSelectorImpl.(WindowsSelectorImpl.java:127)
at 
sun.nio.ch.WindowsSelectorProvider.openSelector(WindowsSelectorProvider.java:44)
at java.nio.channels.Selector.open(Selector.java:227)
at org.apache.kafka.common.network.Selector.(Selector.java:122)
... 16 more
Caused by: java.net.ConnectException: Connection timed out: connect
at sun.nio.ch.Net.connect0(Native Method)
at sun.nio.ch.Net.connect(Net.java:454)
at sun.nio.ch.Net.connect(Net.java:446)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:648)
at java.nio.channels.SocketChannel.open(SocketChannel.java:189)
at 
sun.nio.ch.PipeImpl$Initializer$LoopbackConnector.run(PipeImpl.java:127)
at sun.nio.ch.PipeImpl$Initializer.run(PipeImpl.java:76)
... 25 more




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6265) GlobalKTable missing #queryableStoreName()

2017-12-15 Thread Richard Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6265?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16293320#comment-16293320
 ] 

Richard Yu commented on KAFKA-6265:
---

I am not sure if this is a second way to instantiate it: (I found it in 
{{KStreamBuilder}})

{code:java}
@SuppressWarnings("unchecked")
private  GlobalKTable doGlobalTable(final Serde keySerde,
final Serde valSerde,
final TimestampExtractor 
timestampExtractor,
final String topic,
final 
org.apache.kafka.streams.processor.StateStoreSupplier 
storeSupplier) {
try {
Objects.requireNonNull(storeSupplier, "storeSupplier can't be 
null");
final String sourceName = newName(KStreamImpl.SOURCE_NAME);
final String processorName = newName(KTableImpl.SOURCE_NAME);
final KTableSource tableSource = new 
KTableSource<>(storeSupplier.name());

final Deserializer keyDeserializer = keySerde == null ? null : 
keySerde.deserializer();
final Deserializer valueDeserializer = valSerde == null ? null : 
valSerde.deserializer();

internalTopologyBuilder.addGlobalStore(storeSupplier, sourceName, 
timestampExtractor, keyDeserializer, valueDeserializer, topic, processorName, 
tableSource);
return new GlobalKTableImpl(new 
KTableSourceValueGetterSupplier<>(storeSupplier.name()));
} catch (final org.apache.kafka.streams.errors.TopologyException e) {
throw new 
org.apache.kafka.streams.errors.TopologyBuilderException(e);
}
}
{code}

The constructor for {{GlobalKTableImpl}} is called here. Does this affect our 
approach in any way?

> GlobalKTable missing #queryableStoreName()
> --
>
> Key: KAFKA-6265
> URL: https://issues.apache.org/jira/browse/KAFKA-6265
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Antony Stubbs
>  Labels: beginner, needs-kip, newbie
>
> KTable has the nicely useful #queryableStoreName(), it seems to be missing 
> from GlobalKTable



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Issue Comment Deleted] (KAFKA-6265) GlobalKTable missing #queryableStoreName()

2017-12-15 Thread Richard Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6265?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Yu updated KAFKA-6265:
--
Comment: was deleted

(was: I am not sure if this is a second way to instantiate it: (I found it in 
{{KStreamBuilder}})

{code:java}
@SuppressWarnings("unchecked")
private  GlobalKTable doGlobalTable(final Serde keySerde,
final Serde valSerde,
final TimestampExtractor 
timestampExtractor,
final String topic,
final 
org.apache.kafka.streams.processor.StateStoreSupplier 
storeSupplier) {
try {
Objects.requireNonNull(storeSupplier, "storeSupplier can't be 
null");
final String sourceName = newName(KStreamImpl.SOURCE_NAME);
final String processorName = newName(KTableImpl.SOURCE_NAME);
final KTableSource tableSource = new 
KTableSource<>(storeSupplier.name());

final Deserializer keyDeserializer = keySerde == null ? null : 
keySerde.deserializer();
final Deserializer valueDeserializer = valSerde == null ? null : 
valSerde.deserializer();

internalTopologyBuilder.addGlobalStore(storeSupplier, sourceName, 
timestampExtractor, keyDeserializer, valueDeserializer, topic, processorName, 
tableSource);
return new GlobalKTableImpl(new 
KTableSourceValueGetterSupplier<>(storeSupplier.name()));
} catch (final org.apache.kafka.streams.errors.TopologyException e) {
throw new 
org.apache.kafka.streams.errors.TopologyBuilderException(e);
}
}
{code}

The constructor for {{GlobalKTableImpl}} is called here. Does this affect our 
approach in any way?)

> GlobalKTable missing #queryableStoreName()
> --
>
> Key: KAFKA-6265
> URL: https://issues.apache.org/jira/browse/KAFKA-6265
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Antony Stubbs
>  Labels: beginner, needs-kip, newbie
>
> KTable has the nicely useful #queryableStoreName(), it seems to be missing 
> from GlobalKTable



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (KAFKA-3955) Kafka log recovery doesn't truncate logs on non-monotonic offsets, leading to failed broker boot

2017-12-15 Thread James Cheng (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16293291#comment-16293291
 ] 

James Cheng edited comment on KAFKA-3955 at 12/15/17 9:45 PM:
--

[~ijuma], I said in my comment above that:
{quote}
Note that it is only safe to do this if the partition is online (being lead by 
someone other than the affected broker). If the partition is offline and the 
preferred replica for it is the one with this problem, then deleting the log 
directory on the leader will cause 
https://issues.apache.org/jira/browse/KAFKA-3410 to happen (because the leader 
will now have no data, but followers do have data, and so follower will be 
ahead of leader).

{quote}

If this happens on the offline partition leader, then what are my options? I 
think that I will need to first, delete all the logs for this partition from 
the offline partition leader's log.dir, and then I think I will need to turn on 
unclean.leader.election for that topic (in order to prevent KAFKA-3410)

Does that sound right?


was (Author: wushujames):
[~ijuma], I said in my comment that:
{quote}
Note that it is only safe to do this if the partition is online (being lead by 
someone other than the affected broker). If the partition is offline and the 
preferred replica for it is the one with this problem, then deleting the log 
directory on the leader will cause 
https://issues.apache.org/jira/browse/KAFKA-3410 to happen (because the leader 
will now have no data, but followers do have data, and so follower will be 
ahead of leader).

{quote}

If this happens on the offline partition leader, then what are my options? I 
think that I will need to first, delete all the logs for this partition from 
the offline partition leader's log.dir, and then I think I will need to turn on 
unclean.leader.election for that topic (in order to prevent KAFKA-3410)

Does that sound right?

> Kafka log recovery doesn't truncate logs on non-monotonic offsets, leading to 
> failed broker boot
> 
>
> Key: KAFKA-3955
> URL: https://issues.apache.org/jira/browse/KAFKA-3955
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.0, 0.10.1.1, 0.11.0.0, 1.0.0
>Reporter: Tom Crayford
>Assignee: Ismael Juma
>Priority: Critical
>  Labels: reliability
> Fix For: 1.1.0
>
>
> Hi,
> I've found a bug impacting kafka brokers on startup after an unclean 
> shutdown. If a log segment is corrupt and has non-monotonic offsets (see the 
> appendix of this bug for a sample output from {{DumpLogSegments}}), then 
> {{LogSegment.recover}} throws an {{InvalidOffsetException}} error: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/OffsetIndex.scala#L218
> That code is called by {{LogSegment.recover}}: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L191
> Which is called in several places in {{Log.scala}}. Notably it's called four 
> times during recovery:
> Thrice in Log.loadSegments
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L199
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L204
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L226
> and once in Log.recoverLog
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L268
> Of these, only the very last one has a {{catch}} for 
> {{InvalidOffsetException}}. When that catches the issue, it truncates the 
> whole log (not just this segment): 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L274
>  to the start segment of the bad log segment.
> However, this code can't be hit on recovery, because of the code paths in 
> {{loadSegments}} - they mean we'll never hit truncation here, as we always 
> throw this exception and that goes all the way to the toplevel exception 
> handler and crashes the JVM.
> As {{Log.recoverLog}} is always called during recovery, I *think* a fix for 
> this is to move this crash recovery/truncate code inside a new method in 
> {{Log.scala}}, and call that instead of {{LogSegment.recover}} in each place. 
> That code should return the number of {{truncatedBytes}} like we do in 
> {{Log.recoverLog}} and then truncate the log. The callers will have to be 
> notified "stop iterating over files in the directory", likely via a return 
> value of {{truncatedBytes}} like {{Log.recoverLog` does right now.
> I'm happy working on a patch for this. I'm aware this recovery code is tricky 
> and important to get right.
> I'm also curious (and currently don't have good theories as of yet) as to how 
> this log segment got into thi

[jira] [Commented] (KAFKA-3410) Unclean leader election and "Halting because log truncation is not allowed"

2017-12-15 Thread James Cheng (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16293294#comment-16293294
 ] 

James Cheng commented on KAFKA-3410:


Do we think KAFKA-1211 resolves this? If so, should we set Fix Version and 
Resolution on this JIRA?

> Unclean leader election and "Halting because log truncation is not allowed"
> ---
>
> Key: KAFKA-3410
> URL: https://issues.apache.org/jira/browse/KAFKA-3410
> Project: Kafka
>  Issue Type: Bug
>Reporter: James Cheng
>  Labels: reliability
>
> I ran into a scenario where one of my brokers would continually shutdown, 
> with the error message:
> [2016-02-25 00:29:39,236] FATAL [ReplicaFetcherThread-0-1], Halting because 
> log truncation is not allowed for topic test, Current leader 1's latest 
> offset 0 is less than replica 2's latest offset 151 
> (kafka.server.ReplicaFetcherThread)
> I managed to reproduce it with the following scenario:
> 1. Start broker1, with unclean.leader.election.enable=false
> 2. Start broker2, with unclean.leader.election.enable=false
> 3. Create topic, single partition, with replication-factor 2.
> 4. Write data to the topic.
> 5. At this point, both brokers are in the ISR. Broker1 is the partition 
> leader.
> 6. Ctrl-Z on broker2. (Simulates a GC pause or a slow network) Broker2 gets 
> dropped out of ISR. Broker1 is still the leader. I can still write data to 
> the partition.
> 7. Shutdown Broker1. Hard or controlled, doesn't matter.
> 8. rm -rf the log directory of broker1. (This simulates a disk replacement or 
> full hardware replacement)
> 9. Resume broker2. It attempts to connect to broker1, but doesn't succeed 
> because broker1 is down. At this point, the partition is offline. Can't write 
> to it.
> 10. Resume broker1. Broker1 resumes leadership of the topic. Broker2 attempts 
> to join ISR, and immediately halts with the error message:
> [2016-02-25 00:29:39,236] FATAL [ReplicaFetcherThread-0-1], Halting because 
> log truncation is not allowed for topic test, Current leader 1's latest 
> offset 0 is less than replica 2's latest offset 151 
> (kafka.server.ReplicaFetcherThread)
> I am able to recover by setting unclean.leader.election.enable=true on my 
> brokers.
> I'm trying to understand a couple things:
> * In step 10, why is broker1 allowed to resume leadership even though it has 
> no data?
> * In step 10, why is it necessary to stop the entire broker due to one 
> partition that is in this state? Wouldn't it be possible for the broker to 
> continue to serve traffic for all the other topics, and just mark this one as 
> unavailable?
> * Would it make sense to allow an operator to manually specify which broker 
> they want to become the new master? This would give me more control over how 
> much data loss I am willing to handle. In this case, I would want broker2 to 
> become the new master. Or, is that possible and I just don't know how to do 
> it?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-3955) Kafka log recovery doesn't truncate logs on non-monotonic offsets, leading to failed broker boot

2017-12-15 Thread James Cheng (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16293291#comment-16293291
 ] 

James Cheng commented on KAFKA-3955:


[~ijuma], I said in my comment that:
{quote}
Note that it is only safe to do this if the partition is online (being lead by 
someone other than the affected broker). If the partition is offline and the 
preferred replica for it is the one with this problem, then deleting the log 
directory on the leader will cause 
https://issues.apache.org/jira/browse/KAFKA-3410 to happen (because the leader 
will now have no data, but followers do have data, and so follower will be 
ahead of leader).

{quote}

If this happens on the offline partition leader, then what are my options? I 
think that I will need to first, delete all the logs for this partition from 
the offline partition leader's log.dir, and then I think I will need to turn on 
unclean.leader.election for that topic (in order to prevent KAFKA-3410)

Does that sound right?

> Kafka log recovery doesn't truncate logs on non-monotonic offsets, leading to 
> failed broker boot
> 
>
> Key: KAFKA-3955
> URL: https://issues.apache.org/jira/browse/KAFKA-3955
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.0, 0.10.1.1, 0.11.0.0, 1.0.0
>Reporter: Tom Crayford
>Assignee: Ismael Juma
>Priority: Critical
>  Labels: reliability
> Fix For: 1.1.0
>
>
> Hi,
> I've found a bug impacting kafka brokers on startup after an unclean 
> shutdown. If a log segment is corrupt and has non-monotonic offsets (see the 
> appendix of this bug for a sample output from {{DumpLogSegments}}), then 
> {{LogSegment.recover}} throws an {{InvalidOffsetException}} error: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/OffsetIndex.scala#L218
> That code is called by {{LogSegment.recover}}: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L191
> Which is called in several places in {{Log.scala}}. Notably it's called four 
> times during recovery:
> Thrice in Log.loadSegments
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L199
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L204
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L226
> and once in Log.recoverLog
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L268
> Of these, only the very last one has a {{catch}} for 
> {{InvalidOffsetException}}. When that catches the issue, it truncates the 
> whole log (not just this segment): 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L274
>  to the start segment of the bad log segment.
> However, this code can't be hit on recovery, because of the code paths in 
> {{loadSegments}} - they mean we'll never hit truncation here, as we always 
> throw this exception and that goes all the way to the toplevel exception 
> handler and crashes the JVM.
> As {{Log.recoverLog}} is always called during recovery, I *think* a fix for 
> this is to move this crash recovery/truncate code inside a new method in 
> {{Log.scala}}, and call that instead of {{LogSegment.recover}} in each place. 
> That code should return the number of {{truncatedBytes}} like we do in 
> {{Log.recoverLog}} and then truncate the log. The callers will have to be 
> notified "stop iterating over files in the directory", likely via a return 
> value of {{truncatedBytes}} like {{Log.recoverLog` does right now.
> I'm happy working on a patch for this. I'm aware this recovery code is tricky 
> and important to get right.
> I'm also curious (and currently don't have good theories as of yet) as to how 
> this log segment got into this state with non-monotonic offsets. This segment 
> is using gzip compression, and is under 0.9.0.1. The same bug with respect to 
> recovery exists in trunk, but I'm unsure if the new handling around 
> compressed messages (KIP-31) means the bug where non-monotonic offsets get 
> appended is still present in trunk.
> As a production workaround, one can manually truncate that log folder 
> yourself (delete all .index/.log files including and after the one with the 
> bad offset). However, kafka should (and can) handle this case well - with 
> replication we can truncate in broker startup.
> stacktrace and error message:
> {code}
> pri=WARN  t=pool-3-thread-4 at=Log Found a corrupted index file, 
> /$DIRECTORY/$TOPIC-22/14306536.index, deleting and rebuilding 
> index...
> pri=ERROR t=main at=LogManager There was an error in one of the threads 
> during logs loading: kafka.common.InvalidOffsetEx

[jira] [Updated] (KAFKA-6374) Constant Consumer Errors after replacing a broker

2017-12-15 Thread Aravind Velamur Srinivasan (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aravind Velamur Srinivasan updated KAFKA-6374:
--
Description: 
We had to replace one of the brokers for maintenance reasons. We already did 
this a couple of times earlier without any issues but ran into one this time. 
We did the following to replace the broker:
(1) Gracefully stop the Kafka broker (id: 48)
(2) Make sure producers/consumers were fine (the consumers coordinated by this 
broker now were managed by another broker and things were fine)
(3) Spin up a new instance with the same IP
(4) Make sure the new instance's config is the same as old with the same broker 
ID.
(5) Bring the new one back up.

It took ~35 to 40 mins to do this. But once the broker came back up, the 
consumer groups coordinated by this broker were getting constant errors that 
this CG is not coordinated by this broker for nearly 30 to 40 mins until i 
stopped the broker again.

Looks like the metadata kept returning that the coordinator for this CG is the 
same old broker (id 48) even after the client kept on asking for the 
coordinator.

(1) Are there any known issues/recent fixes for this?
(2) Why didn't the metadata refresh? Any ideas on what could be happening?

We were constantly getting errors when trying to fetch the coordinator like 
this:
'
sarama-logger : client/coordinator requesting coordinator for consumergroup 
from  (some other broker)
sarama-logger : client/coordinator coordinator for consumergroup is #48
kafka- Error: kafka server: Request was for a consumer group that is not 
coordinated by this broker.
'

In the kafka broker saw lots of errors like this:
'
[2017-12-13 00:38:49,559] ERROR [ReplicaFetcherThread-0-48], Error for 
partition [__consumer_offsets,37] to broker 
48:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server 
is not the leader for that topic-partition. (kafka.server.ReplicaFetcherThread)
'

Is it running into the stale metadata like this:
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design#Kafka0.9ConsumerRewriteDesign-Co-ordinatorfailoverorconnectionlosstotheco-ordinator

  was:
We had to replace one of the brokers for maintenance reasons. We did the 
following to replace the broker:
(1) Gracefully stop the Kafka broker (id: 48)
(2) Make sure producers/consumers were fine (the consumers coordinated by this 
broker now were managed by another broker and things were fine)
(3) Spin up a new instance with the same IP
(4) Make sure the new instance's config is the same as old with the same broker 
ID.
(5) Bring the new one back up.

It took ~35 to 40 mins to do this. But once the broker came back up, the 
consumer groups coordinated by this broker were getting constant errors that 
this CG is not coordinated by this broker for nearly 30 to 40 mins until i 
stopped the broker again.

Looks like the metadata kept returning that the coordinator for this CG is the 
same old broker (id 48) even after the client kept on asking for the 
coordinator.

(1) Are there any known issues/recent fixes for this?
(2) Why didn't the metadata refresh? Any ideas on what could be happening?

We were constantly getting errors when trying to fetch the coordinator like 
this:
'
sarama-logger : client/coordinator requesting coordinator for consumergroup 
from  (some other broker)
sarama-logger : client/coordinator coordinator for consumergroup is #48
kafka- Error: kafka server: Request was for a consumer group that is not 
coordinated by this broker.
'

In the kafka broker saw lots of errors like this:
'
[2017-12-13 00:38:49,559] ERROR [ReplicaFetcherThread-0-48], Error for 
partition [__consumer_offsets,37] to broker 
48:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server 
is not the leader for that topic-partition. (kafka.server.ReplicaFetcherThread)
'

Is it running into the stale metadata like this:
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design#Kafka0.9ConsumerRewriteDesign-Co-ordinatorfailoverorconnectionlosstotheco-ordinator


> Constant Consumer Errors after replacing a broker
> -
>
> Key: KAFKA-6374
> URL: https://issues.apache.org/jira/browse/KAFKA-6374
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.2.1
> Environment: OS: linux
> Broker Instances: EC2 (r4.xlarge)
> Storage: EBS (HDD st1 - 16T size)
> Client: golang (sarama and sarama-cluster libraries)
> Cluster Size: 5 nodes
> Kafka Version: 0.10.2.1
> ZooKeeper: 3 nodes (separate from the brokers)
>Reporter: Aravind Velamur Srinivasan
>
> We had to replace one of the brokers for maintenance reasons. We already did 
> this a couple of times earlier without any issues but ran into one this time. 
> We did th

[jira] [Created] (KAFKA-6374) Constant Consumer Errors after replacing a broker

2017-12-15 Thread Aravind Velamur Srinivasan (JIRA)
Aravind Velamur Srinivasan created KAFKA-6374:
-

 Summary: Constant Consumer Errors after replacing a broker
 Key: KAFKA-6374
 URL: https://issues.apache.org/jira/browse/KAFKA-6374
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 0.10.2.1
 Environment: OS: linux
Broker Instances: EC2 (r4.xlarge)
Storage: EBS (HDD st1 - 16T size)
Client: golang (sarama and sarama-cluster libraries)
Cluster Size: 5 nodes
Kafka Version: 0.10.2.1
ZooKeeper: 3 nodes (separate from the brokers)
Reporter: Aravind Velamur Srinivasan


We had to replace one of the brokers for maintenance reasons. We did the 
following to replace the broker:
(1) Gracefully stop the Kafka broker (id: 48)
(2) Make sure producers/consumers were fine (the consumers coordinated by this 
broker now were managed by another broker and things were fine)
(3) Spin up a new instance with the same IP
(4) Make sure the new instance's config is the same as old with the same broker 
ID.
(5) Bring the new one back up.

It took ~35 to 40 mins to do this. But once the broker came back up, the 
consumer groups coordinated by this broker were getting constant errors that 
this CG is not coordinated by this broker for nearly 30 to 40 mins until i 
stopped the broker again.

Looks like the metadata kept returning that the coordinator for this CG is the 
same old broker (id 48) even after the client kept on asking for the 
coordinator.

(1) Are there any known issues/recent fixes for this?
(2) Why didn't the metadata refresh? Any ideas on what could be happening?

We were constantly getting errors when trying to fetch the coordinator like 
this:
'
sarama-logger : client/coordinator requesting coordinator for consumergroup 
from  (some other broker)
sarama-logger : client/coordinator coordinator for consumergroup is #48
kafka- Error: kafka server: Request was for a consumer group that is not 
coordinated by this broker.
'

In the kafka broker saw lots of errors like this:
'
[2017-12-13 00:38:49,559] ERROR [ReplicaFetcherThread-0-48], Error for 
partition [__consumer_offsets,37] to broker 
48:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server 
is not the leader for that topic-partition. (kafka.server.ReplicaFetcherThread)
'

Is it running into the stale metadata like this:
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design#Kafka0.9ConsumerRewriteDesign-Co-ordinatorfailoverorconnectionlosstotheco-ordinator



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6373) Log end offset of input table changing during restore

2017-12-15 Thread Charles Crain (JIRA)
Charles Crain created KAFKA-6373:


 Summary: Log end offset of input table changing during restore
 Key: KAFKA-6373
 URL: https://issues.apache.org/jira/browse/KAFKA-6373
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 1.0.0
 Environment: Client 1.0.0, Brokers 1.0.0 with 1.0.0 message format and 
inter-broker protocol
Reporter: Charles Crain


I am receiving a confusing error from a Kafka Streams application.  Most of the 
time when I try to being up just a single replica of the task for the first 
time, I get this:

{noformat}
Detected a task that got migrated to another thread. This implies that this 
thread missed a rebalance and dropped out of the consumer group. Trying to 
rejoin the consumer group now.
org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of [Name 
of Topic]-36 should not change while restoring: old end offset 37559, current 
offset 37561
{noformat}

The confusing thing is that [Name of Topic] is *not* a change log topic created 
by the stream app.  Rather it is a topic published from a completely different 
service.  And since that other service is publishing to that topic actively, of 
course the end offset is constantly changing.

Here is a rough view of my stream topology.  I'll call the topic that's showing 
up in the above error "ExternalTableTopic".


{noformat}
externalTable = table(ExternalTableTopic)

stream(ExternalStreamTopic)
  .leftJoin(externalTable, joiner)
  .aggregate(aggregator, SomeQueryableStoreName)
{noformat}


...and that's it.  If I take out the left join this appears not to happen.  Is 
it illegal to join a table to a stream if that table is being published from 
somewhere else?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6336) when using assign() with kafka consumer the KafkaConsumerGroup command doesnt show those consumers

2017-12-15 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16293091#comment-16293091
 ] 

Jason Gustafson commented on KAFKA-6336:


[~huxi_2b] Good point. Since the protocol type is undefined for a group using 
manual assignment, it seems we would filter it. Maybe the coordinator ought to 
assume it is a consumer group if there are committed offsets. Either that or 
the --list output ought to include empty groups even if the protocol type is 
undefined.

[~neerjakhattar] Can you clarify if it is only the --list option that is 
failing? Have you tried using --describe on the group name explicitly?

> when using assign() with kafka consumer the KafkaConsumerGroup command doesnt 
> show those consumers
> --
>
> Key: KAFKA-6336
> URL: https://issues.apache.org/jira/browse/KAFKA-6336
> Project: Kafka
>  Issue Type: Bug
>Reporter: Neerja Khattar
>
> The issue is when using assign rather than subscribe for kafka consumers 
> commit not able to get the lag using ConsumerGroup command. It doesnt even 
> list those groups.
> JMX tool also doesnt show lag properly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6372) Trogdor should use LogContext for log messages

2017-12-15 Thread Colin P. McCabe (JIRA)
Colin P. McCabe created KAFKA-6372:
--

 Summary: Trogdor should use LogContext for log messages
 Key: KAFKA-6372
 URL: https://issues.apache.org/jira/browse/KAFKA-6372
 Project: Kafka
  Issue Type: Sub-task
Reporter: Colin P. McCabe
Assignee: Colin P. McCabe
Priority: Minor


Trogdor should use LogContext for log messages, rather than manually prefixing 
log messages with the context.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5526) KIP-175: ConsumerGroupCommand no longer shows output for consumer groups which have not committed offsets

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292990#comment-16292990
 ] 

ASF GitHub Bot commented on KAFKA-5526:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/4271


> KIP-175: ConsumerGroupCommand no longer shows output for consumer groups 
> which have not committed offsets
> -
>
> Key: KAFKA-5526
> URL: https://issues.apache.org/jira/browse/KAFKA-5526
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ryan P
>Assignee: Vahid Hashemian
>  Labels: kip
> Fix For: 1.1.0
>
>
> It would appear that the latest iteration of the ConsumerGroupCommand no 
> longer outputs information about group membership when no offsets have been 
> committed. It would be nice if the output generated by these tools maintained 
> some form of consistency across versions as some users have grown to depend 
> on them. 
> 0.9.x output:
> bin/kafka-consumer-groups --bootstrap-server localhost:9092 --new-consumer 
> --describe --group console-consumer-34885
> GROUP, TOPIC, PARTITION, CURRENT OFFSET, LOG END OFFSET, LAG, OWNER
> console-consumer-34885, test, 0, unknown, 0, unknown, consumer-1_/192.168.1.64
> 0.10.2 output:
> bin/kafka-consumer-groups --bootstrap-server localhost:9092 --new-consumer 
> --describe --group console-consumer-34885
> Note: This will only show information about consumers that use the Java 
> consumer API (non-ZooKeeper-based consumers).
> TOPIC  PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG 
>CONSUMER-ID   HOST 
>   CLIENT-ID



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6359) Work for KIP-236

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292978#comment-16292978
 ] 

ASF GitHub Bot commented on KAFKA-6359:
---

GitHub user tombentley opened a pull request:

https://github.com/apache/kafka/pull/4330

[WIP] KAFKA-6359: KIP-236 interruptible reassignments

This is a WIP for KIP-236. All the existing tests (via the 
`/admin/reassign_partitions` path) still pass, and I've added a couple of tests 
for the new path (via `/admin/reassignment_requests`), but it needs a lot more 
tests of the new path.

### Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation 
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tombentley/kafka 
KIP-236-interruptible-reassignments

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/4330.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4330


commit eb856a3009e5cc3d293305fe6a3add0d21267095
Author: Tom Bentley 
Date:   2017-12-06T15:18:17Z

Split /admin/reassign_partitions

commit 6114752becb964041da88e874ce289968c2d2abc
Author: Tom Bentley 
Date:   2017-12-07T09:40:42Z

KIP-236 wip

commit 5caf4e3b6e9fc5ff4be3755ed77057598438017d
Author: Tom Bentley 
Date:   2017-12-13T09:21:33Z

Jun's idea of using separate path for notification

commit 325569c1adfb220eaef3e6deb642d55fbab3d5ce
Author: Tom Bentley 
Date:   2017-12-13T13:22:04Z

Tidy up

commit 6c3a922fde6a2f1b0c6acca80f8c175948b165a9
Author: Tom Bentley 
Date:   2017-12-14T10:14:26Z

More work

commit bb9e3874590cc71825fea37a92f77b6b426744f9
Author: Tom Bentley 
Date:   2017-12-14T10:33:28Z

Fix to match KIP

commit 6096ef2a713e18f13de96d771244bc8e99742322
Author: Tom Bentley 
Date:   2017-12-14T12:54:15Z

WIP on algo improvment

commit 8c60b304283ed13e839b9ad3aee026f23dbaeb72
Author: Tom Bentley 
Date:   2017-12-14T13:21:54Z

Fixup

commit c0b6ac97e7bc8e168123ff6b6f467a035a73a02a
Author: Tom Bentley 
Date:   2017-12-15T09:26:19Z

Treat [1,2,3]->[1,3,2] as a no-op reassignment

Since only the first element in the list is distinguished.

commit b97521d81a3c7482f526d6574b3da34e1cfb9ff8
Author: Tom Bentley 
Date:   2017-12-15T09:28:02Z

Fixes

commit 52d61af3191d338222147395f3fe89b9f0015c80
Author: Tom Bentley 
Date:   2017-12-15T09:29:12Z

WIP: Change onPartitionReassignment() algo to cope with re-reassignment

commit 698cf3ef2b253347cddb0a49d5464fcd20781c05
Author: Tom Bentley 
Date:   2017-12-15T10:52:36Z

Refactor test

commit 117a1af116296795cfbd38ef85abd48d4b2643d0
Author: Tom Bentley 
Date:   2017-12-15T16:38:42Z

Improved algo




> Work for KIP-236
> 
>
> Key: KAFKA-6359
> URL: https://issues.apache.org/jira/browse/KAFKA-6359
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Tom Bentley
>Assignee: Tom Bentley
>Priority: Minor
>
> This issue is for the work described in KIP-236.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6363) Use MockAdminClient for any unit tests that depend on AdminClient

2017-12-15 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292887#comment-16292887
 ] 

Guozhang Wang commented on KAFKA-6363:
--

Currently {{MockKafkaAdminClientEnv}} is used in these tests: 
{{KafkaAdminClientTest}}, {{StreamsResetterTest}}, {{TopicAdminTest}} and 
{{MockClientSupplier}}. The point is that besides {{KafkaAdminClientTest}} all 
other unit tests would be better using {{MockAdminClient}} directly. This JIRA 
is for that purpose primarily.

As for the {{MockKafkaAdminClientEnv}} class, I think it is either fine to 
rename and keep it, or just "in-line" its logic in {{KafkaAdminClient}} to 
allow users to pass in a {{KafkaClient}} which can be a {{MockClient}}.

> Use MockAdminClient for any unit tests that depend on AdminClient
> -
>
> Key: KAFKA-6363
> URL: https://issues.apache.org/jira/browse/KAFKA-6363
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>  Labels: newbie
>
> Today we have a few unit tests other than KafkaAdminClientTest that relies on 
> MockKafkaAdminClientEnv.
> About this class and MockKafkaAdminClientEnv, my thoughts:
> 1. MockKafkaAdminClientEnv is actually using a MockClient for the inner 
> KafkaClient; it should be only used for the unit test of KafkaAdminClient 
> itself.
> 2. For any other unit tests on classes that depend on AdminClient, we should 
> be using the MockAdminClient that mocks the whole AdminClient.
> So I suggest 1) in TopicAdminTest use MockAdminClient instead; 2) in 
> KafkaAdminClientTest use MockClient and added a new static constructor that 
> takes a KafkaClient; 3) remove the MockKafkaAdminClientEnv.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6365) How to add a client to list of available clients?

2017-12-15 Thread Lev Gorodinski (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292559#comment-16292559
 ] 

Lev Gorodinski commented on KAFKA-6365:
---

Hi [~hachikuji], just following up on this. I'm logged in, but I don't seem to 
have edit privileges on the page 
(https://cwiki.apache.org/confluence/display/KAFKA/Clients). Maybe I'm not 
looking in the right place, but the edit button which is usually there on 
Confluence, isn't shown. 

> How to add a client to list of available clients?
> -
>
> Key: KAFKA-6365
> URL: https://issues.apache.org/jira/browse/KAFKA-6365
> Project: Kafka
>  Issue Type: Wish
>Reporter: Lev Gorodinski
>Priority: Trivial
>
> I'd like to add a client to: 
> https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-.NET
> The client is: https://github.com/jet/kafunk
> .NET written in F# supports 0.8 0.9 0.10



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6086) Provide for custom error handling when Kafka Streams fails to produce

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292480#comment-16292480
 ] 

ASF GitHub Bot commented on KAFKA-6086:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/4165


> Provide for custom error handling when Kafka Streams fails to produce
> -
>
> Key: KAFKA-6086
> URL: https://issues.apache.org/jira/browse/KAFKA-6086
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Matt Farmer
>  Labels: kip
> Fix For: 1.1.0
>
>
> This is an issue related to the following KIP:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-210+-+Provide+for+custom+error+handling++when+Kafka+Streams+fails+to+produce



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6371) FetchMetadata creates unneeded Strings on instantiation

2017-12-15 Thread huxihx (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292268#comment-16292268
 ] 

huxihx commented on KAFKA-6371:
---

Seems it's already fixed in 
[KAFKA-5121|https://issues.apache.org/jira/browse/KAFKA-5121]?  See commit: 
https://github.com/apache/kafka/commit/e71dce89c0da50f3eccc47d0fc050c92d5a99b88

Do you want to backport to 0.10.* codebase?

> FetchMetadata creates unneeded Strings on instantiation
> ---
>
> Key: KAFKA-6371
> URL: https://issues.apache.org/jira/browse/KAFKA-6371
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.1, 0.10.1.1, 0.10.2.1
>Reporter: Maytee Chinavanichkit
>Assignee: Maytee Chinavanichkit
>Priority: Minor
> Fix For: 0.10.0.2, 0.10.1.2, 0.10.2.2
>
>
> My colleague and I were taking a heap dump to investigate the memory usage of 
> a broker. From the dump, we saw a number of object strings with the message 
> {{onlyCommitted: }} and {{partitionStatus: ]}}. Upon 
> investigation, these objects were being instantiated when the 
> {{FetchMetadata}} object is constructed. The toString method here is 
> malformed and the last two lines are executed as a block instead of being 
> concatenated. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6371) FetchMetadata creates unneeded Strings on instantiation

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292242#comment-16292242
 ] 

ASF GitHub Bot commented on KAFKA-6371:
---

GitHub user mayt opened a pull request:

https://github.com/apache/kafka/pull/4328

KAFKA-6371 Fix DelayedFetch toString




You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mayt/kafka KAFKA-6371

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/4328.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4328






> FetchMetadata creates unneeded Strings on instantiation
> ---
>
> Key: KAFKA-6371
> URL: https://issues.apache.org/jira/browse/KAFKA-6371
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.1, 0.10.1.1, 0.10.2.1
>Reporter: Maytee Chinavanichkit
>Assignee: Maytee Chinavanichkit
>Priority: Minor
> Fix For: 0.10.0.2, 0.10.1.2, 0.10.2.2
>
>
> My colleague and I were taking a heap dump to investigate the memory usage of 
> a broker. From the dump, we saw a number of object strings with the message 
> {{onlyCommitted: }} and {{partitionStatus: ]}}. Upon 
> investigation, these objects were being instantiated when the 
> {{FetchMetadata}} object is constructed. The toString method here is 
> malformed and the last two lines are executed as a block instead of being 
> concatenated. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6371) FetchMetadata creates unneeded Strings on instantiation

2017-12-15 Thread Maytee Chinavanichkit (JIRA)
Maytee Chinavanichkit created KAFKA-6371:


 Summary: FetchMetadata creates unneeded Strings on instantiation
 Key: KAFKA-6371
 URL: https://issues.apache.org/jira/browse/KAFKA-6371
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.10.2.1, 0.10.1.1, 0.10.0.1
Reporter: Maytee Chinavanichkit
Assignee: Maytee Chinavanichkit
Priority: Minor
 Fix For: 0.10.0.2, 0.10.1.2, 0.10.2.2


My colleague and I were taking a heap dump to investigate the memory usage of a 
broker. From the dump, we saw a number of object strings with the message 
{{onlyCommitted: }} and {{partitionStatus: ]}}. Upon 
investigation, these objects were being instantiated when the {{FetchMetadata}} 
object is constructed. The toString method here is malformed and the last two 
lines are executed as a block instead of being concatenated. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6370) MirrorMakerIntegrationTest#testCommaSeparatedRegex may fail due to NullPointerException

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292172#comment-16292172
 ] 

ASF GitHub Bot commented on KAFKA-6370:
---

GitHub user huxihx opened a pull request:

https://github.com/apache/kafka/pull/4327

KAFKA-6370: KafkaMetricsGroup.toScope should filter out tags null value

KafkaMetricsGroup.toScope should filter out tags with value of `null` to 
avoid NullPointerException thrown.

*More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.*

*Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.*

### Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation 
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/huxihx/kafka KAFKA-6370

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/4327.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4327


commit 9eec15c5f231cb6c5ec17cc62783031775d69b74
Author: huxihx 
Date:   2017-12-15T08:05:52Z

KAFKA-6370: KafkaMetricsGroup.toScope should filter out tags with value of 
`null` to avoid NullPointerException thrown.




> MirrorMakerIntegrationTest#testCommaSeparatedRegex may fail due to 
> NullPointerException
> ---
>
> Key: KAFKA-6370
> URL: https://issues.apache.org/jira/browse/KAFKA-6370
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: huxihx
>Priority: Minor
>
> From 
> https://builds.apache.org/job/kafka-trunk-jdk8/2277/testReport/junit/kafka.tools/MirrorMakerIntegrationTest/testCommaSeparatedRegex/
>  :
> {code}
> java.lang.NullPointerException
>   at 
> scala.collection.immutable.StringLike.$anonfun$format$1(StringLike.scala:351)
>   at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:234)
>   at 
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:32)
>   at 
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:29)
>   at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:234)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:227)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at scala.collection.immutable.StringLike.format(StringLike.scala:351)
>   at scala.collection.immutable.StringLike.format$(StringLike.scala:350)
>   at scala.collection.immutable.StringOps.format(StringOps.scala:29)
>   at 
> kafka.metrics.KafkaMetricsGroup$.$anonfun$toScope$3(KafkaMetricsGroup.scala:170)
>   at scala.collection.immutable.List.map(List.scala:283)
>   at 
> kafka.metrics.KafkaMetricsGroup$.kafka$metrics$KafkaMetricsGroup$$toScope(KafkaMetricsGroup.scala:170)
>   at 
> kafka.metrics.KafkaMetricsGroup.explicitMetricName(KafkaMetricsGroup.scala:67)
>   at 
> kafka.metrics.KafkaMetricsGroup.explicitMetricName$(KafkaMetricsGroup.scala:51)
>   at 
> kafka.network.RequestMetrics.explicitMetricName(RequestChannel.scala:352)
>   at 
> kafka.metrics.KafkaMetricsGroup.metricName(KafkaMetricsGroup.scala:47)
>   at 
> kafka.metrics.KafkaMetricsGroup.metricName$(KafkaMetricsGroup.scala:42)
>   at kafka.network.RequestMetrics.metricName(RequestChannel.scala:352)
>   at 
> kafka.metrics.KafkaMetricsGroup.newHistogram(KafkaMetricsGroup.scala:81)
>   at 
> kafka.metrics.KafkaMetricsGroup.newHistogram$(KafkaMetricsGroup.scala:80)
>   at kafka.network.RequestMetrics.newHistogram(RequestChannel.scala:352)
>   at kafka.network.RequestMetrics.(RequestChannel.scala:364)
>   at 
> kafka.network.RequestChannel$Metrics.$anonfun$new$2(RequestChannel.scala:57)
>   at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:59)
>   at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:52)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>   at kafka.network.RequestChannel$Metrics.(RequestChannel.scala:56)
>   at kafka.network.RequestChannel.(RequestChannel.scala:243)
>   at kafka.network.SocketServer.(SocketServer.scala:71)
>

[jira] [Assigned] (KAFKA-6370) MirrorMakerIntegrationTest#testCommaSeparatedRegex may fail due to NullPointerException

2017-12-15 Thread huxihx (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huxihx reassigned KAFKA-6370:
-

Assignee: huxihx

> MirrorMakerIntegrationTest#testCommaSeparatedRegex may fail due to 
> NullPointerException
> ---
>
> Key: KAFKA-6370
> URL: https://issues.apache.org/jira/browse/KAFKA-6370
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: huxihx
>Priority: Minor
>
> From 
> https://builds.apache.org/job/kafka-trunk-jdk8/2277/testReport/junit/kafka.tools/MirrorMakerIntegrationTest/testCommaSeparatedRegex/
>  :
> {code}
> java.lang.NullPointerException
>   at 
> scala.collection.immutable.StringLike.$anonfun$format$1(StringLike.scala:351)
>   at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:234)
>   at 
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:32)
>   at 
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:29)
>   at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:234)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:227)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at scala.collection.immutable.StringLike.format(StringLike.scala:351)
>   at scala.collection.immutable.StringLike.format$(StringLike.scala:350)
>   at scala.collection.immutable.StringOps.format(StringOps.scala:29)
>   at 
> kafka.metrics.KafkaMetricsGroup$.$anonfun$toScope$3(KafkaMetricsGroup.scala:170)
>   at scala.collection.immutable.List.map(List.scala:283)
>   at 
> kafka.metrics.KafkaMetricsGroup$.kafka$metrics$KafkaMetricsGroup$$toScope(KafkaMetricsGroup.scala:170)
>   at 
> kafka.metrics.KafkaMetricsGroup.explicitMetricName(KafkaMetricsGroup.scala:67)
>   at 
> kafka.metrics.KafkaMetricsGroup.explicitMetricName$(KafkaMetricsGroup.scala:51)
>   at 
> kafka.network.RequestMetrics.explicitMetricName(RequestChannel.scala:352)
>   at 
> kafka.metrics.KafkaMetricsGroup.metricName(KafkaMetricsGroup.scala:47)
>   at 
> kafka.metrics.KafkaMetricsGroup.metricName$(KafkaMetricsGroup.scala:42)
>   at kafka.network.RequestMetrics.metricName(RequestChannel.scala:352)
>   at 
> kafka.metrics.KafkaMetricsGroup.newHistogram(KafkaMetricsGroup.scala:81)
>   at 
> kafka.metrics.KafkaMetricsGroup.newHistogram$(KafkaMetricsGroup.scala:80)
>   at kafka.network.RequestMetrics.newHistogram(RequestChannel.scala:352)
>   at kafka.network.RequestMetrics.(RequestChannel.scala:364)
>   at 
> kafka.network.RequestChannel$Metrics.$anonfun$new$2(RequestChannel.scala:57)
>   at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:59)
>   at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:52)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>   at kafka.network.RequestChannel$Metrics.(RequestChannel.scala:56)
>   at kafka.network.RequestChannel.(RequestChannel.scala:243)
>   at kafka.network.SocketServer.(SocketServer.scala:71)
>   at kafka.server.KafkaServer.startup(KafkaServer.scala:238)
>   at kafka.utils.TestUtils$.createServer(TestUtils.scala:135)
>   at 
> kafka.integration.KafkaServerTestHarness.$anonfun$setUp$1(KafkaServerTestHarness.scala:93)
> {code}
> Here is the code from KafkaMetricsGroup.scala :
> {code}
> .map { case (key, value) => "%s.%s".format(key, 
> value.replaceAll("\\.", "_"))}
> {code}
> It seems (some) value was null.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)