Kafka: writing custom Encoder/Serializer

2014-05-20 Thread Kumar Pradeep
I am trying to build a POC with Kafka 0.8.1. I am using my own java class as a 
Kafka message which has a bunch of String data types. For serializer.class 
property in my producer, I cannot use the default serializer class or the 
String serializer class that comes with Kafka library. I guess I need to write 
my own serializer and feed it to the producer properties. If you are aware of 
writing an example custom serializer in Kafka (in java), please do share. 
Appreciate a lot, thanks much.

I tried to use something like below, but I get the exception: Exception in 
thread main java.lang.NoSuchMethodException: 
test.EventsDataSerializer.init(kafka.utils.VerifiableProperties)
 at java.lang.Class.getConstructor0(Class.java:2971)


package test;

import java.io.IOException;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.databind.ObjectMapper;

import kafka.message.Message;
import kafka.serializer.Decoder;
import kafka.serializer.Encoder;

public  class EventsDataSerializer implements EncoderSimulateEvent, 
DecoderSimulateEvent {

 public Message toMessage(SimulateEvent eventDetails) {
try {
ObjectMapper mapper = new ObjectMapper(new JsonFactory());
byte[] serialized = mapper.writeValueAsBytes(eventDetails);
return new Message(serialized);
} catch (IOException e) {
e.printStackTrace();
return null;   // TODO
}
}
public SimulateEvent toEvent(Message message) {
SimulateEvent event = new SimulateEvent();

ObjectMapper mapper = new ObjectMapper(new JsonFactory());
try {
//TODO handle error
return mapper.readValue(message.payload().array(), 
SimulateEvent.class);
} catch (IOException e) {
e.printStackTrace();
return null;
}

}

 public byte[] toBytes(SimulateEvent arg0) {
  // TODO Auto-generated method stub
  return null;
 }
 public SimulateEvent fromBytes(byte[] arg0) {
  // TODO Auto-generated method stub
  return null;
 }
}



Re: Kafka: writing custom Encoder/Serializer

2014-05-20 Thread pushkar priyadarshi
you can send byte[] that you get by using your own serializer ; through
kafka ().On the reciving side u can deseraialize from the byte[] and read
back your object.for using this you will have to
supply serializer.class=kafka.serializer.DefaultEncoder in the properties.


On Tue, May 20, 2014 at 4:23 PM, Kumar Pradeep kprad...@novell.com wrote:

 I am trying to build a POC with Kafka 0.8.1. I am using my own java class
 as a Kafka message which has a bunch of String data types. For
 serializer.class property in my producer, I cannot use the default
 serializer class or the String serializer class that comes with Kafka
 library. I guess I need to write my own serializer and feed it to the
 producer properties. If you are aware of writing an example custom
 serializer in Kafka (in java), please do share. Appreciate a lot, thanks
 much.

 I tried to use something like below, but I get the exception: Exception in
 thread main java.lang.NoSuchMethodException:
 test.EventsDataSerializer.init(kafka.utils.VerifiableProperties)
  at java.lang.Class.getConstructor0(Class.java:2971)


 package test;

 import java.io.IOException;

 import com.fasterxml.jackson.core.JsonFactory;
 import com.fasterxml.jackson.databind.ObjectMapper;

 import kafka.message.Message;
 import kafka.serializer.Decoder;
 import kafka.serializer.Encoder;

 public  class EventsDataSerializer implements EncoderSimulateEvent,
 DecoderSimulateEvent {

  public Message toMessage(SimulateEvent eventDetails) {
 try {
 ObjectMapper mapper = new ObjectMapper(new JsonFactory());
 byte[] serialized = mapper.writeValueAsBytes(eventDetails);
 return new Message(serialized);
 } catch (IOException e) {
 e.printStackTrace();
 return null;   // TODO
 }
 }
 public SimulateEvent toEvent(Message message) {
  SimulateEvent event = new SimulateEvent();

 ObjectMapper mapper = new ObjectMapper(new JsonFactory());
 try {
 //TODO handle error
 return mapper.readValue(message.payload().array(),
 SimulateEvent.class);
 } catch (IOException e) {
 e.printStackTrace();
 return null;
 }

 }

  public byte[] toBytes(SimulateEvent arg0) {
   // TODO Auto-generated method stub
   return null;
  }
  public SimulateEvent fromBytes(byte[] arg0) {
   // TODO Auto-generated method stub
   return null;
  }
 }





Re: Make kafka storage engine pluggable and provide a HDFS plugin?

2014-05-20 Thread François Langelier
Take a look at Camus https://github.com/linkedin/camus/



François Langelier
Étudiant en génie Logiciel - École de Technologie
Supérieurehttp://www.etsmtl.ca/
Capitaine Club Capra http://capra.etsmtl.ca/
VP-Communication - CS Games http://csgames.org 2014
Jeux de Génie http://www.jdgets.com/ 2011 à 2014
Argentier Fraternité du Piranha http://fraternitedupiranha.com/ 2012-2014
Comité Organisateur Olympiades ÉTS 2012
Compétition Québécoise d'Ingénierie 2012 - Compétition Senior


