Json encoded blob definitely appears to be going in as a json string. The partition assignment json seems to be the only thing that is being prefixed by these bytes. Any ideas?
On Thu, May 14, 2015 at 5:17 PM, Corey Nolet <cjno...@gmail.com> wrote: > I think I figured out what the problem is, though I'm not sure how to fix > it. > > > I've managed to debug through the embedded broker's callback for the > TopicChangeListener#handleChildChange() int he PartitionStateMachine class. > > The following line from that function that's failing look this: > > val addedPartitionReplicaAssignment = > ZKUtils.getReplicaAssignmentForTopics(zkClient, newTopics.toSeq) > > Inside the getReplicaAssignmentForTopics() it is pulling back a json blob > from the /brokers/topics/testtopic znode's data and it appears the json > blob has some extra bytes @ the beginning of it that are making it > unparseable once pulled from zookeeper. > > Any ideas to what this could be? I'm using 0.8.2.0- this is really what's > holding me back right now from getting my tests functional. > > > On Thu, May 14, 2015 at 4:29 PM, Corey Nolet <cjno...@gmail.com> wrote: > >> I raised the log levels to try to figure out what happens. I see log >> statements on the broker stating: >> >> "New topic creation callback for " >> "New partition creation callback for " >> "Invoking state change to NewPartition for partitions " >> "Invoking state change to OnlinePartitions for partitions " >> "Error while fetching metadata for partition [testtopic, 0] >> kafka.common.LeaderNotAvailableExzception: No leader exists for partition >> 0..." >> >> I'm not sure what's happening between the time I create my topic and the >> time the broker sees that it needs to add the partition assignment to >> zookeeper with itself as the leader but it's strange that the log messages >> above seem like they are missing the data. "New topic creation callback for >> " seems like it should be listing a topic and not blank. >> >> Any ideas? >> >> On Thu, May 14, 2015 at 1:00 PM, Corey Nolet <cjno...@gmail.com> wrote: >> >>> I'm firing up a KafkaServer (using some EmbeddedKafkaBroker code that I >>> found on Github) so that I can run an end-to-end test ingesting data >>> through a kafka topic with consumers in Spark Streaming pushing to >>> Accumulo. >>> >>> Thus far, my code is doing this: >>> >>> 1) Creating a MiniAccumuloCluster and KafkaServer (using the zookeepers >>> from the mini Accumulo cluster) >>> 2) Creating a topic using AdminUtil >>> 3) Starting up a Spark streaming context using a Kafka stream that puts >>> all data into Accumulo >>> 4) Creating a producer and sending a message to the Kafka topic. >>> >>> >>> Looking @ the topic metadata in zookeeper after the topic is created, >>> let's say "testtopic", I never see the metadata for a leader in >>> /brokers/topics/testtopic/partitions show up. If I understand correctly, >>> creating a topic does this: >>> >>> 1) Adds a persistent node into Zookeeper with some json data to denote >>> the topic's name as well as the partitions and the list of each broker id >>> for each partition. >>> 2) I am still in the process of digging into this part but I think the >>> first item in the list of replicas for each partition is used to define the >>> initial leader and the leader is notified via a watcher and told to create >>> an ephemeral node so that it can know when that node goes down to assign >>> another. >>> >>> If I'm correct about #2, it seems like that watcher is never being >>> invoked. Any attempt to produce to the topic just returns an error back to >>> the producer that says there was no leader selected. >>> >>> Anything advice would be much appreciated. I really would like to get >>> our stack tested fully through automated testing and Kafka is the last >>> piece we need to assemble. >>> >>> >>> >> >