RE: Partition key not working properly

2014-11-25 Thread Haoming Zhang
Hi Svante,

Thanks for your reply!

As you said, my purpose is let "all messages with the same key goes to the same 
partition", but the actual case is even I hard code the same partition 
key(let's say the key is "1") for three messages, the messages are still goes 
to different partitions.

Regards,
Haoming

> Date: Wed, 26 Nov 2014 08:03:04 +0100
> Subject: Re: Partition key not working properly
> From: s...@csi.se
> To: users@kafka.apache.org
> 
> By default, the partition key is used for hashing then it's placed in a
> partition that has the appropriate hashed keyspace.
> 
> If you have three physical partitions and then give the partition key "5"
> it has nothing to do with physical partition 5 (that does not exist) ,
> similar to physical: partition = hash("5") mod 3
> 
> 
> The only guarantee is that all messages with the same key goes to the same
> partition. This is useful to make sure that for example all logs from the
> same ip goest to the same partition which means that they can be read by
> the same producer.
> 
> /svante
> 
> 
> 
> 2014-11-26 2:42 GMT+01:00 Haoming Zhang :
> 
> >
> >
> >
> > Hi all,
> >
> > I'm struggling with how to use the partition key mechanism properly. My
> > logic is set the partition number as 3, then  create three partition keys
> > as "0", "1", "2", then use the partition keys to create three KeyedMessage
> > such as
> > KeyedMessage(topic, "0", message),
> > KeyedMessage(topic, "1", message),
> > KeyedMessage(topic, "2", message)
> >
> > After this, creating a producer instance to send out all the KeyedMessage.
> >
> > I expecting each KeyedMessage should enter to different partitions
> > according to the different partition keys, which means
> > KeyedMessage(topic, "0", message) go to Partition 0,
> > KeyedMessage(topic, "1", message) go to Partition 1,
> > KeyedMessage(topic, "2", message) go to Partition 2
> >
> > I'm using Kafka-web-console to watch the topic status, but the result is
> > not like what I'm expecting. KeyedMessage still go to partitions randomly,
> > some times two KeyedMessage will enter the same partition even they have
> > different partition keys, .
> >
> > Not sure whether my logic is incorrect or I didn't understand the
> > partition key mechanism correctly. Anyone could provides some sample code
> > or explanation would be great!
> >
> > Thanks,
> > Haoming
> >
> >
  

Re: Partition key not working properly

2014-11-25 Thread svante karlsson
By default, the partition key is used for hashing then it's placed in a
partition that has the appropriate hashed keyspace.

If you have three physical partitions and then give the partition key "5"
it has nothing to do with physical partition 5 (that does not exist) ,
similar to physical: partition = hash("5") mod 3


The only guarantee is that all messages with the same key goes to the same
partition. This is useful to make sure that for example all logs from the
same ip goest to the same partition which means that they can be read by
the same producer.

/svante



2014-11-26 2:42 GMT+01:00 Haoming Zhang :

>
>
>
> Hi all,
>
> I'm struggling with how to use the partition key mechanism properly. My
> logic is set the partition number as 3, then  create three partition keys
> as "0", "1", "2", then use the partition keys to create three KeyedMessage
> such as
> KeyedMessage(topic, "0", message),
> KeyedMessage(topic, "1", message),
> KeyedMessage(topic, "2", message)
>
> After this, creating a producer instance to send out all the KeyedMessage.
>
> I expecting each KeyedMessage should enter to different partitions
> according to the different partition keys, which means
> KeyedMessage(topic, "0", message) go to Partition 0,
> KeyedMessage(topic, "1", message) go to Partition 1,
> KeyedMessage(topic, "2", message) go to Partition 2
>
> I'm using Kafka-web-console to watch the topic status, but the result is
> not like what I'm expecting. KeyedMessage still go to partitions randomly,
> some times two KeyedMessage will enter the same partition even they have
> different partition keys, .
>
> Not sure whether my logic is incorrect or I didn't understand the
> partition key mechanism correctly. Anyone could provides some sample code
> or explanation would be great!
>
> Thanks,
> Haoming
>
>


Can Mirroring Preserve Every Topic's Partition?

2014-11-25 Thread Alex Melville
Howdy friends,


I'd like to mirror the topics on several clusters to a central cluster, and
I'm looking at using the default Mirrormaker to do so. I've already done
some basic testing on the Mirrormaker found here:

https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=27846330

and managed to successfully copy a topic's partitions on a source cluster
to a topic on a target cluster. So I'm able to mirror correctly. However
for my particular use case I need to ensure that when I copy a topic's
partitions from source cluster to target cluster, a partition created on
the target cluster contains data in the exact same order as the data on the
corresponding partition on the source cluster.

I'm thinking of writing a Simple Consumer so I can manually compare the
events in a source cluster's partition with the corresponding partition on
the target cluster, but I'm not 100% sure if I'll be able to verify my
guarantee if I do it this way. Can anyone here verify that partitions
copied over to the target cluster by the default Mirrormaker are an exact
copy of those on the source cluster?


Thanks in advance,

Alex Melville


Re: dynamically changing log level on a running broker?

2014-11-25 Thread Jun Rao
You can do this through jconsole on the log4j mbean.

Thanks,

Jun

On Tue, Nov 25, 2014 at 2:56 AM, ben fleis  wrote:

> Hello hello,
>
> From what I see at KAFKA-16 <
> https://issues.apache.org/jira/browse/KAFKA-16>
> and KAFKA-429 , it should
> be possible to change log levels on a running broker process.  The patches
> appears (to my naive eyes: I speak no Java/MBeans, and very little Scala)
> to provide the capability, but it's not clear to me -- how can I actually
> do this from the command line on a running system?  Anybody have a simple
> example of this?  Have I simply missed it?
>
> Thanks!
>
> b
>


Re: Does Kafka Producer service ?

2014-11-25 Thread Jun Rao
Assuming that you use a single producer in the async mode, the Kafka
overhead should be limited to a single thread. Using a cheaper compression
codec such as snappy will also help reducing the CPU load.

Thanks,

Jun

On Tue, Nov 25, 2014 at 12:02 PM, Krishna Raj 
wrote:

> Hi Jun,
>
> Thanks for replying back on this. Appreciated.
>
> I do understand that the Kafka Client just needs a protocol compatibility
> with the Application which is producing the messages.
>
> To clarity a bit more:
>
> I witnessed a scenario where a large scale website uses the Kafka Library
> in their Web Application. So in this case, the Kafka libraries are tied to
> the Application which are served by Web Servers.
>
> So, When there was an issue caused by Kafka related to CPU usage, the team
> wanted to do a patch. In this case, in order to do a patch, they had to
> create a new WAR package and deploy again in Web Server which is a
> significant effort.
>
> I totally understand that having a layer like Logging service in between
> Kafka and the Application will totally defect the purpose for Kafka.
>
> And I would love to know your advice how best to handle these type of
> maintenance.
>
> Thanks,
> Krishna Raj
>
>
>
>
>
> On Tue, Nov 25, 2014 at 10:58 AM, Jun Rao  wrote:
>
>> Could you be a bit more specific about the issue? As long as there is
>> protocol compatibility btw the Kafka client and the broker, upgrading the
>> Kafka client library should be easy, right?
>>
>> Thanks,
>>
>> Jun
>>
>> On Mon, Nov 24, 2014 at 3:57 PM, Krishna Raj 
>> wrote:
>>
>>> Hello Amazing Kafka Creators & User,
>>>
>>> I have learnt and use kafka in our Production system, so you can count
>>> my understanding as intermediate.
>>>
>>> With the statement that "Kafka has solved the Scalability and
>>> Availability needs for a large scale message publish/subscribe system", I
>>> understand that having a Producer Service which sits in between the
>>> Application and the Producer defects the one major purpose of Kafka.
>>>
>>> So, my question is, How to loosely couple Kafka with my Production
>>> Application ?
>>>
>>> The reason being, I wish to do all producer code and Kafka library
>>> maintenance without affecting my large scale Production system. Its not an
>>> easy thing to buy a window to these type of changes done on a large scale
>>> production application :)
>>>
>>> Any advice on how this can be achieved(even moderately) will greatly
>>> help ?
>>>
>>> Thanks,
>>> Krishna Raj
>>>
>>
>>
>