On 19 May 2014 05:28, Hangjun Ye yehang...@gmail.com wrote:

 Hi there,

 I recently started to use Kafka for our data analysis pipeline and it works
 very well.

 One problem to us so far is expanding our cluster when we need more storage
 space.
 Kafka provides some scripts for helping do this but the process wasn't
 smooth.

 To make it work perfectly, seems Kafka needs to do some jobs that a
 distributed file system has already done.
 So just wondering if any thoughts to make Kafka work on top of HDFS? Maybe
 make the Kafka storage engine pluggable and HDFS is one option?

 The pros might be that HDFS has already handled storage management
 (replication, corrupted disk/machine, migration, load balance, etc.) very
 well and it frees Kafka and the users from the burden, and the cons might
 be performance degradation.
 As Kafka does very well on performance, possibly even with some degree of
 degradation, it's still competitive for the most situations.

 Best,
 --
 Hangjun Ye



Re: Kafka: writing custom Encoder/Serializer

2014-05-20 Thread Jun Rao
The customized encoder/decoder has to have a constructor that takes
(VerifiableProperties: props). Alternatively, you could do the
encoding/decoding outside of Kafka client and just send byte[] to Kafka.
The pluggable encoder/decoder will be gradually phased out in the future.

Thanks,

Jun


On Tue, May 20, 2014 at 3:53 AM, Kumar Pradeep kprad...@novell.com wrote:

 I am trying to build a POC with Kafka 0.8.1. I am using my own java class
 as a Kafka message which has a bunch of String data types. For
 serializer.class property in my producer, I cannot use the default
 serializer class or the String serializer class that comes with Kafka
 library. I guess I need to write my own serializer and feed it to the
 producer properties. If you are aware of writing an example custom
 serializer in Kafka (in java), please do share. Appreciate a lot, thanks
 much.

 I tried to use something like below, but I get the exception: Exception in
 thread main java.lang.NoSuchMethodException:
 test.EventsDataSerializer.init(kafka.utils.VerifiableProperties)
  at java.lang.Class.getConstructor0(Class.java:2971)


 package test;

 import java.io.IOException;

 import com.fasterxml.jackson.core.JsonFactory;
 import com.fasterxml.jackson.databind.ObjectMapper;

 import kafka.message.Message;
 import kafka.serializer.Decoder;
 import kafka.serializer.Encoder;

 public  class EventsDataSerializer implements EncoderSimulateEvent,
 DecoderSimulateEvent {

  public Message toMessage(SimulateEvent eventDetails) {
 try {
 ObjectMapper mapper = new ObjectMapper(new JsonFactory());
 byte[] serialized = mapper.writeValueAsBytes(eventDetails);
 return new Message(serialized);
 } catch (IOException e) {
 e.printStackTrace();
 return null;   // TODO
 }
 }
 public SimulateEvent toEvent(Message message) {
  SimulateEvent event = new SimulateEvent();

 ObjectMapper mapper = new ObjectMapper(new JsonFactory());
 try {
 //TODO handle error
 return mapper.readValue(message.payload().array(),
 SimulateEvent.class);
 } catch (IOException e) {
 e.printStackTrace();
 return null;
 }

 }

  public byte[] toBytes(SimulateEvent arg0) {
   // TODO Auto-generated method stub
   return null;
  }
  public SimulateEvent fromBytes(byte[] arg0) {
   // TODO Auto-generated method stub
   return null;
  }
 }





Re: Kafka: writing custom Encoder/Serializer

2014-05-20 Thread Kumar Pradeep
Thanks Pushkar for your response.

I tried to send my own byte array; however the Kafka Producer Class does not 
take byte [] as input type. Do you have an example of this? Please share if you 
do; really appreciate.

Here is my code:

 
public class TestEventProducer {
public static void main(String[] args) {

String topic = test-topic;
long eventsNum = 10;

Properties props = new Properties();
props.put(metadata.broker.list, localhost:9092);
props.put(serializer.class, kafka.serializer.DefaultEncoder );
props.put(request.required.acks, 0);
ProducerConfig config = new ProducerConfig(props);

byte [] rawData;
ProducerString, rawData producer = new ProducerString, 
rawData(config); //compillation error rawData cannot be resolved to a type

long start = System.currentTimeMillis();
 
for (long nEvents = 0; nEvents  eventsNum; nEvents++) { 
 SimulateEvent event = new SimulateEvent(); 
 try {
rawData = Serializer.serialize(event);
  } catch (IOException e) {
e.printStackTrace();
  }
KeyedMessageString, rawData data = new 
KeyedMessageString, rawData(topic, event);
producer.send(data);
System.out.println(produced event#: + nEvents +  + 
data);
}
System.out.println(Took  + (System.currentTimeMillis() - start) + 
to produce  + eventsNum + messages);
producer.close();
}
}
public class Serializer {
public static byte[] serialize(Object obj) throws IOException {
ByteArrayOutputStream b = new ByteArrayOutputStream();
ObjectOutputStream o = new ObjectOutputStream(b);
o.writeObject(obj);
return b.toByteArray();
}

public static Object deserialize(byte[] bytes) throws IOException, 
ClassNotFoundException {
ByteArrayInputStream b = new ByteArrayInputStream(bytes);
ObjectInputStream o = new ObjectInputStream(b);
return o.readObject();
}
}
  pushkar priyadarshi priyadarshi.push...@gmail.com 5/20/2014 5:11 PM 
you can send byte[] that you get by using your own serializer ; through

kafka ().On the reciving side u can deseraialize from the byte[] and read

back your object.for using this you will have to

supply serializer.class=kafka.serializer.DefaultEncoder in the properties.



