KafkaServer in integration test not properly assigning to leaders to partitions
I'm firing up a KafkaServer (using some EmbeddedKafkaBroker code that I found on Github) so that I can run an end-to-end test ingesting data through a kafka topic with consumers in Spark Streaming pushing to Accumulo. Thus far, my code is doing this: 1) Creating a MiniAccumuloCluster and KafkaServer (using the zookeepers from the mini Accumulo cluster) 2) Creating a topic using AdminUtil 3) Starting up a Spark streaming context using a Kafka stream that puts all data into Accumulo 4) Creating a producer and sending a message to the Kafka topic. Looking @ the topic metadata in zookeeper after the topic is created, let's say testtopic, I never see the metadata for a leader in /brokers/topics/testtopic/partitions show up. If I understand correctly, creating a topic does this: 1) Adds a persistent node into Zookeeper with some json data to denote the topic's name as well as the partitions and the list of each broker id for each partition. 2) I am still in the process of digging into this part but I think the first item in the list of replicas for each partition is used to define the initial leader and the leader is notified via a watcher and told to create an ephemeral node so that it can know when that node goes down to assign another. If I'm correct about #2, it seems like that watcher is never being invoked. Any attempt to produce to the topic just returns an error back to the producer that says there was no leader selected. Anything advice would be much appreciated. I really would like to get our stack tested fully through automated testing and Kafka is the last piece we need to assemble.
Re: KafkaServer in integration test not properly assigning to leaders to partitions
I raised the log levels to try to figure out what happens. I see log statements on the broker stating: New topic creation callback for New partition creation callback for Invoking state change to NewPartition for partitions Invoking state change to OnlinePartitions for partitions Error while fetching metadata for partition [testtopic, 0] kafka.common.LeaderNotAvailableExzception: No leader exists for partition 0... I'm not sure what's happening between the time I create my topic and the time the broker sees that it needs to add the partition assignment to zookeeper with itself as the leader but it's strange that the log messages above seem like they are missing the data. New topic creation callback for seems like it should be listing a topic and not blank. Any ideas? On Thu, May 14, 2015 at 1:00 PM, Corey Nolet cjno...@gmail.com wrote: I'm firing up a KafkaServer (using some EmbeddedKafkaBroker code that I found on Github) so that I can run an end-to-end test ingesting data through a kafka topic with consumers in Spark Streaming pushing to Accumulo. Thus far, my code is doing this: 1) Creating a MiniAccumuloCluster and KafkaServer (using the zookeepers from the mini Accumulo cluster) 2) Creating a topic using AdminUtil 3) Starting up a Spark streaming context using a Kafka stream that puts all data into Accumulo 4) Creating a producer and sending a message to the Kafka topic. Looking @ the topic metadata in zookeeper after the topic is created, let's say testtopic, I never see the metadata for a leader in /brokers/topics/testtopic/partitions show up. If I understand correctly, creating a topic does this: 1) Adds a persistent node into Zookeeper with some json data to denote the topic's name as well as the partitions and the list of each broker id for each partition. 2) I am still in the process of digging into this part but I think the first item in the list of replicas for each partition is used to define the initial leader and the leader is notified via a watcher and told to create an ephemeral node so that it can know when that node goes down to assign another. If I'm correct about #2, it seems like that watcher is never being invoked. Any attempt to produce to the topic just returns an error back to the producer that says there was no leader selected. Anything advice would be much appreciated. I really would like to get our stack tested fully through automated testing and Kafka is the last piece we need to assemble.
Re: KafkaServer in integration test not properly assigning to leaders to partitions
I think I figured out what the problem is, though I'm not sure how to fix it. I've managed to debug through the embedded broker's callback for the TopicChangeListener#handleChildChange() int he PartitionStateMachine class. The following line from that function that's failing look this: val addedPartitionReplicaAssignment = ZKUtils.getReplicaAssignmentForTopics(zkClient, newTopics.toSeq) Inside the getReplicaAssignmentForTopics() it is pulling back a json blob from the /brokers/topics/testtopic znode's data and it appears the json blob has some extra bytes @ the beginning of it that are making it unparseable once pulled from zookeeper. Any ideas to what this could be? I'm using 0.8.2.0- this is really what's holding me back right now from getting my tests functional. On Thu, May 14, 2015 at 4:29 PM, Corey Nolet cjno...@gmail.com wrote: I raised the log levels to try to figure out what happens. I see log statements on the broker stating: New topic creation callback for New partition creation callback for Invoking state change to NewPartition for partitions Invoking state change to OnlinePartitions for partitions Error while fetching metadata for partition [testtopic, 0] kafka.common.LeaderNotAvailableExzception: No leader exists for partition 0... I'm not sure what's happening between the time I create my topic and the time the broker sees that it needs to add the partition assignment to zookeeper with itself as the leader but it's strange that the log messages above seem like they are missing the data. New topic creation callback for seems like it should be listing a topic and not blank. Any ideas? On Thu, May 14, 2015 at 1:00 PM, Corey Nolet cjno...@gmail.com wrote: I'm firing up a KafkaServer (using some EmbeddedKafkaBroker code that I found on Github) so that I can run an end-to-end test ingesting data through a kafka topic with consumers in Spark Streaming pushing to Accumulo. Thus far, my code is doing this: 1) Creating a MiniAccumuloCluster and KafkaServer (using the zookeepers from the mini Accumulo cluster) 2) Creating a topic using AdminUtil 3) Starting up a Spark streaming context using a Kafka stream that puts all data into Accumulo 4) Creating a producer and sending a message to the Kafka topic. Looking @ the topic metadata in zookeeper after the topic is created, let's say testtopic, I never see the metadata for a leader in /brokers/topics/testtopic/partitions show up. If I understand correctly, creating a topic does this: 1) Adds a persistent node into Zookeeper with some json data to denote the topic's name as well as the partitions and the list of each broker id for each partition. 2) I am still in the process of digging into this part but I think the first item in the list of replicas for each partition is used to define the initial leader and the leader is notified via a watcher and told to create an ephemeral node so that it can know when that node goes down to assign another. If I'm correct about #2, it seems like that watcher is never being invoked. Any attempt to produce to the topic just returns an error back to the producer that says there was no leader selected. Anything advice would be much appreciated. I really would like to get our stack tested fully through automated testing and Kafka is the last piece we need to assemble.
Re: KafkaServer in integration test not properly assigning to leaders to partitions
Json encoded blob definitely appears to be going in as a json string. The partition assignment json seems to be the only thing that is being prefixed by these bytes. Any ideas? On Thu, May 14, 2015 at 5:17 PM, Corey Nolet cjno...@gmail.com wrote: I think I figured out what the problem is, though I'm not sure how to fix it. I've managed to debug through the embedded broker's callback for the TopicChangeListener#handleChildChange() int he PartitionStateMachine class. The following line from that function that's failing look this: val addedPartitionReplicaAssignment = ZKUtils.getReplicaAssignmentForTopics(zkClient, newTopics.toSeq) Inside the getReplicaAssignmentForTopics() it is pulling back a json blob from the /brokers/topics/testtopic znode's data and it appears the json blob has some extra bytes @ the beginning of it that are making it unparseable once pulled from zookeeper. Any ideas to what this could be? I'm using 0.8.2.0- this is really what's holding me back right now from getting my tests functional. On Thu, May 14, 2015 at 4:29 PM, Corey Nolet cjno...@gmail.com wrote: I raised the log levels to try to figure out what happens. I see log statements on the broker stating: New topic creation callback for New partition creation callback for Invoking state change to NewPartition for partitions Invoking state change to OnlinePartitions for partitions Error while fetching metadata for partition [testtopic, 0] kafka.common.LeaderNotAvailableExzception: No leader exists for partition 0... I'm not sure what's happening between the time I create my topic and the time the broker sees that it needs to add the partition assignment to zookeeper with itself as the leader but it's strange that the log messages above seem like they are missing the data. New topic creation callback for seems like it should be listing a topic and not blank. Any ideas? On Thu, May 14, 2015 at 1:00 PM, Corey Nolet cjno...@gmail.com wrote: I'm firing up a KafkaServer (using some EmbeddedKafkaBroker code that I found on Github) so that I can run an end-to-end test ingesting data through a kafka topic with consumers in Spark Streaming pushing to Accumulo. Thus far, my code is doing this: 1) Creating a MiniAccumuloCluster and KafkaServer (using the zookeepers from the mini Accumulo cluster) 2) Creating a topic using AdminUtil 3) Starting up a Spark streaming context using a Kafka stream that puts all data into Accumulo 4) Creating a producer and sending a message to the Kafka topic. Looking @ the topic metadata in zookeeper after the topic is created, let's say testtopic, I never see the metadata for a leader in /brokers/topics/testtopic/partitions show up. If I understand correctly, creating a topic does this: 1) Adds a persistent node into Zookeeper with some json data to denote the topic's name as well as the partitions and the list of each broker id for each partition. 2) I am still in the process of digging into this part but I think the first item in the list of replicas for each partition is used to define the initial leader and the leader is notified via a watcher and told to create an ephemeral node so that it can know when that node goes down to assign another. If I'm correct about #2, it seems like that watcher is never being invoked. Any attempt to produce to the topic just returns an error back to the producer that says there was no leader selected. Anything advice would be much appreciated. I really would like to get our stack tested fully through automated testing and Kafka is the last piece we need to assemble.
Horizontal scaling a topic
I have a cluster of 3 nodes and I've created a topic with some number of partitions and some number of replicas, let's say 10 and 2, respectively. Later, after I've got my 3 nodes fairly consumed with data in the 10 partitions, I want to add 2 more nodes to the mix to help balance out the partitions/replicas of my topic across 5 physical nodes instead of just 3. I was assuming Kafka would just notice the new node and auto-replicate partitions to it but research is telling me that this probably isn't the case. Let's say I want no data loss and I want Kafka to spread my 10 partitions across all 5 nodes. How would I do this currently?
Re: Batching at the socket layer
Thanks Jiangie! So what version is considered the new api? Is that the javaapi in version 0.8.2?. On Mon, Mar 9, 2015 at 2:29 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: The stickiness of partition only applies to old producer. In new producer we have the round robin for each message. The batching in new producer is per topic partition, the batch size it is controlled by both max batch size and linger time config. Jiangjie (Becket) Qin On 3/9/15, 10:10 AM, Corey Nolet cjno...@gmail.com wrote: I'm curious what type of batching Kafka producers do at the socket layer. For instance, if I have a partitioner that round robin's n messages to a different partition, am I guaranteed to get n different messages sent over the socket or is there some micro-batching going on underneath? I am trying to understand the semantics of the default partitioner and why it sticks to partitions for 10 minutes. If I were to lower that interval to 1sec, would I acheive better batching that I would if I was to completely round-robin each message to a different partition?
Fwd: Verioning
I'm new to Kafka and I'm trying to understand the version semantics. We want to use Kafka w/ Spark but our version of Spark is tied to 0.8.0. We were wondering what guarantees are made about backwards compatbility across 0.8.x.x. At first glance, given the 3 digits used for versions, I figured 0.8.x would be a bugfix and fully version-compatible but I'm noticing newer versions released w/ 4 digits which leads me to beleave there are less guarantees between 0.8.0, 0.8.1.x and 0.8.2.x.
Fwd: Batching at the socket layer
I'm curious what type of batching Kafka producers do at the socket layer. For instance, if I have a partitioner that round robin's n messages to a different partition, am I guaranteed to get n different messages sent over the socket or is there some micro-batching going on underneath? I am trying to understand the semantics of the default partitioner and why it sticks to partitions for 10 minutes. If I were to lower that interval to 1sec, would I acheive better batching that I would if I was to completely round-robin each message to a different partition?