Kafka: writing custom Encoder/Serializer
I am trying to build a POC with Kafka 0.8.1. I am using my own java class as a Kafka message which has a bunch of String data types. For serializer.class property in my producer, I cannot use the default serializer class or the String serializer class that comes with Kafka library. I guess I need to write my own serializer and feed it to the producer properties. If you are aware of writing an example custom serializer in Kafka (in java), please do share. Appreciate a lot, thanks much. I tried to use something like below, but I get the exception: Exception in thread main java.lang.NoSuchMethodException: test.EventsDataSerializer.init(kafka.utils.VerifiableProperties) at java.lang.Class.getConstructor0(Class.java:2971) package test; import java.io.IOException; import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.databind.ObjectMapper; import kafka.message.Message; import kafka.serializer.Decoder; import kafka.serializer.Encoder; public class EventsDataSerializer implements EncoderSimulateEvent, DecoderSimulateEvent { public Message toMessage(SimulateEvent eventDetails) { try { ObjectMapper mapper = new ObjectMapper(new JsonFactory()); byte[] serialized = mapper.writeValueAsBytes(eventDetails); return new Message(serialized); } catch (IOException e) { e.printStackTrace(); return null; // TODO } } public SimulateEvent toEvent(Message message) { SimulateEvent event = new SimulateEvent(); ObjectMapper mapper = new ObjectMapper(new JsonFactory()); try { //TODO handle error return mapper.readValue(message.payload().array(), SimulateEvent.class); } catch (IOException e) { e.printStackTrace(); return null; } } public byte[] toBytes(SimulateEvent arg0) { // TODO Auto-generated method stub return null; } public SimulateEvent fromBytes(byte[] arg0) { // TODO Auto-generated method stub return null; } }
Re: Kafka: writing custom Encoder/Serializer
you can send byte[] that you get by using your own serializer ; through kafka ().On the reciving side u can deseraialize from the byte[] and read back your object.for using this you will have to supply serializer.class=kafka.serializer.DefaultEncoder in the properties. On Tue, May 20, 2014 at 4:23 PM, Kumar Pradeep kprad...@novell.com wrote: I am trying to build a POC with Kafka 0.8.1. I am using my own java class as a Kafka message which has a bunch of String data types. For serializer.class property in my producer, I cannot use the default serializer class or the String serializer class that comes with Kafka library. I guess I need to write my own serializer and feed it to the producer properties. If you are aware of writing an example custom serializer in Kafka (in java), please do share. Appreciate a lot, thanks much. I tried to use something like below, but I get the exception: Exception in thread main java.lang.NoSuchMethodException: test.EventsDataSerializer.init(kafka.utils.VerifiableProperties) at java.lang.Class.getConstructor0(Class.java:2971) package test; import java.io.IOException; import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.databind.ObjectMapper; import kafka.message.Message; import kafka.serializer.Decoder; import kafka.serializer.Encoder; public class EventsDataSerializer implements EncoderSimulateEvent, DecoderSimulateEvent { public Message toMessage(SimulateEvent eventDetails) { try { ObjectMapper mapper = new ObjectMapper(new JsonFactory()); byte[] serialized = mapper.writeValueAsBytes(eventDetails); return new Message(serialized); } catch (IOException e) { e.printStackTrace(); return null; // TODO } } public SimulateEvent toEvent(Message message) { SimulateEvent event = new SimulateEvent(); ObjectMapper mapper = new ObjectMapper(new JsonFactory()); try { //TODO handle error return mapper.readValue(message.payload().array(), SimulateEvent.class); } catch (IOException e) { e.printStackTrace(); return null; } } public byte[] toBytes(SimulateEvent arg0) { // TODO Auto-generated method stub return null; } public SimulateEvent fromBytes(byte[] arg0) { // TODO Auto-generated method stub return null; } }
Re: Make kafka storage engine pluggable and provide a HDFS plugin?
Take a look at Camus https://github.com/linkedin/camus/ François Langelier Étudiant en génie Logiciel - École de Technologie Supérieurehttp://www.etsmtl.ca/ Capitaine Club Capra http://capra.etsmtl.ca/ VP-Communication - CS Games http://csgames.org 2014 Jeux de Génie http://www.jdgets.com/ 2011 à 2014 Argentier Fraternité du Piranha http://fraternitedupiranha.com/ 2012-2014 Comité Organisateur Olympiades ÉTS 2012 Compétition Québécoise d'Ingénierie 2012 - Compétition Senior On 19 May 2014 05:28, Hangjun Ye yehang...@gmail.com wrote: Hi there, I recently started to use Kafka for our data analysis pipeline and it works very well. One problem to us so far is expanding our cluster when we need more storage space. Kafka provides some scripts for helping do this but the process wasn't smooth. To make it work perfectly, seems Kafka needs to do some jobs that a distributed file system has already done. So just wondering if any thoughts to make Kafka work on top of HDFS? Maybe make the Kafka storage engine pluggable and HDFS is one option? The pros might be that HDFS has already handled storage management (replication, corrupted disk/machine, migration, load balance, etc.) very well and it frees Kafka and the users from the burden, and the cons might be performance degradation. As Kafka does very well on performance, possibly even with some degree of degradation, it's still competitive for the most situations. Best, -- Hangjun Ye
Re: Kafka: writing custom Encoder/Serializer
The customized encoder/decoder has to have a constructor that takes (VerifiableProperties: props). Alternatively, you could do the encoding/decoding outside of Kafka client and just send byte[] to Kafka. The pluggable encoder/decoder will be gradually phased out in the future. Thanks, Jun On Tue, May 20, 2014 at 3:53 AM, Kumar Pradeep kprad...@novell.com wrote: I am trying to build a POC with Kafka 0.8.1. I am using my own java class as a Kafka message which has a bunch of String data types. For serializer.class property in my producer, I cannot use the default serializer class or the String serializer class that comes with Kafka library. I guess I need to write my own serializer and feed it to the producer properties. If you are aware of writing an example custom serializer in Kafka (in java), please do share. Appreciate a lot, thanks much. I tried to use something like below, but I get the exception: Exception in thread main java.lang.NoSuchMethodException: test.EventsDataSerializer.init(kafka.utils.VerifiableProperties) at java.lang.Class.getConstructor0(Class.java:2971) package test; import java.io.IOException; import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.databind.ObjectMapper; import kafka.message.Message; import kafka.serializer.Decoder; import kafka.serializer.Encoder; public class EventsDataSerializer implements EncoderSimulateEvent, DecoderSimulateEvent { public Message toMessage(SimulateEvent eventDetails) { try { ObjectMapper mapper = new ObjectMapper(new JsonFactory()); byte[] serialized = mapper.writeValueAsBytes(eventDetails); return new Message(serialized); } catch (IOException e) { e.printStackTrace(); return null; // TODO } } public SimulateEvent toEvent(Message message) { SimulateEvent event = new SimulateEvent(); ObjectMapper mapper = new ObjectMapper(new JsonFactory()); try { //TODO handle error return mapper.readValue(message.payload().array(), SimulateEvent.class); } catch (IOException e) { e.printStackTrace(); return null; } } public byte[] toBytes(SimulateEvent arg0) { // TODO Auto-generated method stub return null; } public SimulateEvent fromBytes(byte[] arg0) { // TODO Auto-generated method stub return null; } }
Re: Kafka: writing custom Encoder/Serializer
Thanks Pushkar for your response. I tried to send my own byte array; however the Kafka Producer Class does not take byte [] as input type. Do you have an example of this? Please share if you do; really appreciate. Here is my code: public class TestEventProducer { public static void main(String[] args) { String topic = test-topic; long eventsNum = 10; Properties props = new Properties(); props.put(metadata.broker.list, localhost:9092); props.put(serializer.class, kafka.serializer.DefaultEncoder ); props.put(request.required.acks, 0); ProducerConfig config = new ProducerConfig(props); byte [] rawData; ProducerString, rawData producer = new ProducerString, rawData(config); //compillation error rawData cannot be resolved to a type long start = System.currentTimeMillis(); for (long nEvents = 0; nEvents eventsNum; nEvents++) { SimulateEvent event = new SimulateEvent(); try { rawData = Serializer.serialize(event); } catch (IOException e) { e.printStackTrace(); } KeyedMessageString, rawData data = new KeyedMessageString, rawData(topic, event); producer.send(data); System.out.println(produced event#: + nEvents + + data); } System.out.println(Took + (System.currentTimeMillis() - start) + to produce + eventsNum + messages); producer.close(); } } public class Serializer { public static byte[] serialize(Object obj) throws IOException { ByteArrayOutputStream b = new ByteArrayOutputStream(); ObjectOutputStream o = new ObjectOutputStream(b); o.writeObject(obj); return b.toByteArray(); } public static Object deserialize(byte[] bytes) throws IOException, ClassNotFoundException { ByteArrayInputStream b = new ByteArrayInputStream(bytes); ObjectInputStream o = new ObjectInputStream(b); return o.readObject(); } } pushkar priyadarshi priyadarshi.push...@gmail.com 5/20/2014 5:11 PM you can send byte[] that you get by using your own serializer ; through kafka ().On the reciving side u can deseraialize from the byte[] and read back your object.for using this you will have to supply serializer.class=kafka.serializer.DefaultEncoder in the properties. On Tue, May 20, 2014 at 4:23 PM, Kumar Pradeep kprad...@novell.com wrote: I am trying to build a POC with Kafka 0.8.1. I am using my own java class as a Kafka message which has a bunch of String data types. For serializer.class property in my producer, I cannot use the default serializer class or the String serializer class that comes with Kafka library. I guess I need to write my own serializer and feed it to the producer properties. If you are aware of writing an example custom serializer in Kafka (in java), please do share. Appreciate a lot, thanks much. I tried to use something like below, but I get the exception: Exception in thread main java.lang.NoSuchMethodException: test.EventsDataSerializer.init(kafka.utils.VerifiableProperties) at java.lang.Class.getConstructor0(Class.java:2971) package test; import java.io.IOException; import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.databind.ObjectMapper; import kafka.message.Message; import kafka.serializer.Decoder; import kafka.serializer.Encoder; public class EventsDataSerializer implements EncoderSimulateEvent, DecoderSimulateEvent { public Message toMessage(SimulateEvent eventDetails) { try { ObjectMapper mapper = new ObjectMapper(new JsonFactory()); byte[] serialized = mapper.writeValueAsBytes(eventDetails); return new Message(serialized); } catch (IOException e) { e.printStackTrace(); return null; // TODO } } public SimulateEvent toEvent(Message message) { SimulateEvent event = new SimulateEvent(); ObjectMapper mapper = new ObjectMapper(new JsonFactory()); try { //TODO handle error return mapper.readValue(message.payload().array(), SimulateEvent.class); } catch (IOException e) { e.printStackTrace(); return null; } } public byte[] toBytes(SimulateEvent arg0) { // TODO Auto-generated method stub return null; } public SimulateEvent fromBytes(byte[] arg0) { // TODO Auto-generated
Re: Consistent replication of an event stream into Kafka
We plan to work on the feature this summer, and make it available in the 0.9 release. Please try it out then and give us any feedbacks you have. Guozhang On Tue, May 20, 2014 at 9:23 AM, Bob Potter bobby.pot...@gmail.com wrote: Hi Guozhang, That looks great! I think it would solve our case. Thanks, Bob On 20 May 2014 00:18, Guozhang Wang wangg...@gmail.com wrote: Hello Bob, What you described is similar to the idempotent producer design that we are now discussing about: https://cwiki.apache.org/confluence/display/KAFKA/Idempotent+Producer Do you think this new feature will solve your case? Guozhang On Mon, May 19, 2014 at 2:40 PM, Bob Potter bobby.pot...@gmail.com wrote: Hello, We have a use case where we want to replicate an event stream which exists outside of kafka into a kafka topic (single partition). The event stream has sequence ids which always increase by 1. We want to preserve this ordering. The difficulty is that we want to be able to have the process that writes these events automatically fail-over if it dies. While ZooKeeper can guarantee a single writer at a given point in time we are worried about delayed network packets, bugs and long GC pauses. One solution we've thought of is to set the sequence_id as the key for the Kafka messages and have a proxy running on each Kafka broker which refuses to write new messages if they don't have the next expected key. This seems to solve any issue we would have with badly behaving networks or processes. Is there a better solution? Should we just handle these inconsistencies in our consumers? Are we being too paranoid? As a side-note, it seems like this functionality (guaranteeing that all keys in a partition are in sequence on a particular topic) may be a nice option to have in Kafka proper. Thanks, Bob -- -- Guozhang -- Bob Potter -- -- Guozhang
RE: SocketServerStats not reporting bytes written or read
I checked the stats with jconsole and it confirms the reading I've been getting with jmxtrans so the problem is with the jmx beans themselves I think. The stats just came back to normal again and I don't know why. I haven't made any changes to the kafka brokers. -Original Message- From: Jun Rao [mailto:jun...@gmail.com] Sent: Monday, May 19, 2014 9:45 PM To: users@kafka.apache.org Subject: Re: SocketServerStats not reporting bytes written or read Is the problem with the jmx beans themselves or jmxstats? Thanks, Jun On Mon, May 19, 2014 at 2:48 PM, Xuyen On x...@ancestry.com wrote: Hi all, I have an intermittent problem with the JMX SocketServer stats on my 0.7.2 Kafka cluster. I'm collecting the SocketServerStats with jmxstats and everything seems to be working fine except kafka.SocketServerStats:BytesWrittenPerSecond and kafka.SocketServerStats:BytesReadPerSecond are not working all the time. It sometimes will cut out and not report any traffic and then it will randomly report back normal stats. I've noticed that when I started a new topic and starting sending data with a new producer, the stats for bytes written and read will suddenly zero out. Funny thing is that the other stats seem to still be working fine including cumulative bytes read and written. Does anyone know what might be causing this and how I can fix it? Thanks, Xuyen
RE: SocketServerStats not reporting bytes written or read
To be more clear, 1. I am using jmxtrans to get the data not jmstats. Sorry about the misspelling. 2. When I say the stats zero out, I mean that I am not able to get new values when I refresh with a new query from jmxtrans or jconsole. This only happens for the kafka.SocketServerStats:BytesWrittenPerSecond and kafka.SocketServerStats:BytesReadPerSecond stats. The other stats seem to update fine. Any ideas why this might be? -Original Message- From: Xuyen On Sent: Tuesday, May 20, 2014 11:24 AM To: users@kafka.apache.org Subject: RE: SocketServerStats not reporting bytes written or read I checked the stats with jconsole and it confirms the reading I've been getting with jmxtrans so the problem is with the jmx beans themselves I think. The stats just came back to normal again and I don't know why. I haven't made any changes to the kafka brokers. -Original Message- From: Jun Rao [mailto:jun...@gmail.com] Sent: Monday, May 19, 2014 9:45 PM To: users@kafka.apache.org Subject: Re: SocketServerStats not reporting bytes written or read Is the problem with the jmx beans themselves or jmxstats? Thanks, Jun On Mon, May 19, 2014 at 2:48 PM, Xuyen On x...@ancestry.com wrote: Hi all, I have an intermittent problem with the JMX SocketServer stats on my 0.7.2 Kafka cluster. I'm collecting the SocketServerStats with jmxstats and everything seems to be working fine except kafka.SocketServerStats:BytesWrittenPerSecond and kafka.SocketServerStats:BytesReadPerSecond are not working all the time. It sometimes will cut out and not report any traffic and then it will randomly report back normal stats. I've noticed that when I started a new topic and starting sending data with a new producer, the stats for bytes written and read will suddenly zero out. Funny thing is that the other stats seem to still be working fine including cumulative bytes read and written. Does anyone know what might be causing this and how I can fix it? Thanks, Xuyen
RE: SocketServerStats not reporting bytes written or read
Sorry I lied, The following do not update: ProduceRequestsPerSecond FetchRequestsPerSecond AvgProduceRequestMs MaxProduceRequestMs AvgFetchRequestMs MaxFetchRequestMs BytesReadPerSecond BytesWrittenPerSecond These stats do update with new values: NumFetchRequests NumProduceRequests TotalBytesRead TotalFetchRequestMs TotalProduceRequestMs -Original Message- From: Xuyen On Sent: Tuesday, May 20, 2014 11:30 AM To: 'users@kafka.apache.org' Subject: RE: SocketServerStats not reporting bytes written or read To be more clear, 1. I am using jmxtrans to get the data not jmstats. Sorry about the misspelling. 2. When I say the stats zero out, I mean that I am not able to get new values when I refresh with a new query from jmxtrans or jconsole. This only happens for the kafka.SocketServerStats:BytesWrittenPerSecond and kafka.SocketServerStats:BytesReadPerSecond stats. The other stats seem to update fine. Any ideas why this might be? -Original Message- From: Xuyen On Sent: Tuesday, May 20, 2014 11:24 AM To: users@kafka.apache.org Subject: RE: SocketServerStats not reporting bytes written or read I checked the stats with jconsole and it confirms the reading I've been getting with jmxtrans so the problem is with the jmx beans themselves I think. The stats just came back to normal again and I don't know why. I haven't made any changes to the kafka brokers. -Original Message- From: Jun Rao [mailto:jun...@gmail.com] Sent: Monday, May 19, 2014 9:45 PM To: users@kafka.apache.org Subject: Re: SocketServerStats not reporting bytes written or read Is the problem with the jmx beans themselves or jmxstats? Thanks, Jun On Mon, May 19, 2014 at 2:48 PM, Xuyen On x...@ancestry.com wrote: Hi all, I have an intermittent problem with the JMX SocketServer stats on my 0.7.2 Kafka cluster. I'm collecting the SocketServerStats with jmxstats and everything seems to be working fine except kafka.SocketServerStats:BytesWrittenPerSecond and kafka.SocketServerStats:BytesReadPerSecond are not working all the time. It sometimes will cut out and not report any traffic and then it will randomly report back normal stats. I've noticed that when I started a new topic and starting sending data with a new producer, the stats for bytes written and read will suddenly zero out. Funny thing is that the other stats seem to still be working fine including cumulative bytes read and written. Does anyone know what might be causing this and how I can fix it? Thanks, Xuyen
Re: starting of at a small scale, single ec2 instance with 7.5 GB RAM with kafka
It is not recommended to install both kafka and zookeeper on the same box as both would fight for the available memory and performance will degrade. Thanks Neha On Mon, May 19, 2014 at 7:29 AM, S Ahmed sahmed1...@gmail.com wrote: Hi, I like how kafka operates, but I'm wondering if it is possible to run everything on a single ec2 instance with 7.5 GB RAM. So that would be zookeeper and a single kafka broker. I would have a separate server to consume from the broker. Producers would be from my web servers. I don't want to complicate things as i don't really need failover or redundancy etc. I just want to keep things simple. I'll have a single topic, and a few partitions because I want the guarantee that the messages are in order. Is this something that would be really out of the norm and not recommended? i.e. nobody really uses it this way and who knows what is going to happen? :)
Re: starting of at a small scale, single ec2 instance with 7.5 GB RAM with kafka
Yes agreed, but I have done some load testing before and kafka was doing 10's of thousands of messages per second. If I am doing only hundreds, I think it could handle it for now. Like I said this is small scale. On Tue, May 20, 2014 at 2:51 PM, Neha Narkhede neha.narkh...@gmail.comwrote: It is not recommended to install both kafka and zookeeper on the same box as both would fight for the available memory and performance will degrade. Thanks Neha On Mon, May 19, 2014 at 7:29 AM, S Ahmed sahmed1...@gmail.com wrote: Hi, I like how kafka operates, but I'm wondering if it is possible to run everything on a single ec2 instance with 7.5 GB RAM. So that would be zookeeper and a single kafka broker. I would have a separate server to consume from the broker. Producers would be from my web servers. I don't want to complicate things as i don't really need failover or redundancy etc. I just want to keep things simple. I'll have a single topic, and a few partitions because I want the guarantee that the messages are in order. Is this something that would be really out of the norm and not recommended? i.e. nobody really uses it this way and who knows what is going to happen? :)
Re: starting of at a small scale, single ec2 instance with 7.5 GB RAM with kafka
If you really only care about small scale (no HA, no horizontal scaling), you could also consider using Redis instead of Kafka for queueing. - Niek On Tue, May 20, 2014 at 2:23 PM, S Ahmed sahmed1...@gmail.com wrote: Yes agreed, but I have done some load testing before and kafka was doing 10's of thousands of messages per second. If I am doing only hundreds, I think it could handle it for now. Like I said this is small scale. On Tue, May 20, 2014 at 2:51 PM, Neha Narkhede neha.narkh...@gmail.comwrote: It is not recommended to install both kafka and zookeeper on the same box as both would fight for the available memory and performance will degrade. Thanks Neha On Mon, May 19, 2014 at 7:29 AM, S Ahmed sahmed1...@gmail.com wrote: Hi, I like how kafka operates, but I'm wondering if it is possible to run everything on a single ec2 instance with 7.5 GB RAM. So that would be zookeeper and a single kafka broker. I would have a separate server to consume from the broker. Producers would be from my web servers. I don't want to complicate things as i don't really need failover or redundancy etc. I just want to keep things simple. I'll have a single topic, and a few partitions because I want the guarantee that the messages are in order. Is this something that would be really out of the norm and not recommended? i.e. nobody really uses it this way and who knows what is going to happen? :)
Java API to list topics and partitions
Hi, Is there java API in kafka to list topics and partitions in the kafka broker? Thanks, Saurabh.
Re: Java API to list topics and partitions
There is a Scala API. You can take a look at TopicCommand.scala as kafka-topics.sh simply calls that class. Tim On Tue, May 20, 2014 at 3:41 PM, Saurabh Agarwal (BLOOMBERG/ 731 LEX -) sagarwal...@bloomberg.net wrote: Hi, Is there java API in kafka to list topics and partitions in the kafka broker? Thanks, Saurabh.
Async producer callback?
Hi guys, So far, is there a way to track the asyn producer callback. My requirement is basically if all nodes of the topic goes down, can I pause the producer and after the broker comes back online, continue to produce from the failure point? Best, Siyuan
Re: Java API to list topics and partitions
Thanks. I will look into it. - Original Message - From: Timothy Chen tnac...@gmail.com At: Tuesday, May 20, 2014 18:56 There is a Scala API. You can take a look at TopicCommand.scala as kafka-topics.sh simply calls that class. Tim On Tue, May 20, 2014 at 3:41 PM, Saurabh Agarwal (BLOOMBERG/ 731 LEX -) sagarwal...@bloomberg.net wrote: Hi, Is there java API in kafka to list topics and partitions in the kafka broker? Thanks, Saurabh.
Re: Async producer callback?
We introduced callbacks in the new producer. It's only available in trunk though. Thanks, Jun On Tue, May 20, 2014 at 4:42 PM, hsy...@gmail.com hsy...@gmail.com wrote: Hi guys, So far, is there a way to track the asyn producer callback. My requirement is basically if all nodes of the topic goes down, can I pause the producer and after the broker comes back online, continue to produce from the failure point? Best, Siyuan
Re: Java API to list topics and partitions
You can issue a TopicMetadataRequest. See https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example Thanks, Jun On Tue, May 20, 2014 at 3:41 PM, Saurabh Agarwal (BLOOMBERG/ 731 LEX -) sagarwal...@bloomberg.net wrote: Hi, Is there java API in kafka to list topics and partitions in the kafka broker? Thanks, Saurabh.
Re: Make kafka storage engine pluggable and provide a HDFS plugin?
Thanks Jun and Francois. We used Kafka 0.8.0 previously. We got some weird error when expanding cluster and it couldn't be finished. Now we use 0.8.1.1, I would have a try on cluster expansion sometime. I read the discussion on that jira issue and I agree with points raised there. HDFS was also improved a lot since then and many issues have been resolved (e.g. SPOF). We have a team for building and providing storage/computing platform for our company and we have already provided a Hadoop cluster. If Kafka has an option to store data on HDFS, we just need to allocate some space quota for it on our cluster (and increase it on demand) and it might reduce our operational cost a lot. Another (and maybe more aggressive) thought is about the deployment. Jun has a good point: HDFS only provides data redundancy, but not computational redundancy. If Kafka could be deployed on YARN, it could offload some computational resource management to YARN and we don't have to allocate machines physically. Kafka still needs to take care of load balance and partition assignment among brokers by itself. Many computational frameworks like spark/samza have such an option and it's a big attractive point for us. Best, Hangjun 2014-05-20 21:00 GMT+08:00 François Langelier f.langel...@gmail.com: Take a look at Camus https://github.com/linkedin/camus/ François Langelier Étudiant en génie Logiciel - École de Technologie Supérieurehttp://www.etsmtl.ca/ Capitaine Club Capra http://capra.etsmtl.ca/ VP-Communication - CS Games http://csgames.org 2014 Jeux de Génie http://www.jdgets.com/ 2011 à 2014 Argentier Fraternité du Piranha http://fraternitedupiranha.com/ 2012-2014 Comité Organisateur Olympiades ÉTS 2012 Compétition Québécoise d'Ingénierie 2012 - Compétition Senior On 19 May 2014 05:28, Hangjun Ye yehang...@gmail.com wrote: Hi there, I recently started to use Kafka for our data analysis pipeline and it works very well. One problem to us so far is expanding our cluster when we need more storage space. Kafka provides some scripts for helping do this but the process wasn't smooth. To make it work perfectly, seems Kafka needs to do some jobs that a distributed file system has already done. So just wondering if any thoughts to make Kafka work on top of HDFS? Maybe make the Kafka storage engine pluggable and HDFS is one option? The pros might be that HDFS has already handled storage management (replication, corrupted disk/machine, migration, load balance, etc.) very well and it frees Kafka and the users from the burden, and the cons might be performance degradation. As Kafka does very well on performance, possibly even with some degree of degradation, it's still competitive for the most situations. Best, -- Hangjun Ye -- Hangjun Ye
Re: Make kafka storage engine pluggable and provide a HDFS plugin?
Hangjun, Does having Kafka in Yarn would be a big architectural change from where it is now? From what I have seen on most typical setup you want machines optimized for Kafka, not just it on top of hdfs. -Steve On Tue, May 20, 2014 at 8:37 PM, Hangjun Ye yehang...@gmail.com wrote: Thanks Jun and Francois. We used Kafka 0.8.0 previously. We got some weird error when expanding cluster and it couldn't be finished. Now we use 0.8.1.1, I would have a try on cluster expansion sometime. I read the discussion on that jira issue and I agree with points raised there. HDFS was also improved a lot since then and many issues have been resolved (e.g. SPOF). We have a team for building and providing storage/computing platform for our company and we have already provided a Hadoop cluster. If Kafka has an option to store data on HDFS, we just need to allocate some space quota for it on our cluster (and increase it on demand) and it might reduce our operational cost a lot. Another (and maybe more aggressive) thought is about the deployment. Jun has a good point: HDFS only provides data redundancy, but not computational redundancy. If Kafka could be deployed on YARN, it could offload some computational resource management to YARN and we don't have to allocate machines physically. Kafka still needs to take care of load balance and partition assignment among brokers by itself. Many computational frameworks like spark/samza have such an option and it's a big attractive point for us. Best, Hangjun 2014-05-20 21:00 GMT+08:00 François Langelier f.langel...@gmail.com: Take a look at Camus https://github.com/linkedin/camus/ François Langelier Étudiant en génie Logiciel - École de Technologie Supérieurehttp://www.etsmtl.ca/ Capitaine Club Capra http://capra.etsmtl.ca/ VP-Communication - CS Games http://csgames.org 2014 Jeux de Génie http://www.jdgets.com/ 2011 à 2014 Argentier Fraternité du Piranha http://fraternitedupiranha.com/ 2012-2014 Comité Organisateur Olympiades ÉTS 2012 Compétition Québécoise d'Ingénierie 2012 - Compétition Senior On 19 May 2014 05:28, Hangjun Ye yehang...@gmail.com wrote: Hi there, I recently started to use Kafka for our data analysis pipeline and it works very well. One problem to us so far is expanding our cluster when we need more storage space. Kafka provides some scripts for helping do this but the process wasn't smooth. To make it work perfectly, seems Kafka needs to do some jobs that a distributed file system has already done. So just wondering if any thoughts to make Kafka work on top of HDFS? Maybe make the Kafka storage engine pluggable and HDFS is one option? The pros might be that HDFS has already handled storage management (replication, corrupted disk/machine, migration, load balance, etc.) very well and it frees Kafka and the users from the burden, and the cons might be performance degradation. As Kafka does very well on performance, possibly even with some degree of degradation, it's still competitive for the most situations. Best, -- Hangjun Ye -- Hangjun Ye
Kafka replication throttling
Hi, We have several Kafka clusters in production, and we've had to reassign replication a few times now in production. Some of our topic/partitions are pretty large, up to 32 partitions per topic, and 16GB per partition, so adding a new broker and/or repairing a broker that had been down for some time turns out to be a major undertaking. Today, when we attempt to replicate a single partition, it pegs the disk IO, and uses a significant chunk of the 10Gbps interface for a good ~5 minutes. This is causing problems for our downstream consumers, which rely on having a consistent stream of realtime data being sent to them. Is there a way to throttle Kafka replication between nodes, so that instead of it going full blast, it will replicate at a fixed rate in megabytes or activities/batches per second? Or maybe is this planned for a future release, maybe 0.9? Thanks, Marcos Juarez
Re: Make kafka storage engine pluggable and provide a HDFS plugin?
Hi Steve, Yes, what I want is that Kafka doesn't have to care about machines physically (as an option). Best, Hangjun 2014-05-21 11:46 GMT+08:00 Steve Morin st...@stevemorin.com: Hangjun, Does having Kafka in Yarn would be a big architectural change from where it is now? From what I have seen on most typical setup you want machines optimized for Kafka, not just it on top of hdfs. -Steve On Tue, May 20, 2014 at 8:37 PM, Hangjun Ye yehang...@gmail.com wrote: Thanks Jun and Francois. We used Kafka 0.8.0 previously. We got some weird error when expanding cluster and it couldn't be finished. Now we use 0.8.1.1, I would have a try on cluster expansion sometime. I read the discussion on that jira issue and I agree with points raised there. HDFS was also improved a lot since then and many issues have been resolved (e.g. SPOF). We have a team for building and providing storage/computing platform for our company and we have already provided a Hadoop cluster. If Kafka has an option to store data on HDFS, we just need to allocate some space quota for it on our cluster (and increase it on demand) and it might reduce our operational cost a lot. Another (and maybe more aggressive) thought is about the deployment. Jun has a good point: HDFS only provides data redundancy, but not computational redundancy. If Kafka could be deployed on YARN, it could offload some computational resource management to YARN and we don't have to allocate machines physically. Kafka still needs to take care of load balance and partition assignment among brokers by itself. Many computational frameworks like spark/samza have such an option and it's a big attractive point for us. Best, Hangjun 2014-05-20 21:00 GMT+08:00 François Langelier f.langel...@gmail.com: Take a look at Camus https://github.com/linkedin/camus/ François Langelier Étudiant en génie Logiciel - École de Technologie Supérieurehttp://www.etsmtl.ca/ Capitaine Club Capra http://capra.etsmtl.ca/ VP-Communication - CS Games http://csgames.org 2014 Jeux de Génie http://www.jdgets.com/ 2011 à 2014 Argentier Fraternité du Piranha http://fraternitedupiranha.com/ 2012-2014 Comité Organisateur Olympiades ÉTS 2012 Compétition Québécoise d'Ingénierie 2012 - Compétition Senior On 19 May 2014 05:28, Hangjun Ye yehang...@gmail.com wrote: Hi there, I recently started to use Kafka for our data analysis pipeline and it works very well. One problem to us so far is expanding our cluster when we need more storage space. Kafka provides some scripts for helping do this but the process wasn't smooth. To make it work perfectly, seems Kafka needs to do some jobs that a distributed file system has already done. So just wondering if any thoughts to make Kafka work on top of HDFS? Maybe make the Kafka storage engine pluggable and HDFS is one option? The pros might be that HDFS has already handled storage management (replication, corrupted disk/machine, migration, load balance, etc.) very well and it frees Kafka and the users from the burden, and the cons might be performance degradation. As Kafka does very well on performance, possibly even with some degree of degradation, it's still competitive for the most situations. Best, -- Hangjun Ye -- Hangjun Ye -- Hangjun Ye
Re: Kafka: writing custom Encoder/Serializer
ProducerString, byte[] producer = new ProducerString, byte[](config); Try this. On Wed, May 21, 2014 at 12:26 AM, Neha Narkhede neha.narkh...@gmail.comwrote: Pradeep, If you are writing a POC, I'd suggest you do that using the new producer APIs http://people.apache.org/~nehanarkhede/kafka-0.9-producer-javadoc/doc/org/apache/kafka/clients/producer/Producer.html . These are much easier to use, exposes more functionality and the new producer is faster than the older one. It is currently in beta, slated for release in 0.8.2 or 0.9 and we are working on stabilizing it, but it should work great for your POC. We'd love to hear feedback on the APIs. Thanks, Neha On Tue, May 20, 2014 at 10:51 AM, Kumar Pradeep kprad...@novell.com wrote: Thanks Pushkar for your response. I tried to send my own byte array; however the Kafka Producer Class does not take byte [] as input type. Do you have an example of this? Please share if you do; really appreciate. Here is my code: public class TestEventProducer { public static void main(String[] args) { String topic = test-topic; long eventsNum = 10; Properties props = new Properties(); props.put(metadata.broker.list, localhost:9092); props.put(serializer.class, kafka.serializer.DefaultEncoder ); props.put(request.required.acks, 0); ProducerConfig config = new ProducerConfig(props); byte [] rawData; ProducerString, rawData producer = new ProducerString, rawData(config); //compillation error rawData cannot be resolved to a type long start = System.currentTimeMillis(); for (long nEvents = 0; nEvents eventsNum; nEvents++) { SimulateEvent event = new SimulateEvent(); try { rawData = Serializer.serialize(event); } catch (IOException e) { e.printStackTrace(); } KeyedMessageString, rawData data = new KeyedMessageString, rawData(topic, event); producer.send(data); System.out.println(produced event#: + nEvents + + data); } System.out.println(Took + (System.currentTimeMillis() - start) + to produce + eventsNum + messages); producer.close(); } } public class Serializer { public static byte[] serialize(Object obj) throws IOException { ByteArrayOutputStream b = new ByteArrayOutputStream(); ObjectOutputStream o = new ObjectOutputStream(b); o.writeObject(obj); return b.toByteArray(); } public static Object deserialize(byte[] bytes) throws IOException, ClassNotFoundException { ByteArrayInputStream b = new ByteArrayInputStream(bytes); ObjectInputStream o = new ObjectInputStream(b); return o.readObject(); } } pushkar priyadarshi priyadarshi.push...@gmail.com 5/20/2014 5:11 PM you can send byte[] that you get by using your own serializer ; through kafka ().On the reciving side u can deseraialize from the byte[] and read back your object.for using this you will have to supply serializer.class=kafka.serializer.DefaultEncoder in the properties. On Tue, May 20, 2014 at 4:23 PM, Kumar Pradeep kprad...@novell.com wrote: I am trying to build a POC with Kafka 0.8.1. I am using my own java class as a Kafka message which has a bunch of String data types. For serializer.class property in my producer, I cannot use the default serializer class or the String serializer class that comes with Kafka library. I guess I need to write my own serializer and feed it to the producer properties. If you are aware of writing an example custom serializer in Kafka (in java), please do share. Appreciate a lot, thanks much. I tried to use something like below, but I get the exception: Exception in thread main java.lang.NoSuchMethodException: test.EventsDataSerializer.init(kafka.utils.VerifiableProperties) at java.lang.Class.getConstructor0(Class.java:2971) package test; import java.io.IOException; import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.databind.ObjectMapper; import kafka.message.Message; import kafka.serializer.Decoder; import kafka.serializer.Encoder; public class EventsDataSerializer implements EncoderSimulateEvent, DecoderSimulateEvent { public Message toMessage(SimulateEvent eventDetails) { try { ObjectMapper mapper = new ObjectMapper(new JsonFactory()); byte[] serialized = mapper.writeValueAsBytes(eventDetails); return new Message(serialized); } catch (IOException e) { e.printStackTrace(); return null; // TODO } }