On Tue, May 20, 2014 at 4:23 PM, Kumar Pradeep kprad...@novell.com wrote:


 I am trying to build a POC with Kafka 0.8.1. I am using my own java class

 as a Kafka message which has a bunch of String data types. For

 serializer.class property in my producer, I cannot use the default

 serializer class or the String serializer class that comes with Kafka

 library. I guess I need to write my own serializer and feed it to the

 producer properties. If you are aware of writing an example custom

 serializer in Kafka (in java), please do share. Appreciate a lot, thanks

 much.



 I tried to use something like below, but I get the exception: Exception in

 thread main java.lang.NoSuchMethodException:

 test.EventsDataSerializer.init(kafka.utils.VerifiableProperties)

  at java.lang.Class.getConstructor0(Class.java:2971)





 package test;



 import java.io.IOException;



 import com.fasterxml.jackson.core.JsonFactory;

 import com.fasterxml.jackson.databind.ObjectMapper;



 import kafka.message.Message;

 import kafka.serializer.Decoder;

 import kafka.serializer.Encoder;



 public  class EventsDataSerializer implements EncoderSimulateEvent,

 DecoderSimulateEvent {



  public Message toMessage(SimulateEvent eventDetails) {

try {

ObjectMapper mapper = new ObjectMapper(new 
 JsonFactory());

byte[] serialized = 
 mapper.writeValueAsBytes(eventDetails);

return new Message(serialized);

} catch (IOException e) {

e.printStackTrace();

return null;   // TODO

}

 }

public SimulateEvent toEvent(Message message) {

 SimulateEvent event = new SimulateEvent();



ObjectMapper mapper = new ObjectMapper(new JsonFactory());

try {

//TODO handle error

return mapper.readValue(message.payload().array(),

 SimulateEvent.class);

} catch (IOException e) {

e.printStackTrace();

return null;

}



}



  public byte[] toBytes(SimulateEvent arg0) {

   // TODO Auto-generated method stub

   return null;

  }

  public SimulateEvent fromBytes(byte[] arg0) {

   // TODO Auto-generated 

Re: Consistent replication of an event stream into Kafka

2014-05-20 Thread Guozhang Wang
We plan to work on the feature this summer, and make it available in the
0.9 release. Please try it out then and give us any feedbacks you have.

Guozhang


On Tue, May 20, 2014 at 9:23 AM, Bob Potter bobby.pot...@gmail.com wrote:

 Hi Guozhang,

 That looks great! I think it would solve our case.

 Thanks,
 Bob


 On 20 May 2014 00:18, Guozhang Wang wangg...@gmail.com wrote:

  Hello Bob,
 
  What you described is similar to the idempotent producer design that we
 are
  now discussing about:
 
  https://cwiki.apache.org/confluence/display/KAFKA/Idempotent+Producer
 
  Do you think this new feature will solve your case?
 
  Guozhang
 
 
  On Mon, May 19, 2014 at 2:40 PM, Bob Potter bobby.pot...@gmail.com
  wrote:
 
   Hello,
  
   We have a use case where we want to replicate an event stream which
  exists
   outside of kafka into a kafka topic (single partition). The event
 stream
   has sequence ids which always increase by 1. We want to preserve this
   ordering.
  
   The difficulty is that we want to be able to have the process that
 writes
   these events automatically fail-over if it dies. While ZooKeeper can
   guarantee a single writer at a given point in time we are worried about
   delayed network packets, bugs and long GC pauses.
  
   One solution we've thought of is to set the sequence_id as the key for
  the
   Kafka messages and have a proxy running on each Kafka broker which
  refuses
   to write new messages if they don't have the next expected key. This
  seems
   to solve any issue we would have with badly behaving networks or
  processes.
  
   Is there a better solution? Should we just handle these inconsistencies
  in
   our consumers? Are we being too paranoid?
  
   As a side-note, it seems like this functionality (guaranteeing that all
   keys in a partition are in sequence on a particular topic) may be a
 nice
   option to have in Kafka proper.
  
   Thanks,
   Bob
  
 
 
 
  --
  -- Guozhang
 



 --
 Bob Potter




-- 
-- Guozhang


RE: SocketServerStats not reporting bytes written or read

2014-05-20 Thread Xuyen On
I checked the stats with jconsole and it confirms the reading I've been getting 
with jmxtrans so the problem is with the jmx beans themselves I think. The 
stats just came back to normal again and I don't know  why. I haven't made any 
changes to the kafka brokers.

-Original Message-
From: Jun Rao [mailto:jun...@gmail.com] 
Sent: Monday, May 19, 2014 9:45 PM
To: users@kafka.apache.org
Subject: Re: SocketServerStats not reporting bytes written or read

Is the problem with the jmx beans themselves or jmxstats?

Thanks,

Jun


On Mon, May 19, 2014 at 2:48 PM, Xuyen On x...@ancestry.com wrote:

 Hi all,

 I have an intermittent problem with the JMX SocketServer stats on my 
 0.7.2 Kafka cluster.
 I'm collecting the SocketServerStats with jmxstats and everything 
 seems to be working fine except 
 kafka.SocketServerStats:BytesWrittenPerSecond and 
 kafka.SocketServerStats:BytesReadPerSecond are not working all the 
 time. It sometimes will cut out and not report any traffic and then it 
 will randomly report back normal stats. I've noticed that when I 
 started a new topic and starting sending data with a new producer, the 
 stats for bytes written and read will suddenly zero out. Funny thing is that 
 the other stats seem  to still be working fine including cumulative bytes 
 read and written.

 Does anyone know what might be causing this and how I can fix it?

 Thanks,

 Xuyen








RE: SocketServerStats not reporting bytes written or read

2014-05-20 Thread Xuyen On
To be more clear, 

1. I am using jmxtrans to get the data not jmstats. Sorry about the misspelling.
2. When I say the stats zero out, I mean that I am not able to get new values 
when I refresh with a new query from jmxtrans or jconsole. This only happens 
for the kafka.SocketServerStats:BytesWrittenPerSecond and 
kafka.SocketServerStats:BytesReadPerSecond stats. The other stats seem to 
update fine.

Any ideas why this might be?

-Original Message-
From: Xuyen On 
Sent: Tuesday, May 20, 2014 11:24 AM
To: users@kafka.apache.org
Subject: RE: SocketServerStats not reporting bytes written or read

I checked the stats with jconsole and it confirms the reading I've been getting 
with jmxtrans so the problem is with the jmx beans themselves I think. The 
stats just came back to normal again and I don't know  why. I haven't made any 
changes to the kafka brokers.

-Original Message-
From: Jun Rao [mailto:jun...@gmail.com]
Sent: Monday, May 19, 2014 9:45 PM
To: users@kafka.apache.org
Subject: Re: SocketServerStats not reporting bytes written or read

Is the problem with the jmx beans themselves or jmxstats?

Thanks,

Jun


On Mon, May 19, 2014 at 2:48 PM, Xuyen On x...@ancestry.com wrote:

 Hi all,

 I have an intermittent problem with the JMX SocketServer stats on my
 0.7.2 Kafka cluster.
 I'm collecting the SocketServerStats with jmxstats and everything 
 seems to be working fine except 
 kafka.SocketServerStats:BytesWrittenPerSecond and 
 kafka.SocketServerStats:BytesReadPerSecond are not working all the 
 time. It sometimes will cut out and not report any traffic and then it 
 will randomly report back normal stats. I've noticed that when I 
 started a new topic and starting sending data with a new producer, the 
 stats for bytes written and read will suddenly zero out. Funny thing is that 
 the other stats seem  to still be working fine including cumulative bytes 
 read and written.

 Does anyone know what might be causing this and how I can fix it?

 Thanks,

 Xuyen








RE: SocketServerStats not reporting bytes written or read

2014-05-20 Thread Xuyen On
Sorry I lied,

The following do not update:
ProduceRequestsPerSecond 
FetchRequestsPerSecond 
AvgProduceRequestMs 
MaxProduceRequestMs
AvgFetchRequestMs
MaxFetchRequestMs 
BytesReadPerSecond 
BytesWrittenPerSecond 

These stats do update with new values:
NumFetchRequests 
NumProduceRequests 
TotalBytesRead 
TotalFetchRequestMs 
TotalProduceRequestMs 

-Original Message-
From: Xuyen On 
Sent: Tuesday, May 20, 2014 11:30 AM
To: 'users@kafka.apache.org'
Subject: RE: SocketServerStats not reporting bytes written or read

To be more clear, 

1. I am using jmxtrans to get the data not jmstats. Sorry about the misspelling.
2. When I say the stats zero out, I mean that I am not able to get new values 
when I refresh with a new query from jmxtrans or jconsole. This only happens 
for the kafka.SocketServerStats:BytesWrittenPerSecond and 
kafka.SocketServerStats:BytesReadPerSecond stats. The other stats seem to 
update fine.

Any ideas why this might be?

-Original Message-
From: Xuyen On
Sent: Tuesday, May 20, 2014 11:24 AM
To: users@kafka.apache.org
Subject: RE: SocketServerStats not reporting bytes written or read

I checked the stats with jconsole and it confirms the reading I've been getting 
with jmxtrans so the problem is with the jmx beans themselves I think. The 
stats just came back to normal again and I don't know  why. I haven't made any 
changes to the kafka brokers.

-Original Message-
From: Jun Rao [mailto:jun...@gmail.com]
Sent: Monday, May 19, 2014 9:45 PM
To: users@kafka.apache.org
Subject: Re: SocketServerStats not reporting bytes written or read

Is the problem with the jmx beans themselves or jmxstats?

Thanks,

Jun


On Mon, May 19, 2014 at 2:48 PM, Xuyen On x...@ancestry.com wrote:

 Hi all,

 I have an intermittent problem with the JMX SocketServer stats on my
 0.7.2 Kafka cluster.
 I'm collecting the SocketServerStats with jmxstats and everything 
 seems to be working fine except 
 kafka.SocketServerStats:BytesWrittenPerSecond and 
 kafka.SocketServerStats:BytesReadPerSecond are not working all the 
 time. It sometimes will cut out and not report any traffic and then it 
 will randomly report back normal stats. I've noticed that when I 
 started a new topic and starting sending data with a new producer, the 
 stats for bytes written and read will suddenly zero out. Funny thing is that 
 the other stats seem  to still be working fine including cumulative bytes 
 read and written.

 Does anyone know what might be causing this and how I can fix it?

 Thanks,

 Xuyen








Re: starting of at a small scale, single ec2 instance with 7.5 GB RAM with kafka

2014-05-20 Thread Neha Narkhede
It is not recommended to install both kafka and zookeeper on the same box
as both would fight for the available memory and performance will degrade.

Thanks
Neha


On Mon, May 19, 2014 at 7:29 AM, S Ahmed sahmed1...@gmail.com wrote:

 Hi,

 I like how kafka operates, but I'm wondering if it is possible to run
 everything on a single ec2 instance with 7.5 GB RAM.

 So that would be zookeeper and a single kafka broker.

 I would have a separate server to consume from the broker.

 Producers would be from my web servers.


 I don't want to complicate things as i don't really need failover or
 redundancy etc.  I just want to keep things simple.

 I'll have a single topic, and a few partitions because I want the guarantee
 that the messages are in order.


 Is this something that would be really out of the norm and not recommended?
 i.e. nobody really uses it this way and who knows what is going to happen?
 :)



Re: starting of at a small scale, single ec2 instance with 7.5 GB RAM with kafka

2014-05-20 Thread S Ahmed
Yes agreed, but I have done some load testing before and kafka was doing
10's of thousands of messages per second.

If I am doing only hundreds, I think it could handle it for now.  Like I
said this is small scale.


On Tue, May 20, 2014 at 2:51 PM, Neha Narkhede neha.narkh...@gmail.comwrote:

 It is not recommended to install both kafka and zookeeper on the same box
 as both would fight for the available memory and performance will degrade.

 Thanks
 Neha


 On Mon, May 19, 2014 at 7:29 AM, S Ahmed sahmed1...@gmail.com wrote:

  Hi,
 
  I like how kafka operates, but I'm wondering if it is possible to run
  everything on a single ec2 instance with 7.5 GB RAM.
 
  So that would be zookeeper and a single kafka broker.
 
  I would have a separate server to consume from the broker.
 
  Producers would be from my web servers.
 
 
  I don't want to complicate things as i don't really need failover or
  redundancy etc.  I just want to keep things simple.
 
  I'll have a single topic, and a few partitions because I want the
 guarantee
  that the messages are in order.
 
 
  Is this something that would be really out of the norm and not
 recommended?
  i.e. nobody really uses it this way and who knows what is going to
 happen?
  :)
 