Partition key not working properly

2014-11-25 Thread Haoming Zhang



Hi all,

I'm struggling with how to use the partition key mechanism properly. My logic 
is set the partition number as 3, then  create three partition keys as "0", 
"1", "2", then use the partition keys to create three KeyedMessage such as 
KeyedMessage(topic, "0", message),
KeyedMessage(topic, "1", message), 
KeyedMessage(topic, "2", message)

After this, creating a producer instance to send out all the KeyedMessage.

I expecting each KeyedMessage should enter to different partitions according to 
the different partition keys, which means 
KeyedMessage(topic, "0", message) go to Partition 0,
KeyedMessage(topic, "1", message) go to Partition 1,
KeyedMessage(topic, "2", message) go to Partition 2

I'm using Kafka-web-console to watch the topic status, but the result is not 
like what I'm expecting. KeyedMessage still go to partitions randomly, some 
times two KeyedMessage will enter the same partition even they have different 
partition keys, .

Not sure whether my logic is incorrect or I didn't understand the partition key 
mechanism correctly. Anyone could provides some sample code or explanation 
would be great!

Thanks,
Haoming

  

Re: How many messages does each broker have?

2014-11-25 Thread Palur Sandeep
Hi Jiangjie,

This is what I have understood. Please correct me if I am wrong

