Poseidon, a new 0.8 compatible Ruby client for Kafka

2013-02-11 Thread Bob Potter
Howdy,

I just pushed the initial version of a new Ruby client that implements the
new 0.8 wire protocol and includes a producer which uses the topic metadata
API to distribute messages across a cluster.

https://github.com/bpot/poseidon

It's still very alpha, but I hope to put it through its paces some over the
next couple weeks and work out any issues.

I'd appreciate any feedback, especially on the API.

Thanks,
Bob


Re: InvalidMessageSizeException

2013-02-11 Thread Jun Rao
Another way is to figure out a valid offset close to the current offset and
reset the offset in ZK. You can use the tool DumpLogSegment to print out
valid offsets in a log file.

0.6 is pretty old though. I recommend that you upgrade to 0.7.

Thanks,

Jun

On Mon, Feb 11, 2013 at 9:31 AM, Manish Khettry  wrote:

> One of our consumers keeps getting an invalid message size exception. I'm
> pretty sure that we don't have a message size this big (1.7G). We have two
> other consumer groups consuming messages from the same Kafka instance
> happily over the last few days.
>
> Since we keep the logs around for a fixed interval and this consumer group
> has fallen pretty far behind, is it possible that somehow log truncation is
> causing this? We are on kafka 0.6 BTW.
>
> At this point, I'm inclined to wipe out the "/consumers//offsets"
> node in zookeeper to get this system going again. Would that be the
> preferred way of getting out of this bad state?
>
> Let me know if there is any other trouble shooting/diagnostics I can run on
>  the system before I reboot!
>
> Manish
>
> [$DATE] ERROR k.c.FetcherRunnable [] - error in FetcherRunnable
> kafka.common.InvalidMessageSizeException: invalid message size:1852339316
> only received bytes:307196 at 0 possible causes (1) a single message larger
> than the fetch size; (2) log corruption
> at
>
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:75)
> ~[rookery-vacuum.jar:na]
> at
>
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:61)
> ~[rookery-vacuum.jar:na]
> at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:58)
> ~[rookery-vacuum.jar:na]
> at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:50)
> ~[rookery-vacuum.jar:na]
> at
>
> kafka.message.ByteBufferMessageSet.validBytes(ByteBufferMessageSet.scala:49)
> ~[rookery-vacuum.jar:na]
> at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:70)
> ~[rookery-vacuum.jar:na]
> at
>
> kafka.consumer.FetcherRunnable$$anonfun$run$4.apply(FetcherRunnable.scala:80)
> ~[rookery-vacuum.jar:na]
> at kafka.con...
>


Re: Changing replication factors in 0.8.

2013-02-11 Thread Jun Rao
Bob,

We do plan to support changing replication factors online in the future.
This will be a post 0.8 feature though.

Thanks,

Jun

On Mon, Feb 11, 2013 at 9:32 AM, Bob Jervis  wrote:

> In testing our 0.8 cluster, we started by just using the sample
> server.properties file that ships with 0.8 and tweaking it.  The
> replication factor property did not have an exemplar in the file, so we
> didn’t include it.  Naturally, the cluster did not do replication.
>
> ** **
>
> After sending some data through the tests, we then added replication
> factor=2 to the config and restarted the cluster.  After a half-hour it did
> not appear to replicate the topic data we had generated during the first
> test.  We had to erase the topics entirely and start from scratch.
> Replication then happened and when we took down a broker we saw the gaps in
> the topic data get filled in after restarting it.
>
> ** **
>
> In the future, what is the correct way to get a Kafka cluster to change
> the replication factor on an existing topic?  Will we have to flush all
> data from the cluster for a given topic, or is there a recommended way to
> convert existing data?
>
> ** **
>
> *Bob Jervis | Senior Architect*
>
>
> *[image: Description: Description: 
> Visible-sm]*
> **
>
> Seattle *| *Boston* | *New York *|* London
>
> *Phone:* 425.957.6075* | Fax:* 781.404.5711 
>
> ** **
>
> *Follow Visibly Intelligent Blog
> *
>
> ** **
>
> [image: Description: Description: 
> LinkedIn_Logo60px[1]][image:
> Description: Description: 
> facebook]
>  [image: Description: Description: 
> in]
> 
>
> ** **
>