Re: starting of at a small scale, single ec2 instance with 7.5 GB RAM with kafka

2014-05-20 Thread Niek Sanders
If you really only care about small scale (no HA, no horizontal
scaling), you could also consider using Redis instead of Kafka for
queueing.

- Niek


On Tue, May 20, 2014 at 2:23 PM, S Ahmed sahmed1...@gmail.com wrote:
 Yes agreed, but I have done some load testing before and kafka was doing
 10's of thousands of messages per second.

 If I am doing only hundreds, I think it could handle it for now.  Like I
 said this is small scale.


 On Tue, May 20, 2014 at 2:51 PM, Neha Narkhede neha.narkh...@gmail.comwrote:

 It is not recommended to install both kafka and zookeeper on the same box
 as both would fight for the available memory and performance will degrade.

 Thanks
 Neha


 On Mon, May 19, 2014 at 7:29 AM, S Ahmed sahmed1...@gmail.com wrote:

  Hi,
 
  I like how kafka operates, but I'm wondering if it is possible to run
  everything on a single ec2 instance with 7.5 GB RAM.
 
  So that would be zookeeper and a single kafka broker.
 
  I would have a separate server to consume from the broker.
 
  Producers would be from my web servers.
 
 
  I don't want to complicate things as i don't really need failover or
  redundancy etc.  I just want to keep things simple.
 
  I'll have a single topic, and a few partitions because I want the
 guarantee
  that the messages are in order.
 
 
  Is this something that would be really out of the norm and not
 recommended?
  i.e. nobody really uses it this way and who knows what is going to
 happen?
  :)
 



