KafkaServer in integration test not properly assigning to leaders to partitions

2015-05-14 Thread Corey Nolet
I'm firing up a KafkaServer (using some EmbeddedKafkaBroker code that I
found on Github) so that I can run an end-to-end test ingesting data
through a kafka topic with consumers in Spark Streaming pushing to
Accumulo.

Thus far, my code is doing this:

1) Creating a MiniAccumuloCluster and KafkaServer (using the zookeepers
from the mini Accumulo cluster)
2) Creating a topic using AdminUtil
3) Starting up a Spark streaming context using a Kafka stream that puts all
data into Accumulo
4) Creating a producer and sending a message to the Kafka topic.


Looking @ the topic metadata in zookeeper after the topic is created, let's
say testtopic, I never see the metadata for a leader in
/brokers/topics/testtopic/partitions show up. If I understand correctly,
creating a topic does this:

1) Adds a persistent node into Zookeeper with some json data to denote the
topic's name as well as the partitions and the list of each broker id for
each partition.
2) I am still in the process of digging into this part but I think the
first item in the list of replicas for each partition is used to define the
initial leader and the leader is notified via a watcher and told to create
an ephemeral node so that it can know when that node goes down to assign
another.

If I'm correct about #2, it seems like that watcher is never being invoked.
Any attempt to produce to the topic just returns an error back to the
producer that says there was no leader selected.

Anything advice would be much appreciated. I really would like to get our
stack tested fully through automated testing and Kafka is the last piece we
need to assemble.


Re: KafkaServer in integration test not properly assigning to leaders to partitions

2015-05-14 Thread Corey Nolet
I raised the log levels to try to figure out what happens. I see log
statements on the broker stating:

New topic creation callback for 
New partition creation callback for 
Invoking state change to NewPartition for partitions 
Invoking state change to OnlinePartitions for partitions 
Error while fetching metadata for partition [testtopic, 0]
kafka.common.LeaderNotAvailableExzception: No leader exists for partition
0...

I'm not sure what's happening between the time I create my topic and the
time the broker sees that it needs to add the partition assignment to
zookeeper with itself as the leader but it's strange that the log messages
above seem like they are missing the data. New topic creation callback for
 seems like it should be listing a topic and not blank.

Any ideas?

On Thu, May 14, 2015 at 1:00 PM, Corey Nolet cjno...@gmail.com wrote:

 I'm firing up a KafkaServer (using some EmbeddedKafkaBroker code that I
 found on Github) so that I can run an end-to-end test ingesting data
 through a kafka topic with consumers in Spark Streaming pushing to
 Accumulo.

 Thus far, my code is doing this:

 1) Creating a MiniAccumuloCluster and KafkaServer (using the zookeepers
 from the mini Accumulo cluster)
 2) Creating a topic using AdminUtil
 3) Starting up a Spark streaming context using a Kafka stream that puts
 all data into Accumulo
 4) Creating a producer and sending a message to the Kafka topic.


 Looking @ the topic metadata in zookeeper after the topic is created,
 let's say testtopic, I never see the metadata for a leader in
 /brokers/topics/testtopic/partitions show up. If I understand correctly,
 creating a topic does this:

 1) Adds a persistent node into Zookeeper with some json data to denote the
 topic's name as well as the partitions and the list of each broker id for
 each partition.
 2) I am still in the process of digging into this part but I think the
 first item in the list of replicas for each partition is used to define the
 initial leader and the leader is notified via a watcher and told to create
 an ephemeral node so that it can know when that node goes down to assign
 another.

 If I'm correct about #2, it seems like that watcher is never being
 invoked. Any attempt to produce to the topic just returns an error back to
 the producer that says there was no leader selected.

 Anything advice would be much appreciated. I really would like to get our
 stack tested fully through automated testing and Kafka is the last piece we
 need to assemble.





Re: KafkaServer in integration test not properly assigning to leaders to partitions

2015-05-14 Thread Corey Nolet
I think I figured out what the problem is, though I'm not sure how to fix
it.


I've managed to debug through the embedded broker's callback for the
TopicChangeListener#handleChildChange() int he PartitionStateMachine class.

The following line from that function that's failing look this:

val addedPartitionReplicaAssignment =
ZKUtils.getReplicaAssignmentForTopics(zkClient, newTopics.toSeq)

Inside the getReplicaAssignmentForTopics() it is pulling back a json blob
from the /brokers/topics/testtopic znode's data and it appears the json
blob has some extra bytes @ the beginning of it that are making it
unparseable once pulled from zookeeper.

Any ideas to what this could be? I'm using 0.8.2.0- this is really what's
holding me back right now from getting my tests functional.