I don’t use the partition class at all(KeyedMessage data =
new KeyedMessage(topic_name,new_mes). It partitions
messages randomly to different partitions. I don’t see it sticking  to any
broker for 10 mins. I guess it follows some random partitioning logic. I am
using the following 0.8.1.1 version.

MessageAndMetadata on consumer side prints the following message: Can you
help me find out metadat regarding partition number?

*Thread 0: Message(magic = 0, attributes = 0, crc = 127991357, key =
java.nio.HeapByteBuffer[pos=0 lim=1 cap=55], payload =
java.nio.HeapByteBuffer[pos=0 lim=50 cap=50])*

Thanks
Sandeep

On Tue, Nov 25, 2014 at 7:07 PM, Jiangjie Qin 
wrote:

> Palur,
>
> Just adding to what Guozhang said, the answer to your question might
> depend on which producer you are using.
> Assuming you are producing messages without keys to the same topic, in new
> producer(KafkaProducer), the messages will go to brokers in a round robin
> way, so the messages will end up in brokers evenly distributed. Whereas in
> old producer, it actually sticks to a particular broker for 10 min (by
> default) then switch to another random partition. In that case, if you
> send messages fast enough, you might see uneven distribution in brokers.
>
> For the consumer, if you are using high level consumer, when reading from
> KafkaStream, you will get MessageAndMetadata, the topic and partition
> information is included in it as well as the raw message.
>
> Jiangjie (Becket) Qin
>
>
>
> On 11/25/14, 10:01 AM, "Guozhang Wang"  wrote:
>
> >Palur,
> >
> >If the 8 partitions are hosted on each one of the nodes, assuming
> >replication factor 1 then each node will get roughly 10 / 8 messages
> >due to the random partitioner. If you want to know exactly how many
> >messages is on each broker then you can use a simple consumer which allows
> >you to specify the partition id you want to consume from.
> >
> >In the new consumer (0.9), each of the consumed message will contain the
> >partition id as part of its message metadata.
> >
> >Guozhang
> >
> >On Tue, Nov 25, 2014 at 7:47 AM, Palur Sandeep 
> >wrote:
> >
> >> Dear Developers,
> >>
> >> I am using the default partitioning logic(Random Partitioning) to
> >>produce
> >> messages into brokers. That is I don't use a partitioner.class.
> >>
> >> My requirement is If I produce 10 messages using the below code for
> >>a
> >> broker that has 8 partitions across 8 nodes. How many messages will each
> >> partition have? Is there any API that can help me find the broker id of
> >>the
> >> each message I consume from the consumer side?
> >>
> >> PS: I dont want to use partitioner.class. I want use the kafka's default
> >> partitioning logic.
> >>
> >>   KeyedMessage data = new KeyedMessage >> String>(topic_name,new_mes);
> >>
> >> producer.send(data);
> >>
> >> --
> >> Regards,
> >> Sandeep Palur
> >> Data-Intensive Distributed Systems Laboratory, CS/IIT
> >> Department of Computer Science, Illinois Institute of Technology (IIT)
> >> Phone : 312-647-9833
> >> Email : psand...@hawk.iit.edu 
> >>
> >
> >
> >
> >--
> >-- Guozhang
>
>


-- 
Regards,
Sandeep Palur
Data-Intensive Distributed Systems Laboratory, CS/IIT
Department of Computer Science, Illinois Institute of Technology (IIT)
Phone : 312-647-9833
Email : psand...@hawk.iit.edu 


Re: How many messages does each broker have?

2014-11-25 Thread Jiangjie Qin
Palur,

Just adding to what Guozhang said, the answer to your question might
depend on which producer you are using.
Assuming you are producing messages without keys to the same topic, in new
producer(KafkaProducer), the messages will go to brokers in a round robin
way, so the messages will end up in brokers evenly distributed. Whereas in
old producer, it actually sticks to a particular broker for 10 min (by
default) then switch to another random partition. In that case, if you
send messages fast enough, you might see uneven distribution in brokers.

For the consumer, if you are using high level consumer, when reading from
KafkaStream, you will get MessageAndMetadata, the topic and partition
information is included in it as well as the raw message.

Jiangjie (Becket) Qin



On 11/25/14, 10:01 AM, "Guozhang Wang"  wrote:

>Palur,
>
>If the 8 partitions are hosted on each one of the nodes, assuming
>replication factor 1 then each node will get roughly 10 / 8 messages
>due to the random partitioner. If you want to know exactly how many
>messages is on each broker then you can use a simple consumer which allows
>you to specify the partition id you want to consume from.
>
>In the new consumer (0.9), each of the consumed message will contain the
>partition id as part of its message metadata.
>
>Guozhang
>
>On Tue, Nov 25, 2014 at 7:47 AM, Palur Sandeep 
>wrote:
>
>> Dear Developers,
>>
>> I am using the default partitioning logic(Random Partitioning) to
>>produce
>> messages into brokers. That is I don't use a partitioner.class.
>>
>> My requirement is If I produce 10 messages using the below code for
>>a
>> broker that has 8 partitions across 8 nodes. How many messages will each
>> partition have? Is there any API that can help me find the broker id of
>>the
>> each message I consume from the consumer side?
>>
>> PS: I dont want to use partitioner.class. I want use the kafka's default
>> partitioning logic.
>>
>>   KeyedMessage data = new KeyedMessage> String>(topic_name,new_mes);
>>
>> producer.send(data);
>>
>> --
>> Regards,
>> Sandeep Palur
>> Data-Intensive Distributed Systems Laboratory, CS/IIT
>> Department of Computer Science, Illinois Institute of Technology (IIT)
>> Phone : 312-647-9833
>> Email : psand...@hawk.iit.edu 
>>
>
>
>
>-- 
>-- Guozhang



Re: logging agent based on fuse and kafka: first release

2014-11-25 Thread Neha Narkhede
Great. Thanks for sharing. I added it to our ecosystem
 wiki.

On Tue, Nov 25, 2014 at 9:58 AM, yazgoo  wrote:

> Hi,
>
> First I'd like to thank kafka developers for writing kafka.
>
> This is an announcement for the first release of a file system logging
> agent based on kafka.
>
> It is written for collecting logs from servers running all kind of
> software,
> as a generic way to collect logs without needing to know about each logger.
>
> Home:
> https://github.com/yazgoo/fuse_kafka
>
> Here are some functionnalities:
>
>- sends all writes to given directories to kafka
>- passes through FS syscalls to underlying directory
>- captures the pid, gid, uid, user, group, command line doing the write
>- you can add metadata to identify from where the message comes from
>(e.g. ip-address, ...)
>- you can configure kafka destination cluster either by giving a broker
>list or a zookeeper list
>- you can specify a bandwidth quota: fuse_kafka won't send data if a
>file is written more than a given size per second (useful for preventing
>floods caused by core files dumped or log rotations in directories
> watched
>by fuse_kafka)
>
> It is based on:
>
>- FUSE (filesystem in userspace), to capture writes done under a given
>directory
>- kafka (messaging queue), as the event transport system
>- logstash: events are written to kafka in logstash format (except
>messages and commands which are stored in base64)
>
> It is written in C and python.
>
> Packages are provided for various distros, see installing section in
> README.md.
> FUSE adds an overhead, so it should not be used on filesystems where high
> throughput is necessary.
> Here are benchmarks:
>
> http://htmlpreview.github.io/?https://raw.githubusercontent.com/yazgoo/fuse_kafka/master/benchs/benchmarks.html
>
> Contributions are welcome, of course!
>
> Regards
>


MetadataResponse error code handling

2014-11-25 Thread Evan Huus
Hi folks,

I was wondering in which cases the PartitionMetadata section of a
MetadataResponse [1] can contain useful information?

I had been working under the assumption that when the PartitionErrorCode
was 0 (NoError) then the rest of the data would be legitimate, and that
when the error was non-0 then the rest of the data would be blank/empty/etc.

However, I recently discovered a JIRA ticket [2] which suggests otherwise,
namely that there are certain error codes for which parts of the remainder
of the metadata may still be filled in. I have not found any explicit
documentation for which error codes this is true. Does anybody know?

Thanks,
Evan

P.S. I'm not 100% sure if this kind of question should go on users@ or dev@.
Please let me know if I'm in the wrong place.

[1]
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataResponse
[2] https://issues.apache.org/jira/browse/KAFKA-1609


Re: How many messages does each broker have?

2014-11-25 Thread Palur Sandeep
Thank you Gouzhang. I dont find 0.9 version in the following page
https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.1.1
I am looking for kafka jar version 0.9.

Can you also tell me how to turn off "flushing messages to disk" in kafka.
I never want this to happen

Thank you




On Tue, Nov 25, 2014 at 12:01 PM, Guozhang Wang  wrote:

> Palur,
>
> If the 8 partitions are hosted on each one of the nodes, assuming
> replication factor 1 then each node will get roughly 10 / 8 messages
> due to the random partitioner. If you want to know exactly how many
> messages is on each broker then you can use a simple consumer which allows
> you to specify the partition id you want to consume from.
>
> In the new consumer (0.9), each of the consumed message will contain the
> partition id as part of its message metadata.
>
> Guozhang
>
> On Tue, Nov 25, 2014 at 7:47 AM, Palur Sandeep 
> wrote:
>
> > Dear Developers,
> >
> > I am using the default partitioning logic(Random Partitioning) to produce
> > messages into brokers. That is I don't use a partitioner.class.
> >
> > My requirement is If I produce 10 messages using the below code for a
> > broker that has 8 partitions across 8 nodes. How many messages will each
> > partition have? Is there any API that can help me find the broker id of
> the
> > each message I consume from the consumer side?
> >
> > PS: I dont want to use partitioner.class. I want use the kafka's default
> > partitioning logic.
> >
> >   KeyedMessage data = new KeyedMessage > String>(topic_name,new_mes);
> >
> > producer.send(data);
> >
> > --
> > Regards,
> > Sandeep Palur
> > Data-Intensive Distributed Systems Laboratory, CS/IIT
> > Department of Computer Science, Illinois Institute of Technology (IIT)
> > Phone : 312-647-9833
> > Email : psand...@hawk.iit.edu 
> >
>
>
>
> --
> -- Guozhang
>



-- 
Regards,
Sandeep Palur
Data-Intensive Distributed Systems Laboratory, CS/IIT
Department of Computer Science, Illinois Institute of Technology (IIT)
Phone : 312-647-9833
Email : psand...@hawk.iit.edu 


Re: Does Kafka Producer service ?

2014-11-25 Thread Krishna Raj
Hi Jun,

Thanks for replying back on this. Appreciated.

I do understand that the Kafka Client just needs a protocol compatibility
with the Application which is producing the messages.

To clarity a bit more:

I witnessed a scenario where a large scale website uses the Kafka Library
in their Web Application. So in this case, the Kafka libraries are tied to
the Application which are served by Web Servers.

So, When there was an issue caused by Kafka related to CPU usage, the team
wanted to do a patch. In this case, in order to do a patch, they had to
create a new WAR package and deploy again in Web Server which is a
significant effort.

I totally understand that having a layer like Logging service in between
Kafka and the Application will totally defect the purpose for Kafka.

And I would love to know your advice how best to handle these type of
maintenance.

Thanks,
Krishna Raj





On Tue, Nov 25, 2014 at 10:58 AM, Jun Rao  wrote:

> Could you be a bit more specific about the issue? As long as there is
> protocol compatibility btw the Kafka client and the broker, upgrading the
> Kafka client library should be easy, right?
>
> Thanks,
>
> Jun
>
> On Mon, Nov 24, 2014 at 3:57 PM, Krishna Raj 
> wrote:
>
>> Hello Amazing Kafka Creators & User,
>>
>> I have learnt and use kafka in our Production system, so you can count my
>> understanding as intermediate.
>>
>> With the statement that "Kafka has solved the Scalability and
>> Availability needs for a large scale message publish/subscribe system", I
>> understand that having a Producer Service which sits in between the
>> Application and the Producer defects the one major purpose of Kafka.
>>
>> So, my question is, How to loosely couple Kafka with my Production
>> Application ?
>>
>> The reason being, I wish to do all producer code and Kafka library
>> maintenance without affecting my large scale Production system. Its not an
>> easy thing to buy a window to these type of changes done on a large scale
>> production application :)
>>
>> Any advice on how this can be achieved(even moderately) will greatly help
>> ?
>>
>> Thanks,
>> Krishna Raj
>>
>
>


Re: Does Kafka Producer service ?

2014-11-25 Thread Jun Rao
Could you be a bit more specific about the issue? As long as there is
protocol compatibility btw the Kafka client and the broker, upgrading the
Kafka client library should be easy, right?

Thanks,

Jun

On Mon, Nov 24, 2014 at 3:57 PM, Krishna Raj 
wrote:

> Hello Amazing Kafka Creators & User,
>
> I have learnt and use kafka in our Production system, so you can count my
> understanding as intermediate.
>
> With the statement that "Kafka has solved the Scalability and Availability
> needs for a large scale message publish/subscribe system", I understand
> that having a Producer Service which sits in between the Application and
> the Producer defects the one major purpose of Kafka.
>
> So, my question is, How to loosely couple Kafka with my Production
> Application ?
>
> The reason being, I wish to do all producer code and Kafka library
> maintenance without affecting my large scale Production system. Its not an
> easy thing to buy a window to these type of changes done on a large scale
> production application :)
>
> Any advice on how this can be achieved(even moderately) will greatly help ?
>
> Thanks,
> Krishna Raj
>