Java API to list topics and partitions

2014-05-20 Thread Saurabh Agarwal (BLOOMBERG/ 731 LEX -)
Hi,

Is there java API in kafka to list topics and partitions in the kafka broker?   
Thanks, 
Saurabh.

Re: Java API to list topics and partitions

2014-05-20 Thread Timothy Chen
There is a Scala API. You can take a look at TopicCommand.scala as
kafka-topics.sh simply calls that class.

Tim

On Tue, May 20, 2014 at 3:41 PM, Saurabh Agarwal (BLOOMBERG/ 731 LEX
-) sagarwal...@bloomberg.net wrote:
 Hi,

 Is there java API in kafka to list topics and partitions in the kafka broker?
 Thanks,
 Saurabh.


Async producer callback?

2014-05-20 Thread hsy...@gmail.com
Hi guys,

So far, is there a way to track the asyn producer callback.
My requirement is basically if all nodes of the topic goes down, can I
pause the producer and after the broker comes back online,  continue to
produce from the failure point?


Best,
Siyuan


Re: Java API to list topics and partitions

2014-05-20 Thread Saurabh Agarwal (BLOOMBERG/ 731 LEX -)
Thanks. I will look into it. 

- Original Message -
From: Timothy Chen tnac...@gmail.com
At: Tuesday, May 20, 2014 18:56

There is a Scala API. You can take a look at TopicCommand.scala as
kafka-topics.sh simply calls that class.

Tim

On Tue, May 20, 2014 at 3:41 PM, Saurabh Agarwal (BLOOMBERG/ 731 LEX
-) sagarwal...@bloomberg.net wrote:
 Hi,

 Is there java API in kafka to list topics and partitions in the kafka broker?
 Thanks,
 Saurabh.


Re: Async producer callback?

2014-05-20 Thread Jun Rao
We introduced callbacks in the new producer. It's only available in trunk
though.

Thanks,

Jun


