[ 
https://issues.apache.org/jira/browse/KAFKA-1387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14151673#comment-14151673
 ] 

James Lent edited comment on KAFKA-1387 at 9/29/14 2:14 PM:
------------------------------------------------------------

In case anyone is interested in the complete code for the new class I am 
testing with:

{noformat}
class EphemeralNodeMonitor(zkClient: ZkClient, path: String, recreateNode: () 
=> Unit) extends Logging {

  val dataListener = new DataListener()
  val stateListener = new StateListener()
  
  def start() {
    zkClient.subscribeStateChanges(stateListener)
    zkClient.subscribeDataChanges(path, dataListener)
  }
  
  def close() {
    zkClient.unsubscribeStateChanges(stateListener)
    zkClient.unsubscribeDataChanges(path, dataListener)
  }

  class DataListener() extends IZkDataListener {
    
    var oldData: scala.Any = null

    def handleDataChange(dataPath: String, newData: scala.Any) {
      if (newData != null) {
        if (!newData.equals(oldData)) {
          info("Ephemeral node %s has new data [%s]".format(dataPath, newData))
        }
      }
      else {
        if (oldData != null) {
          info("Ephemeral node %s now has no data".format(dataPath))   
        }
      }

      oldData = newData
    }

    def handleDataDeleted(dataPath: String) {
      if (zkClient.exists(path)) {
        info("Ephemeral node %s was deleted, but, has already been 
recreated".format(dataPath))
      }
      else {
        info("Ephemeral node %s was deleted, recreate it".format(dataPath))
        recreateNode()
      }
    }
  }

  class StateListener() extends IZkStateListener {

    def handleStateChanged(state: KeeperState) {}

    def handleNewSession() {
      if (zkClient.exists(path)) {
        info("New session started, but, ephemeral %s already/still 
exists".format(path))
      }
      else {
        info("New session started, recreate ephemeral node %s".format(path))
        recreateNode()
      }
    }
  }
{noformat}


was (Author: jwlent55):
In case anyone is interested in the complete code for the new class I am 
testing with:

{noformat}
class EphemeralNodeMonitor(zkClient: ZkClient, path: String, recreateNode: () 
=> Unit) extends Logging {

  val dataListener = new DataListener()
  val stateListener = new StateListener()
  
  def start() {
    zkClient.subscribeStateChanges(stateListener)
    zkClient.subscribeDataChanges(path, dataListener)
  }
  
  def close() {
    zkClient.unsubscribeStateChanges(stateListener)
    zkClient.unsubscribeDataChanges(path, dataListener)
  }

  class DataListener() extends IZkDataListener {
    
    var oldData: String = null

    def handleDataChange(dataPath: String, newData: scala.Any) {
      if (!newData.toString.equals(oldData)) {
        oldData = newData.toString
        info("Ephemeral node %s has new data [%s]".format(dataPath, newData))
      }
    }

    def handleDataDeleted(dataPath: String) {
      if (zkClient.exists(path)) {
        info("Ephemeral node %s was deleted, but, has already been 
recreated".format(dataPath))
      }
      else {
        info("Ephemeral node %s was deleted, recreate it".format(dataPath))
        recreateNode()
      }
    }
  }

  class StateListener() extends IZkStateListener {

    def handleStateChanged(state: KeeperState) {}

    def handleNewSession() {
      if (zkClient.exists(path)) {
        info("New session started, but, ephemeral %s already/still 
exists".format(path))
      }
      else {
        info("New session started, recreate ephemeral node %s".format(path))
        recreateNode()
      }
    }
  }
{noformat}

> Kafka getting stuck creating ephemeral node it has already created when two 
> zookeeper sessions are established in a very short period of time
> ---------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-1387
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1387
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Fedor Korotkiy
>
> Kafka broker re-registers itself in zookeeper every time handleNewSession() 
> callback is invoked.
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
>  
> Now imagine the following sequence of events.
> 1) Zookeeper session reestablishes. handleNewSession() callback is queued by 
> the zkClient, but not invoked yet.
> 2) Zookeeper session reestablishes again, queueing callback second time.
> 3) First callback is invoked, creating /broker/[id] ephemeral path.
> 4) Second callback is invoked and it tries to create /broker/[id] path using 
> createEphemeralPathExpectConflictHandleZKBug() function. But the path is 
> already exists, so createEphemeralPathExpectConflictHandleZKBug() is getting 
> stuck in the infinite loop.
> Seems like controller election code have the same issue.
> I'am able to reproduce this issue on the 0.8.1 branch from github using the 
> following configs.
> # zookeeper
> tickTime=10
> dataDir=/tmp/zk/
> clientPort=2101
> maxClientCnxns=0
> # kafka
> broker.id=1
> log.dir=/tmp/kafka
> zookeeper.connect=localhost:2101
> zookeeper.connection.timeout.ms=100
> zookeeper.sessiontimeout.ms=100
> Just start kafka and zookeeper and then pause zookeeper several times using 
> Ctrl-Z.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to