Json encoded blob definitely appears to be going in as a json string. The
partition assignment json seems to be the only thing that is being prefixed
by these bytes. Any ideas?

On Thu, May 14, 2015 at 5:17 PM, Corey Nolet <cjno...@gmail.com> wrote:

> I think I figured out what the problem is, though I'm not sure how to fix
> it.
>
>
> I've managed to debug through the embedded broker's callback for the
> TopicChangeListener#handleChildChange() int he PartitionStateMachine class.
>
> The following line from that function that's failing look this:
>
> val addedPartitionReplicaAssignment =
> ZKUtils.getReplicaAssignmentForTopics(zkClient, newTopics.toSeq)
>
> Inside the getReplicaAssignmentForTopics() it is pulling back a json blob
> from the /brokers/topics/testtopic znode's data and it appears the json
> blob has some extra bytes @ the beginning of it that are making it
> unparseable once pulled from zookeeper.
>
> Any ideas to what this could be? I'm using 0.8.2.0- this is really what's
> holding me back right now from getting my tests functional.
>
>
> On Thu, May 14, 2015 at 4:29 PM, Corey Nolet <cjno...@gmail.com> wrote:
>
>> I raised the log levels to try to figure out what happens. I see log
>> statements on the broker stating:
>>
>> "New topic creation callback for "
>> "New partition creation callback for "
>> "Invoking state change to NewPartition for partitions "
>> "Invoking state change to OnlinePartitions for partitions "
>> "Error while fetching metadata for partition [testtopic, 0]
>> kafka.common.LeaderNotAvailableExzception: No leader exists for partition
>> 0..."
>>
>> I'm not sure what's happening between the time I create my topic and the
>> time the broker sees that it needs to add the partition assignment to
>> zookeeper with itself as the leader but it's strange that the log messages
>> above seem like they are missing the data. "New topic creation callback for
>> " seems like it should be listing a topic and not blank.
>>
>> Any ideas?
>>
>> On Thu, May 14, 2015 at 1:00 PM, Corey Nolet <cjno...@gmail.com> wrote:
>>
>>> I'm firing up a KafkaServer (using some EmbeddedKafkaBroker code that I
>>> found on Github) so that I can run an end-to-end test ingesting data
>>> through a kafka topic with consumers in Spark Streaming pushing to
>>> Accumulo.
>>>
>>> Thus far, my code is doing this:
>>>
>>> 1) Creating a MiniAccumuloCluster and KafkaServer (using the zookeepers
>>> from the mini Accumulo cluster)
>>> 2) Creating a topic using AdminUtil
>>> 3) Starting up a Spark streaming context using a Kafka stream that puts
>>> all data into Accumulo
>>> 4) Creating a producer and sending a message to the Kafka topic.
>>>
>>>
>>> Looking @ the topic metadata in zookeeper after the topic is created,
>>> let's say "testtopic", I never see the metadata for a leader in
>>> /brokers/topics/testtopic/partitions show up. If I understand correctly,
>>> creating a topic does this:
>>>
>>> 1) Adds a persistent node into Zookeeper with some json data to denote
>>> the topic's name as well as the partitions and the list of each broker id
>>> for each partition.
>>> 2) I am still in the process of digging into this part but I think the
>>> first item in the list of replicas for each partition is used to define the
>>> initial leader and the leader is notified via a watcher and told to create
>>> an ephemeral node so that it can know when that node goes down to assign
>>> another.
>>>
>>> If I'm correct about #2, it seems like that watcher is never being
>>> invoked. Any attempt to produce to the topic just returns an error back to
>>> the producer that says there was no leader selected.
>>>
>>> Anything advice would be much appreciated. I really would like to get
>>> our stack tested fully through automated testing and Kafka is the last
>>> piece we need to assemble.
>>>
>>>
>>>
>>
>

Reply via email to