On Tue, May 20, 2014 at 4:42 PM, hsy...@gmail.com hsy...@gmail.com wrote:

 Hi guys,

 So far, is there a way to track the asyn producer callback.
 My requirement is basically if all nodes of the topic goes down, can I
 pause the producer and after the broker comes back online,  continue to
 produce from the failure point?


 Best,
 Siyuan



Re: Java API to list topics and partitions

2014-05-20 Thread Jun Rao
You can issue a TopicMetadataRequest. See
https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example

Thanks,

Jun


On Tue, May 20, 2014 at 3:41 PM, Saurabh Agarwal (BLOOMBERG/ 731 LEX -) 
sagarwal...@bloomberg.net wrote:

 Hi,

 Is there java API in kafka to list topics and partitions in the kafka
 broker?
 Thanks,
 Saurabh.


Re: Make kafka storage engine pluggable and provide a HDFS plugin?

2014-05-20 Thread Hangjun Ye
Thanks Jun and Francois.

We used Kafka 0.8.0 previously. We got some weird error when expanding
cluster and it couldn't be finished.
Now we use 0.8.1.1, I would have a try on cluster expansion sometime.

I read the discussion on that jira issue and I agree with points raised
there.
HDFS was also improved a lot since then and many issues have been resolved
(e.g. SPOF).

We have a team for building and providing storage/computing platform for
our company and we have already provided a Hadoop cluster.
If Kafka has an option to store data on HDFS, we just need to allocate some
space quota for it on our cluster (and increase it on demand) and it might
reduce our operational cost a lot.

Another (and maybe more aggressive) thought is about the deployment. Jun
has a good point: HDFS only provides data redundancy, but not
computational redundancy. If Kafka could be deployed on YARN, it could
offload some computational resource management to YARN and we don't have to
allocate machines physically. Kafka still needs to take care of load
balance and partition assignment among brokers by itself.
Many computational frameworks like spark/samza have such an option and it's
a big attractive point for us.

Best,
Hangjun


2014-05-20 21:00 GMT+08:00 François Langelier f.langel...@gmail.com:

 Take a look at Camus https://github.com/linkedin/camus/



 François Langelier
 Étudiant en génie Logiciel - École de Technologie
 Supérieurehttp://www.etsmtl.ca/
 Capitaine Club Capra http://capra.etsmtl.ca/
 VP-Communication - CS Games http://csgames.org 2014
 Jeux de Génie http://www.jdgets.com/ 2011 à 2014
 Argentier Fraternité du Piranha http://fraternitedupiranha.com/
 2012-2014
 Comité Organisateur Olympiades ÉTS 2012
 Compétition Québécoise d'Ingénierie 2012 - Compétition Senior


 On 19 May 2014 05:28, Hangjun Ye yehang...@gmail.com wrote:

  Hi there,
 
  I recently started to use Kafka for our data analysis pipeline and it
 works
  very well.
 
  One problem to us so far is expanding our cluster when we need more
 storage
  space.
  Kafka provides some scripts for helping do this but the process wasn't
  smooth.
 
  To make it work perfectly, seems Kafka needs to do some jobs that a
  distributed file system has already done.
  So just wondering if any thoughts to make Kafka work on top of HDFS?
 Maybe
  make the Kafka storage engine pluggable and HDFS is one option?
 
  The pros might be that HDFS has already handled storage management
  (replication, corrupted disk/machine, migration, load balance, etc.) very
  well and it frees Kafka and the users from the burden, and the cons might
  be performance degradation.
  As Kafka does very well on performance, possibly even with some degree of
  degradation, it's still competitive for the most situations.
 
  Best,
  --
  Hangjun Ye
 




-- 
Hangjun Ye


Re: Make kafka storage engine pluggable and provide a HDFS plugin?

2014-05-20 Thread Steve Morin
Hangjun,
  Does having Kafka in Yarn would be a big architectural change from where
it is now?  From what I have seen on most typical setup you want machines
optimized for Kafka, not just it on top of hdfs.
-Steve


On Tue, May 20, 2014 at 8:37 PM, Hangjun Ye yehang...@gmail.com wrote:

 Thanks Jun and Francois.

 We used Kafka 0.8.0 previously. We got some weird error when expanding
 cluster and it couldn't be finished.
 Now we use 0.8.1.1, I would have a try on cluster expansion sometime.

 I read the discussion on that jira issue and I agree with points raised
 there.
 HDFS was also improved a lot since then and many issues have been resolved
 (e.g. SPOF).

 We have a team for building and providing storage/computing platform for
 our company and we have already provided a Hadoop cluster.
 If Kafka has an option to store data on HDFS, we just need to allocate some
 space quota for it on our cluster (and increase it on demand) and it might
 reduce our operational cost a lot.

 Another (and maybe more aggressive) thought is about the deployment. Jun
 has a good point: HDFS only provides data redundancy, but not
 computational redundancy. If Kafka could be deployed on YARN, it could
 offload some computational resource management to YARN and we don't have to
 allocate machines physically. Kafka still needs to take care of load
 balance and partition assignment among brokers by itself.
 Many computational frameworks like spark/samza have such an option and it's
 a big attractive point for us.

 Best,
 Hangjun


 2014-05-20 21:00 GMT+08:00 François Langelier f.langel...@gmail.com:

  Take a look at Camus https://github.com/linkedin/camus/
 
 
 
  François Langelier
  Étudiant en génie Logiciel - École de Technologie
  Supérieurehttp://www.etsmtl.ca/
  Capitaine Club Capra http://capra.etsmtl.ca/
  VP-Communication - CS Games http://csgames.org 2014
  Jeux de Génie http://www.jdgets.com/ 2011 à 2014
  Argentier Fraternité du Piranha http://fraternitedupiranha.com/
  2012-2014
  Comité Organisateur Olympiades ÉTS 2012
  Compétition Québécoise d'Ingénierie 2012 - Compétition Senior
 
 
  On 19 May 2014 05:28, Hangjun Ye yehang...@gmail.com wrote:
 
   Hi there,
  
   I recently started to use Kafka for our data analysis pipeline and it
  works
   very well.
  
   One problem to us so far is expanding our cluster when we need more
  storage
   space.
   Kafka provides some scripts for helping do this but the process wasn't
   smooth.
  
   To make it work perfectly, seems Kafka needs to do some jobs that a
   distributed file system has already done.
   So just wondering if any thoughts to make Kafka work on top of HDFS?
  Maybe
   make the Kafka storage engine pluggable and HDFS is one option?
  
   The pros might be that HDFS has already handled storage management
   (replication, corrupted disk/machine, migration, load balance, etc.)
 very
   well and it frees Kafka and the users from the burden, and the cons
 might
   be performance degradation.
   As Kafka does very well on performance, possibly even with some degree
 of
   degradation, it's still competitive for the most situations.
  
   Best,
   --
   Hangjun Ye
  
 



 --
 Hangjun Ye