Re: Producers errors when failing a broker in a replicated 0.8 cluster.

2013-02-11 Thread Jun Rao
Bob,

In 0.8, if you send a set of messages in sync mode, the producer will throw
back an exception if at least one message can't be sent to the broker after
all retries. The client won't know which messages are sent successfully and
which are not. We do plan to improve the producer API after 0.8 that can
expose more information to the client.

Thanks,

Jun

On Mon, Feb 11, 2013 at 9:27 AM, Bob Jervis  wrote:

> We are in final testing of Kafka and so far the fail-over tests have been
> pretty encouraging.  If we kill (-9) one of two kafka brokers, with
> replication factor=2 we see a flurry of activity as the producer fails and
> retries its writes (we use a bulk, synchronous send of 1000 messages at a
> time, each message ~1K long).  Sometimes the library finds the newly
> elected leader before returning to the application and sometimes it
> doesn’t.  We added retry/backoff logic to our code and we don’t seem to be
> losing content.
>
> ** **
>
> However, we have another app in the pipeline that does a fan-out from one
> Kafka topic to dozens of topics.  We still use a single, synchronous, bulk
> send.
>
> ** **
>
> My question is what are the semantics of a bulk send like that, where one
> broker dies, but the topic leaders have been spread across both brokers.
> Do we get any feedback on which messages went through and which were
> dropped because the leader just died?  For our own transactioning we can
> mark messages as ‘retries’ if we suspect there might have been any
> hanky-panky, but if we can reliably avoid extra work by not re-sending
> messages that we know have been delivered we can avoid the extra work on
> the client side.
>
> ** **
>
> Thanks for any insight,
>
> ** **
>
> *Bob Jervis | Senior Architect*
>
>
> *[image: Description: Description: 
> Visible-sm]*
> **
>
> Seattle *| *Boston* | *New York *|* London
>
> *Phone:* 425.957.6075* | Fax:* 781.404.5711 
>
> ** **
>
> *Follow Visibly Intelligent Blog
> *
>
> ** **
>
> [image: Description: Description: 
> LinkedIn_Logo60px[1]][image:
> Description: Description: 
> facebook]
>  [image: Description: Description: 
> in]
> 
>
> ** **
>


Changing replication factors in 0.8.

2013-02-11 Thread Bob Jervis
In testing our 0.8 cluster, we started by just using the sample 
server.properties file that ships with 0.8 and tweaking it.  The replication 
factor property did not have an exemplar in the file, so we didn't include it.  
Naturally, the cluster did not do replication.

After sending some data through the tests, we then added replication factor=2 
to the config and restarted the cluster.  After a half-hour it did not appear 
to replicate the topic data we had generated during the first test.  We had to 
erase the topics entirely and start from scratch.  Replication then happened 
and when we took down a broker we saw the gaps in the topic data get filled in 
after restarting it.

In the future, what is the correct way to get a Kafka cluster to change the 
replication factor on an existing topic?  Will we have to flush all data from 
the cluster for a given topic, or is there a recommended way to convert 
existing data?

Bob Jervis | Senior Architect

[cid:image001.png@01CE083A.C1C14FC0]
Seattle | Boston | New York | London
Phone: 425.957.6075 | Fax: 781.404.5711

Follow Visibly Intelligent Blog

[cid:image002.png@01CE083A.C1C14FC0][cid:image003.png@01CE083A.C1C14FC0]
 [cid:image004.png@01CE083A.C1C14FC0] 




InvalidMessageSizeException

2013-02-11 Thread Manish Khettry
One of our consumers keeps getting an invalid message size exception. I'm
pretty sure that we don't have a message size this big (1.7G). We have two
other consumer groups consuming messages from the same Kafka instance
happily over the last few days.

