[
https://issues.apache.org/jira/browse/KAFKA-1387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14151673#comment-14151673
]
James Lent edited comment on KAFKA-1387 at 9/29/14 2:43 PM:
------------------------------------------------------------
In case anyone is interested in the complete code for the new class I am
testing with:
{noformat}
class EphemeralNodeMonitor(zkClient: ZkClient, path: String, recreateNode: ()
=> Unit) extends Logging {
val dataListener = new DataListener()
val stateListener = new StateListener()
def start() {
zkClient.subscribeStateChanges(stateListener)
zkClient.subscribeDataChanges(path, dataListener)
}
def close() {
zkClient.unsubscribeStateChanges(stateListener)
zkClient.unsubscribeDataChanges(path, dataListener)
}
class DataListener() extends IZkDataListener {
var oldData: scala.Any = null
def handleDataChange(dataPath: String, newData: scala.Any) {
if (newData != oldData) {
oldData = newData
info("Ephemeral node %s has new data [%s]".format(dataPath, newData))
}
}
def handleDataDeleted(dataPath: String) {
if (zkClient.exists(path)) {
info("Ephemeral node %s was deleted, but, has already been
recreated".format(dataPath))
}
else {
info("Ephemeral node %s was deleted, recreate it".format(dataPath))
recreateNode()
}
}
}
class StateListener() extends IZkStateListener {
def handleStateChanged(state: KeeperState) {}
def handleNewSession() {
if (zkClient.exists(path)) {
info("New session started, but, ephemeral %s already/still
exists".format(path))
}
else {
info("New session started, recreate ephemeral node %s".format(path))
recreateNode()
}
}
}
{noformat}
was (Author: jwlent55):
In case anyone is interested in the complete code for the new class I am
testing with:
{noformat}
class EphemeralNodeMonitor(zkClient: ZkClient, path: String, recreateNode: ()
=> Unit) extends Logging {
val dataListener = new DataListener()
val stateListener = new StateListener()
def start() {
zkClient.subscribeStateChanges(stateListener)
zkClient.subscribeDataChanges(path, dataListener)
}
def close() {
zkClient.unsubscribeStateChanges(stateListener)
zkClient.unsubscribeDataChanges(path, dataListener)
}
class DataListener() extends IZkDataListener {
var oldData: scala.Any = null
def handleDataChange(dataPath: String, newData: scala.Any) {
if (newData != null) {
if (!newData.equals(oldData)) {
info("Ephemeral node %s has new data [%s]".format(dataPath, newData))
}
}
else {
if (oldData != null) {
info("Ephemeral node %s now has no data".format(dataPath))
}
}
oldData = newData
}
def handleDataDeleted(dataPath: String) {
if (zkClient.exists(path)) {
info("Ephemeral node %s was deleted, but, has already been
recreated".format(dataPath))
}
else {
info("Ephemeral node %s was deleted, recreate it".format(dataPath))
recreateNode()
}
}
}
class StateListener() extends IZkStateListener {
def handleStateChanged(state: KeeperState) {}
def handleNewSession() {
if (zkClient.exists(path)) {
info("New session started, but, ephemeral %s already/still
exists".format(path))
}
else {
info("New session started, recreate ephemeral node %s".format(path))
recreateNode()
}
}
}
{noformat}
> Kafka getting stuck creating ephemeral node it has already created when two
> zookeeper sessions are established in a very short period of time
> ---------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: KAFKA-1387
> URL: https://issues.apache.org/jira/browse/KAFKA-1387
> Project: Kafka
> Issue Type: Bug
> Reporter: Fedor Korotkiy
>
> Kafka broker re-registers itself in zookeeper every time handleNewSession()
> callback is invoked.
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
>
> Now imagine the following sequence of events.
> 1) Zookeeper session reestablishes. handleNewSession() callback is queued by
> the zkClient, but not invoked yet.
> 2) Zookeeper session reestablishes again, queueing callback second time.
> 3) First callback is invoked, creating /broker/[id] ephemeral path.
> 4) Second callback is invoked and it tries to create /broker/[id] path using
> createEphemeralPathExpectConflictHandleZKBug() function. But the path is
> already exists, so createEphemeralPathExpectConflictHandleZKBug() is getting
> stuck in the infinite loop.
> Seems like controller election code have the same issue.
> I'am able to reproduce this issue on the 0.8.1 branch from github using the
> following configs.
> # zookeeper
> tickTime=10
> dataDir=/tmp/zk/
> clientPort=2101
> maxClientCnxns=0
> # kafka
> broker.id=1
> log.dir=/tmp/kafka
> zookeeper.connect=localhost:2101
> zookeeper.connection.timeout.ms=100
> zookeeper.sessiontimeout.ms=100
> Just start kafka and zookeeper and then pause zookeeper several times using
> Ctrl-Z.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)