Re: How many messages does each broker have?

2014-11-25 Thread Guozhang Wang
Palur,

If the 8 partitions are hosted on each one of the nodes, assuming
replication factor 1 then each node will get roughly 10 / 8 messages
due to the random partitioner. If you want to know exactly how many
messages is on each broker then you can use a simple consumer which allows
you to specify the partition id you want to consume from.

In the new consumer (0.9), each of the consumed message will contain the
partition id as part of its message metadata.

Guozhang

On Tue, Nov 25, 2014 at 7:47 AM, Palur Sandeep 
wrote:

> Dear Developers,
>
> I am using the default partitioning logic(Random Partitioning) to produce
> messages into brokers. That is I don't use a partitioner.class.
>
> My requirement is If I produce 10 messages using the below code for a
> broker that has 8 partitions across 8 nodes. How many messages will each
> partition have? Is there any API that can help me find the broker id of the
> each message I consume from the consumer side?
>
> PS: I dont want to use partitioner.class. I want use the kafka's default
> partitioning logic.
>
>   KeyedMessage data = new KeyedMessage String>(topic_name,new_mes);
>
> producer.send(data);
>
> --
> Regards,
> Sandeep Palur
> Data-Intensive Distributed Systems Laboratory, CS/IIT
> Department of Computer Science, Illinois Institute of Technology (IIT)
> Phone : 312-647-9833
> Email : psand...@hawk.iit.edu 
>



-- 
-- Guozhang


logging agent based on fuse and kafka: first release

2014-11-25 Thread yazgoo
Hi,

First I'd like to thank kafka developers for writing kafka.

This is an announcement for the first release of a file system logging
agent based on kafka.

It is written for collecting logs from servers running all kind of software,
as a generic way to collect logs without needing to know about each logger.

Home:
https://github.com/yazgoo/fuse_kafka

Here are some functionnalities:

   - sends all writes to given directories to kafka
   - passes through FS syscalls to underlying directory
   - captures the pid, gid, uid, user, group, command line doing the write
   - you can add metadata to identify from where the message comes from
   (e.g. ip-address, ...)
   - you can configure kafka destination cluster either by giving a broker
   list or a zookeeper list
   - you can specify a bandwidth quota: fuse_kafka won't send data if a
   file is written more than a given size per second (useful for preventing
   floods caused by core files dumped or log rotations in directories watched
   by fuse_kafka)

It is based on:

   - FUSE (filesystem in userspace), to capture writes done under a given
   directory
   - kafka (messaging queue), as the event transport system
   - logstash: events are written to kafka in logstash format (except
   messages and commands which are stored in base64)

It is written in C and python.

Packages are provided for various distros, see installing section in
README.md.
FUSE adds an overhead, so it should not be used on filesystems where high
throughput is necessary.
Here are benchmarks:
http://htmlpreview.github.io/?https://raw.githubusercontent.com/yazgoo/fuse_kafka/master/benchs/benchmarks.html

Contributions are welcome, of course!

Regards


Re: [DISCUSSION] adding the serializer api back to the new java producer

2014-11-25 Thread Jun Rao
Bhavesh,

This api change doesn't mean you need to change the format of the encoded
data. It simply moves the serialization logic from the application to a
pluggable serializer. As long as you preserve the serialization logic, the
consumer should still see the same bytes.

If you are talking about how to evolve the data schema over time, that's a
separate story. Serialization libraries like Avro have better support on
schema evolution.

Thanks,

Jun

On Tue, Nov 25, 2014 at 8:41 AM, Bhavesh Mistry 
wrote:

> How will mix bag will work with Consumer side ?  Entire site can not be
> rolled at once so Consumer will have to deals with New and Old Serialize
> Bytes ?  This could be app team responsibility.  Are you guys targeting
> 0.8.2 release, which may break customer who are already using new producer
> API (beta version).
>
> Thanks,
>
> Bhavesh
>
> On Tue, Nov 25, 2014 at 8:29 AM, Manikumar Reddy 
> wrote:
>
> > +1 for this change.
> >
> > what about de-serializer  class in 0.8.2?  Say i am using new producer
> with
> > Avro and old consumer combination.
> > then i need to give custom Decoder implementation for Avro right?.
> >
> > On Tue, Nov 25, 2014 at 9:19 PM, Joe Stein  wrote:
> >
> > > The serializer is an expected use of the producer/consumer now and
> think
> > we
> > > should continue that support in the new client. As far as breaking the
> > API
> > > it is why we released the 0.8.2-beta to help get through just these
> type
> > of
> > > blocking issues in a way that the community at large could be involved
> in
> > > easier with a build/binaries to download and use from maven also.
> > >
> > > +1 on the change now prior to the 0.8.2 release.
> > >
> > > - Joe Stein
> > >
> > >
> > > On Mon, Nov 24, 2014 at 11:43 PM, Sriram Subramanian <
> > > srsubraman...@linkedin.com.invalid> wrote:
> > >
> > > > Looked at the patch. +1 from me.
> > > >
> > > > On 11/24/14 8:29 PM, "Gwen Shapira"  wrote:
> > > >
> > > > >As one of the people who spent too much time building Avro
> > repositories,
> > > > >+1
> > > > >on bringing serializer API back.
> > > > >
> > > > >I think it will make the new producer easier to work with.
> > > > >
> > > > >Gwen
> > > > >
> > > > >On Mon, Nov 24, 2014 at 6:13 PM, Jay Kreps 
> > wrote:
> > > > >
> > > > >> This is admittedly late in the release cycle to make a change. To
> > add
> > > to
> > > > >> Jun's description the motivation was that we felt it would be
> better
> > > to
> > > > >> change that interface now rather than after the release if it
> needed
> > > to
> > > > >> change.
> > > > >>
> > > > >> The motivation for wanting to make a change was the ability to
> > really
> > > be
> > > > >> able to develop support for Avro and other serialization formats.
> > The
> > > > >> current status is pretty scattered--there is a schema repository
> on
> > an
> > > > >>Avro
> > > > >> JIRA and another fork of that on github, and a bunch of people we
> > have
> > > > >> talked to have done similar things for other serialization
> systems.
> > It
> > > > >> would be nice if these things could be packaged in such a way that
> > it
> > > > >>was
> > > > >> possible to just change a few configs in the producer and get rich
> > > > >>metadata
> > > > >> support for messages.
> > > > >>
> > > > >> As we were thinking this through we realized that the new api we
> > were
> > > > >>about
> > > > >> to introduce was kind of not very compatable with this since it
> was
> > > just
> > > > >> byte[] oriented.
> > > > >>
> > > > >> You can always do this by adding some kind of wrapper api that
> wraps
> > > the
> > > > >> producer. But this puts us back in the position of trying to
> > document
> > > > >>and
> > > > >> support multiple interfaces.
> > > > >>
> > > > >> This also opens up the possibility of adding a MessageValidator or
> > > > >> MessageInterceptor plug-in transparently so that you can do other
> > > custom
> > > > >> validation on the messages you are sending which obviously
> requires
> > > > >>access
> > > > >> to the original object not the byte array.
> > > > >>
> > > > >> This api doesn't prevent using byte[] by configuring the
> > > > >> ByteArraySerializer it works as it currently does.
> > > > >>
> > > > >> -Jay
> > > > >>
> > > > >> On Mon, Nov 24, 2014 at 5:58 PM, Jun Rao 
> wrote:
> > > > >>
> > > > >> > Hi, Everyone,
> > > > >> >
> > > > >> > I'd like to start a discussion on whether it makes sense to add
> > the
> > > > >> > serializer api back to the new java producer. Currently, the new
> > > java
> > > > >> > producer takes a byte array for both the key and the value.
> While
> > > this
> > > > >> api
> > > > >> > is simple, it pushes the serialization logic into the
> application.
> > > > >>This
> > > > >> > makes it hard to reason about what type of data is being sent to
> > > Kafka
> > > > >> and
> > > > >> > also makes it hard to share an implementation of the serializer.
> > For
> > > > >> > example, to support Avro, the serialization logic could be q