Kafka replication throttling

2014-05-20 Thread Marcos Juarez Lopez
Hi,

We have several Kafka clusters in production, and we've had to reassign
replication a few times now in production.  Some of our topic/partitions
are pretty large, up to 32 partitions per topic, and 16GB per partition, so
adding a new broker and/or repairing a broker that had been down for some
time turns out to be a major undertaking.

Today, when we attempt to replicate a single partition, it pegs the disk
IO, and uses a significant chunk of the 10Gbps interface for a good ~5
minutes.  This is causing problems for our downstream consumers, which rely
on having a consistent stream of realtime data being sent to them.

Is there a way to throttle Kafka replication between nodes, so that instead
of it going full blast, it will replicate at a fixed rate in megabytes or
activities/batches per second?  Or maybe is this planned for a future
release, maybe 0.9?

Thanks,

Marcos Juarez


Re: Make kafka storage engine pluggable and provide a HDFS plugin?

2014-05-20 Thread Hangjun Ye
Hi Steve,

Yes, what I want is that Kafka doesn't have to care about machines
physically (as an option).

Best,
Hangjun

2014-05-21 11:46 GMT+08:00 Steve Morin st...@stevemorin.com:

 Hangjun,
   Does having Kafka in Yarn would be a big architectural change from where
 it is now?  From what I have seen on most typical setup you want machines
 optimized for Kafka, not just it on top of hdfs.
 -Steve


 On Tue, May 20, 2014 at 8:37 PM, Hangjun Ye yehang...@gmail.com wrote:

  Thanks Jun and Francois.
 
  We used Kafka 0.8.0 previously. We got some weird error when expanding
  cluster and it couldn't be finished.
  Now we use 0.8.1.1, I would have a try on cluster expansion sometime.
 
  I read the discussion on that jira issue and I agree with points raised
  there.
  HDFS was also improved a lot since then and many issues have been
 resolved
  (e.g. SPOF).
 
  We have a team for building and providing storage/computing platform for
  our company and we have already provided a Hadoop cluster.
  If Kafka has an option to store data on HDFS, we just need to allocate
 some
  space quota for it on our cluster (and increase it on demand) and it
 might
  reduce our operational cost a lot.
 
  Another (and maybe more aggressive) thought is about the deployment. Jun
  has a good point: HDFS only provides data redundancy, but not
  computational redundancy. If Kafka could be deployed on YARN, it could
  offload some computational resource management to YARN and we don't have
 to
  allocate machines physically. Kafka still needs to take care of load
  balance and partition assignment among brokers by itself.
  Many computational frameworks like spark/samza have such an option and
 it's
  a big attractive point for us.
 
  Best,
  Hangjun
 
 
  2014-05-20 21:00 GMT+08:00 François Langelier f.langel...@gmail.com:
 
   Take a look at Camus https://github.com/linkedin/camus/
  
  
  
   François Langelier
   Étudiant en génie Logiciel - École de Technologie
   Supérieurehttp://www.etsmtl.ca/
   Capitaine Club Capra http://capra.etsmtl.ca/
   VP-Communication - CS Games http://csgames.org 2014
   Jeux de Génie http://www.jdgets.com/ 2011 à 2014
   Argentier Fraternité du Piranha http://fraternitedupiranha.com/
   2012-2014
   Comité Organisateur Olympiades ÉTS 2012
   Compétition Québécoise d'Ingénierie 2012 - Compétition Senior
  
  
   On 19 May 2014 05:28, Hangjun Ye yehang...@gmail.com wrote:
  
Hi there,
   
I recently started to use Kafka for our data analysis pipeline and it
   works
very well.
   
One problem to us so far is expanding our cluster when we need more
   storage
space.
Kafka provides some scripts for helping do this but the process
 wasn't
smooth.
   
To make it work perfectly, seems Kafka needs to do some jobs that a
distributed file system has already done.
So just wondering if any thoughts to make Kafka work on top of HDFS?
   Maybe
make the Kafka storage engine pluggable and HDFS is one option?
   
The pros might be that HDFS has already handled storage management
(replication, corrupted disk/machine, migration, load balance, etc.)
  very