Since we keep the logs around for a fixed interval and this consumer group
has fallen pretty far behind, is it possible that somehow log truncation is
causing this? We are on kafka 0.6 BTW.

At this point, I'm inclined to wipe out the "/consumers//offsets"
node in zookeeper to get this system going again. Would that be the
preferred way of getting out of this bad state?

Let me know if there is any other trouble shooting/diagnostics I can run on
 the system before I reboot!

Manish

[$DATE] ERROR k.c.FetcherRunnable [] - error in FetcherRunnable
kafka.common.InvalidMessageSizeException: invalid message size:1852339316
only received bytes:307196 at 0 possible causes (1) a single message larger
than the fetch size; (2) log corruption
at
kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:75)
~[rookery-vacuum.jar:na]
at
kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:61)
~[rookery-vacuum.jar:na]
at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:58)
~[rookery-vacuum.jar:na]
at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:50)
~[rookery-vacuum.jar:na]
at
kafka.message.ByteBufferMessageSet.validBytes(ByteBufferMessageSet.scala:49)
~[rookery-vacuum.jar:na]
at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:70)
~[rookery-vacuum.jar:na]
at
kafka.consumer.FetcherRunnable$$anonfun$run$4.apply(FetcherRunnable.scala:80)
~[rookery-vacuum.jar:na]
at kafka.con...


Producers errors when failing a broker in a replicated 0.8 cluster.

2013-02-11 Thread Bob Jervis
We are in final testing of Kafka and so far the fail-over tests have been 
pretty encouraging.  If we kill (-9) one of two kafka brokers, with replication 
factor=2 we see a flurry of activity as the producer fails and retries its 
writes (we use a bulk, synchronous send of 1000 messages at a time, each 
message ~1K long).  Sometimes the library finds the newly elected leader before 
returning to the application and sometimes it doesn't.  We added retry/backoff 
logic to our code and we don't seem to be losing content.

However, we have another app in the pipeline that does a fan-out from one Kafka 
topic to dozens of topics.  We still use a single, synchronous, bulk send.

My question is what are the semantics of a bulk send like that, where one 
broker dies, but the topic leaders have been spread across both brokers.  Do we 
get any feedback on which messages went through and which were dropped because 
the leader just died?  For our own transactioning we can mark messages as 
'retries' if we suspect there might have been any hanky-panky, but if we can 
reliably avoid extra work by not re-sending messages that we know have been 
delivered we can avoid the extra work on the client side.

Thanks for any insight,

Bob Jervis | Senior Architect

[cid:image001.png@01CE0839.10AC52D0]
Seattle | Boston | New York | London
Phone: 425.957.6075 | Fax: 781.404.5711

Follow Visibly Intelligent Blog

[cid:image002.png@01CE0839.10AC52D0][cid:image003.png@01CE0839.10AC52D0]
 [cid:image004.png@01CE0839.10AC52D0] 




Re: Consumer re-design and Python

2013-02-11 Thread David Arthur

On 1/31/13 3:30 PM, Marc Labbe wrote:

Hi,

I am fairly new to Kafka and Scala, I am trying to see through the consumer
re-design changes, proposed and implemented for 0.8 and after, which will
affect other languages implementations. There are documentation pages on
the wiki, JIRA issues but I still can't figure out what's already there for
0.8, what will be there in the future and how it affects the consumers
written in other languages (Python in my case).

For instance, I am looking at
https://cwiki.apache.org/KAFKA/consumer-client-re-design.html and the very
well documented
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Detailed+Consumer+Coordinator+Design
and
I am not sure what part is in the works, done and still a proposal. I feel
there are changes there already in 0.8 but not completely, referring
especially to KAFKA-364 and KAFKA-264.

https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Client+Re-Design

and

https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Detailed+Consumer+Coordinator+Design

are the current design docs (as far as I know).