Re: benchmark kafka on 10GbE network

2014-11-25 Thread Jay Kreps
Yeah, neither of those are simple to optimize. The CRC is already the
optimized java crc we stole from Hadoop. It may be possible to make that
faster still but probably not easy. It might be possible to optimize out
some of the interrupt calls, though I'm not exactly sure.

One thing, though, is that the crc stuff should be highly paralellizable.
How many partitions/topics do you have? I suspect more writer threads (and
ensuring there are multiple partitions) would also improve throughput.
Basically we need to serialize writes per-partition which also involves
waiting on the crc, but this lock is per partition so if you have N
partitions N threads can all be working at once.

On Tue, Nov 25, 2014 at 3:03 AM, Manu Zhang  wrote:

> Thanks for the explanation.
>
> Here are some stats for I/O thread.
>
> *io-ratio 0.155*
> *io-time-ns-avg 16418.5*
> *io-wait-ratio 0.59*
> *io-wait-time-ns-avg  62881*
>
> It seems to confirm that IO spent much more time waiting than doing real
> work.
>
> Given the above stats, how could I trace down and pinpoint the bottleneck ?
> I guess computing crc32s can not be avoided.
>
> On Fri, Nov 21, 2014 at 12:34 PM, Jay Kreps  wrote:
>
> > So I suspect that the bottleneck is actually in the writer thread (the
> one
> > calling send()), not the I/O thread. You could verify this by checking
> the
> > JMX stats which will give the amount of time the I/O thread spends
> waiting.
> > But since epollWait shows up first that is the I/O thread waiting for
> work.
> >
> > It looks like the big bottleneck is computing the crc32s for the
> messages.
> > The next big hit after that is signaling the I/O thread to wake-up and do
> > work.
> >
> > Here is an annotated version of those traces:
> >
> > These two are bogus and are just background JMX things I think:
> >  1 39.30% 39.30%   79585 300923 java.net.SocketInputStream.socketRead0
> >2 20.62% 59.92%   41750 300450 java.net.PlainSocketImpl.socketAccept
> >
> > This is the I/O thread waiting for work to do
> >3  9.52% 69.45%   19287 300660 sun.nio.ch.EPollArrayWrapper.epollWait
> >
> > These are the real problems:
> >4  9.50% 78.94%   19234 300728 org.apache.kafka.common.
> > record.Record.computeChecksum
> >5  4.14% 83.08%8377 300777 sun.nio.ch.EPollArrayWrapper.interrupt
> >
> > I/O thread doing a write
> >6  2.30% 85.38%4662 300708 sun.nio.ch.FileDispatcherImpl.writev0
> >
> > This is a one time thing when fetching metadata on startup
> >7  1.61% 86.99%3260 300752
> > org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata
> >
> > These are all in the I/O thread so not relevant:
> >8  1.24% 88.23%2501 300804 sun.nio.ch.EPollArrayWrapper.epollWait
> >9  1.08% 89.31%2187 300734
> > org.apache.kafka.clients.producer.internals.RecordBatch.done
> >   10  0.98% 90.29%1991 300870
> > org.apache.kafka.common.protocol.types.Type$6.write
> >   11  0.97% 91.26%1961 300789
> > org.apache.kafka.clients.producer.internals.RecordAccumulator.ready
> >   12  0.96% 92.22%1951 300726
> >
> > -Jay
> >
> > On Thu, Nov 20, 2014 at 5:42 PM, Manu Zhang 
> > wrote:
> >
> > > Ok, here is the hrpof output
> > >
> > > CPU SAMPLES BEGIN (total = 202493) Fri Nov 21 08:07:51 2014
> > > rank   self  accum   count trace method
> > >1 39.30% 39.30%   79585 300923
> java.net.SocketInputStream.socketRead0
> > >2 20.62% 59.92%   41750 300450 java.net.PlainSocketImpl.socketAccept
> > >3  9.52% 69.45%   19287 300660
> sun.nio.ch.EPollArrayWrapper.epollWait
> > >4  9.50% 78.94%   19234 300728
> > > org.apache.kafka.common.record.Record.computeChecksum
> > >5  4.14% 83.08%8377 300777
> sun.nio.ch.EPollArrayWrapper.interrupt
> > >6  2.30% 85.38%4662 300708 sun.nio.ch.FileDispatcherImpl.writev0
> > >7  1.61% 86.99%3260 300752
> > > org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata
> > >8  1.24% 88.23%2501 300804
> sun.nio.ch.EPollArrayWrapper.epollWait
> > >9  1.08% 89.31%2187 300734
> > > org.apache.kafka.clients.producer.internals.RecordBatch.done
> > >   10  0.98% 90.29%1991 300870
> > > org.apache.kafka.common.protocol.types.Type$6.write
> > >   11  0.97% 91.26%1961 300789
> > > org.apache.kafka.clients.producer.internals.RecordAccumulator.ready
> > >   12  0.96% 92.22%1951 300726
> > > org.apache.kafka.common.record.MemoryRecords.append
> > >   13  0.89% 93.12%1809 300829 java.nio.Bits.copyFromArray
> > >   14  0.75% 93.86%1510 300722 java.nio.HeapByteBuffer.
> > >   15  0.54% 94.41%1100 300730
> > > org.apache.kafka.common.record.Compressor.put
> > >   16  0.54% 94.95%1094 300749
> > > org.apache.kafka.clients.producer.internals.RecordAccumulator.append
> > >   17  0.38% 95.33% 771 300755
> > > org.apache.kafka.clients.producer.KafkaProducer.send
> > >   18  0.36% 95.69% 736 300830
> > > org.apache.kafka.common.metrics.Sensor.record
> > >   19  0.35% 96.04% 709 300848 sun.nio.ch.IOUtil.drain
> > > 

Re: [DISCUSSION] adding the serializer api back to the new java producer

2014-11-25 Thread Bhavesh Mistry
How will mix bag will work with Consumer side ?  Entire site can not be
rolled at once so Consumer will have to deals with New and Old Serialize
Bytes ?  This could be app team responsibility.  Are you guys targeting
0.8.2 release, which may break customer who are already using new producer
API (beta version).

Thanks,

Bhavesh

On Tue, Nov 25, 2014 at 8:29 AM, Manikumar Reddy 
wrote:

> +1 for this change.
>
> what about de-serializer  class in 0.8.2?  Say i am using new producer with
> Avro and old consumer combination.
> then i need to give custom Decoder implementation for Avro right?.
>
> On Tue, Nov 25, 2014 at 9:19 PM, Joe Stein  wrote:
>
> > The serializer is an expected use of the producer/consumer now and think
> we
> > should continue that support in the new client. As far as breaking the
> API
> > it is why we released the 0.8.2-beta to help get through just these type
> of
> > blocking issues in a way that the community at large could be involved in
> > easier with a build/binaries to download and use from maven also.
> >
> > +1 on the change now prior to the 0.8.2 release.
> >
> > - Joe Stein
> >
> >
> > On Mon, Nov 24, 2014 at 11:43 PM, Sriram Subramanian <
> > srsubraman...@linkedin.com.invalid> wrote:
> >
> > > Looked at the patch. +1 from me.
> > >
> > > On 11/24/14 8:29 PM, "Gwen Shapira"  wrote:
> > >
> > > >As one of the people who spent too much time building Avro
> repositories,
> > > >+1
> > > >on bringing serializer API back.
> > > >
> > > >I think it will make the new producer easier to work with.
> > > >
> > > >Gwen
> > > >
> > > >On Mon, Nov 24, 2014 at 6:13 PM, Jay Kreps 
> wrote:
> > > >
> > > >> This is admittedly late in the release cycle to make a change. To
> add
> > to
> > > >> Jun's description the motivation was that we felt it would be better
> > to
> > > >> change that interface now rather than after the release if it needed
> > to
> > > >> change.
> > > >>
> > > >> The motivation for wanting to make a change was the ability to
> really
> > be
> > > >> able to develop support for Avro and other serialization formats.
> The
> > > >> current status is pretty scattered--there is a schema repository on
> an
> > > >>Avro
> > > >> JIRA and another fork of that on github, and a bunch of people we
> have
> > > >> talked to have done similar things for other serialization systems.
> It
> > > >> would be nice if these things could be packaged in such a way that
> it
> > > >>was
> > > >> possible to just change a few configs in the producer and get rich
> > > >>metadata
> > > >> support for messages.
> > > >>
> > > >> As we were thinking this through we realized that the new api we
> were
> > > >>about
> > > >> to introduce was kind of not very compatable with this since it was
> > just
> > > >> byte[] oriented.
> > > >>
> > > >> You can always do this by adding some kind of wrapper api that wraps
> > the
> > > >> producer. But this puts us back in the position of trying to
> document
> > > >>and
> > > >> support multiple interfaces.
> > > >>
> > > >> This also opens up the possibility of adding a MessageValidator or
> > > >> MessageInterceptor plug-in transparently so that you can do other
> > custom
> > > >> validation on the messages you are sending which obviously requires
> > > >>access
> > > >> to the original object not the byte array.
> > > >>
> > > >> This api doesn't prevent using byte[] by configuring the
> > > >> ByteArraySerializer it works as it currently does.
> > > >>
> > > >> -Jay
> > > >>
> > > >> On Mon, Nov 24, 2014 at 5:58 PM, Jun Rao  wrote:
> > > >>
> > > >> > Hi, Everyone,
> > > >> >
> > > >> > I'd like to start a discussion on whether it makes sense to add
> the
> > > >> > serializer api back to the new java producer. Currently, the new
> > java
> > > >> > producer takes a byte array for both the key and the value. While
> > this
> > > >> api
> > > >> > is simple, it pushes the serialization logic into the application.
> > > >>This
> > > >> > makes it hard to reason about what type of data is being sent to
> > Kafka
> > > >> and
> > > >> > also makes it hard to share an implementation of the serializer.
> For
> > > >> > example, to support Avro, the serialization logic could be quite
> > > >>involved
> > > >> > since it might need to register the Avro schema in some remote
> > > >>registry
> > > >> and
> > > >> > maintain a schema cache locally, etc. Without a serialization api,
> > > >>it's
> > > >> > impossible to share such an implementation so that people can
> easily
> > > >> reuse.
> > > >> > We sort of overlooked this implication during the initial
> discussion
> > > >>of
> > > >> the
> > > >> > producer api.
> > > >> >
> > > >> > So, I'd like to propose an api change to the new producer by
> adding
> > > >>back
> > > >> > the serializer api similar to what we had in the old producer.
> > > >>Specially,
> > > >> > the proposed api changes are the following.
> > > >> >
> > > >> > First, we change KafkaProducer to take generic types K and V for
> th

Re: [DISCUSSION] adding the serializer api back to the new java producer

2014-11-25 Thread Manikumar Reddy
+1 for this change.

what about de-serializer  class in 0.8.2?  Say i am using new producer with
Avro and old consumer combination.
then i need to give custom Decoder implementation for Avro right?.

On Tue, Nov 25, 2014 at 9:19 PM, Joe Stein  wrote:

> The serializer is an expected use of the producer/consumer now and think we
> should continue that support in the new client. As far as breaking the API
> it is why we released the 0.8.2-beta to help get through just these type of
> blocking issues in a way that the community at large could be involved in
> easier with a build/binaries to download and use from maven also.
>
> +1 on the change now prior to the 0.8.2 release.
>
> - Joe Stein
>
>
> On Mon, Nov 24, 2014 at 11:43 PM, Sriram Subramanian <
> srsubraman...@linkedin.com.invalid> wrote:
>
> > Looked at the patch. +1 from me.
> >
> > On 11/24/14 8:29 PM, "Gwen Shapira"  wrote:
> >
> > >As one of the people who spent too much time building Avro repositories,
> > >+1
> > >on bringing serializer API back.
> > >
> > >I think it will make the new producer easier to work with.
> > >
> > >Gwen
> > >
> > >On Mon, Nov 24, 2014 at 6:13 PM, Jay Kreps  wrote:
> > >
> > >> This is admittedly late in the release cycle to make a change. To add
> to
> > >> Jun's description the motivation was that we felt it would be better
> to
> > >> change that interface now rather than after the release if it needed
> to
> > >> change.
> > >>
> > >> The motivation for wanting to make a change was the ability to really
> be
> > >> able to develop support for Avro and other serialization formats. The
> > >> current status is pretty scattered--there is a schema repository on an
> > >>Avro
> > >> JIRA and another fork of that on github, and a bunch of people we have
> > >> talked to have done similar things for other serialization systems. It
> > >> would be nice if these things could be packaged in such a way that it
> > >>was
> > >> possible to just change a few configs in the producer and get rich
> > >>metadata
> > >> support for messages.
> > >>
> > >> As we were thinking this through we realized that the new api we were
> > >>about
> > >> to introduce was kind of not very compatable with this since it was
> just
> > >> byte[] oriented.
> > >>
> > >> You can always do this by adding some kind of wrapper api that wraps
> the
> > >> producer. But this puts us back in the position of trying to document
> > >>and
> > >> support multiple interfaces.
> > >>
> > >> This also opens up the possibility of adding a MessageValidator or
> > >> MessageInterceptor plug-in transparently so that you can do other
> custom
> > >> validation on the messages you are sending which obviously requires
> > >>access
> > >> to the original object not the byte array.
> > >>
> > >> This api doesn't prevent using byte[] by configuring the
> > >> ByteArraySerializer it works as it currently does.
> > >>
> > >> -Jay
> > >>
> > >> On Mon, Nov 24, 2014 at 5:58 PM, Jun Rao  wrote:
> > >>
> > >> > Hi, Everyone,
> > >> >
> > >> > I'd like to start a discussion on whether it makes sense to add the
> > >> > serializer api back to the new java producer. Currently, the new
> java
> > >> > producer takes a byte array for both the key and the value. While
> this
> > >> api
> > >> > is simple, it pushes the serialization logic into the application.
> > >>This
> > >> > makes it hard to reason about what type of data is being sent to
> Kafka
> > >> and
> > >> > also makes it hard to share an implementation of the serializer. For
> > >> > example, to support Avro, the serialization logic could be quite
> > >>involved
> > >> > since it might need to register the Avro schema in some remote
> > >>registry
> > >> and
> > >> > maintain a schema cache locally, etc. Without a serialization api,
> > >>it's
> > >> > impossible to share such an implementation so that people can easily
> > >> reuse.
> > >> > We sort of overlooked this implication during the initial discussion
> > >>of
> > >> the
> > >> > producer api.
> > >> >
> > >> > So, I'd like to propose an api change to the new producer by adding
> > >>back
> > >> > the serializer api similar to what we had in the old producer.
> > >>Specially,
> > >> > the proposed api changes are the following.
> > >> >
> > >> > First, we change KafkaProducer to take generic types K and V for the
> > >>key
> > >> > and the value, respectively.
> > >> >
> > >> > public class KafkaProducer implements Producer {
> > >> >
> > >> > public Future send(ProducerRecord record,
> > >> Callback
> > >> > callback);
> > >> >
> > >> > public Future send(ProducerRecord record);
> > >> > }
> > >> >
> > >> > Second, we add two new configs, one for the key serializer and
> another
> > >> for
> > >> > the value serializer. Both serializers will default to the byte
> array
> > >> > implementation.
> > >> >
> > >> > public class ProducerConfig extends AbstractConfig {
> > >> >
> > >> > .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS,
> > >> > "org.apach

Re: [DISCUSSION] adding the serializer api back to the new java producer

2014-11-25 Thread Joe Stein
The serializer is an expected use of the producer/consumer now and think we
should continue that support in the new client. As far as breaking the API
it is why we released the 0.8.2-beta to help get through just these type of
blocking issues in a way that the community at large could be involved in
easier with a build/binaries to download and use from maven also.

+1 on the change now prior to the 0.8.2 release.

- Joe Stein


On Mon, Nov 24, 2014 at 11:43 PM, Sriram Subramanian <
srsubraman...@linkedin.com.invalid> wrote:

> Looked at the patch. +1 from me.
>
> On 11/24/14 8:29 PM, "Gwen Shapira"  wrote:
>
> >As one of the people who spent too much time building Avro repositories,
> >+1
> >on bringing serializer API back.
> >
> >I think it will make the new producer easier to work with.
> >
> >Gwen
> >
> >On Mon, Nov 24, 2014 at 6:13 PM, Jay Kreps  wrote:
> >
> >> This is admittedly late in the release cycle to make a change. To add to
> >> Jun's description the motivation was that we felt it would be better to
> >> change that interface now rather than after the release if it needed to
> >> change.
> >>
> >> The motivation for wanting to make a change was the ability to really be
> >> able to develop support for Avro and other serialization formats. The
> >> current status is pretty scattered--there is a schema repository on an
> >>Avro
> >> JIRA and another fork of that on github, and a bunch of people we have
> >> talked to have done similar things for other serialization systems. It
> >> would be nice if these things could be packaged in such a way that it
> >>was
> >> possible to just change a few configs in the producer and get rich
> >>metadata
> >> support for messages.
> >>
> >> As we were thinking this through we realized that the new api we were
> >>about
> >> to introduce was kind of not very compatable with this since it was just
> >> byte[] oriented.
> >>
> >> You can always do this by adding some kind of wrapper api that wraps the
> >> producer. But this puts us back in the position of trying to document
> >>and
> >> support multiple interfaces.
> >>
> >> This also opens up the possibility of adding a MessageValidator or
> >> MessageInterceptor plug-in transparently so that you can do other custom
> >> validation on the messages you are sending which obviously requires
> >>access
> >> to the original object not the byte array.
> >>
> >> This api doesn't prevent using byte[] by configuring the
> >> ByteArraySerializer it works as it currently does.
> >>
> >> -Jay
> >>
> >> On Mon, Nov 24, 2014 at 5:58 PM, Jun Rao  wrote:
> >>
> >> > Hi, Everyone,
> >> >
> >> > I'd like to start a discussion on whether it makes sense to add the
> >> > serializer api back to the new java producer. Currently, the new java
> >> > producer takes a byte array for both the key and the value. While this
> >> api
> >> > is simple, it pushes the serialization logic into the application.
> >>This
> >> > makes it hard to reason about what type of data is being sent to Kafka
> >> and
> >> > also makes it hard to share an implementation of the serializer. For
> >> > example, to support Avro, the serialization logic could be quite
> >>involved
> >> > since it might need to register the Avro schema in some remote
> >>registry
> >> and
> >> > maintain a schema cache locally, etc. Without a serialization api,
> >>it's
> >> > impossible to share such an implementation so that people can easily
> >> reuse.
> >> > We sort of overlooked this implication during the initial discussion
> >>of
> >> the
> >> > producer api.
> >> >
> >> > So, I'd like to propose an api change to the new producer by adding
> >>back
> >> > the serializer api similar to what we had in the old producer.
> >>Specially,
> >> > the proposed api changes are the following.
> >> >
> >> > First, we change KafkaProducer to take generic types K and V for the
> >>key
> >> > and the value, respectively.
> >> >
> >> > public class KafkaProducer implements Producer {
> >> >
> >> > public Future send(ProducerRecord record,
> >> Callback
> >> > callback);
> >> >
> >> > public Future send(ProducerRecord record);
> >> > }
> >> >
> >> > Second, we add two new configs, one for the key serializer and another
> >> for
> >> > the value serializer. Both serializers will default to the byte array
> >> > implementation.
> >> >
> >> > public class ProducerConfig extends AbstractConfig {
> >> >
> >> > .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS,
> >> > "org.apache.kafka.clients.producer.ByteArraySerializer",
> >>Importance.HIGH,
> >> > KEY_SERIALIZER_CLASS_DOC)
> >> > .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS,
> >> > "org.apache.kafka.clients.producer.ByteArraySerializer",
> >>Importance.HIGH,
> >> > VALUE_SERIALIZER_CLASS_DOC);
> >> > }
> >> >
> >> > Both serializers will implement the following interface.
> >> >
> >> > public interface Serializer extends Configurable {
> >> > public byte[] serialize(String topic, T data, boolean isKey);
> >> >
> >>

How many messages does each broker have?

2014-11-25 Thread Palur Sandeep
Dear Developers,

I am using the default partitioning logic(Random Partitioning) to produce
messages into brokers. That is I don't use a partitioner.class.

My requirement is If I produce 10 messages using the below code for a
broker that has 8 partitions across 8 nodes. How many messages will each
partition have? Is there any API that can help me find the broker id of the
each message I consume from the consumer side?

PS: I dont want to use partitioner.class. I want use the kafka's default
partitioning logic.

  KeyedMessage data = new KeyedMessage(topic_name,new_mes);

producer.send(data);

-- 
Regards,
Sandeep Palur
Data-Intensive Distributed Systems Laboratory, CS/IIT
Department of Computer Science, Illinois Institute of Technology (IIT)
Phone : 312-647-9833
Email : psand...@hawk.iit.edu 


Re: [DISCUSSION] adding the serializer api back to the new java producer

2014-11-25 Thread Jonathan Weeks
+1 on this change — APIs are forever. As much as we’d love to see 0.8.2 release 
ASAP, it is important to get this right.

-JW

> On Nov 24, 2014, at 5:58 PM, Jun Rao  wrote:
> 
> Hi, Everyone,
> 
> I'd like to start a discussion on whether it makes sense to add the
> serializer api back to the new java producer. Currently, the new java
> producer takes a byte array for both the key and the value. While this api
> is simple, it pushes the serialization logic into the application. This
> makes it hard to reason about what type of data is being sent to Kafka and
> also makes it hard to share an implementation of the serializer. For
> example, to support Avro, the serialization logic could be quite involved
> since it might need to register the Avro schema in some remote registry and
> maintain a schema cache locally, etc. Without a serialization api, it's
> impossible to share such an implementation so that people can easily reuse.
> We sort of overlooked this implication during the initial discussion of the
> producer api.
> 
> So, I'd like to propose an api change to the new producer by adding back
> the serializer api similar to what we had in the old producer. Specially,
> the proposed api changes are the following.
> 
> First, we change KafkaProducer to take generic types K and V for the key
> and the value, respectively.
> 
> public class KafkaProducer implements Producer {
> 
>public Future send(ProducerRecord record, Callback
> callback);
> 
>public Future send(ProducerRecord record);
> }
> 
> Second, we add two new configs, one for the key serializer and another for
> the value serializer. Both serializers will default to the byte array
> implementation.
> 
> public class ProducerConfig extends AbstractConfig {
> 
>.define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS,
> "org.apache.kafka.clients.producer.ByteArraySerializer", Importance.HIGH,
> KEY_SERIALIZER_CLASS_DOC)
>.define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS,
> "org.apache.kafka.clients.producer.ByteArraySerializer", Importance.HIGH,
> VALUE_SERIALIZER_CLASS_DOC);
> }
> 
> Both serializers will implement the following interface.
> 
> public interface Serializer extends Configurable {
>public byte[] serialize(String topic, T data, boolean isKey);
> 
>public void close();
> }
> 
> This is more or less the same as what's in the old producer. The slight
> differences are (1) the serializer now only requires a parameter-less
> constructor; (2) the serializer has a configure() and a close() method for
> initialization and cleanup, respectively; (3) the serialize() method
> additionally takes the topic and an isKey indicator, both of which are
> useful for things like schema registration.
> 
> The detailed changes are included in KAFKA-1797. For completeness, I also
> made the corresponding changes for the new java consumer api as well.
> 
> Note that the proposed api changes are incompatible with what's in the
> 0.8.2 branch. However, if those api changes are beneficial, it's probably
> better to include them now in the 0.8.2 release, rather than later.
> 
> I'd like to discuss mainly two things in this thread.
> 1. Do people feel that the proposed api changes are reasonable?
> 2. Are there any concerns of including the api changes in the 0.8.2 final
> release?
> 
> Thanks,
> 
> Jun