On Thu, May 14, 2015 at 4:29 PM, Corey Nolet cjno...@gmail.com wrote:

 I raised the log levels to try to figure out what happens. I see log
 statements on the broker stating:

 New topic creation callback for 
 New partition creation callback for 
 Invoking state change to NewPartition for partitions 
 Invoking state change to OnlinePartitions for partitions 
 Error while fetching metadata for partition [testtopic, 0]
 kafka.common.LeaderNotAvailableExzception: No leader exists for partition
 0...

 I'm not sure what's happening between the time I create my topic and the
 time the broker sees that it needs to add the partition assignment to
 zookeeper with itself as the leader but it's strange that the log messages
 above seem like they are missing the data. New topic creation callback for
  seems like it should be listing a topic and not blank.

 Any ideas?

 On Thu, May 14, 2015 at 1:00 PM, Corey Nolet cjno...@gmail.com wrote:

 I'm firing up a KafkaServer (using some EmbeddedKafkaBroker code that I
 found on Github) so that I can run an end-to-end test ingesting data
 through a kafka topic with consumers in Spark Streaming pushing to
 Accumulo.

 Thus far, my code is doing this:

 1) Creating a MiniAccumuloCluster and KafkaServer (using the zookeepers
 from the mini Accumulo cluster)
 2) Creating a topic using AdminUtil
 3) Starting up a Spark streaming context using a Kafka stream that puts
 all data into Accumulo
 4) Creating a producer and sending a message to the Kafka topic.


 Looking @ the topic metadata in zookeeper after the topic is created,
 let's say testtopic, I never see the metadata for a leader in
 /brokers/topics/testtopic/partitions show up. If I understand correctly,
 creating a topic does this:

 1) Adds a persistent node into Zookeeper with some json data to denote
 the topic's name as well as the partitions and the list of each broker id
 for each partition.
 2) I am still in the process of digging into this part but I think the
 first item in the list of replicas for each partition is used to define the
 initial leader and the leader is notified via a watcher and told to create
 an ephemeral node so that it can know when that node goes down to assign
 another.

 If I'm correct about #2, it seems like that watcher is never being
 invoked. Any attempt to produce to the topic just returns an error back to
 the producer that says there was no leader selected.

 Anything advice would be much appreciated. I really would like to get our
 stack tested fully through automated testing and Kafka is the last piece we
 need to assemble.






Re: KafkaServer in integration test not properly assigning to leaders to partitions

2015-05-14 Thread Corey Nolet
Json encoded blob definitely appears to be going in as a json string. The
partition assignment json seems to be the only thing that is being prefixed
by these bytes. Any ideas?

On Thu, May 14, 2015 at 5:17 PM, Corey Nolet cjno...@gmail.com wrote:

 I think I figured out what the problem is, though I'm not sure how to fix
 it.


 I've managed to debug through the embedded broker's callback for the
 TopicChangeListener#handleChildChange() int he PartitionStateMachine class.

 The following line from that function that's failing look this:

 val addedPartitionReplicaAssignment =
 ZKUtils.getReplicaAssignmentForTopics(zkClient, newTopics.toSeq)

 Inside the getReplicaAssignmentForTopics() it is pulling back a json blob
 from the /brokers/topics/testtopic znode's data and it appears the json
 blob has some extra bytes @ the beginning of it that are making it
 unparseable once pulled from zookeeper.

 Any ideas to what this could be? I'm using 0.8.2.0- this is really what's
 holding me back right now from getting my tests functional.


 On Thu, May 14, 2015 at 4:29 PM, Corey Nolet cjno...@gmail.com wrote:

 I raised the log levels to try to figure out what happens. I see log
 statements on the broker stating:

 New topic creation callback for 
 New partition creation callback for 
 Invoking state change to NewPartition for partitions 
 Invoking state change to OnlinePartitions for partitions 
 Error while fetching metadata for partition [testtopic, 0]
 kafka.common.LeaderNotAvailableExzception: No leader exists for partition
 0...

 I'm not sure what's happening between the time I create my topic and the
 time the broker sees that it needs to add the partition assignment to
 zookeeper with itself as the leader but it's strange that the log messages
 above seem like they are missing the data. New topic creation callback for
  seems like it should be listing a topic and not blank.

 Any ideas?

 On Thu, May 14, 2015 at 1:00 PM, Corey Nolet cjno...@gmail.com wrote:

 I'm firing up a KafkaServer (using some EmbeddedKafkaBroker code that I
 found on Github) so that I can run an end-to-end test ingesting data
 through a kafka topic with consumers in Spark Streaming pushing to
 Accumulo.

 Thus far, my code is doing this:

 1) Creating a MiniAccumuloCluster and KafkaServer (using the zookeepers
 from the mini Accumulo cluster)
 2) Creating a topic using AdminUtil
 3) Starting up a Spark streaming context using a Kafka stream that puts
 all data into Accumulo
 4) Creating a producer and sending a message to the Kafka topic.


 Looking @ the topic metadata in zookeeper after the topic is created,
 let's say testtopic, I never see the metadata for a leader in
 /brokers/topics/testtopic/partitions show up. If I understand correctly,
 creating a topic does this:

 1) Adds a persistent node into Zookeeper with some json data to denote
 the topic's name as well as the partitions and the list of each broker id
 for each partition.
 2) I am still in the process of digging into this part but I think the
 first item in the list of replicas for each partition is used to define the
 initial leader and the leader is notified via a watcher and told to create
 an ephemeral node so that it can know when that node goes down to assign
 another.

 If I'm correct about #2, it seems like that watcher is never being
 invoked. Any attempt to produce to the topic just returns an error back to
 the producer that says there was no leader selected.

 Anything advice would be much appreciated. I really would like to get
 our stack tested fully through automated testing and Kafka is the last
 piece we need to assemble.