Is this all accurate and up to date? There are talks of a coordinator as
well but from what I see, this hasn't been implemented so far.
From my understanding, the client redesign has not been finalized and 
it still in-progress/todo.


After all, maybe my question is: other than the wire protocol changes, what
changes should I expect to do to SimpleConsumer client written in Python
for v0.8? What should I do next to implement a high level consumer
(ZookeeperConsumerConnector?) which fits with the design proposal?
With 0.8, you will not need to connect to ZooKeeper from the clients. 
With KAFKA-657, offsets are centrally managed by the broker. Any broker 
can handle these requests.


Has anyone started making changes to their implementation yet (thinking
Brod or Samsa)? I'll post that question on github too.
I am working updating my Python client: 
https://github.com/mumrah/kafka-python, still a ways to go yet. The 
biggest change (besides centralized offset management) is that each 
topic+partition is owned by a specific broker (the leader). When 
producing messages, you must send them to the correct leader. This 
requires that clients maintain some state of what belongs where which is 
a pain, but such is the cost of replication.


Thanks and cheers!
marc


-David



Re: kafka replication blog

2013-02-11 Thread Michal Haris
Thanks Jun, makes sense.
On Feb 8, 2013 4:00 PM, "Jun Rao"  wrote:

> That's right. If you are partitioning by key, that means you insist a
> message has to go to a certain partition, whether it's available or not.
> So, if a partition is not available, we will drop the message for the
> partition in the async mode and consistently throw an exception to the
> caller in the sync mode.
>
> Thanks,
>
> Jun
>
> On Fri, Feb 8, 2013 at 1:41 AM, Michal Haris  >wrote:
>
> > So if the produces are partitioning by key we have to have replication if
> > we dont want messages to get lost when partition goes down l right ?
> > Thanks
> > On Feb 8, 2013 5:12 AM, "Jun Rao"  wrote:
> >
> > > We have fixed this issue in 0.8. Withreplication factor 1, if the
> > producer
> > > doesn't care about partitioning by key, messages will be sent to
> > partitions
> > > that are currently available.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Thu, Feb 7, 2013 at 3:11 PM, Michal Haris <
> michal.ha...@visualdna.com
> > > >wrote:
> > >
> > > > Same here, summary was need as we have a fairly large ecosystem of
> > > multiple
> > > > 0.7.2 clusters and I am planning to test upgrade to 0.8.
> > > > However, one thing  creeping at the back of my mind regarding 0.8 is
> > > > something i have spotted in one thread few weeks ago namely that the
> > > > rebalance behaviour of producers is not as robust as in 0.7.x without
> > > > replication and i remeber there was no designed solution at the time
> -
> > > any
> > > > news here ? Basically our usecase doesn't require replication but
> > logical
> > > > offsets and some other things introduced would solve some problems.
> > > > On Feb 7, 2013 7:11 PM, "Vaibhav Puranik" 
> wrote:
> > > >
> > > > > Same here. Thanks a lot Jun.
> > > > >
> > > > > Regards,
> > > > > Vaibhav
> > > > >
> > > > > On Thu, Feb 7, 2013 at 10:38 AM, Felix GV 
> > wrote:
> > > > >
> > > > > > Thanks Jun!
> > > > > >
> > > > > > I hadn't been following the discussions regarding 0.8 and
> > replication
> > > > > for a
> > > > > > little while and this was a great post to refresh my memory and
> get
> > > up
> > > > to
> > > > > > speed on the current replication architecture's design.
> > > > > >
> > > > > > --
> > > > > > Felix
> > > > > >
> > > > > >
> > > > > > On Tue, Feb 5, 2013 at 2:21 PM, Jun Rao 
> wrote:
> > > > > >
> > > > > > > I just posted the following blog on Kafka replication. This may
> > > > answer
> > > > > > some
> > > > > > > of the questions that a few people have asked in the mailing
> list
> > > > > before.
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> http://engineering.linkedin.com/kafka/intra-cluster-replication-apache-kafka
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Jun
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>