well and it frees Kafka and the users from the burden, and the cons
  might
be performance degradation.
As Kafka does very well on performance, possibly even with some
 degree
  of
degradation, it's still competitive for the most situations.
   
Best,
--
Hangjun Ye
   
  
 
 
 
  --
  Hangjun Ye
 




-- 
Hangjun Ye


Re: Kafka: writing custom Encoder/Serializer

2014-05-20 Thread pushkar priyadarshi
ProducerString, byte[] producer = new ProducerString, byte[](config);

Try this.



On Wed, May 21, 2014 at 12:26 AM, Neha Narkhede neha.narkh...@gmail.comwrote:

 Pradeep,

 If you are writing a POC, I'd suggest you do that using the new producer
 APIs
 http://people.apache.org/~nehanarkhede/kafka-0.9-producer-javadoc/doc/org/apache/kafka/clients/producer/Producer.html
 .
 These are much easier to use, exposes more functionality and the new
 producer is faster than the older one. It is currently in beta, slated for
 release in 0.8.2 or 0.9 and we are working on stabilizing it, but it should
 work great for your POC. We'd love to hear feedback on the APIs.

 Thanks,
 Neha


 On Tue, May 20, 2014 at 10:51 AM, Kumar Pradeep kprad...@novell.com
 wrote:

  Thanks Pushkar for your response.
 
  I tried to send my own byte array; however the Kafka Producer Class does
  not take byte [] as input type. Do you have an example of this? Please
  share if you do; really appreciate.
 
  Here is my code:
 
 
  public class TestEventProducer {
  public static void main(String[] args) {
 
   String topic = test-topic;
   long eventsNum = 10;
 
  Properties props = new Properties();
  props.put(metadata.broker.list, localhost:9092);
  props.put(serializer.class, kafka.serializer.DefaultEncoder
 );
  props.put(request.required.acks, 0);
  ProducerConfig config = new ProducerConfig(props);
 
  byte [] rawData;
  ProducerString, rawData producer = new ProducerString,
  rawData(config); //compillation error rawData cannot be resolved to a
 type
 
  long start = System.currentTimeMillis();
 
  for (long nEvents = 0; nEvents  eventsNum; nEvents++) {
 
   SimulateEvent event = new SimulateEvent();
   try {
  rawData = Serializer.serialize(event);
} catch (IOException e) {
  e.printStackTrace();
}
  KeyedMessageString, rawData data = new KeyedMessageString,
  rawData(topic, event);
  producer.send(data);
  System.out.println(produced event#: + nEvents +  + data);
  }
  System.out.println(Took  + (System.currentTimeMillis() - start)
  + to produce  + eventsNum + messages);
  producer.close();
  }
  }
 
  public class Serializer {
  public static byte[] serialize(Object obj) throws IOException {
  ByteArrayOutputStream b = new ByteArrayOutputStream();
  ObjectOutputStream o = new ObjectOutputStream(b);
  o.writeObject(obj);
  return b.toByteArray();
  }
 
  public static Object deserialize(byte[] bytes) throws IOException,
  ClassNotFoundException {
  ByteArrayInputStream b = new ByteArrayInputStream(bytes);
  ObjectInputStream o = new ObjectInputStream(b);
  return o.readObject();
  }
  }
 
   pushkar priyadarshi priyadarshi.push...@gmail.com 5/20/2014 5:11
 PM
  
  you can send byte[] that you get by using your own serializer ; through
 
  kafka ().On the reciving side u can deseraialize from the byte[] and read
 
  back your object.for using this you will have to
 
  supply serializer.class=kafka.serializer.DefaultEncoder in the
 properties.
 
 
 
  On Tue, May 20, 2014 at 4:23 PM, Kumar Pradeep kprad...@novell.com
  wrote:
 
 
   I am trying to build a POC with Kafka 0.8.1. I am using my own java
 class
 
   as a Kafka message which has a bunch of String data types. For
 
   serializer.class property in my producer, I cannot use the default
 
   serializer class or the String serializer class that comes with Kafka
 
   library. I guess I need to write my own serializer and feed it to the
 
   producer properties. If you are aware of writing an example custom
 
   serializer in Kafka (in java), please do share. Appreciate a lot,
 thanks
 
   much.
 
  
 
   I tried to use something like below, but I get the exception: Exception
  in
 
   thread main java.lang.NoSuchMethodException:
 
   test.EventsDataSerializer.init(kafka.utils.VerifiableProperties)
 
at java.lang.Class.getConstructor0(Class.java:2971)
 
  
 
  
 
   package test;
 
  
 
   import java.io.IOException;
 
  
 
   import com.fasterxml.jackson.core.JsonFactory;
 
   import com.fasterxml.jackson.databind.ObjectMapper;
 
  
 
   import kafka.message.Message;
 
   import kafka.serializer.Decoder;
 
   import kafka.serializer.Encoder;
 
  
 
   public  class EventsDataSerializer implements EncoderSimulateEvent,
 
   DecoderSimulateEvent {
 
  
 
public Message toMessage(SimulateEvent eventDetails) {
 
   try {
 
   ObjectMapper mapper = new ObjectMapper(new JsonFactory());
 
   byte[] serialized = mapper.writeValueAsBytes(eventDetails);
 
   return new Message(serialized);
 
   } catch (IOException e) {
 
   e.printStackTrace();
 
   return null;   // TODO
 
   }
 
   }