Re: benchmark kafka on 10GbE network

2014-11-25 Thread Manu Zhang
Thanks for the explanation.

Here are some stats for I/O thread.

*io-ratio 0.155*
*io-time-ns-avg 16418.5*
*io-wait-ratio 0.59*
*io-wait-time-ns-avg  62881*

It seems to confirm that IO spent much more time waiting than doing real
work.

Given the above stats, how could I trace down and pinpoint the bottleneck ?
I guess computing crc32s can not be avoided.

On Fri, Nov 21, 2014 at 12:34 PM, Jay Kreps  wrote:

> So I suspect that the bottleneck is actually in the writer thread (the one
> calling send()), not the I/O thread. You could verify this by checking the
> JMX stats which will give the amount of time the I/O thread spends waiting.
> But since epollWait shows up first that is the I/O thread waiting for work.
>
> It looks like the big bottleneck is computing the crc32s for the messages.
> The next big hit after that is signaling the I/O thread to wake-up and do
> work.
>
> Here is an annotated version of those traces:
>
> These two are bogus and are just background JMX things I think:
>  1 39.30% 39.30%   79585 300923 java.net.SocketInputStream.socketRead0
>2 20.62% 59.92%   41750 300450 java.net.PlainSocketImpl.socketAccept
>
> This is the I/O thread waiting for work to do
>3  9.52% 69.45%   19287 300660 sun.nio.ch.EPollArrayWrapper.epollWait
>
> These are the real problems:
>4  9.50% 78.94%   19234 300728 org.apache.kafka.common.
> record.Record.computeChecksum
>5  4.14% 83.08%8377 300777 sun.nio.ch.EPollArrayWrapper.interrupt
>
> I/O thread doing a write
>6  2.30% 85.38%4662 300708 sun.nio.ch.FileDispatcherImpl.writev0
>
> This is a one time thing when fetching metadata on startup
>7  1.61% 86.99%3260 300752
> org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata
>
> These are all in the I/O thread so not relevant:
>8  1.24% 88.23%2501 300804 sun.nio.ch.EPollArrayWrapper.epollWait
>9  1.08% 89.31%2187 300734
> org.apache.kafka.clients.producer.internals.RecordBatch.done
>   10  0.98% 90.29%1991 300870
> org.apache.kafka.common.protocol.types.Type$6.write
>   11  0.97% 91.26%1961 300789
> org.apache.kafka.clients.producer.internals.RecordAccumulator.ready
>   12  0.96% 92.22%1951 300726
>
> -Jay
>
> On Thu, Nov 20, 2014 at 5:42 PM, Manu Zhang 
> wrote:
>
> > Ok, here is the hrpof output
> >
> > CPU SAMPLES BEGIN (total = 202493) Fri Nov 21 08:07:51 2014
> > rank   self  accum   count trace method
> >1 39.30% 39.30%   79585 300923 java.net.SocketInputStream.socketRead0
> >2 20.62% 59.92%   41750 300450 java.net.PlainSocketImpl.socketAccept
> >3  9.52% 69.45%   19287 300660 sun.nio.ch.EPollArrayWrapper.epollWait
> >4  9.50% 78.94%   19234 300728
> > org.apache.kafka.common.record.Record.computeChecksum
> >5  4.14% 83.08%8377 300777 sun.nio.ch.EPollArrayWrapper.interrupt
> >6  2.30% 85.38%4662 300708 sun.nio.ch.FileDispatcherImpl.writev0
> >7  1.61% 86.99%3260 300752
> > org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata
> >8  1.24% 88.23%2501 300804 sun.nio.ch.EPollArrayWrapper.epollWait
> >9  1.08% 89.31%2187 300734
> > org.apache.kafka.clients.producer.internals.RecordBatch.done
> >   10  0.98% 90.29%1991 300870
> > org.apache.kafka.common.protocol.types.Type$6.write
> >   11  0.97% 91.26%1961 300789
> > org.apache.kafka.clients.producer.internals.RecordAccumulator.ready
> >   12  0.96% 92.22%1951 300726
> > org.apache.kafka.common.record.MemoryRecords.append
> >   13  0.89% 93.12%1809 300829 java.nio.Bits.copyFromArray
> >   14  0.75% 93.86%1510 300722 java.nio.HeapByteBuffer.
> >   15  0.54% 94.41%1100 300730
> > org.apache.kafka.common.record.Compressor.put
> >   16  0.54% 94.95%1094 300749
> > org.apache.kafka.clients.producer.internals.RecordAccumulator.append
> >   17  0.38% 95.33% 771 300755
> > org.apache.kafka.clients.producer.KafkaProducer.send
> >   18  0.36% 95.69% 736 300830
> > org.apache.kafka.common.metrics.Sensor.record
> >   19  0.35% 96.04% 709 300848 sun.nio.ch.IOUtil.drain
> >   20  0.33% 96.37% 665 300814 sun.nio.ch.IOUtil.drain
> >   21  0.32% 96.69% 644 300812
> > org.apache.kafka.common.metrics.Sensor.record
> >   22  0.31% 97.00% 626 300725
> > org.apache.kafka.clients.producer.internals.Partitioner.partition
> >   23  0.28% 97.28% 571 300729
> > org.apache.kafka.clients.producer.internals.RecordAccumulator.append
> >   24  0.26% 97.54% 535 300764
> > org.apache.log4j.Category.getEffectiveLevel
> >   25  0.25% 97.79% 501 300924
> > org.apache.kafka.common.protocol.types.Schema.write
> >   26  0.19% 97.98% 392 300802
> > org.apache.kafka.common.metrics.Sensor.record
> >   27  0.19% 98.17% 386 300797
> > org.apache.kafka.common.metrics.Sensor.record
> >   28  0.17% 98.34% 342 300739
> > org.apache.kafka.common.record.Record.write
> >   29  0.16% 98.50% 315 300792
> > org.apache.kafka.common.record.Record.write
> >   30  0.15% 98.64% 294 300757
> > org.apa

dynamically changing log level on a running broker?

2014-11-25 Thread ben fleis
Hello hello,

>From what I see at KAFKA-16 
and KAFKA-429 , it should
be possible to change log levels on a running broker process.  The patches
appears (to my naive eyes: I speak no Java/MBeans, and very little Scala)
to provide the capability, but it's not clear to me -- how can I actually
do this from the command line on a running system?  Anybody have a simple
example of this?  Have I simply missed it?

Thanks!

b