Re: Re: How to push metrics to graphite - jmxtrans does not work

2014-12-02 Thread David Montgomery
Hi,

Thanks for the help.  I found the issue.I was appending to the bottom
when I should have placed the below line at the top of the file.


echo 'KAFKA_JMX_OPTS="-Dcom.sun.
management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.ssl=false"' | tee -a
/var/kafka/bin/kafka-run-class.sh


On Wed, Dec 3, 2014 at 1:27 PM, Jason Rosenberg  wrote:

> fwiw, we wrap the kafka server in our java service container framework.
> This allows us to use the default GraphiteReporter class that is part of
> the yammer metrics library (which is used by kafka directly).  So it works
> seemlessly.  (We've since changed our use of GraphiteReporter to instead
> send all our metrics via kafka :))
>
> Jason
>
> On Tue, Dec 2, 2014 at 11:00 PM, Otis Gospodnetic <
> otis.gospodne...@gmail.com> wrote:
>
> > Hi,
> >
> > You can make use of this documentation aimed at JMX and monitoring:
> >
> https://sematext.atlassian.net/wiki/display/PUBSPM/SPM+Monitor+-+Standalone
> >
> > There is a section about Kafka and the information is not SPM-specific.
> >
> > Otis
> > --
> > Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> > Solr & Elasticsearch Support * http://sematext.com/
> >
> >
> > On Tue, Dec 2, 2014 at 9:34 PM, YuanJia Li  wrote:
> >
> > > Hi David,
> > > Just edit "kafka-server-start.sh", and add "export
> JMX_PORT=",it
> > > will work.
> > >
> > >
> > >
> > >
> > > Yuanjia
> > >
> > > From: David Montgomery
> > > Date: 2014-12-03 04:47
> > > To:users
> > > Subject: Re: How to push metrics to graphite - jmxtrans does not work
> > > Hi,
> > >
> > > I am seeing this in the logs and wondering what "jmx_port":-1 means?
> > >
> > > INFO conflict in /brokers/ids/29136 data: { "host":"104.111.111.111.",
> > > "jmx_port":-1, "port":9092, "timestamp":"1417552817875", "version":1 }
> > > stored data: { "host":"104.111.111", "jmx_port":-1, "port":9092,
> > > "timestamp":"1417552738253", "version":1
> > >
> > > despite having these added
> > >
> > > echo 'KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote.port=
> > > -Dcom.sun.management.jmxremote=true
> > > -Dcom.sun.management.jmxremote.authenticate=false
> > > -Dcom.sun.management.jmxremote.ssl=false"' | tee -a
> > > /var/kafka/bin/kafka-run-class.sh
> > > echo 'export JMX_PORT=${JMX_PORT:-}' | tee -a
> > > /var/kafka/bin/kafka-server-start.sh
> > >
> > > Thanks
> > >
> > > On Tue, Dec 2, 2014 at 9:58 PM, Andrew Otto 
> wrote:
> > >
> > > > Maybe also set:
> > > >
> > > >  -Dcom.sun.management.jmxremote.port=
> > > >
> > > > ?
> > > >
> > > >
> > > > > On Dec 2, 2014, at 02:59, David Montgomery <
> > davidmontgom...@gmail.com>
> > > > wrote:
> > > > >
> > > > > Hi,
> > > > >
> > > > > I am having a very difficult time trying to report kafka 8 metrics
> to
> > > > > Graphite.  Nothing is listening on  and and no data in
> graphite.
> > > If
> > > > > this method of graphite reporting is know to not work is there an
> > > > > alternative to jmxtrans to get data to graphite?
> > > > >
> > > > > I am using the deb file to install jmxtrans on ubuntu 12.04
> > > > >
> > > > > And I use the below to modify kafka scripts
> > > > >
> > > > > echo 'KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote=true
> > > > > -Dcom.sun.management.jmxremote.authenticate=false
> > > > > -Dcom.sun.management.jmxremote.ssl=false"' | tee -a
> > > > > /var/kafka/bin/kafka-run-class.sh
> > > > > echo 'export JMX_PORT=${JMX_PORT:-}' | tee -a
> > > > > /var/kafka/bin/kafka-server-start.sh
> > > > >
> > > > > {
> > > > >  "servers" : [ {
> > > > >"host" : "127.0.0.1",
> > > > >"port" : "",
> > > > >"alias" : "<%=node.name%>",
> > > > >"queries" : [
> > > > > {
> > > > > "obj" : "kafka:type=kafka.SocketServerStats",
> > > > >  "resultAlias": "kafka.socketServerStats",
> > > > >  "attr" : [ "AvgFetchRequestMs", "AvgProduceRequestMs",
> > > > > "BytesReadPerSecond", "BytesWrittenPerSecond",
> > > "FetchRequestsPerSecond",
> > > > > "MaxFetchRequestMs", "MaxProduceRequestMs" , "NumFetchRequests" ,
> > > > > "NumProduceRequests" , "ProduceRequestsPerSecond",
> "TotalBytesRead",
> > > > > "TotalBytesWritten", "TotalFetchRequestMs", "TotalProduceRequestMs"
> > ],
> > > > > "outputWriters" : [ {
> > > > >  "@class" :
> > > > "com.googlecode.jmxtrans.model.output.GraphiteWriter",
> > > > >  "settings" : {
> > > > >"host" : "<%=@monitor_host%>",
> > > > >"port" : "2003"
> > > > >  }
> > > > >} ]
> > > > >  }
> > > > >],
> > > > >"numQueryThreads": "2"
> > > > >  } ]
> > > > > }
> > > >
> > > >
> > >
> >
>


Re: Re: How to push metrics to graphite - jmxtrans does not work

2014-12-02 Thread Jason Rosenberg
fwiw, we wrap the kafka server in our java service container framework.
This allows us to use the default GraphiteReporter class that is part of
the yammer metrics library (which is used by kafka directly).  So it works
seemlessly.  (We've since changed our use of GraphiteReporter to instead
send all our metrics via kafka :))

Jason

On Tue, Dec 2, 2014 at 11:00 PM, Otis Gospodnetic <
otis.gospodne...@gmail.com> wrote:

> Hi,
>
> You can make use of this documentation aimed at JMX and monitoring:
> https://sematext.atlassian.net/wiki/display/PUBSPM/SPM+Monitor+-+Standalone
>
> There is a section about Kafka and the information is not SPM-specific.
>
> Otis
> --
> Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> Solr & Elasticsearch Support * http://sematext.com/
>
>
> On Tue, Dec 2, 2014 at 9:34 PM, YuanJia Li  wrote:
>
> > Hi David,
> > Just edit "kafka-server-start.sh", and add "export JMX_PORT=",it
> > will work.
> >
> >
> >
> >
> > Yuanjia
> >
> > From: David Montgomery
> > Date: 2014-12-03 04:47
> > To:users
> > Subject: Re: How to push metrics to graphite - jmxtrans does not work
> > Hi,
> >
> > I am seeing this in the logs and wondering what "jmx_port":-1 means?
> >
> > INFO conflict in /brokers/ids/29136 data: { "host":"104.111.111.111.",
> > "jmx_port":-1, "port":9092, "timestamp":"1417552817875", "version":1 }
> > stored data: { "host":"104.111.111", "jmx_port":-1, "port":9092,
> > "timestamp":"1417552738253", "version":1
> >
> > despite having these added
> >
> > echo 'KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote.port=
> > -Dcom.sun.management.jmxremote=true
> > -Dcom.sun.management.jmxremote.authenticate=false
> > -Dcom.sun.management.jmxremote.ssl=false"' | tee -a
> > /var/kafka/bin/kafka-run-class.sh
> > echo 'export JMX_PORT=${JMX_PORT:-}' | tee -a
> > /var/kafka/bin/kafka-server-start.sh
> >
> > Thanks
> >
> > On Tue, Dec 2, 2014 at 9:58 PM, Andrew Otto  wrote:
> >
> > > Maybe also set:
> > >
> > >  -Dcom.sun.management.jmxremote.port=
> > >
> > > ?
> > >
> > >
> > > > On Dec 2, 2014, at 02:59, David Montgomery <
> davidmontgom...@gmail.com>
> > > wrote:
> > > >
> > > > Hi,
> > > >
> > > > I am having a very difficult time trying to report kafka 8 metrics to
> > > > Graphite.  Nothing is listening on  and and no data in graphite.
> > If
> > > > this method of graphite reporting is know to not work is there an
> > > > alternative to jmxtrans to get data to graphite?
> > > >
> > > > I am using the deb file to install jmxtrans on ubuntu 12.04
> > > >
> > > > And I use the below to modify kafka scripts
> > > >
> > > > echo 'KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote=true
> > > > -Dcom.sun.management.jmxremote.authenticate=false
> > > > -Dcom.sun.management.jmxremote.ssl=false"' | tee -a
> > > > /var/kafka/bin/kafka-run-class.sh
> > > > echo 'export JMX_PORT=${JMX_PORT:-}' | tee -a
> > > > /var/kafka/bin/kafka-server-start.sh
> > > >
> > > > {
> > > >  "servers" : [ {
> > > >"host" : "127.0.0.1",
> > > >"port" : "",
> > > >"alias" : "<%=node.name%>",
> > > >"queries" : [
> > > > {
> > > > "obj" : "kafka:type=kafka.SocketServerStats",
> > > >  "resultAlias": "kafka.socketServerStats",
> > > >  "attr" : [ "AvgFetchRequestMs", "AvgProduceRequestMs",
> > > > "BytesReadPerSecond", "BytesWrittenPerSecond",
> > "FetchRequestsPerSecond",
> > > > "MaxFetchRequestMs", "MaxProduceRequestMs" , "NumFetchRequests" ,
> > > > "NumProduceRequests" , "ProduceRequestsPerSecond", "TotalBytesRead",
> > > > "TotalBytesWritten", "TotalFetchRequestMs", "TotalProduceRequestMs"
> ],
> > > > "outputWriters" : [ {
> > > >  "@class" :
> > > "com.googlecode.jmxtrans.model.output.GraphiteWriter",
> > > >  "settings" : {
> > > >"host" : "<%=@monitor_host%>",
> > > >"port" : "2003"
> > > >  }
> > > >} ]
> > > >  }
> > > >],
> > > >"numQueryThreads": "2"
> > > >  } ]
> > > > }
> > >
> > >
> >
>


Re: [DISCUSSION] adding the serializer api back to the new java producer

2014-12-02 Thread Jason Rosenberg
In our case, we use protocol buffers for all messages, and these have
simple serialization/deserialization builtin to the protobuf libraries
(e.g. MyProtobufMessage.toByteArray()).  Also, we often produce/consume
messages without conversion to/from protobuf Objects (e.g. in cases where
we are just forwarding messages on to other topics, or if we are consuming
directly to a binary blob store like hdfs).  There's a huge efficiency in
not over synthesizing new Objects.

Thus, it's nice to only deal with bytes directly in all messages, and keep
things simple.  Having the overhead of having to dummy in a default,
generically parameterized, no-op serializer (and the overhead of having
that extra no-op method call, seems unnecessary).

I'd suggest that maybe it could work seamlessly either way (which it
probably does now, for the case where no serializer is provided, but not
sure if it efficiently will elide the call to the no-op serializer after
JIT?)Alternatively, I do think it's important to preserve the
efficiency of sending raw bytes directly, so if necessary, maybe expose
both apis (one which explicitly bypasses any serialization).

Finally, I've wondered in the past about enabling some sort of streaming
serialization, whereby you hook up a producer to a long living stream
class, which could integrate compression in line, and allow more control of
the pipeline.  The stream would implement an iterator to get the next
serialized message, etc.  For me, something like this might be a reason to
have a serialization/deserialization abstraction built into the
producer/consumer api's.

But if I have a vote, I'd be in favor of keeping the api simple and have it
take bytes directly.

Jason

On Tue, Dec 2, 2014 at 9:50 PM, Jan Filipiak 
wrote:

> Hello Everyone,
>
> I would very much appreciate if someone could provide me a real world
> examplewhere it is more convenient to implement the serializers instead of
> just making sure to provide bytearrays.
>
> The code we came up with explicitly avoids the serializer api. I think it
> is common understanding that if you want to transport data you need to have
> it as a bytearray.
>
> If at all I personally would like to have a serializer interface that
> takes the same types as the producer
>
> public interface Serializer extends Configurable {
> public byte[] serializeKey(K data);
> public byte[] serializeValue(V data);
> public void close();
> }
>
> this would avoid long serialize implementations with branches like
> "switch(topic)" or "if(isKey)". Further serializer per topic makes more
> sense in my opinion. It feels natural to have a one to one relationship
> from types to topics or at least only a few partition per type. But as we
> inherit the type from the producer we would have to create many producers.
> This would create additional unnecessary connections to the brokers. With
> the serializers we create a one type to all topics relationship and the
> only type that satisfies that is the bytearray or Object. Am I missing
> something here? As said in the beginning I would like to that usecase that
> really benefits from using the serializers. I think in theory they sound
> great but they cause real practical issues that may lead users to wrong
> decisions.
>
> -1 for putting the serializers back in.
>
> Looking forward to replies that can show me the benefit of serializes and
> especially how the
> Type => topic relationship can be handled nicely.
>
> Best
> Jan
>
>
>
>
> On 25.11.2014 02:58, Jun Rao wrote:
>
>> Hi, Everyone,
>>
>> I'd like to start a discussion on whether it makes sense to add the
>> serializer api back to the new java producer. Currently, the new java
>> producer takes a byte array for both the key and the value. While this api
>> is simple, it pushes the serialization logic into the application. This
>> makes it hard to reason about what type of data is being sent to Kafka and
>> also makes it hard to share an implementation of the serializer. For
>> example, to support Avro, the serialization logic could be quite involved
>> since it might need to register the Avro schema in some remote registry
>> and
>> maintain a schema cache locally, etc. Without a serialization api, it's
>> impossible to share such an implementation so that people can easily
>> reuse.
>> We sort of overlooked this implication during the initial discussion of
>> the
>> producer api.
>>
>> So, I'd like to propose an api change to the new producer by adding back
>> the serializer api similar to what we had in the old producer. Specially,
>> the proposed api changes are the following.
>>
>> First, we change KafkaProducer to take generic types K and V for the key
>> and the value, respectively.
>>
>> public class KafkaProducer implements Producer {
>>
>>  public Future send(ProducerRecord record,
>> Callback
>> callback);
>>
>>  public Future send(ProducerRecord record);
>> }
>>
>> Second, we add two new configs, one for the key serializer and another f

Re: Re: How to push metrics to graphite - jmxtrans does not work

2014-12-02 Thread Otis Gospodnetic
Hi,

You can make use of this documentation aimed at JMX and monitoring:
https://sematext.atlassian.net/wiki/display/PUBSPM/SPM+Monitor+-+Standalone

There is a section about Kafka and the information is not SPM-specific.

Otis
--
Monitoring * Alerting * Anomaly Detection * Centralized Log Management
Solr & Elasticsearch Support * http://sematext.com/


On Tue, Dec 2, 2014 at 9:34 PM, YuanJia Li  wrote:

> Hi David,
> Just edit "kafka-server-start.sh", and add "export JMX_PORT=",it
> will work.
>
>
>
>
> Yuanjia
>
> From: David Montgomery
> Date: 2014-12-03 04:47
> To:users
> Subject: Re: How to push metrics to graphite - jmxtrans does not work
> Hi,
>
> I am seeing this in the logs and wondering what "jmx_port":-1 means?
>
> INFO conflict in /brokers/ids/29136 data: { "host":"104.111.111.111.",
> "jmx_port":-1, "port":9092, "timestamp":"1417552817875", "version":1 }
> stored data: { "host":"104.111.111", "jmx_port":-1, "port":9092,
> "timestamp":"1417552738253", "version":1
>
> despite having these added
>
> echo 'KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote.port=
> -Dcom.sun.management.jmxremote=true
> -Dcom.sun.management.jmxremote.authenticate=false
> -Dcom.sun.management.jmxremote.ssl=false"' | tee -a
> /var/kafka/bin/kafka-run-class.sh
> echo 'export JMX_PORT=${JMX_PORT:-}' | tee -a
> /var/kafka/bin/kafka-server-start.sh
>
> Thanks
>
> On Tue, Dec 2, 2014 at 9:58 PM, Andrew Otto  wrote:
>
> > Maybe also set:
> >
> >  -Dcom.sun.management.jmxremote.port=
> >
> > ?
> >
> >
> > > On Dec 2, 2014, at 02:59, David Montgomery 
> > wrote:
> > >
> > > Hi,
> > >
> > > I am having a very difficult time trying to report kafka 8 metrics to
> > > Graphite.  Nothing is listening on  and and no data in graphite.
> If
> > > this method of graphite reporting is know to not work is there an
> > > alternative to jmxtrans to get data to graphite?
> > >
> > > I am using the deb file to install jmxtrans on ubuntu 12.04
> > >
> > > And I use the below to modify kafka scripts
> > >
> > > echo 'KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote=true
> > > -Dcom.sun.management.jmxremote.authenticate=false
> > > -Dcom.sun.management.jmxremote.ssl=false"' | tee -a
> > > /var/kafka/bin/kafka-run-class.sh
> > > echo 'export JMX_PORT=${JMX_PORT:-}' | tee -a
> > > /var/kafka/bin/kafka-server-start.sh
> > >
> > > {
> > >  "servers" : [ {
> > >"host" : "127.0.0.1",
> > >"port" : "",
> > >"alias" : "<%=node.name%>",
> > >"queries" : [
> > > {
> > > "obj" : "kafka:type=kafka.SocketServerStats",
> > >  "resultAlias": "kafka.socketServerStats",
> > >  "attr" : [ "AvgFetchRequestMs", "AvgProduceRequestMs",
> > > "BytesReadPerSecond", "BytesWrittenPerSecond",
> "FetchRequestsPerSecond",
> > > "MaxFetchRequestMs", "MaxProduceRequestMs" , "NumFetchRequests" ,
> > > "NumProduceRequests" , "ProduceRequestsPerSecond", "TotalBytesRead",
> > > "TotalBytesWritten", "TotalFetchRequestMs", "TotalProduceRequestMs" ],
> > > "outputWriters" : [ {
> > >  "@class" :
> > "com.googlecode.jmxtrans.model.output.GraphiteWriter",
> > >  "settings" : {
> > >"host" : "<%=@monitor_host%>",
> > >"port" : "2003"
> > >  }
> > >} ]
> > >  }
> > >],
> > >"numQueryThreads": "2"
> > >  } ]
> > > }
> >
> >
>


Re: Re: Pagecache cause OffsetOutOfRangeException

2014-12-02 Thread Guozhang Wang
OffsetOutOfRangeException will be returned when the requested partition's
offset range is [a, b] and the requested offset is either < a or > b; the
offset range will be change whenever:

1. new messages appended to the log, which increments a;
2. old messages get cleaned based on the log retention (controlled by
log.retention.{minutes,hours} and log.retention.bytes).

The pagecache here should not play role as the [a, b] values are kept at
broker's memory and the exception will only be thrown based on these two
values, not the data on disk.

Guozhang

On Tue, Dec 2, 2014 at 6:30 PM, YuanJia Li  wrote:

> Hi Guozhang,
> My kafka works in product environment, a large messages are produced
> or consumed. So it is not easy to get the accurate offset through the
> GetOffset tool when an OffsetOutOfRangeException happens.But in my
> application, I have coded comparing the consuming offset with the latest
> offset got by SimpleConsumer.getOffsetsBefore API.
> I don't quite understand the kernel pagecache, so I wonder the delay
> between writting and reading.
>
> Thanks,
> Yuanjia
>
>
>
>
>
> From: Guozhang Wang
> Date: 2014-12-03 03:23
> To: users@kafka.apache.org
> Subject: Re: Pagecache cause OffsetOutOfRangeException
> Yuanjia,
>
> I am not sure that pagecache can be the cause of this, could you attach
> your full stack trace and use the GetOffset tool Manikumar mentioned to
> make sure the offset does exist in the broker?
>
> Guozhang
>
> On Tue, Dec 2, 2014 at 7:50 AM, Manikumar Reddy 
> wrote:
>
> > You can check the latest/earliest offsets of a given topic by running
> > GetOffsetShell.
> >
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/System+Tools#SystemTools-GetOffsetShell
> >
> > On Tue, Dec 2, 2014 at 2:05 PM, yuanjia8947  wrote:
> >
> > > Hi all,
> > > I'm using kafka 0.8.0 release now. And I often encounter the problem
> > > OffsetOutOfRangeException when cosuming message by simple consumer API.
> > > But I'm sure that the consuming offset is smaller than the latest
> offset
> > > got from OffsetRequest.
> > > Can it be caused by that new messages are wrote to kernel's pagecache
> and
> > > not flush to the file yet,
> > > while I'm consuming new messages from the file?
> > > How fix it?
> > >
> > > Thanks,
> > > liyuanjia
> > >
> > >
> > >
> > >
> > >
> > > liyuanjia
> >
>
>
>
> --
> -- Guozhang
>



-- 
-- Guozhang


Re: [DISCUSSION] adding the serializer api back to the new java producer

2014-12-02 Thread Jan Filipiak

Hello Everyone,

I would very much appreciate if someone could provide me a real world 
examplewhere it is more convenient to implement the serializers instead 
of just making sure to provide bytearrays.


The code we came up with explicitly avoids the serializer api. I think 
it is common understanding that if you want to transport data you need 
to have it as a bytearray.


If at all I personally would like to have a serializer interface that 
takes the same types as the producer


public interface Serializer extends Configurable {
public byte[] serializeKey(K data);
public byte[] serializeValue(V data);
public void close();
}

this would avoid long serialize implementations with branches like 
"switch(topic)" or "if(isKey)". Further serializer per topic makes more 
sense in my opinion. It feels natural to have a one to one relationship 
from types to topics or at least only a few partition per type. But as 
we inherit the type from the producer we would have to create many 
producers. This would create additional unnecessary connections to the 
brokers. With the serializers we create a one type to all topics 
relationship and the only type that satisfies that is the bytearray or 
Object. Am I missing something here? As said in the beginning I would 
like to that usecase that really benefits from using the serializers. I 
think in theory they sound great but they cause real practical issues 
that may lead users to wrong decisions.


-1 for putting the serializers back in.

Looking forward to replies that can show me the benefit of serializes 
and especially how the

Type => topic relationship can be handled nicely.

Best
Jan



On 25.11.2014 02:58, Jun Rao wrote:

Hi, Everyone,

I'd like to start a discussion on whether it makes sense to add the
serializer api back to the new java producer. Currently, the new java
producer takes a byte array for both the key and the value. While this api
is simple, it pushes the serialization logic into the application. This
makes it hard to reason about what type of data is being sent to Kafka and
also makes it hard to share an implementation of the serializer. For
example, to support Avro, the serialization logic could be quite involved
since it might need to register the Avro schema in some remote registry and
maintain a schema cache locally, etc. Without a serialization api, it's
impossible to share such an implementation so that people can easily reuse.
We sort of overlooked this implication during the initial discussion of the
producer api.

So, I'd like to propose an api change to the new producer by adding back
the serializer api similar to what we had in the old producer. Specially,
the proposed api changes are the following.

First, we change KafkaProducer to take generic types K and V for the key
and the value, respectively.

public class KafkaProducer implements Producer {

 public Future send(ProducerRecord record, Callback
callback);

 public Future send(ProducerRecord record);
}

Second, we add two new configs, one for the key serializer and another for
the value serializer. Both serializers will default to the byte array
implementation.

public class ProducerConfig extends AbstractConfig {

 .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS,
"org.apache.kafka.clients.producer.ByteArraySerializer", Importance.HIGH,
KEY_SERIALIZER_CLASS_DOC)
 .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS,
"org.apache.kafka.clients.producer.ByteArraySerializer", Importance.HIGH,
VALUE_SERIALIZER_CLASS_DOC);
}

Both serializers will implement the following interface.

public interface Serializer extends Configurable {
 public byte[] serialize(String topic, T data, boolean isKey);

 public void close();
}

This is more or less the same as what's in the old producer. The slight
differences are (1) the serializer now only requires a parameter-less
constructor; (2) the serializer has a configure() and a close() method for
initialization and cleanup, respectively; (3) the serialize() method
additionally takes the topic and an isKey indicator, both of which are
useful for things like schema registration.

The detailed changes are included in KAFKA-1797. For completeness, I also
made the corresponding changes for the new java consumer api as well.

Note that the proposed api changes are incompatible with what's in the
0.8.2 branch. However, if those api changes are beneficial, it's probably
better to include them now in the 0.8.2 release, rather than later.

I'd like to discuss mainly two things in this thread.
1. Do people feel that the proposed api changes are reasonable?
2. Are there any concerns of including the api changes in the 0.8.2 final
release?

Thanks,

Jun





Re: Re: How to push metrics to graphite - jmxtrans does not work

2014-12-02 Thread YuanJia Li
Hi David,
Just edit "kafka-server-start.sh", and add "export JMX_PORT=",it will 
work.




Yuanjia

From: David Montgomery
Date: 2014-12-03 04:47
To:users
Subject: Re: How to push metrics to graphite - jmxtrans does not work
Hi,

I am seeing this in the logs and wondering what "jmx_port":-1 means?

INFO conflict in /brokers/ids/29136 data: { "host":"104.111.111.111.",
"jmx_port":-1, "port":9092, "timestamp":"1417552817875", "version":1 }
stored data: { "host":"104.111.111", "jmx_port":-1, "port":9092,
"timestamp":"1417552738253", "version":1

despite having these added

echo 'KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote.port=
-Dcom.sun.management.jmxremote=true
-Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.ssl=false"' | tee -a
/var/kafka/bin/kafka-run-class.sh
echo 'export JMX_PORT=${JMX_PORT:-}' | tee -a
/var/kafka/bin/kafka-server-start.sh

Thanks

On Tue, Dec 2, 2014 at 9:58 PM, Andrew Otto  wrote:

> Maybe also set:
>
>  -Dcom.sun.management.jmxremote.port=
>
> ?
>
>
> > On Dec 2, 2014, at 02:59, David Montgomery 
> wrote:
> >
> > Hi,
> >
> > I am having a very difficult time trying to report kafka 8 metrics to
> > Graphite.  Nothing is listening on  and and no data in graphite.  If
> > this method of graphite reporting is know to not work is there an
> > alternative to jmxtrans to get data to graphite?
> >
> > I am using the deb file to install jmxtrans on ubuntu 12.04
> >
> > And I use the below to modify kafka scripts
> >
> > echo 'KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote=true
> > -Dcom.sun.management.jmxremote.authenticate=false
> > -Dcom.sun.management.jmxremote.ssl=false"' | tee -a
> > /var/kafka/bin/kafka-run-class.sh
> > echo 'export JMX_PORT=${JMX_PORT:-}' | tee -a
> > /var/kafka/bin/kafka-server-start.sh
> >
> > {
> >  "servers" : [ {
> >"host" : "127.0.0.1",
> >"port" : "",
> >"alias" : "<%=node.name%>",
> >"queries" : [
> > {
> > "obj" : "kafka:type=kafka.SocketServerStats",
> >  "resultAlias": "kafka.socketServerStats",
> >  "attr" : [ "AvgFetchRequestMs", "AvgProduceRequestMs",
> > "BytesReadPerSecond", "BytesWrittenPerSecond", "FetchRequestsPerSecond",
> > "MaxFetchRequestMs", "MaxProduceRequestMs" , "NumFetchRequests" ,
> > "NumProduceRequests" , "ProduceRequestsPerSecond", "TotalBytesRead",
> > "TotalBytesWritten", "TotalFetchRequestMs", "TotalProduceRequestMs" ],
> > "outputWriters" : [ {
> >  "@class" :
> "com.googlecode.jmxtrans.model.output.GraphiteWriter",
> >  "settings" : {
> >"host" : "<%=@monitor_host%>",
> >"port" : "2003"
> >  }
> >} ]
> >  }
> >],
> >"numQueryThreads": "2"
> >  } ]
> > }
>
>

Re: Re: Pagecache cause OffsetOutOfRangeException

2014-12-02 Thread YuanJia Li
Hi Guozhang,
My kafka works in product environment, a large messages are produced or 
consumed. So it is not easy to get the accurate offset through the GetOffset 
tool when an OffsetOutOfRangeException happens.But in my application, I have 
coded comparing the consuming offset with the latest offset got by 
SimpleConsumer.getOffsetsBefore API.
I don't quite understand the kernel pagecache, so I wonder the delay 
between writting and reading.

Thanks,
Yuanjia





From: Guozhang Wang
Date: 2014-12-03 03:23
To: users@kafka.apache.org
Subject: Re: Pagecache cause OffsetOutOfRangeException
Yuanjia,

I am not sure that pagecache can be the cause of this, could you attach
your full stack trace and use the GetOffset tool Manikumar mentioned to
make sure the offset does exist in the broker?

Guozhang

On Tue, Dec 2, 2014 at 7:50 AM, Manikumar Reddy 
wrote:

> You can check the latest/earliest offsets of a given topic by running
> GetOffsetShell.
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/System+Tools#SystemTools-GetOffsetShell
>
> On Tue, Dec 2, 2014 at 2:05 PM, yuanjia8947  wrote:
>
> > Hi all,
> > I'm using kafka 0.8.0 release now. And I often encounter the problem
> > OffsetOutOfRangeException when cosuming message by simple consumer API.
> > But I'm sure that the consuming offset is smaller than the latest offset
> > got from OffsetRequest.
> > Can it be caused by that new messages are wrote to kernel's pagecache and
> > not flush to the file yet,
> > while I'm consuming new messages from the file?
> > How fix it?
> >
> > Thanks,
> > liyuanjia
> >
> >
> >
> >
> >
> > liyuanjia
>



-- 
-- Guozhang

Re: [DISCUSSION] adding the serializer api back to the new java producer

2014-12-02 Thread Rajiv Kurian
Yeah I am kind of sad about that :(. I just mentioned it to show that there
are material use cases for applications where you expose the underlying
ByteBuffer (I know we were talking about byte arrays) instead of
serializing/deserializing objects -  performance is a big one.


On Tue, Dec 2, 2014 at 5:42 PM, Jun Rao  wrote:

> Rajiv,
>
> That's probably a very special use case. Note that even in the new consumer
> api w/o the generics, the client is only going to get the byte array back.
> So, you won't be able to take advantage of reusing the ByteBuffer in the
> underlying responses.
>
> Thanks,
>
> Jun
>
> On Tue, Dec 2, 2014 at 5:26 PM, Rajiv Kurian  wrote:
>
> > I for one use the consumer (Simple Consumer) without any
> deserialization. I
> > just take the ByteBuffer wrap it a preallocated flyweight and use it
> > without creating any objects. I'd ideally not have to wrap this logic in
> a
> > deserializer interface. For every one who does do this, it seems like a
> > very small step.
> >
> > On Tue, Dec 2, 2014 at 5:12 PM, Joel Koshy  wrote:
> >
> > > > For (1), yes, but it's easier to make a config change than a code
> > change.
> > > > If you are using a third party library, one may not be able to make
> any
> > > > code change.
> > >
> > > Doesn't that assume that all organizations have to already share the
> > > same underlying specific data type definition (e.g.,
> > > UniversalAvroRecord). If not, then wouldn't they have to anyway make a
> > > code change anyway to use the shared definition (since that is
> > > required in the parameterized type of the producerrecord and
> > > producer)?  And if they have already made the change to use the said
> > > shared definition then you could just as well have the serializer of
> > > UniversalAvroRecord configured in your application config and have
> > > that replaced if you wish by some other implementation of a serializer
> > > of UniversalAvroRecord (again via config).
> > >
> > > > For (2), it's just that if most consumers always do deserialization
> > after
> > > > getting the raw bytes, perhaps it would be better to have these two
> > steps
> > > > integrated.
> > >
> > > True, but it is just a marginal and very obvious step that shouldn't
> > > surprise any user.
> > >
> > > Thanks,
> > >
> > > Joel
> > >
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > > On Tue, Dec 2, 2014 at 2:05 PM, Joel Koshy 
> > wrote:
> > > >
> > > > > > The issue with a separate ser/deser library is that if it's not
> > part
> > > of
> > > > > the
> > > > > > client API, (1) users may not use it or (2) different users may
> use
> > > it in
> > > > > > different ways. For example, you can imagine that two Avro
> > > > > implementations
> > > > > > have different ways of instantiation (since it's not enforced by
> > the
> > > > > client
> > > > > > API). This makes sharing such kind of libraries harder.
> > > > >
> > > > > That is true - but that is also the point I think and it seems
> > > > > irrelevant to whether it is built-in to the producer's config or
> > > > > plugged in outside at the application-level. i.e., users will not
> use
> > > > > a common implementation if it does not fit their requirements. If a
> > > > > well-designed, full-featured and correctly implemented
> avro-or-other
> > > > > serializer/deserializer is made available there is no reason why
> that
> > > > > cannot be shared by different applications.
> > > > >
> > > > > > As for reason about the data types, take an example of the
> consumer
> > > > > > application. It needs to deal with objects at some point. So the
> > > earlier
> > > > > > that type information is revealed, the clearer it is to the
> > > application.
> > > > >
> > > > > Again for this, the only additional step is a call to deserialize.
> At
> > > > > some level the application _has_ to deal with the specific data
> type
> > > > > and it is thus reasonable to require that a consumed byte array
> needs
> > > > > to be deserialized to that type before being used.
> > > > >
> > > > > I suppose I don't see much benefit in pushing this into the core
> API
> > > > > of the producer at the expense of making these changes to the API.
> > At
> > > > > the same time, I should be clear that I don't think the proposal is
> > in
> > > > > any way unreasonable which is why I'm definitely not opposed to it,
> > > > > but I'm also not convinced that it is necessary.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Joel
> > > > >
> > > > > >
> > > > > > On Tue, Dec 2, 2014 at 10:06 AM, Joel Koshy  >
> > > wrote:
> > > > > >
> > > > > > > Re: pushing complexity of dealing with objects: we're talking
> > about
> > > > > > > just a call to a serialize method to convert the object to a
> byte
> > > > > > > array right? Or is there more to it? (To me) that seems less
> > > > > > > cumbersome than having to interact with parameterized types.
> > > Actually,
> > > > > > > can you explain more clearly what you mean by reason about
> > what
> > >

Re: [DISCUSSION] adding the serializer api back to the new java producer

2014-12-02 Thread Jun Rao
Rajiv,

That's probably a very special use case. Note that even in the new consumer
api w/o the generics, the client is only going to get the byte array back.
So, you won't be able to take advantage of reusing the ByteBuffer in the
underlying responses.

Thanks,

Jun

On Tue, Dec 2, 2014 at 5:26 PM, Rajiv Kurian  wrote:

> I for one use the consumer (Simple Consumer) without any deserialization. I
> just take the ByteBuffer wrap it a preallocated flyweight and use it
> without creating any objects. I'd ideally not have to wrap this logic in a
> deserializer interface. For every one who does do this, it seems like a
> very small step.
>
> On Tue, Dec 2, 2014 at 5:12 PM, Joel Koshy  wrote:
>
> > > For (1), yes, but it's easier to make a config change than a code
> change.
> > > If you are using a third party library, one may not be able to make any
> > > code change.
> >
> > Doesn't that assume that all organizations have to already share the
> > same underlying specific data type definition (e.g.,
> > UniversalAvroRecord). If not, then wouldn't they have to anyway make a
> > code change anyway to use the shared definition (since that is
> > required in the parameterized type of the producerrecord and
> > producer)?  And if they have already made the change to use the said
> > shared definition then you could just as well have the serializer of
> > UniversalAvroRecord configured in your application config and have
> > that replaced if you wish by some other implementation of a serializer
> > of UniversalAvroRecord (again via config).
> >
> > > For (2), it's just that if most consumers always do deserialization
> after
> > > getting the raw bytes, perhaps it would be better to have these two
> steps
> > > integrated.
> >
> > True, but it is just a marginal and very obvious step that shouldn't
> > surprise any user.
> >
> > Thanks,
> >
> > Joel
> >
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Tue, Dec 2, 2014 at 2:05 PM, Joel Koshy 
> wrote:
> > >
> > > > > The issue with a separate ser/deser library is that if it's not
> part
> > of
> > > > the
> > > > > client API, (1) users may not use it or (2) different users may use
> > it in
> > > > > different ways. For example, you can imagine that two Avro
> > > > implementations
> > > > > have different ways of instantiation (since it's not enforced by
> the
> > > > client
> > > > > API). This makes sharing such kind of libraries harder.
> > > >
> > > > That is true - but that is also the point I think and it seems
> > > > irrelevant to whether it is built-in to the producer's config or
> > > > plugged in outside at the application-level. i.e., users will not use
> > > > a common implementation if it does not fit their requirements. If a
> > > > well-designed, full-featured and correctly implemented avro-or-other
> > > > serializer/deserializer is made available there is no reason why that
> > > > cannot be shared by different applications.
> > > >
> > > > > As for reason about the data types, take an example of the consumer
> > > > > application. It needs to deal with objects at some point. So the
> > earlier
> > > > > that type information is revealed, the clearer it is to the
> > application.
> > > >
> > > > Again for this, the only additional step is a call to deserialize. At
> > > > some level the application _has_ to deal with the specific data type
> > > > and it is thus reasonable to require that a consumed byte array needs
> > > > to be deserialized to that type before being used.
> > > >
> > > > I suppose I don't see much benefit in pushing this into the core API
> > > > of the producer at the expense of making these changes to the API.
> At
> > > > the same time, I should be clear that I don't think the proposal is
> in
> > > > any way unreasonable which is why I'm definitely not opposed to it,
> > > > but I'm also not convinced that it is necessary.
> > > >
> > > > Thanks,
> > > >
> > > > Joel
> > > >
> > > > >
> > > > > On Tue, Dec 2, 2014 at 10:06 AM, Joel Koshy 
> > wrote:
> > > > >
> > > > > > Re: pushing complexity of dealing with objects: we're talking
> about
> > > > > > just a call to a serialize method to convert the object to a byte
> > > > > > array right? Or is there more to it? (To me) that seems less
> > > > > > cumbersome than having to interact with parameterized types.
> > Actually,
> > > > > > can you explain more clearly what you mean by reason about
> what
> > > > > > type of data is being sent in your original email? I have
> some
> > > > > > notion of what that means but it is a bit vague and you might
> have
> > > > > > meant something else.
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Joel
> > > > > >
> > > > > > On Tue, Dec 02, 2014 at 09:15:19AM -0800, Jun Rao wrote:
> > > > > > > Joel,
> > > > > > >
> > > > > > > Thanks for the feedback.
> > > > > > >
> > > > > > > Yes, the raw bytes interface is simpler than the Generic api.
> > > > However, it
> > > > > > > just pushes the complexity of dealing with the objec

Re: [DISCUSSION] adding the serializer api back to the new java producer

2014-12-02 Thread Rajiv Kurian
I for one use the consumer (Simple Consumer) without any deserialization. I
just take the ByteBuffer wrap it a preallocated flyweight and use it
without creating any objects. I'd ideally not have to wrap this logic in a
deserializer interface. For every one who does do this, it seems like a
very small step.

On Tue, Dec 2, 2014 at 5:12 PM, Joel Koshy  wrote:

> > For (1), yes, but it's easier to make a config change than a code change.
> > If you are using a third party library, one may not be able to make any
> > code change.
>
> Doesn't that assume that all organizations have to already share the
> same underlying specific data type definition (e.g.,
> UniversalAvroRecord). If not, then wouldn't they have to anyway make a
> code change anyway to use the shared definition (since that is
> required in the parameterized type of the producerrecord and
> producer)?  And if they have already made the change to use the said
> shared definition then you could just as well have the serializer of
> UniversalAvroRecord configured in your application config and have
> that replaced if you wish by some other implementation of a serializer
> of UniversalAvroRecord (again via config).
>
> > For (2), it's just that if most consumers always do deserialization after
> > getting the raw bytes, perhaps it would be better to have these two steps
> > integrated.
>
> True, but it is just a marginal and very obvious step that shouldn't
> surprise any user.
>
> Thanks,
>
> Joel
>
> >
> > Thanks,
> >
> > Jun
> >
> > On Tue, Dec 2, 2014 at 2:05 PM, Joel Koshy  wrote:
> >
> > > > The issue with a separate ser/deser library is that if it's not part
> of
> > > the
> > > > client API, (1) users may not use it or (2) different users may use
> it in
> > > > different ways. For example, you can imagine that two Avro
> > > implementations
> > > > have different ways of instantiation (since it's not enforced by the
> > > client
> > > > API). This makes sharing such kind of libraries harder.
> > >
> > > That is true - but that is also the point I think and it seems
> > > irrelevant to whether it is built-in to the producer's config or
> > > plugged in outside at the application-level. i.e., users will not use
> > > a common implementation if it does not fit their requirements. If a
> > > well-designed, full-featured and correctly implemented avro-or-other
> > > serializer/deserializer is made available there is no reason why that
> > > cannot be shared by different applications.
> > >
> > > > As for reason about the data types, take an example of the consumer
> > > > application. It needs to deal with objects at some point. So the
> earlier
> > > > that type information is revealed, the clearer it is to the
> application.
> > >
> > > Again for this, the only additional step is a call to deserialize. At
> > > some level the application _has_ to deal with the specific data type
> > > and it is thus reasonable to require that a consumed byte array needs
> > > to be deserialized to that type before being used.
> > >
> > > I suppose I don't see much benefit in pushing this into the core API
> > > of the producer at the expense of making these changes to the API.  At
> > > the same time, I should be clear that I don't think the proposal is in
> > > any way unreasonable which is why I'm definitely not opposed to it,
> > > but I'm also not convinced that it is necessary.
> > >
> > > Thanks,
> > >
> > > Joel
> > >
> > > >
> > > > On Tue, Dec 2, 2014 at 10:06 AM, Joel Koshy 
> wrote:
> > > >
> > > > > Re: pushing complexity of dealing with objects: we're talking about
> > > > > just a call to a serialize method to convert the object to a byte
> > > > > array right? Or is there more to it? (To me) that seems less
> > > > > cumbersome than having to interact with parameterized types.
> Actually,
> > > > > can you explain more clearly what you mean by reason about what
> > > > > type of data is being sent in your original email? I have some
> > > > > notion of what that means but it is a bit vague and you might have
> > > > > meant something else.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Joel
> > > > >
> > > > > On Tue, Dec 02, 2014 at 09:15:19AM -0800, Jun Rao wrote:
> > > > > > Joel,
> > > > > >
> > > > > > Thanks for the feedback.
> > > > > >
> > > > > > Yes, the raw bytes interface is simpler than the Generic api.
> > > However, it
> > > > > > just pushes the complexity of dealing with the objects to the
> > > > > application.
> > > > > > We also thought about the layered approach. However, this may
> > > confuse the
> > > > > > users since there is no single entry point and it's not clear
> which
> > > > > layer a
> > > > > > user should be using.
> > > > > >
> > > > > > Jun
> > > > > >
> > > > > >
> > > > > > On Tue, Dec 2, 2014 at 12:34 AM, Joel Koshy  >
> > > wrote:
> > > > > >
> > > > > > > > makes it hard to reason about what type of data is being
> sent to
> > > > > Kafka
> > > > > > > and
> > > > > > > > also makes it hard to share an

Re: How many messages does each broker have?

2014-12-02 Thread Guozhang Wang
Palur,

First you need to make sure the message is received at Kafka:

message.max.bytes

controls the maximum size of a message that can be accepted, and

fetch.message.max.bytes

controls the maximum number of bytes a consumer issues in one fetch.


Guozhang


On Mon, Dec 1, 2014 at 7:25 PM, Palur Sandeep  wrote:

> Thank you so much Jiangle. I got it working.
>
> I have another problem the consumer doesnt receive message if it is big:
> When the producer sends 256kb messages to broker, consumer is able to
> retrieve it, but when producer sends 10MB messages to the broker, the
> consumer doesn’t receive any message.
>
> Please tell me how to make the consumer receive 10MB messages.
>
> On Mon, Dec 1, 2014 at 10:24 AM, Jiangjie Qin 
> wrote:
>
> > I think you are printing the class Message instead of MessageAndMetadata.
> > The output you got was from Message.toString.
> >
> > Can you just try something like below?
> >
> > ...
> > ConsumeIterator iter = consumerStream.iterator(); // assuming you have
> got
> > a consumer stream.
> > MessageAndMetadata messageAndMetadta = iter.next();
> > System.out.println(“topic: “ + messageAndMeatadata.topic() + ”partition:
> “
> > + messageAndMetadata.partition());
> >
> >
> > Jiangjie (Becket) Qin
> >
> > On 11/26/14, 12:56 PM, "Palur Sandeep"  wrote:
> >
> > >Hi Jiangle,
> > >
> > >
> > >Thanks for the information. This is what I get when I print
> > >MessageandMetadata
> > >
> > >*Thread 0: Message(magic = 0, attributes = 0, crc = 127991357, key =
> > >java.nio.HeapByteBuffer[pos=0 lim=1 cap=55], payload =
> > >java.nio.HeapByteBuffer[pos=0 lim=50 cap=50])*
> > >
> > >Can you please tell me where can I find partition number in this?
> > >
> > >
> > >
> > >On Wed, Nov 26, 2014 at 1:29 PM, Jiangjie Qin  >
> > >wrote:
> > >
> > >> Hi Sandeep,
> > >>
> > >> For old producer, I don’t think you can achieve strict even
> distribution
> > >> of messages across partitions within the same topic. But You can
> > >> potentially reduce the sticking time by setting
> > >> topic.metadata.refresh.interval.ms to be lower, e.g. 1 second.
> > >>
> > >> Kafka-544 added the partition information to MessageAndMetadata. And
> > >>that
> > >> is back to 11/15/12 so it should have been included in 0.8.1.1. Do you
> > >> mean the MessageAndMetadata you got does not partition member or
> > >> MessageAndMetadata.partition give you nothing?
> > >>
> > >> Jiangjie (Becket) Qin
> > >>
> > >> On 11/26/14, 10:31 AM, "Palur Sandeep"  wrote:
> > >>
> > >> >Hi Jiangjie,
> > >> >
> > >> >I am using the high level consumer (ZookeeperConsumerConnector),
> after
> > >> >getting the message from stream, but I don't see this
> > >>"message.Partition".
> > >> >Please help me how to get the partition id form message.
> > >> >
> > >> >What is that I can to do get messages evenly distributed among
> > >>partitions?
> > >> >do you mean that  it is not possible in 0.8.1.1 version?
> > >> >
> > >> >On Wed, Nov 26, 2014 at 12:03 PM, Jiangjie Qin
> > >> > >> >
> > >> >wrote:
> > >> >
> > >> >> Hi Sandeep,
> > >> >>
> > >> >> If you are sending messages to different topics, each topic will
> > >>stick
> > >> >>to
> > >> >> a random partition for 10 min. Since they are likely sticking to
> > >> >>different
> > >> >> brokers, you will still see messages roughly evenly distributed.
> > >> >> If you are using high level consumer (ZookeeperConsumerConnector),
> > >>after
> > >> >> getting the message from stream, you can simply call
> > >>message.Partition
> > >> >>to
> > >> >> get the partition id.
> > >> >>
> > >> >> Jiangjie (Becket) Qin
> > >> >>
> > >> >> On 11/25/14, 5:30 PM, "Palur Sandeep" 
> wrote:
> > >> >>
> > >> >> >Hi Jiangjie,
> > >> >> >
> > >> >> >This is what I have understood. Please correct me if I am wrong
> > >> >> >
> > >> >> >I don¹t use the partition class at all(KeyedMessage
> > >> >>data =
> > >> >> >new KeyedMessage(topic_name,new_mes). It
> partitions
> > >> >> >messages randomly to different partitions. I don¹t see it sticking
> > >>to
> > >> >>any
> > >> >> >broker for 10 mins. I guess it follows some random partitioning
> > >>logic.
> > >> >>I
> > >> >> >am
> > >> >> >using the following 0.8.1.1 version.
> > >> >> >
> > >> >> >MessageAndMetadata on consumer side prints the following message:
> > >>Can
> > >> >>you
> > >> >> >help me find out metadat regarding partition number?
> > >> >> >
> > >> >> >*Thread 0: Message(magic = 0, attributes = 0, crc = 127991357,
> key =
> > >> >> >java.nio.HeapByteBuffer[pos=0 lim=1 cap=55], payload =
> > >> >> >java.nio.HeapByteBuffer[pos=0 lim=50 cap=50])*
> > >> >> >
> > >> >> >Thanks
> > >> >> >Sandeep
> > >> >> >
> > >> >> >On Tue, Nov 25, 2014 at 7:07 PM, Jiangjie Qin
> > >> >>
> > >> >> >wrote:
> > >> >> >
> > >> >> >> Palur,
> > >> >> >>
> > >> >> >> Just adding to what Guozhang said, the answer to your question
> > >>might
> > >> >> >> depend on which producer you are using.
> > >> >> >> Assuming you are producing messages without keys to the same
> 

Re: [DISCUSSION] adding the serializer api back to the new java producer

2014-12-02 Thread Joel Koshy
> For (1), yes, but it's easier to make a config change than a code change.
> If you are using a third party library, one may not be able to make any
> code change.

Doesn't that assume that all organizations have to already share the
same underlying specific data type definition (e.g.,
UniversalAvroRecord). If not, then wouldn't they have to anyway make a
code change anyway to use the shared definition (since that is
required in the parameterized type of the producerrecord and
producer)?  And if they have already made the change to use the said
shared definition then you could just as well have the serializer of
UniversalAvroRecord configured in your application config and have
that replaced if you wish by some other implementation of a serializer
of UniversalAvroRecord (again via config).

> For (2), it's just that if most consumers always do deserialization after
> getting the raw bytes, perhaps it would be better to have these two steps
> integrated.

True, but it is just a marginal and very obvious step that shouldn't
surprise any user.

Thanks,

Joel

> 
> Thanks,
> 
> Jun
> 
> On Tue, Dec 2, 2014 at 2:05 PM, Joel Koshy  wrote:
> 
> > > The issue with a separate ser/deser library is that if it's not part of
> > the
> > > client API, (1) users may not use it or (2) different users may use it in
> > > different ways. For example, you can imagine that two Avro
> > implementations
> > > have different ways of instantiation (since it's not enforced by the
> > client
> > > API). This makes sharing such kind of libraries harder.
> >
> > That is true - but that is also the point I think and it seems
> > irrelevant to whether it is built-in to the producer's config or
> > plugged in outside at the application-level. i.e., users will not use
> > a common implementation if it does not fit their requirements. If a
> > well-designed, full-featured and correctly implemented avro-or-other
> > serializer/deserializer is made available there is no reason why that
> > cannot be shared by different applications.
> >
> > > As for reason about the data types, take an example of the consumer
> > > application. It needs to deal with objects at some point. So the earlier
> > > that type information is revealed, the clearer it is to the application.
> >
> > Again for this, the only additional step is a call to deserialize. At
> > some level the application _has_ to deal with the specific data type
> > and it is thus reasonable to require that a consumed byte array needs
> > to be deserialized to that type before being used.
> >
> > I suppose I don't see much benefit in pushing this into the core API
> > of the producer at the expense of making these changes to the API.  At
> > the same time, I should be clear that I don't think the proposal is in
> > any way unreasonable which is why I'm definitely not opposed to it,
> > but I'm also not convinced that it is necessary.
> >
> > Thanks,
> >
> > Joel
> >
> > >
> > > On Tue, Dec 2, 2014 at 10:06 AM, Joel Koshy  wrote:
> > >
> > > > Re: pushing complexity of dealing with objects: we're talking about
> > > > just a call to a serialize method to convert the object to a byte
> > > > array right? Or is there more to it? (To me) that seems less
> > > > cumbersome than having to interact with parameterized types. Actually,
> > > > can you explain more clearly what you mean by reason about what
> > > > type of data is being sent in your original email? I have some
> > > > notion of what that means but it is a bit vague and you might have
> > > > meant something else.
> > > >
> > > > Thanks,
> > > >
> > > > Joel
> > > >
> > > > On Tue, Dec 02, 2014 at 09:15:19AM -0800, Jun Rao wrote:
> > > > > Joel,
> > > > >
> > > > > Thanks for the feedback.
> > > > >
> > > > > Yes, the raw bytes interface is simpler than the Generic api.
> > However, it
> > > > > just pushes the complexity of dealing with the objects to the
> > > > application.
> > > > > We also thought about the layered approach. However, this may
> > confuse the
> > > > > users since there is no single entry point and it's not clear which
> > > > layer a
> > > > > user should be using.
> > > > >
> > > > > Jun
> > > > >
> > > > >
> > > > > On Tue, Dec 2, 2014 at 12:34 AM, Joel Koshy 
> > wrote:
> > > > >
> > > > > > > makes it hard to reason about what type of data is being sent to
> > > > Kafka
> > > > > > and
> > > > > > > also makes it hard to share an implementation of the serializer.
> > For
> > > > > > > example, to support Avro, the serialization logic could be quite
> > > > involved
> > > > > > > since it might need to register the Avro schema in some remote
> > > > registry
> > > > > > and
> > > > > > > maintain a schema cache locally, etc. Without a serialization
> > api,
> > > > it's
> > > > > > > impossible to share such an implementation so that people can
> > easily
> > > > > > reuse.
> > > > > > > We sort of overlooked this implication during the initial
> > discussion
> > > > of
> > > > > > the
> > > > > > > producer 

Re: Best practice for upgrading Kafka cluster from 0.8.1 to 0.8.1.1

2014-12-02 Thread Guozhang Wang
Yu,

Are you enabling message compression in 0.8.1 now? If you have already then
upgrading to 0.8.2 will not change its behavior.

Guozhang

On Tue, Dec 2, 2014 at 4:21 PM, Yu Yang  wrote:

> Hi Neha,
>
> Thanks for the reply!  We know that Kafka 0.8.2 will be released soon. If
> we want to upgrade to Kafka 0.8.2 and enable message compression, will we
> still be able do this in the same way, or we need to handle it differently?
>
> Thanks!
>
> Regards,
> -Yu
>
> On Tue, Dec 2, 2014 at 3:11 PM, Neha Narkhede 
> wrote:
>
> > Will doing one broker at
> > a time by brining the broker down, updating the code, and restarting it
> be
> > sufficient?
> >
> > Yes this should work for the upgrade.
> >
> > On Mon, Dec 1, 2014 at 10:23 PM, Yu Yang  wrote:
> >
> > > Hi,
> > >
> > > We have a kafka cluster that runs Kafka 0.8.1 that we are considering
> > > upgrade to 0.8.1.1. The Kafka documentation
> > >  mentions
> upgrading
> > > from 0.8 to 0.8.1, but not from 0.8.1 to 0.8.1.1.  Will doing one
> broker
> > at
> > > a time by brining the broker down, updating the code, and restarting it
> > be
> > > sufficient? Any best practice suggestions?
> > >
> > > Thanks!
> > >
> > > Regards,
> > > Yu
> > >
> >
>



-- 
-- Guozhang


Re: Best practice for upgrading Kafka cluster from 0.8.1 to 0.8.1.1

2014-12-02 Thread Yu Yang
Hi Neha,

Thanks for the reply!  We know that Kafka 0.8.2 will be released soon. If
we want to upgrade to Kafka 0.8.2 and enable message compression, will we
still be able do this in the same way, or we need to handle it differently?

Thanks!

Regards,
-Yu

On Tue, Dec 2, 2014 at 3:11 PM, Neha Narkhede 
wrote:

> Will doing one broker at
> a time by brining the broker down, updating the code, and restarting it be
> sufficient?
>
> Yes this should work for the upgrade.
>
> On Mon, Dec 1, 2014 at 10:23 PM, Yu Yang  wrote:
>
> > Hi,
> >
> > We have a kafka cluster that runs Kafka 0.8.1 that we are considering
> > upgrade to 0.8.1.1. The Kafka documentation
> >  mentions upgrading
> > from 0.8 to 0.8.1, but not from 0.8.1 to 0.8.1.1.  Will doing one broker
> at
> > a time by brining the broker down, updating the code, and restarting it
> be
> > sufficient? Any best practice suggestions?
> >
> > Thanks!
> >
> > Regards,
> > Yu
> >
>


Re: Failed partition reassignment

2014-12-02 Thread Karol Nowak
I don't have it reproduced in a sandbox environment, but it's already
happened twice on that cluster, so it's a safe bet to say it's reproducible
in that setup. Are there special metrics / events that I should capture to
make debugging this easier?


Thanks,
Karol

On Tue, Dec 2, 2014 at 11:20 PM, Jun Rao  wrote:

> Is there an easy way to reproduce the issues that you saw?
>
> Thanks,
>
> Jun
>
> On Mon, Dec 1, 2014 at 6:31 AM, Karol Nowak  wrote:
>
> > Hi,
> >
> > I observed some error messages / exceptions while running partition
> > reassignment on kafka 0.8.1.1 cluster. Being fairly new to this system
> I'm
> > not sure if these indicate serious failures or transient problems, or if
> > manual intervention is needed.
> >
> > I used kafka-reassign-partitions.sh to reassign partitions from brokers
> > {143,155,155,93} to {143,155,115,68} on a healthy (?) cluster. Right now
> > one partition has just two replicas in the ISR and a number of partitions
> > is left with 4 partitions in ISR even though replication factor is 3.
> Logs
> > show a few zookeeper timeouts, but there were no GC pauses anywhere near
> > the session timeout. Zookeeper itself seems healthy and not overloaded,
> > with exception of regular CPU spikes, probably related to snapshots.
> >
> > I cleaned the log lines a little bit for brevity.
> >
> > First example: https://gist.github.com/knowak/a682afc1545fdeb836a1
> > Second one with two similar stack traces:
> > https://gist.github.com/knowak/6398be433d869d8141e5
> > Third one, many many of these:
> > https://gist.github.com/knowak/e78301259b74841702ae
> > Fourth: https://gist.github.com/knowak/1fbde5ca90d8f1924141
> > Fifth:https://gist.github.com/knowak/57fdcb75b3dc7c626893
> >
> > Hints?
> >
> >
> > Thanks,
> > Karol
> >
>



-- 
pozdrawiam
Karol Nowak
http://knowak.wordpress.com


Re: Best practice for upgrading Kafka cluster from 0.8.1 to 0.8.1.1

2014-12-02 Thread Neha Narkhede
Will doing one broker at
a time by brining the broker down, updating the code, and restarting it be
sufficient?

Yes this should work for the upgrade.

On Mon, Dec 1, 2014 at 10:23 PM, Yu Yang  wrote:

> Hi,
>
> We have a kafka cluster that runs Kafka 0.8.1 that we are considering
> upgrade to 0.8.1.1. The Kafka documentation
>  mentions upgrading
> from 0.8 to 0.8.1, but not from 0.8.1 to 0.8.1.1.  Will doing one broker at
> a time by brining the broker down, updating the code, and restarting it be
> sufficient? Any best practice suggestions?
>
> Thanks!
>
> Regards,
> Yu
>


Re: Questions about new consumer API

2014-12-02 Thread Neha Narkhede
The offsets are keyed on  so if you have more than
one owner per partition, they will rewrite each other's offsets and lead to
incorrect state.

On Tue, Dec 2, 2014 at 2:32 PM, hsy...@gmail.com  wrote:

> Thanks Neha, another question, so if offsets are stored under group.id,
> dose it mean in one group, there should be at most one subscriber for each
> topic partition?
>
> Best,
> Siyuan
>
> On Tue, Dec 2, 2014 at 12:55 PM, Neha Narkhede 
> wrote:
>
> > 1. In this doc it says kafka consumer will automatically do load balance.
> > Is it based on throughtput or same as what we have now balance the
> > cardinality among all consumers in same ConsumerGroup? In a real case
> > different partitions could have different peak time.
> >
> > Load balancing is still based on # of partitions for the subscribed
> topics
> > and
> > ensuring that each partition has exactly one consumer as the owner.
> >
> > 2. In the API, threre is subscribe(partition...) method saying not using
> > group management, does it mean the group.id property will be discarded
> and
> > developer has full control of distributing partitions to consumers?
> >
> > group.id is also required for offset management, if the user chooses to
> > use
> > Kafka based offset management. The user will have full control over
> > distribution
> > of partitions to consumers.
> >
> > 3. Is new API compatible with old broker?
> >
> > Yes, it will.
> >
> > 4. Will simple consumer api and high-level consumer api still be
> supported?
> >
> > Over time, we will phase out the current high-level and simple consumer
> > since the
> > 0.9 API supports both.
> >
> > Thanks,
> > Neha
> >
> > On Tue, Dec 2, 2014 at 12:07 PM, hsy...@gmail.com 
> > wrote:
> >
> > > Hi guys,
> > >
> > > I'm interested in the new Consumer API.
> > > http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/
> > >
> > > I have couple of question.
> > > 1. In this doc it says kafka consumer will automatically do load
> balance.
> > > Is it based on throughtput or same as what we have now balance the
> > > cardinality among all consumers in same ConsumerGroup? In a real case
> > > different partitions could have different peak time.
> > > 2. In the API, threre is subscribe(partition...) method saying not
> using
> > > group management, does it mean the group.id property will be discarded
> > and
> > > developer has full control of distributing partitions to consumers?
> > > 3. Is new API compatible with old broker?
> > > 4. Will simple consumer api and high-level consumer api still be
> > supported?
> > >
> > > Thanks!
> > >
> > > Best,
> > > Siyuan
> > >
> >
>


Re: High level Consumer API doesnt receive 10MB messages?

2014-12-02 Thread Jiangjie Qin
Has the message successfully produced to broker? You might need to change
producer settings as well. Otherwise the message could have been dropped.

‹Jiangjie (Becket) Qin

On 12/1/14, 8:09 PM, "Palur Sandeep"  wrote:

>Yeah I did. I made the following changes to server.config:
>
>message.max.bytes=10485800
>replica.fetch.max.bytes=104858000
>replica.fetch.wait.max.ms=2000
>
>
>On Mon, Dec 1, 2014 at 10:03 PM, Harsha  wrote:
>
>> have you set fetch.message.max.bytes to 10mb or more in your consumer
>> config.
>> -Harsha
>>
>> On Mon, Dec 1, 2014, at 07:27 PM, Palur Sandeep wrote:
>> > Hi all,
>> >
>> > Consumer doesnt receive message if it is big:  When the producer sends
>> > 256kb messages to broker, consumer is able to retrieve it, but when
>> > producer sends 10MB messages to the broker, the consumer doesn¹t
>>receive
>> > any message.
>> >
>> > Please tell me how to make the consumer receive 10MB messages.
>> > --
>> > Regards,
>> > Sandeep Palur
>> > Data-Intensive Distributed Systems Laboratory, CS/IIT
>> > Department of Computer Science, Illinois Institute of Technology (IIT)
>> > Phone : 312-647-9833
>> > Email : psand...@hawk.iit.edu 
>>
>
>
>
>-- 
>Regards,
>Sandeep Palur
>Data-Intensive Distributed Systems Laboratory, CS/IIT
>Department of Computer Science, Illinois Institute of Technology (IIT)
>Phone : 312-647-9833
>Email : psand...@hawk.iit.edu 



Re: [DISCUSSION] adding the serializer api back to the new java producer

2014-12-02 Thread Jun Rao
Rajiv,

Yes, that's possible within an organization. However, if you want to share
that implementation with other organizations, they will have to make code
changes, instead of just a config change.

Thanks,

Jun

On Tue, Dec 2, 2014 at 1:06 PM, Rajiv Kurian  wrote:

> Why can't the organization package the Avro implementation with a kafka
> client and distribute that library though? The risk of different users
> supplying the kafka client with different serializer/deserializer
> implementations still exists.
>
> On Tue, Dec 2, 2014 at 12:11 PM, Jun Rao  wrote:
>
> > Joel, Rajiv, Thunder,
> >
> > The issue with a separate ser/deser library is that if it's not part of
> the
> > client API, (1) users may not use it or (2) different users may use it in
> > different ways. For example, you can imagine that two Avro
> implementations
> > have different ways of instantiation (since it's not enforced by the
> client
> > API). This makes sharing such kind of libraries harder.
> >
> > Joel,
> >
> > As for reason about the data types, take an example of the consumer
> > application. It needs to deal with objects at some point. So the earlier
> > that type information is revealed, the clearer it is to the application.
> > Since the consumer client is the entry point where an application gets
> the
> > data,  if the type is enforced there, it makes it clear to all down
> stream
> > consumers.
> >
> > Thanks,
> >
> > Jun
> >
> > On Tue, Dec 2, 2014 at 10:06 AM, Joel Koshy  wrote:
> >
> > > Re: pushing complexity of dealing with objects: we're talking about
> > > just a call to a serialize method to convert the object to a byte
> > > array right? Or is there more to it? (To me) that seems less
> > > cumbersome than having to interact with parameterized types. Actually,
> > > can you explain more clearly what you mean by reason about what
> > > type of data is being sent in your original email? I have some
> > > notion of what that means but it is a bit vague and you might have
> > > meant something else.
> > >
> > > Thanks,
> > >
> > > Joel
> > >
> > > On Tue, Dec 02, 2014 at 09:15:19AM -0800, Jun Rao wrote:
> > > > Joel,
> > > >
> > > > Thanks for the feedback.
> > > >
> > > > Yes, the raw bytes interface is simpler than the Generic api.
> However,
> > it
> > > > just pushes the complexity of dealing with the objects to the
> > > application.
> > > > We also thought about the layered approach. However, this may confuse
> > the
> > > > users since there is no single entry point and it's not clear which
> > > layer a
> > > > user should be using.
> > > >
> > > > Jun
> > > >
> > > >
> > > > On Tue, Dec 2, 2014 at 12:34 AM, Joel Koshy 
> > wrote:
> > > >
> > > > > > makes it hard to reason about what type of data is being sent to
> > > Kafka
> > > > > and
> > > > > > also makes it hard to share an implementation of the serializer.
> > For
> > > > > > example, to support Avro, the serialization logic could be quite
> > > involved
> > > > > > since it might need to register the Avro schema in some remote
> > > registry
> > > > > and
> > > > > > maintain a schema cache locally, etc. Without a serialization
> api,
> > > it's
> > > > > > impossible to share such an implementation so that people can
> > easily
> > > > > reuse.
> > > > > > We sort of overlooked this implication during the initial
> > discussion
> > > of
> > > > > the
> > > > > > producer api.
> > > > >
> > > > > Thanks for bringing this up and the patch.  My take on this is that
> > > > > any reasoning about the data itself is more appropriately handled
> > > > > outside of the core producer API. FWIW, I don't think this was
> > > > > _overlooked_ during the initial discussion of the producer API
> > > > > (especially since it was a significant change from the old
> producer).
> > > > > IIRC we believed at the time that there is elegance and flexibility
> > in
> > > > > a simple API that deals with raw bytes. I think it is more accurate
> > to
> > > > > say that this is a reversal of opinion for some (which is fine) but
> > > > > personally I'm still in the old camp :) i.e., I really like the
> > > > > simplicity of the current 0.8.2 producer API and find parameterized
> > > > > types/generics to be distracting and annoying; and IMO any
> > > > > data-specific handling is better absorbed at a higher-level than
> the
> > > > > core Kafka APIs - possibly by a (very thin) wrapper producer
> library.
> > > > > I don't quite see why it is difficult to share different wrapper
> > > > > implementations; or even ser-de libraries for that matter that
> people
> > > > > can invoke before sending to/reading from Kafka.
> > > > >
> > > > > That said I'm not opposed to the change - it's just that I prefer
> > > > > what's currently there. So I'm +0 on the proposal.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Joel
> > > > >
> > > > > On Mon, Nov 24, 2014 at 05:58:50PM -0800, Jun Rao wrote:
> > > > > > Hi, Everyone,
> > > > > >
> > > > > > I'd like to start a discussion on 

Re: [DISCUSSION] adding the serializer api back to the new java producer

2014-12-02 Thread Jun Rao
For (1), yes, but it's easier to make a config change than a code change.
If you are using a third party library, one may not be able to make any
code change.

For (2), it's just that if most consumers always do deserialization after
getting the raw bytes, perhaps it would be better to have these two steps
integrated.

Thanks,

Jun

On Tue, Dec 2, 2014 at 2:05 PM, Joel Koshy  wrote:

> > The issue with a separate ser/deser library is that if it's not part of
> the
> > client API, (1) users may not use it or (2) different users may use it in
> > different ways. For example, you can imagine that two Avro
> implementations
> > have different ways of instantiation (since it's not enforced by the
> client
> > API). This makes sharing such kind of libraries harder.
>
> That is true - but that is also the point I think and it seems
> irrelevant to whether it is built-in to the producer's config or
> plugged in outside at the application-level. i.e., users will not use
> a common implementation if it does not fit their requirements. If a
> well-designed, full-featured and correctly implemented avro-or-other
> serializer/deserializer is made available there is no reason why that
> cannot be shared by different applications.
>
> > As for reason about the data types, take an example of the consumer
> > application. It needs to deal with objects at some point. So the earlier
> > that type information is revealed, the clearer it is to the application.
>
> Again for this, the only additional step is a call to deserialize. At
> some level the application _has_ to deal with the specific data type
> and it is thus reasonable to require that a consumed byte array needs
> to be deserialized to that type before being used.
>
> I suppose I don't see much benefit in pushing this into the core API
> of the producer at the expense of making these changes to the API.  At
> the same time, I should be clear that I don't think the proposal is in
> any way unreasonable which is why I'm definitely not opposed to it,
> but I'm also not convinced that it is necessary.
>
> Thanks,
>
> Joel
>
> >
> > On Tue, Dec 2, 2014 at 10:06 AM, Joel Koshy  wrote:
> >
> > > Re: pushing complexity of dealing with objects: we're talking about
> > > just a call to a serialize method to convert the object to a byte
> > > array right? Or is there more to it? (To me) that seems less
> > > cumbersome than having to interact with parameterized types. Actually,
> > > can you explain more clearly what you mean by reason about what
> > > type of data is being sent in your original email? I have some
> > > notion of what that means but it is a bit vague and you might have
> > > meant something else.
> > >
> > > Thanks,
> > >
> > > Joel
> > >
> > > On Tue, Dec 02, 2014 at 09:15:19AM -0800, Jun Rao wrote:
> > > > Joel,
> > > >
> > > > Thanks for the feedback.
> > > >
> > > > Yes, the raw bytes interface is simpler than the Generic api.
> However, it
> > > > just pushes the complexity of dealing with the objects to the
> > > application.
> > > > We also thought about the layered approach. However, this may
> confuse the
> > > > users since there is no single entry point and it's not clear which
> > > layer a
> > > > user should be using.
> > > >
> > > > Jun
> > > >
> > > >
> > > > On Tue, Dec 2, 2014 at 12:34 AM, Joel Koshy 
> wrote:
> > > >
> > > > > > makes it hard to reason about what type of data is being sent to
> > > Kafka
> > > > > and
> > > > > > also makes it hard to share an implementation of the serializer.
> For
> > > > > > example, to support Avro, the serialization logic could be quite
> > > involved
> > > > > > since it might need to register the Avro schema in some remote
> > > registry
> > > > > and
> > > > > > maintain a schema cache locally, etc. Without a serialization
> api,
> > > it's
> > > > > > impossible to share such an implementation so that people can
> easily
> > > > > reuse.
> > > > > > We sort of overlooked this implication during the initial
> discussion
> > > of
> > > > > the
> > > > > > producer api.
> > > > >
> > > > > Thanks for bringing this up and the patch.  My take on this is that
> > > > > any reasoning about the data itself is more appropriately handled
> > > > > outside of the core producer API. FWIW, I don't think this was
> > > > > _overlooked_ during the initial discussion of the producer API
> > > > > (especially since it was a significant change from the old
> producer).
> > > > > IIRC we believed at the time that there is elegance and
> flexibility in
> > > > > a simple API that deals with raw bytes. I think it is more
> accurate to
> > > > > say that this is a reversal of opinion for some (which is fine) but
> > > > > personally I'm still in the old camp :) i.e., I really like the
> > > > > simplicity of the current 0.8.2 producer API and find parameterized
> > > > > types/generics to be distracting and annoying; and IMO any
> > > > > data-specific handling is better absorbed at a higher-level than
> the
> > > > > cor

Re: Partition reassignment reversed

2014-12-02 Thread Andrew Jorgensen
I am using kafka 0.8.
Yes I did run —verify, but got some weird output from it I had never seen 
before that looked something like:

Status of partition reassignment:
ERROR: Assigned replicas (5,2) don't match the list of replicas for 
reassignment (5) for partition [topic-1,248]
ERROR: Assigned replicas (7,3) don't match the list of replicas for 
reassignment (7) for partition [topic-2,228]

There were a large number of these but it seems to just be for topic-1, and 
topic-2. In this case I was migrating around 4 or 5 topics. These two are also 
the ones that got reversed when I bounced all the processes yesterday.

Here are some more logs that I found from that day that may help piece together 
what might have happened

[2014-11-19 16:56:52,938] ERROR [KafkaApi-1] Error when processing fetch 
request for partition [topic-2,317] offset 408324093 from follower with 
correlation id 2458 (kafka.server.KafkaApis)
kafka.common.OffsetOutOfRangeException: Request for offset 408324093 but we 
only have log segments in the range 409018400 to 425346400.
at kafka.log.Log.read(Log.scala:380)
at 
kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:530)
at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:476)
at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:471)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:17
at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:347)
at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:347)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:233)
at scala.collection.immutable.HashMap.map(HashMap.scala:3
at 
kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:471)
at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:437)
at kafka.server.KafkaApis.handle(KafkaApis.scala:186)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
at java.lang.Thread.run(Thread.java:724)

-

[2014-11-19 16:24:37,959] ERROR Conditional update of path 
/brokers/topics/topic-2/partitions/248/state with data 
{"controller_epoch":15,"leader":2,"version":1,"leader_epoch":1,"isr":[2,5]} and 
expected version 1 failed due to 
org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = 
BadVersion for /brokers/topics/topic-2/partitions/248/state 
(kafka.utils.ZkUtils$)



-- 
Andrew Jorgensen
@ajorgensen

On December 2, 2014 at 5:28:07 PM, Jun Rao (jun...@gmail.com) wrote:

Did you run the --verify option (  
http://kafka.apache.org/documentation.html#basic_ops_restarting) to check  
if the reassignment process completes? Also, what version of Kafka are you  
using?  

Thanks,  

Jun  

On Mon, Dec 1, 2014 at 7:16 PM, Andrew Jorgensen <  
ajorgen...@twitter.com.invalid> wrote:  

> I unfortunately do not have any specific logs from these events but I will  
> try and describe the events as accurately as possible to give an idea of  
> the problem I saw.  
>  
> The odd behavior manifested itself when I bounced all of the kafka  
> processes on each of the servers in a 12 node cluster. A few weeks prior I  
> did a partition reassignment to add four new kafka brokers to the cluster.  
> This cluster has 4 topics on it each with 350 partitions each, a retention  
> policy of 6 hours, and a replication factor of 1. Originally I attempted to  
> run a migration on all of the topics and partitions adding the 4 new nodes  
> using the partition reassignment tool. This seemed to cause a lot of  
> network congestion and according to the logs some of the nodes were having  
> trouble talking to each other. The network congestion lasted for the  
> duration of the migration and began to get better toward the end. After the  
> migration I confirmed that data was being stored and served from the new  
> brokers. Today I bounced each of the kafka processes on each of the brokers  
> to pick up a change made to the log4j properties. After bouncing one  
> processes I started seeing some strange errors on the four newer broker  
> nodes that looked like:  
>  
> kafka.common.NotAssignedReplicaException: Leader 10 failed to record  
> follower 7's position 0 for partition [topic-1,185] since the replica 7 is  
> not recognized to be one of the assigned replicas 10 for partition  
> [topic-2,185]  
>  
> and on the older kafka brokers the errors looked like:  
>  
> [2014-12-01 17:06:04,268] ERROR [ReplicaFetcherThread-0-12], Error for  
> partition [topic-1,175] to broker 12:class kafka.common.UnknownException  
> (kafka.server.ReplicaFetcherThread)  
>  
> I proceeded to bounce the rest of the kafka processes and after bouncing  
> the rest the errors seemed to stop. It wasn’t until a few hours

Re: Questions about new consumer API

2014-12-02 Thread hsy...@gmail.com
Thanks Neha, another question, so if offsets are stored under group.id,
dose it mean in one group, there should be at most one subscriber for each
topic partition?

Best,
Siyuan

On Tue, Dec 2, 2014 at 12:55 PM, Neha Narkhede 
wrote:

> 1. In this doc it says kafka consumer will automatically do load balance.
> Is it based on throughtput or same as what we have now balance the
> cardinality among all consumers in same ConsumerGroup? In a real case
> different partitions could have different peak time.
>
> Load balancing is still based on # of partitions for the subscribed topics
> and
> ensuring that each partition has exactly one consumer as the owner.
>
> 2. In the API, threre is subscribe(partition...) method saying not using
> group management, does it mean the group.id property will be discarded and
> developer has full control of distributing partitions to consumers?
>
> group.id is also required for offset management, if the user chooses to
> use
> Kafka based offset management. The user will have full control over
> distribution
> of partitions to consumers.
>
> 3. Is new API compatible with old broker?
>
> Yes, it will.
>
> 4. Will simple consumer api and high-level consumer api still be supported?
>
> Over time, we will phase out the current high-level and simple consumer
> since the
> 0.9 API supports both.
>
> Thanks,
> Neha
>
> On Tue, Dec 2, 2014 at 12:07 PM, hsy...@gmail.com 
> wrote:
>
> > Hi guys,
> >
> > I'm interested in the new Consumer API.
> > http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/
> >
> > I have couple of question.
> > 1. In this doc it says kafka consumer will automatically do load balance.
> > Is it based on throughtput or same as what we have now balance the
> > cardinality among all consumers in same ConsumerGroup? In a real case
> > different partitions could have different peak time.
> > 2. In the API, threre is subscribe(partition...) method saying not using
> > group management, does it mean the group.id property will be discarded
> and
> > developer has full control of distributing partitions to consumers?
> > 3. Is new API compatible with old broker?
> > 4. Will simple consumer api and high-level consumer api still be
> supported?
> >
> > Thanks!
> >
> > Best,
> > Siyuan
> >
>


Re: Partition reassignment reversed

2014-12-02 Thread Jun Rao
Did you run the --verify option (
http://kafka.apache.org/documentation.html#basic_ops_restarting) to check
if the reassignment process completes? Also, what version of Kafka are you
using?

Thanks,

Jun

On Mon, Dec 1, 2014 at 7:16 PM, Andrew Jorgensen <
ajorgen...@twitter.com.invalid> wrote:

> I unfortunately do not have any specific logs from these events but I will
> try and describe the events as accurately as possible to give an idea of
> the problem I saw.
>
> The odd behavior manifested itself when I bounced all of the kafka
> processes on each of the servers in a 12 node cluster. A few weeks prior I
> did a partition reassignment to add four new kafka brokers to the cluster.
> This cluster has 4 topics on it each with 350 partitions each, a retention
> policy of 6 hours, and a replication factor of 1. Originally I attempted to
> run a migration on all of the topics and partitions adding the 4 new nodes
> using the partition reassignment tool. This seemed to cause a lot of
> network congestion and according to the logs some of the nodes were having
> trouble talking to each other. The network congestion lasted for the
> duration of the migration and began to get better toward the end. After the
> migration I confirmed that data was being stored and served from the new
> brokers. Today I bounced each of the kafka processes on each of the brokers
> to pick up a change made to the log4j properties. After bouncing one
> processes I started seeing some strange errors on the four newer broker
> nodes that looked like:
>
> kafka.common.NotAssignedReplicaException: Leader 10 failed to record
> follower 7's position 0 for partition [topic-1,185] since the replica 7 is
> not recognized to be one of the assigned replicas 10 for partition
> [topic-2,185]
>
> and on the older kafka brokers the errors looked like:
>
> [2014-12-01 17:06:04,268] ERROR [ReplicaFetcherThread-0-12], Error for
> partition [topic-1,175] to broker 12:class kafka.common.UnknownException
> (kafka.server.ReplicaFetcherThread)
>
> I proceeded to bounce the rest of the kafka processes and after bouncing
> the rest the errors seemed to stop. It wasn’t until a few hours later I
> noticed that the amount of data stored on the 4 new kafka brokers had
> dropped off significantly. When I ran a describe for the topics in the
> errors it was clear that the assigned partitions had been reverted to a
> state prior to the original migration to add the 4 new brokers. I am unsure
> of why bouncing the kafka process would cause the state in zookeeper to get
> overwritten given that it had seemed to have been working for the last few
> weeks until the process was restarted. My hunch is that the controller
> keeps some state about the world pre-reassignment and removes that state
> after it detects that the reassignment happened successfully. In this case
> the network congestion on each of the brokers caused the controller not to
> get notified when all the reassignments were completed and thus kept the
> pre-assignement state around. When the process was bounced it read from
> zookeeper to get this state and reverted the existing scheme to the
> pre-assignment state. Has this behavior been observed before? Does this
> sound like a logical understanding of what happened in this case?
>
> --
> Andrew Jorgensen
> @ajorgensen


Re: Failed partition reassignment

2014-12-02 Thread Jun Rao
Is there an easy way to reproduce the issues that you saw?

Thanks,

Jun

On Mon, Dec 1, 2014 at 6:31 AM, Karol Nowak  wrote:

> Hi,
>
> I observed some error messages / exceptions while running partition
> reassignment on kafka 0.8.1.1 cluster. Being fairly new to this system I'm
> not sure if these indicate serious failures or transient problems, or if
> manual intervention is needed.
>
> I used kafka-reassign-partitions.sh to reassign partitions from brokers
> {143,155,155,93} to {143,155,115,68} on a healthy (?) cluster. Right now
> one partition has just two replicas in the ISR and a number of partitions
> is left with 4 partitions in ISR even though replication factor is 3. Logs
> show a few zookeeper timeouts, but there were no GC pauses anywhere near
> the session timeout. Zookeeper itself seems healthy and not overloaded,
> with exception of regular CPU spikes, probably related to snapshots.
>
> I cleaned the log lines a little bit for brevity.
>
> First example: https://gist.github.com/knowak/a682afc1545fdeb836a1
> Second one with two similar stack traces:
> https://gist.github.com/knowak/6398be433d869d8141e5
> Third one, many many of these:
> https://gist.github.com/knowak/e78301259b74841702ae
> Fourth: https://gist.github.com/knowak/1fbde5ca90d8f1924141
> Fifth:https://gist.github.com/knowak/57fdcb75b3dc7c626893
>
> Hints?
>
>
> Thanks,
> Karol
>


Re: [DISCUSSION] adding the serializer api back to the new java producer

2014-12-02 Thread Roger Hoover
"It also makes it possible to do validation on the server
side or make other tools that inspect or display messages (e.g. the various
command line tools) and do this in an easily pluggable way across tools."

I agree that it's valuable to have a standard way to plugin serialization
across many tools, especially for producers.  For example, the Kafka
producer might get wrapped by JRuby and exposed as a Logstash plugin
.  With a standard method for
plugging in serdes, one can reuse a serde with any tool that wraps the
standard producer API.  This won't be possible if we rely on custom
wrappers.

On Tue, Dec 2, 2014 at 1:49 PM, Jay Kreps  wrote:

> Yeah totally, far from preventing it, making it easy to specify/encourage a
> custom serializer across your org is exactly the kind of thing I was hoping
> to make work well. If there is a config that gives the serializer you can
> just default this to what you want people to use as some kind of
> environment default or just tell people to set the property. A person who
> wants to ignore this can, of course, but the easy thing to do will be to
> use an off-the-shelf serialization method.
>
> If you really want to enforce it, having an interface for serialization
> would also let us optionally check this on the server side (e.g. if you
> specify a serializer on the server side we validate that messages are in
> this format).
>
> If the api is just bytes of course you can make a serializer you want
> people to use, and you can send around an email asking people to use it,
> but the easy thing to do will remain "my string".getBytes() or whatever and
> lots of people will do that instead.
>
> Here the advantage of config is that (assuming your config system allows
> it) you should be able to have some kind of global environment default for
> these settings and easily grep across applications to determine what is in
> use.
>
> I think in all of this there is no hard and fast technical difference
> between these approaches, i.e. there is nothing you can do one way that is
> impossible the other way.
>
> But I do think that having a nice way to plug in serialization makes it
> much more straight-forward and intuitive to package these things up inside
> an organization. It also makes it possible to do validation on the server
> side or make other tools that inspect or display messages (e.g. the various
> command line tools) and do this in an easily pluggable way across tools.
>
> The concern I was expressing was that in the absence of support for
> serialization, what everyone will do is just make a wrapper api that
> handles these things (since no one can actually use the producer without
> serialization, and you will want to encourage use of the proper thing). The
> problem I have with wrapper apis is that they defeat common documentation
> and tend to made without as much thought as the primary api.
>
> The advantage of having serialization handled internally is that all you
> need to do is know the right config for your organization and any example
> usage remains the same.
>
> Hopefully that helps explain the rationale a little more.
>
> -Jay
>
> On Tue, Dec 2, 2014 at 11:53 AM, Joel Koshy  wrote:
>
> > Thanks for the follow-up Jay.  I still don't quite see the issue here
> > but maybe I just need to process this a bit more. To me "packaging up
> > the best practice and plug it in" seems to be to expose a simple
> > low-level API and give people the option to plug in a (possibly
> > shared) standard serializer in their application configs (or a custom
> > one if they choose) and invoke that from code. The additional
> > serialization call is a minor drawback but a very clear and easily
> > understood step that can be documented.  The serializer can obviously
> > also do other things such as schema registration. I'm actually not (or
> > at least I think I'm not) influenced very much by LinkedIn's wrapper.
> > It's just that I think it is reasonable to expect that in practice
> > most organizations (big and small) tend to have at least some specific
> > organization-specific detail that warrants a custom serializer anyway;
> > and it's going to be easier to override a serializer than an entire
> > producer API.
> >
> > Joel
> >
> > On Tue, Dec 02, 2014 at 11:09:55AM -0800, Jay Kreps wrote:
> > > Hey Joel, you are right, we discussed this, but I think we didn't think
> > > about it as deeply as we should have. I think our take was strongly
> > shaped
> > > by having a wrapper api at LinkedIn that DOES do the serialization
> > > transparently so I think you are thinking of the producer as just an
> > > implementation detail of that wrapper. Imagine a world where every
> > > application at LinkedIn had to figure that part out themselves. That
> is,
> > > imagine that what you guys supported was just the raw producer api and
> > that
> > > that just handled bytes. I think in that world the types of data you
> > would
> > > see wo

Re: [DISCUSSION] adding the serializer api back to the new java producer

2014-12-02 Thread Joel Koshy
> The issue with a separate ser/deser library is that if it's not part of the
> client API, (1) users may not use it or (2) different users may use it in
> different ways. For example, you can imagine that two Avro implementations
> have different ways of instantiation (since it's not enforced by the client
> API). This makes sharing such kind of libraries harder.

That is true - but that is also the point I think and it seems
irrelevant to whether it is built-in to the producer's config or
plugged in outside at the application-level. i.e., users will not use
a common implementation if it does not fit their requirements. If a
well-designed, full-featured and correctly implemented avro-or-other
serializer/deserializer is made available there is no reason why that
cannot be shared by different applications.

> As for reason about the data types, take an example of the consumer
> application. It needs to deal with objects at some point. So the earlier
> that type information is revealed, the clearer it is to the application.

Again for this, the only additional step is a call to deserialize. At
some level the application _has_ to deal with the specific data type
and it is thus reasonable to require that a consumed byte array needs
to be deserialized to that type before being used.

I suppose I don't see much benefit in pushing this into the core API
of the producer at the expense of making these changes to the API.  At
the same time, I should be clear that I don't think the proposal is in
any way unreasonable which is why I'm definitely not opposed to it,
but I'm also not convinced that it is necessary.

Thanks,

Joel

> 
> On Tue, Dec 2, 2014 at 10:06 AM, Joel Koshy  wrote:
> 
> > Re: pushing complexity of dealing with objects: we're talking about
> > just a call to a serialize method to convert the object to a byte
> > array right? Or is there more to it? (To me) that seems less
> > cumbersome than having to interact with parameterized types. Actually,
> > can you explain more clearly what you mean by reason about what
> > type of data is being sent in your original email? I have some
> > notion of what that means but it is a bit vague and you might have
> > meant something else.
> >
> > Thanks,
> >
> > Joel
> >
> > On Tue, Dec 02, 2014 at 09:15:19AM -0800, Jun Rao wrote:
> > > Joel,
> > >
> > > Thanks for the feedback.
> > >
> > > Yes, the raw bytes interface is simpler than the Generic api. However, it
> > > just pushes the complexity of dealing with the objects to the
> > application.
> > > We also thought about the layered approach. However, this may confuse the
> > > users since there is no single entry point and it's not clear which
> > layer a
> > > user should be using.
> > >
> > > Jun
> > >
> > >
> > > On Tue, Dec 2, 2014 at 12:34 AM, Joel Koshy  wrote:
> > >
> > > > > makes it hard to reason about what type of data is being sent to
> > Kafka
> > > > and
> > > > > also makes it hard to share an implementation of the serializer. For
> > > > > example, to support Avro, the serialization logic could be quite
> > involved
> > > > > since it might need to register the Avro schema in some remote
> > registry
> > > > and
> > > > > maintain a schema cache locally, etc. Without a serialization api,
> > it's
> > > > > impossible to share such an implementation so that people can easily
> > > > reuse.
> > > > > We sort of overlooked this implication during the initial discussion
> > of
> > > > the
> > > > > producer api.
> > > >
> > > > Thanks for bringing this up and the patch.  My take on this is that
> > > > any reasoning about the data itself is more appropriately handled
> > > > outside of the core producer API. FWIW, I don't think this was
> > > > _overlooked_ during the initial discussion of the producer API
> > > > (especially since it was a significant change from the old producer).
> > > > IIRC we believed at the time that there is elegance and flexibility in
> > > > a simple API that deals with raw bytes. I think it is more accurate to
> > > > say that this is a reversal of opinion for some (which is fine) but
> > > > personally I'm still in the old camp :) i.e., I really like the
> > > > simplicity of the current 0.8.2 producer API and find parameterized
> > > > types/generics to be distracting and annoying; and IMO any
> > > > data-specific handling is better absorbed at a higher-level than the
> > > > core Kafka APIs - possibly by a (very thin) wrapper producer library.
> > > > I don't quite see why it is difficult to share different wrapper
> > > > implementations; or even ser-de libraries for that matter that people
> > > > can invoke before sending to/reading from Kafka.
> > > >
> > > > That said I'm not opposed to the change - it's just that I prefer
> > > > what's currently there. So I'm +0 on the proposal.
> > > >
> > > > Thanks,
> > > >
> > > > Joel
> > > >
> > > > On Mon, Nov 24, 2014 at 05:58:50PM -0800, Jun Rao wrote:
> > > > > Hi, Everyone,
> > > > >
> > > > > I'd like to start a disc

Re: [DISCUSSION] adding the serializer api back to the new java producer

2014-12-02 Thread Jay Kreps
Yeah totally, far from preventing it, making it easy to specify/encourage a
custom serializer across your org is exactly the kind of thing I was hoping
to make work well. If there is a config that gives the serializer you can
just default this to what you want people to use as some kind of
environment default or just tell people to set the property. A person who
wants to ignore this can, of course, but the easy thing to do will be to
use an off-the-shelf serialization method.

If you really want to enforce it, having an interface for serialization
would also let us optionally check this on the server side (e.g. if you
specify a serializer on the server side we validate that messages are in
this format).

If the api is just bytes of course you can make a serializer you want
people to use, and you can send around an email asking people to use it,
but the easy thing to do will remain "my string".getBytes() or whatever and
lots of people will do that instead.

Here the advantage of config is that (assuming your config system allows
it) you should be able to have some kind of global environment default for
these settings and easily grep across applications to determine what is in
use.

I think in all of this there is no hard and fast technical difference
between these approaches, i.e. there is nothing you can do one way that is
impossible the other way.

But I do think that having a nice way to plug in serialization makes it
much more straight-forward and intuitive to package these things up inside
an organization. It also makes it possible to do validation on the server
side or make other tools that inspect or display messages (e.g. the various
command line tools) and do this in an easily pluggable way across tools.

The concern I was expressing was that in the absence of support for
serialization, what everyone will do is just make a wrapper api that
handles these things (since no one can actually use the producer without
serialization, and you will want to encourage use of the proper thing). The
problem I have with wrapper apis is that they defeat common documentation
and tend to made without as much thought as the primary api.

The advantage of having serialization handled internally is that all you
need to do is know the right config for your organization and any example
usage remains the same.

Hopefully that helps explain the rationale a little more.

-Jay

On Tue, Dec 2, 2014 at 11:53 AM, Joel Koshy  wrote:

> Thanks for the follow-up Jay.  I still don't quite see the issue here
> but maybe I just need to process this a bit more. To me "packaging up
> the best practice and plug it in" seems to be to expose a simple
> low-level API and give people the option to plug in a (possibly
> shared) standard serializer in their application configs (or a custom
> one if they choose) and invoke that from code. The additional
> serialization call is a minor drawback but a very clear and easily
> understood step that can be documented.  The serializer can obviously
> also do other things such as schema registration. I'm actually not (or
> at least I think I'm not) influenced very much by LinkedIn's wrapper.
> It's just that I think it is reasonable to expect that in practice
> most organizations (big and small) tend to have at least some specific
> organization-specific detail that warrants a custom serializer anyway;
> and it's going to be easier to override a serializer than an entire
> producer API.
>
> Joel
>
> On Tue, Dec 02, 2014 at 11:09:55AM -0800, Jay Kreps wrote:
> > Hey Joel, you are right, we discussed this, but I think we didn't think
> > about it as deeply as we should have. I think our take was strongly
> shaped
> > by having a wrapper api at LinkedIn that DOES do the serialization
> > transparently so I think you are thinking of the producer as just an
> > implementation detail of that wrapper. Imagine a world where every
> > application at LinkedIn had to figure that part out themselves. That is,
> > imagine that what you guys supported was just the raw producer api and
> that
> > that just handled bytes. I think in that world the types of data you
> would
> > see would be totally funky and standardizing correct usage would be a
> > massive pain.
> >
> > Conversely, you could imagine advocating the LinkedIn approach where you
> > just say, well, every org should wrap up the clients in a way that does
> > things like serialization and other data checks. The problem with that is
> > that it (1) it is kind of redundant work and it is likely that the
> wrapper
> > will goof some nuances of the apis, and (2) it makes documentation and
> code
> > sharing really hard. That is, rather than being able to go to a central
> > place and read how to use the producer, LinkedIn people need to document
> > the LinkedIn producer wrapper, and users at LinkedIn need to read about
> > LinkedIn's wrapper for the producer to understand how to use it. Now
> > imagine this multiplied over every user.
> >
> > The idea is tha

Re: [DISCUSSION] adding the serializer api back to the new java producer

2014-12-02 Thread Rajiv Kurian
Why can't the organization package the Avro implementation with a kafka
client and distribute that library though? The risk of different users
supplying the kafka client with different serializer/deserializer
implementations still exists.

On Tue, Dec 2, 2014 at 12:11 PM, Jun Rao  wrote:

> Joel, Rajiv, Thunder,
>
> The issue with a separate ser/deser library is that if it's not part of the
> client API, (1) users may not use it or (2) different users may use it in
> different ways. For example, you can imagine that two Avro implementations
> have different ways of instantiation (since it's not enforced by the client
> API). This makes sharing such kind of libraries harder.
>
> Joel,
>
> As for reason about the data types, take an example of the consumer
> application. It needs to deal with objects at some point. So the earlier
> that type information is revealed, the clearer it is to the application.
> Since the consumer client is the entry point where an application gets the
> data,  if the type is enforced there, it makes it clear to all down stream
> consumers.
>
> Thanks,
>
> Jun
>
> On Tue, Dec 2, 2014 at 10:06 AM, Joel Koshy  wrote:
>
> > Re: pushing complexity of dealing with objects: we're talking about
> > just a call to a serialize method to convert the object to a byte
> > array right? Or is there more to it? (To me) that seems less
> > cumbersome than having to interact with parameterized types. Actually,
> > can you explain more clearly what you mean by reason about what
> > type of data is being sent in your original email? I have some
> > notion of what that means but it is a bit vague and you might have
> > meant something else.
> >
> > Thanks,
> >
> > Joel
> >
> > On Tue, Dec 02, 2014 at 09:15:19AM -0800, Jun Rao wrote:
> > > Joel,
> > >
> > > Thanks for the feedback.
> > >
> > > Yes, the raw bytes interface is simpler than the Generic api. However,
> it
> > > just pushes the complexity of dealing with the objects to the
> > application.
> > > We also thought about the layered approach. However, this may confuse
> the
> > > users since there is no single entry point and it's not clear which
> > layer a
> > > user should be using.
> > >
> > > Jun
> > >
> > >
> > > On Tue, Dec 2, 2014 at 12:34 AM, Joel Koshy 
> wrote:
> > >
> > > > > makes it hard to reason about what type of data is being sent to
> > Kafka
> > > > and
> > > > > also makes it hard to share an implementation of the serializer.
> For
> > > > > example, to support Avro, the serialization logic could be quite
> > involved
> > > > > since it might need to register the Avro schema in some remote
> > registry
> > > > and
> > > > > maintain a schema cache locally, etc. Without a serialization api,
> > it's
> > > > > impossible to share such an implementation so that people can
> easily
> > > > reuse.
> > > > > We sort of overlooked this implication during the initial
> discussion
> > of
> > > > the
> > > > > producer api.
> > > >
> > > > Thanks for bringing this up and the patch.  My take on this is that
> > > > any reasoning about the data itself is more appropriately handled
> > > > outside of the core producer API. FWIW, I don't think this was
> > > > _overlooked_ during the initial discussion of the producer API
> > > > (especially since it was a significant change from the old producer).
> > > > IIRC we believed at the time that there is elegance and flexibility
> in
> > > > a simple API that deals with raw bytes. I think it is more accurate
> to
> > > > say that this is a reversal of opinion for some (which is fine) but
> > > > personally I'm still in the old camp :) i.e., I really like the
> > > > simplicity of the current 0.8.2 producer API and find parameterized
> > > > types/generics to be distracting and annoying; and IMO any
> > > > data-specific handling is better absorbed at a higher-level than the
> > > > core Kafka APIs - possibly by a (very thin) wrapper producer library.
> > > > I don't quite see why it is difficult to share different wrapper
> > > > implementations; or even ser-de libraries for that matter that people
> > > > can invoke before sending to/reading from Kafka.
> > > >
> > > > That said I'm not opposed to the change - it's just that I prefer
> > > > what's currently there. So I'm +0 on the proposal.
> > > >
> > > > Thanks,
> > > >
> > > > Joel
> > > >
> > > > On Mon, Nov 24, 2014 at 05:58:50PM -0800, Jun Rao wrote:
> > > > > Hi, Everyone,
> > > > >
> > > > > I'd like to start a discussion on whether it makes sense to add the
> > > > > serializer api back to the new java producer. Currently, the new
> java
> > > > > producer takes a byte array for both the key and the value. While
> > this
> > > > api
> > > > > is simple, it pushes the serialization logic into the application.
> > This
> > > > > makes it hard to reason about what type of data is being sent to
> > Kafka
> > > > and
> > > > > also makes it hard to share an implementation of the serializer.
> For
> > > > > example, to support Avro, the s

jvm processes to consume messages

2014-12-02 Thread S Ahmed
Hi,

I have a light load scenerio but I am starting off with kafka because I
like how the messages are durable etc.

If I have 4-5 topics, am I required to create the same # of consumers?  I
am assuming each consumer runs in a long-running jvm process correct?


Are there any consumer examples that use java, and also have the startup
scripts to start/stop the process on an ubuntu server?


Re: Questions about new consumer API

2014-12-02 Thread Neha Narkhede
1. In this doc it says kafka consumer will automatically do load balance.
Is it based on throughtput or same as what we have now balance the
cardinality among all consumers in same ConsumerGroup? In a real case
different partitions could have different peak time.

Load balancing is still based on # of partitions for the subscribed topics
and
ensuring that each partition has exactly one consumer as the owner.

2. In the API, threre is subscribe(partition...) method saying not using
group management, does it mean the group.id property will be discarded and
developer has full control of distributing partitions to consumers?

group.id is also required for offset management, if the user chooses to use
Kafka based offset management. The user will have full control over
distribution
of partitions to consumers.

3. Is new API compatible with old broker?

Yes, it will.

4. Will simple consumer api and high-level consumer api still be supported?

Over time, we will phase out the current high-level and simple consumer
since the
0.9 API supports both.

Thanks,
Neha

On Tue, Dec 2, 2014 at 12:07 PM, hsy...@gmail.com  wrote:

> Hi guys,
>
> I'm interested in the new Consumer API.
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/
>
> I have couple of question.
> 1. In this doc it says kafka consumer will automatically do load balance.
> Is it based on throughtput or same as what we have now balance the
> cardinality among all consumers in same ConsumerGroup? In a real case
> different partitions could have different peak time.
> 2. In the API, threre is subscribe(partition...) method saying not using
> group management, does it mean the group.id property will be discarded and
> developer has full control of distributing partitions to consumers?
> 3. Is new API compatible with old broker?
> 4. Will simple consumer api and high-level consumer api still be supported?
>
> Thanks!
>
> Best,
> Siyuan
>


Re: How to push metrics to graphite - jmxtrans does not work

2014-12-02 Thread David Montgomery
Hi,

I am seeing this in the logs and wondering what "jmx_port":-1 means?

INFO conflict in /brokers/ids/29136 data: { "host":"104.111.111.111.",
"jmx_port":-1, "port":9092, "timestamp":"1417552817875", "version":1 }
stored data: { "host":"104.111.111", "jmx_port":-1, "port":9092,
"timestamp":"1417552738253", "version":1

despite having these added

echo 'KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote.port=
-Dcom.sun.management.jmxremote=true
-Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.ssl=false"' | tee -a
/var/kafka/bin/kafka-run-class.sh
echo 'export JMX_PORT=${JMX_PORT:-}' | tee -a
/var/kafka/bin/kafka-server-start.sh

Thanks

On Tue, Dec 2, 2014 at 9:58 PM, Andrew Otto  wrote:

> Maybe also set:
>
>  -Dcom.sun.management.jmxremote.port=
>
> ?
>
>
> > On Dec 2, 2014, at 02:59, David Montgomery 
> wrote:
> >
> > Hi,
> >
> > I am having a very difficult time trying to report kafka 8 metrics to
> > Graphite.  Nothing is listening on  and and no data in graphite.  If
> > this method of graphite reporting is know to not work is there an
> > alternative to jmxtrans to get data to graphite?
> >
> > I am using the deb file to install jmxtrans on ubuntu 12.04
> >
> > And I use the below to modify kafka scripts
> >
> > echo 'KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote=true
> > -Dcom.sun.management.jmxremote.authenticate=false
> > -Dcom.sun.management.jmxremote.ssl=false"' | tee -a
> > /var/kafka/bin/kafka-run-class.sh
> > echo 'export JMX_PORT=${JMX_PORT:-}' | tee -a
> > /var/kafka/bin/kafka-server-start.sh
> >
> > {
> >  "servers" : [ {
> >"host" : "127.0.0.1",
> >"port" : "",
> >"alias" : "<%=node.name%>",
> >"queries" : [
> > {
> > "obj" : "kafka:type=kafka.SocketServerStats",
> >  "resultAlias": "kafka.socketServerStats",
> >  "attr" : [ "AvgFetchRequestMs", "AvgProduceRequestMs",
> > "BytesReadPerSecond", "BytesWrittenPerSecond", "FetchRequestsPerSecond",
> > "MaxFetchRequestMs", "MaxProduceRequestMs" , "NumFetchRequests" ,
> > "NumProduceRequests" , "ProduceRequestsPerSecond", "TotalBytesRead",
> > "TotalBytesWritten", "TotalFetchRequestMs", "TotalProduceRequestMs" ],
> > "outputWriters" : [ {
> >  "@class" :
> "com.googlecode.jmxtrans.model.output.GraphiteWriter",
> >  "settings" : {
> >"host" : "<%=@monitor_host%>",
> >"port" : "2003"
> >  }
> >} ]
> >  }
> >],
> >"numQueryThreads": "2"
> >  } ]
> > }
>
>


Re: [DISCUSSION] adding the serializer api back to the new java producer

2014-12-02 Thread Jun Rao
Joel, Rajiv, Thunder,

The issue with a separate ser/deser library is that if it's not part of the
client API, (1) users may not use it or (2) different users may use it in
different ways. For example, you can imagine that two Avro implementations
have different ways of instantiation (since it's not enforced by the client
API). This makes sharing such kind of libraries harder.

Joel,

As for reason about the data types, take an example of the consumer
application. It needs to deal with objects at some point. So the earlier
that type information is revealed, the clearer it is to the application.
Since the consumer client is the entry point where an application gets the
data,  if the type is enforced there, it makes it clear to all down stream
consumers.

Thanks,

Jun

On Tue, Dec 2, 2014 at 10:06 AM, Joel Koshy  wrote:

> Re: pushing complexity of dealing with objects: we're talking about
> just a call to a serialize method to convert the object to a byte
> array right? Or is there more to it? (To me) that seems less
> cumbersome than having to interact with parameterized types. Actually,
> can you explain more clearly what you mean by reason about what
> type of data is being sent in your original email? I have some
> notion of what that means but it is a bit vague and you might have
> meant something else.
>
> Thanks,
>
> Joel
>
> On Tue, Dec 02, 2014 at 09:15:19AM -0800, Jun Rao wrote:
> > Joel,
> >
> > Thanks for the feedback.
> >
> > Yes, the raw bytes interface is simpler than the Generic api. However, it
> > just pushes the complexity of dealing with the objects to the
> application.
> > We also thought about the layered approach. However, this may confuse the
> > users since there is no single entry point and it's not clear which
> layer a
> > user should be using.
> >
> > Jun
> >
> >
> > On Tue, Dec 2, 2014 at 12:34 AM, Joel Koshy  wrote:
> >
> > > > makes it hard to reason about what type of data is being sent to
> Kafka
> > > and
> > > > also makes it hard to share an implementation of the serializer. For
> > > > example, to support Avro, the serialization logic could be quite
> involved
> > > > since it might need to register the Avro schema in some remote
> registry
> > > and
> > > > maintain a schema cache locally, etc. Without a serialization api,
> it's
> > > > impossible to share such an implementation so that people can easily
> > > reuse.
> > > > We sort of overlooked this implication during the initial discussion
> of
> > > the
> > > > producer api.
> > >
> > > Thanks for bringing this up and the patch.  My take on this is that
> > > any reasoning about the data itself is more appropriately handled
> > > outside of the core producer API. FWIW, I don't think this was
> > > _overlooked_ during the initial discussion of the producer API
> > > (especially since it was a significant change from the old producer).
> > > IIRC we believed at the time that there is elegance and flexibility in
> > > a simple API that deals with raw bytes. I think it is more accurate to
> > > say that this is a reversal of opinion for some (which is fine) but
> > > personally I'm still in the old camp :) i.e., I really like the
> > > simplicity of the current 0.8.2 producer API and find parameterized
> > > types/generics to be distracting and annoying; and IMO any
> > > data-specific handling is better absorbed at a higher-level than the
> > > core Kafka APIs - possibly by a (very thin) wrapper producer library.
> > > I don't quite see why it is difficult to share different wrapper
> > > implementations; or even ser-de libraries for that matter that people
> > > can invoke before sending to/reading from Kafka.
> > >
> > > That said I'm not opposed to the change - it's just that I prefer
> > > what's currently there. So I'm +0 on the proposal.
> > >
> > > Thanks,
> > >
> > > Joel
> > >
> > > On Mon, Nov 24, 2014 at 05:58:50PM -0800, Jun Rao wrote:
> > > > Hi, Everyone,
> > > >
> > > > I'd like to start a discussion on whether it makes sense to add the
> > > > serializer api back to the new java producer. Currently, the new java
> > > > producer takes a byte array for both the key and the value. While
> this
> > > api
> > > > is simple, it pushes the serialization logic into the application.
> This
> > > > makes it hard to reason about what type of data is being sent to
> Kafka
> > > and
> > > > also makes it hard to share an implementation of the serializer. For
> > > > example, to support Avro, the serialization logic could be quite
> involved
> > > > since it might need to register the Avro schema in some remote
> registry
> > > and
> > > > maintain a schema cache locally, etc. Without a serialization api,
> it's
> > > > impossible to share such an implementation so that people can easily
> > > reuse.
> > > > We sort of overlooked this implication during the initial discussion
> of
> > > the
> > > > producer api.
> > > >
> > > > So, I'd like to propose an api change to the new producer by adding
> back
> > > > the

Questions about new consumer API

2014-12-02 Thread hsy...@gmail.com
Hi guys,

I'm interested in the new Consumer API.
http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/

I have couple of question.
1. In this doc it says kafka consumer will automatically do load balance.
Is it based on throughtput or same as what we have now balance the
cardinality among all consumers in same ConsumerGroup? In a real case
different partitions could have different peak time.
2. In the API, threre is subscribe(partition...) method saying not using
group management, does it mean the group.id property will be discarded and
developer has full control of distributing partitions to consumers?
3. Is new API compatible with old broker?
4. Will simple consumer api and high-level consumer api still be supported?

Thanks!

Best,
Siyuan


Re: [DISCUSSION] adding the serializer api back to the new java producer

2014-12-02 Thread Joel Koshy
Thanks for the follow-up Jay.  I still don't quite see the issue here
but maybe I just need to process this a bit more. To me "packaging up
the best practice and plug it in" seems to be to expose a simple
low-level API and give people the option to plug in a (possibly
shared) standard serializer in their application configs (or a custom
one if they choose) and invoke that from code. The additional
serialization call is a minor drawback but a very clear and easily
understood step that can be documented.  The serializer can obviously
also do other things such as schema registration. I'm actually not (or
at least I think I'm not) influenced very much by LinkedIn's wrapper.
It's just that I think it is reasonable to expect that in practice
most organizations (big and small) tend to have at least some specific
organization-specific detail that warrants a custom serializer anyway;
and it's going to be easier to override a serializer than an entire
producer API.

Joel

On Tue, Dec 02, 2014 at 11:09:55AM -0800, Jay Kreps wrote:
> Hey Joel, you are right, we discussed this, but I think we didn't think
> about it as deeply as we should have. I think our take was strongly shaped
> by having a wrapper api at LinkedIn that DOES do the serialization
> transparently so I think you are thinking of the producer as just an
> implementation detail of that wrapper. Imagine a world where every
> application at LinkedIn had to figure that part out themselves. That is,
> imagine that what you guys supported was just the raw producer api and that
> that just handled bytes. I think in that world the types of data you would
> see would be totally funky and standardizing correct usage would be a
> massive pain.
> 
> Conversely, you could imagine advocating the LinkedIn approach where you
> just say, well, every org should wrap up the clients in a way that does
> things like serialization and other data checks. The problem with that is
> that it (1) it is kind of redundant work and it is likely that the wrapper
> will goof some nuances of the apis, and (2) it makes documentation and code
> sharing really hard. That is, rather than being able to go to a central
> place and read how to use the producer, LinkedIn people need to document
> the LinkedIn producer wrapper, and users at LinkedIn need to read about
> LinkedIn's wrapper for the producer to understand how to use it. Now
> imagine this multiplied over every user.
> 
> The idea is that since everyone needs to do this we should just make it
> easy to package up the best practice and plug it in. That way the
> "contract" your application programs to is just the normal producer api.
> 
> -Jay
> 
> On Tue, Dec 2, 2014 at 10:06 AM, Joel Koshy  wrote:
> 
> > Re: pushing complexity of dealing with objects: we're talking about
> > just a call to a serialize method to convert the object to a byte
> > array right? Or is there more to it? (To me) that seems less
> > cumbersome than having to interact with parameterized types. Actually,
> > can you explain more clearly what you mean by reason about what
> > type of data is being sent in your original email? I have some
> > notion of what that means but it is a bit vague and you might have
> > meant something else.
> >
> > Thanks,
> >
> > Joel
> >
> > On Tue, Dec 02, 2014 at 09:15:19AM -0800, Jun Rao wrote:
> > > Joel,
> > >
> > > Thanks for the feedback.
> > >
> > > Yes, the raw bytes interface is simpler than the Generic api. However, it
> > > just pushes the complexity of dealing with the objects to the
> > application.
> > > We also thought about the layered approach. However, this may confuse the
> > > users since there is no single entry point and it's not clear which
> > layer a
> > > user should be using.
> > >
> > > Jun
> > >
> > >
> > > On Tue, Dec 2, 2014 at 12:34 AM, Joel Koshy  wrote:
> > >
> > > > > makes it hard to reason about what type of data is being sent to
> > Kafka
> > > > and
> > > > > also makes it hard to share an implementation of the serializer. For
> > > > > example, to support Avro, the serialization logic could be quite
> > involved
> > > > > since it might need to register the Avro schema in some remote
> > registry
> > > > and
> > > > > maintain a schema cache locally, etc. Without a serialization api,
> > it's
> > > > > impossible to share such an implementation so that people can easily
> > > > reuse.
> > > > > We sort of overlooked this implication during the initial discussion
> > of
> > > > the
> > > > > producer api.
> > > >
> > > > Thanks for bringing this up and the patch.  My take on this is that
> > > > any reasoning about the data itself is more appropriately handled
> > > > outside of the core producer API. FWIW, I don't think this was
> > > > _overlooked_ during the initial discussion of the producer API
> > > > (especially since it was a significant change from the old producer).
> > > > IIRC we believed at the time that there is elegance and flexibility in
> > > > a simple API that deals with raw

RE: [DISCUSSION] adding the serializer api back to the new java producer

2014-12-02 Thread Thunder Stumpges
I'm not sure I agree with this. I feel that the need to have a consistent, well 
documented, shared serialization approach at the organization level is 
important no matter what. How you structure the API doesn't change that or make 
it any easier or "automatic" than before. It is still possible for users on 
different projects to "plug in" the wrong serializer or to "be totally funky". 
In order to make this consistent and completely encapsulated from users, a 
company would *still* need to write a shim layer that configures the correct 
serializer in a consistent way, and *that* still needs to be documented and 
understood.

Regards,
Thunder

-Original Message-
From: Jay Kreps [mailto:j...@confluent.io] 
Sent: Tuesday, December 02, 2014 11:10 AM
To: d...@kafka.apache.org
Cc: users@kafka.apache.org
Subject: Re: [DISCUSSION] adding the serializer api back to the new java 
producer

Hey Joel, you are right, we discussed this, but I think we didn't think about 
it as deeply as we should have. I think our take was strongly shaped by having 
a wrapper api at LinkedIn that DOES do the serialization transparently so I 
think you are thinking of the producer as just an implementation detail of that 
wrapper. Imagine a world where every application at LinkedIn had to figure that 
part out themselves. That is, imagine that what you guys supported was just the 
raw producer api and that that just handled bytes. I think in that world the 
types of data you would see would be totally funky and standardizing correct 
usage would be a massive pain.

Conversely, you could imagine advocating the LinkedIn approach where you just 
say, well, every org should wrap up the clients in a way that does things like 
serialization and other data checks. The problem with that is that it (1) it is 
kind of redundant work and it is likely that the wrapper will goof some nuances 
of the apis, and (2) it makes documentation and code sharing really hard. That 
is, rather than being able to go to a central place and read how to use the 
producer, LinkedIn people need to document the LinkedIn producer wrapper, and 
users at LinkedIn need to read about LinkedIn's wrapper for the producer to 
understand how to use it. Now imagine this multiplied over every user.

The idea is that since everyone needs to do this we should just make it easy to 
package up the best practice and plug it in. That way the "contract" your 
application programs to is just the normal producer api.

-Jay

On Tue, Dec 2, 2014 at 10:06 AM, Joel Koshy  wrote:

> Re: pushing complexity of dealing with objects: we're talking about 
> just a call to a serialize method to convert the object to a byte 
> array right? Or is there more to it? (To me) that seems less 
> cumbersome than having to interact with parameterized types. Actually, 
> can you explain more clearly what you mean by reason about what 
> type of data is being sent in your original email? I have some 
> notion of what that means but it is a bit vague and you might have 
> meant something else.
>
> Thanks,
>
> Joel
>
> On Tue, Dec 02, 2014 at 09:15:19AM -0800, Jun Rao wrote:
> > Joel,
> >
> > Thanks for the feedback.
> >
> > Yes, the raw bytes interface is simpler than the Generic api. 
> > However, it just pushes the complexity of dealing with the objects 
> > to the
> application.
> > We also thought about the layered approach. However, this may 
> > confuse the users since there is no single entry point and it's not 
> > clear which
> layer a
> > user should be using.
> >
> > Jun
> >
> >
> > On Tue, Dec 2, 2014 at 12:34 AM, Joel Koshy  wrote:
> >
> > > > makes it hard to reason about what type of data is being sent to
> Kafka
> > > and
> > > > also makes it hard to share an implementation of the serializer. 
> > > > For example, to support Avro, the serialization logic could be 
> > > > quite
> involved
> > > > since it might need to register the Avro schema in some remote
> registry
> > > and
> > > > maintain a schema cache locally, etc. Without a serialization 
> > > > api,
> it's
> > > > impossible to share such an implementation so that people can 
> > > > easily
> > > reuse.
> > > > We sort of overlooked this implication during the initial 
> > > > discussion
> of
> > > the
> > > > producer api.
> > >
> > > Thanks for bringing this up and the patch.  My take on this is 
> > > that any reasoning about the data itself is more appropriately 
> > > handled outside of the core producer API. FWIW, I don't think this 
> > > was _overlooked_ during the initial discussion of the producer API 
> > > (especially since it was a significant change from the old producer).
> > > IIRC we believed at the time that there is elegance and 
> > > flexibility in a simple API that deals with raw bytes. I think it 
> > > is more accurate to say that this is a reversal of opinion for 
> > > some (which is fine) but personally I'm still in the old camp :) 
> > > i.e., I really like the simplicity of the current 0.8.2 pr

Re: How to send serialized object in Kafka Producer

2014-12-02 Thread Guozhang Wang
Ramesh,

Which producer are you using in 0.8.1? kafka.api.producer or
org.apache.kafka.clients.producer?

Guozhang

On Tue, Dec 2, 2014 at 2:12 AM, Ramesh K  wrote:

> Hi,
>
> I have written the basic program to send String or byte[] messages to
> consumer from producer by using java & Kafka 0.8.1 .
> It Works perfectly.But i wanted to send serialized object(Java Bean
> Object).
>
> Is it possible to send the serialized object from producer to consumer?
>
> if possible, please share the ideas/samples.
>
> Thanks in advance..
> Ramesh Kasi
>



-- 
-- Guozhang


Re: Zookeeper load burst during replication

2014-12-02 Thread Guozhang Wang
Kafka brokers uses ZK for metadata storage, and Kafka consumer clients uses
ZK for offset and member management.

For metadata storage, when there is replica state changes (for example like
the new replica added after a broker restart in your case) the controller
will try to write to ZK recording such changes, but this should just be one
time and will not largely increase ZK load.

So you can try to check:

1. If there are consumer clients running at the same time which would be
writing to ZK heavily for committing offsets?
2. The controller log on the broker to see if it abnormally update such
metadata to ZK during the period?

Guozhang

On Tue, Dec 2, 2014 at 7:38 AM, Yury Ruchin  wrote:

> Hello,
>
> In a multi-broker Kafka 0.8.1.1 setup, I had one broker crashed. I
> restarted it after some noticeable time, so it started catching up the
> leader very intensively. During the replication, I see that the disk load
> on the ZK leader bursts abnormally, resulting in ZK performance
> degradation. What could cause that? How does Kafka use ZK during
> replication?
>
> Thanks,
> Yury
>



-- 
-- Guozhang


Re: Pagecache cause OffsetOutOfRangeException

2014-12-02 Thread Guozhang Wang
Yuanjia,

I am not sure that pagecache can be the cause of this, could you attach
your full stack trace and use the GetOffset tool Manikumar mentioned to
make sure the offset does exist in the broker?

Guozhang

On Tue, Dec 2, 2014 at 7:50 AM, Manikumar Reddy 
wrote:

> You can check the latest/earliest offsets of a given topic by running
> GetOffsetShell.
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/System+Tools#SystemTools-GetOffsetShell
>
> On Tue, Dec 2, 2014 at 2:05 PM, yuanjia8947  wrote:
>
> > Hi all,
> > I'm using kafka 0.8.0 release now. And I often encounter the problem
> > OffsetOutOfRangeException when cosuming message by simple consumer API.
> > But I'm sure that the consuming offset is smaller than the latest offset
> > got from OffsetRequest.
> > Can it be caused by that new messages are wrote to kernel's pagecache and
> > not flush to the file yet,
> > while I'm consuming new messages from the file?
> > How fix it?
> >
> > Thanks,
> > liyuanjia
> >
> >
> >
> >
> >
> > liyuanjia
>



-- 
-- Guozhang


Re: [DISCUSSION] adding the serializer api back to the new java producer

2014-12-02 Thread Jay Kreps
Hey Joel, you are right, we discussed this, but I think we didn't think
about it as deeply as we should have. I think our take was strongly shaped
by having a wrapper api at LinkedIn that DOES do the serialization
transparently so I think you are thinking of the producer as just an
implementation detail of that wrapper. Imagine a world where every
application at LinkedIn had to figure that part out themselves. That is,
imagine that what you guys supported was just the raw producer api and that
that just handled bytes. I think in that world the types of data you would
see would be totally funky and standardizing correct usage would be a
massive pain.

Conversely, you could imagine advocating the LinkedIn approach where you
just say, well, every org should wrap up the clients in a way that does
things like serialization and other data checks. The problem with that is
that it (1) it is kind of redundant work and it is likely that the wrapper
will goof some nuances of the apis, and (2) it makes documentation and code
sharing really hard. That is, rather than being able to go to a central
place and read how to use the producer, LinkedIn people need to document
the LinkedIn producer wrapper, and users at LinkedIn need to read about
LinkedIn's wrapper for the producer to understand how to use it. Now
imagine this multiplied over every user.

The idea is that since everyone needs to do this we should just make it
easy to package up the best practice and plug it in. That way the
"contract" your application programs to is just the normal producer api.

-Jay

On Tue, Dec 2, 2014 at 10:06 AM, Joel Koshy  wrote:

> Re: pushing complexity of dealing with objects: we're talking about
> just a call to a serialize method to convert the object to a byte
> array right? Or is there more to it? (To me) that seems less
> cumbersome than having to interact with parameterized types. Actually,
> can you explain more clearly what you mean by reason about what
> type of data is being sent in your original email? I have some
> notion of what that means but it is a bit vague and you might have
> meant something else.
>
> Thanks,
>
> Joel
>
> On Tue, Dec 02, 2014 at 09:15:19AM -0800, Jun Rao wrote:
> > Joel,
> >
> > Thanks for the feedback.
> >
> > Yes, the raw bytes interface is simpler than the Generic api. However, it
> > just pushes the complexity of dealing with the objects to the
> application.
> > We also thought about the layered approach. However, this may confuse the
> > users since there is no single entry point and it's not clear which
> layer a
> > user should be using.
> >
> > Jun
> >
> >
> > On Tue, Dec 2, 2014 at 12:34 AM, Joel Koshy  wrote:
> >
> > > > makes it hard to reason about what type of data is being sent to
> Kafka
> > > and
> > > > also makes it hard to share an implementation of the serializer. For
> > > > example, to support Avro, the serialization logic could be quite
> involved
> > > > since it might need to register the Avro schema in some remote
> registry
> > > and
> > > > maintain a schema cache locally, etc. Without a serialization api,
> it's
> > > > impossible to share such an implementation so that people can easily
> > > reuse.
> > > > We sort of overlooked this implication during the initial discussion
> of
> > > the
> > > > producer api.
> > >
> > > Thanks for bringing this up and the patch.  My take on this is that
> > > any reasoning about the data itself is more appropriately handled
> > > outside of the core producer API. FWIW, I don't think this was
> > > _overlooked_ during the initial discussion of the producer API
> > > (especially since it was a significant change from the old producer).
> > > IIRC we believed at the time that there is elegance and flexibility in
> > > a simple API that deals with raw bytes. I think it is more accurate to
> > > say that this is a reversal of opinion for some (which is fine) but
> > > personally I'm still in the old camp :) i.e., I really like the
> > > simplicity of the current 0.8.2 producer API and find parameterized
> > > types/generics to be distracting and annoying; and IMO any
> > > data-specific handling is better absorbed at a higher-level than the
> > > core Kafka APIs - possibly by a (very thin) wrapper producer library.
> > > I don't quite see why it is difficult to share different wrapper
> > > implementations; or even ser-de libraries for that matter that people
> > > can invoke before sending to/reading from Kafka.
> > >
> > > That said I'm not opposed to the change - it's just that I prefer
> > > what's currently there. So I'm +0 on the proposal.
> > >
> > > Thanks,
> > >
> > > Joel
> > >
> > > On Mon, Nov 24, 2014 at 05:58:50PM -0800, Jun Rao wrote:
> > > > Hi, Everyone,
> > > >
> > > > I'd like to start a discussion on whether it makes sense to add the
> > > > serializer api back to the new java producer. Currently, the new java
> > > > producer takes a byte array for both the key and the value. While
> this
> > > api
> > > > is simple

RE: [DISCUSSION] adding the serializer api back to the new java producer

2014-12-02 Thread Thunder Stumpges
Hello, while we do not currently use the Java API, we are writing a C#/.net 
client (https://github.com/ntent-ad/kafka4net). FWIW, we also chose to keep the 
API simpler accepting just byte arrays. We did not want to impose even a simple 
interface onto users of the library, feeling that users will have their own 
serialization requirements (or not), and if desired, can write their own shim 
to handle serialization in the way they would like.  

Cheers,
Thunder


-Original Message-
From: Rajiv Kurian [mailto:ra...@signalfuse.com] 
Sent: Tuesday, December 02, 2014 10:22 AM
To: users@kafka.apache.org
Subject: Re: [DISCUSSION] adding the serializer api back to the new java 
producer

It's not clear to me from your initial email what exactly can't be done with 
the raw accept bytes API. Serialization libraries should be share able outside 
of kafka. I honestly like the simplicity of the raw bytes API and feel like 
serialization should just remain outside of the base Kafka APIs.
Any one who wants them bundled could then create a higher level API themselves.

On Tue, Dec 2, 2014 at 10:06 AM, Joel Koshy  wrote:

> Re: pushing complexity of dealing with objects: we're talking about 
> just a call to a serialize method to convert the object to a byte 
> array right? Or is there more to it? (To me) that seems less 
> cumbersome than having to interact with parameterized types. Actually, 
> can you explain more clearly what you mean by reason about what 
> type of data is being sent in your original email? I have some 
> notion of what that means but it is a bit vague and you might have 
> meant something else.
>
> Thanks,
>
> Joel
>
> On Tue, Dec 02, 2014 at 09:15:19AM -0800, Jun Rao wrote:
> > Joel,
> >
> > Thanks for the feedback.
> >
> > Yes, the raw bytes interface is simpler than the Generic api. 
> > However, it just pushes the complexity of dealing with the objects 
> > to the
> application.
> > We also thought about the layered approach. However, this may 
> > confuse the users since there is no single entry point and it's not 
> > clear which
> layer a
> > user should be using.
> >
> > Jun
> >
> >
> > On Tue, Dec 2, 2014 at 12:34 AM, Joel Koshy  wrote:
> >
> > > > makes it hard to reason about what type of data is being sent to
> Kafka
> > > and
> > > > also makes it hard to share an implementation of the serializer. 
> > > > For example, to support Avro, the serialization logic could be 
> > > > quite
> involved
> > > > since it might need to register the Avro schema in some remote
> registry
> > > and
> > > > maintain a schema cache locally, etc. Without a serialization 
> > > > api,
> it's
> > > > impossible to share such an implementation so that people can 
> > > > easily
> > > reuse.
> > > > We sort of overlooked this implication during the initial 
> > > > discussion
> of
> > > the
> > > > producer api.
> > >
> > > Thanks for bringing this up and the patch.  My take on this is 
> > > that any reasoning about the data itself is more appropriately 
> > > handled outside of the core producer API. FWIW, I don't think this 
> > > was _overlooked_ during the initial discussion of the producer API 
> > > (especially since it was a significant change from the old producer).
> > > IIRC we believed at the time that there is elegance and 
> > > flexibility in a simple API that deals with raw bytes. I think it 
> > > is more accurate to say that this is a reversal of opinion for 
> > > some (which is fine) but personally I'm still in the old camp :) 
> > > i.e., I really like the simplicity of the current 0.8.2 producer 
> > > API and find parameterized types/generics to be distracting and 
> > > annoying; and IMO any data-specific handling is better absorbed at 
> > > a higher-level than the core Kafka APIs - possibly by a (very thin) 
> > > wrapper producer library.
> > > I don't quite see why it is difficult to share different wrapper 
> > > implementations; or even ser-de libraries for that matter that 
> > > people can invoke before sending to/reading from Kafka.
> > >
> > > That said I'm not opposed to the change - it's just that I prefer 
> > > what's currently there. So I'm +0 on the proposal.
> > >
> > > Thanks,
> > >
> > > Joel
> > >
> > > On Mon, Nov 24, 2014 at 05:58:50PM -0800, Jun Rao wrote:
> > > > Hi, Everyone,
> > > >
> > > > I'd like to start a discussion on whether it makes sense to add 
> > > > the serializer api back to the new java producer. Currently, the 
> > > > new java producer takes a byte array for both the key and the 
> > > > value. While
> this
> > > api
> > > > is simple, it pushes the serialization logic into the application.
> This
> > > > makes it hard to reason about what type of data is being sent to
> Kafka
> > > and
> > > > also makes it hard to share an implementation of the serializer. 
> > > > For example, to support Avro, the serialization logic could be 
> > > > quite
> involved
> > > > since it might need to register the Avro schema in some remote
> registry

Re: [DISCUSSION] adding the serializer api back to the new java producer

2014-12-02 Thread Rajiv Kurian
It's not clear to me from your initial email what exactly can't be done
with the raw accept bytes API. Serialization libraries should be share able
outside of kafka. I honestly like the simplicity of the raw bytes API and
feel like serialization should just remain outside of the base Kafka APIs.
Any one who wants them bundled could then create a higher level API
themselves.

On Tue, Dec 2, 2014 at 10:06 AM, Joel Koshy  wrote:

> Re: pushing complexity of dealing with objects: we're talking about
> just a call to a serialize method to convert the object to a byte
> array right? Or is there more to it? (To me) that seems less
> cumbersome than having to interact with parameterized types. Actually,
> can you explain more clearly what you mean by reason about what
> type of data is being sent in your original email? I have some
> notion of what that means but it is a bit vague and you might have
> meant something else.
>
> Thanks,
>
> Joel
>
> On Tue, Dec 02, 2014 at 09:15:19AM -0800, Jun Rao wrote:
> > Joel,
> >
> > Thanks for the feedback.
> >
> > Yes, the raw bytes interface is simpler than the Generic api. However, it
> > just pushes the complexity of dealing with the objects to the
> application.
> > We also thought about the layered approach. However, this may confuse the
> > users since there is no single entry point and it's not clear which
> layer a
> > user should be using.
> >
> > Jun
> >
> >
> > On Tue, Dec 2, 2014 at 12:34 AM, Joel Koshy  wrote:
> >
> > > > makes it hard to reason about what type of data is being sent to
> Kafka
> > > and
> > > > also makes it hard to share an implementation of the serializer. For
> > > > example, to support Avro, the serialization logic could be quite
> involved
> > > > since it might need to register the Avro schema in some remote
> registry
> > > and
> > > > maintain a schema cache locally, etc. Without a serialization api,
> it's
> > > > impossible to share such an implementation so that people can easily
> > > reuse.
> > > > We sort of overlooked this implication during the initial discussion
> of
> > > the
> > > > producer api.
> > >
> > > Thanks for bringing this up and the patch.  My take on this is that
> > > any reasoning about the data itself is more appropriately handled
> > > outside of the core producer API. FWIW, I don't think this was
> > > _overlooked_ during the initial discussion of the producer API
> > > (especially since it was a significant change from the old producer).
> > > IIRC we believed at the time that there is elegance and flexibility in
> > > a simple API that deals with raw bytes. I think it is more accurate to
> > > say that this is a reversal of opinion for some (which is fine) but
> > > personally I'm still in the old camp :) i.e., I really like the
> > > simplicity of the current 0.8.2 producer API and find parameterized
> > > types/generics to be distracting and annoying; and IMO any
> > > data-specific handling is better absorbed at a higher-level than the
> > > core Kafka APIs - possibly by a (very thin) wrapper producer library.
> > > I don't quite see why it is difficult to share different wrapper
> > > implementations; or even ser-de libraries for that matter that people
> > > can invoke before sending to/reading from Kafka.
> > >
> > > That said I'm not opposed to the change - it's just that I prefer
> > > what's currently there. So I'm +0 on the proposal.
> > >
> > > Thanks,
> > >
> > > Joel
> > >
> > > On Mon, Nov 24, 2014 at 05:58:50PM -0800, Jun Rao wrote:
> > > > Hi, Everyone,
> > > >
> > > > I'd like to start a discussion on whether it makes sense to add the
> > > > serializer api back to the new java producer. Currently, the new java
> > > > producer takes a byte array for both the key and the value. While
> this
> > > api
> > > > is simple, it pushes the serialization logic into the application.
> This
> > > > makes it hard to reason about what type of data is being sent to
> Kafka
> > > and
> > > > also makes it hard to share an implementation of the serializer. For
> > > > example, to support Avro, the serialization logic could be quite
> involved
> > > > since it might need to register the Avro schema in some remote
> registry
> > > and
> > > > maintain a schema cache locally, etc. Without a serialization api,
> it's
> > > > impossible to share such an implementation so that people can easily
> > > reuse.
> > > > We sort of overlooked this implication during the initial discussion
> of
> > > the
> > > > producer api.
> > > >
> > > > So, I'd like to propose an api change to the new producer by adding
> back
> > > > the serializer api similar to what we had in the old producer.
> Specially,
> > > > the proposed api changes are the following.
> > > >
> > > > First, we change KafkaProducer to take generic types K and V for the
> key
> > > > and the value, respectively.
> > > >
> > > > public class KafkaProducer implements Producer {
> > > >
> > > > public Future send(ProducerRecord record,
> > > Callback
> > >

Re: [DISCUSSION] adding the serializer api back to the new java producer

2014-12-02 Thread Joel Koshy
Re: pushing complexity of dealing with objects: we're talking about
just a call to a serialize method to convert the object to a byte
array right? Or is there more to it? (To me) that seems less
cumbersome than having to interact with parameterized types. Actually,
can you explain more clearly what you mean by reason about what
type of data is being sent in your original email? I have some
notion of what that means but it is a bit vague and you might have
meant something else.

Thanks,

Joel

On Tue, Dec 02, 2014 at 09:15:19AM -0800, Jun Rao wrote:
> Joel,
> 
> Thanks for the feedback.
> 
> Yes, the raw bytes interface is simpler than the Generic api. However, it
> just pushes the complexity of dealing with the objects to the application.
> We also thought about the layered approach. However, this may confuse the
> users since there is no single entry point and it's not clear which layer a
> user should be using.
> 
> Jun
> 
> 
> On Tue, Dec 2, 2014 at 12:34 AM, Joel Koshy  wrote:
> 
> > > makes it hard to reason about what type of data is being sent to Kafka
> > and
> > > also makes it hard to share an implementation of the serializer. For
> > > example, to support Avro, the serialization logic could be quite involved
> > > since it might need to register the Avro schema in some remote registry
> > and
> > > maintain a schema cache locally, etc. Without a serialization api, it's
> > > impossible to share such an implementation so that people can easily
> > reuse.
> > > We sort of overlooked this implication during the initial discussion of
> > the
> > > producer api.
> >
> > Thanks for bringing this up and the patch.  My take on this is that
> > any reasoning about the data itself is more appropriately handled
> > outside of the core producer API. FWIW, I don't think this was
> > _overlooked_ during the initial discussion of the producer API
> > (especially since it was a significant change from the old producer).
> > IIRC we believed at the time that there is elegance and flexibility in
> > a simple API that deals with raw bytes. I think it is more accurate to
> > say that this is a reversal of opinion for some (which is fine) but
> > personally I'm still in the old camp :) i.e., I really like the
> > simplicity of the current 0.8.2 producer API and find parameterized
> > types/generics to be distracting and annoying; and IMO any
> > data-specific handling is better absorbed at a higher-level than the
> > core Kafka APIs - possibly by a (very thin) wrapper producer library.
> > I don't quite see why it is difficult to share different wrapper
> > implementations; or even ser-de libraries for that matter that people
> > can invoke before sending to/reading from Kafka.
> >
> > That said I'm not opposed to the change - it's just that I prefer
> > what's currently there. So I'm +0 on the proposal.
> >
> > Thanks,
> >
> > Joel
> >
> > On Mon, Nov 24, 2014 at 05:58:50PM -0800, Jun Rao wrote:
> > > Hi, Everyone,
> > >
> > > I'd like to start a discussion on whether it makes sense to add the
> > > serializer api back to the new java producer. Currently, the new java
> > > producer takes a byte array for both the key and the value. While this
> > api
> > > is simple, it pushes the serialization logic into the application. This
> > > makes it hard to reason about what type of data is being sent to Kafka
> > and
> > > also makes it hard to share an implementation of the serializer. For
> > > example, to support Avro, the serialization logic could be quite involved
> > > since it might need to register the Avro schema in some remote registry
> > and
> > > maintain a schema cache locally, etc. Without a serialization api, it's
> > > impossible to share such an implementation so that people can easily
> > reuse.
> > > We sort of overlooked this implication during the initial discussion of
> > the
> > > producer api.
> > >
> > > So, I'd like to propose an api change to the new producer by adding back
> > > the serializer api similar to what we had in the old producer. Specially,
> > > the proposed api changes are the following.
> > >
> > > First, we change KafkaProducer to take generic types K and V for the key
> > > and the value, respectively.
> > >
> > > public class KafkaProducer implements Producer {
> > >
> > > public Future send(ProducerRecord record,
> > Callback
> > > callback);
> > >
> > > public Future send(ProducerRecord record);
> > > }
> > >
> > > Second, we add two new configs, one for the key serializer and another
> > for
> > > the value serializer. Both serializers will default to the byte array
> > > implementation.
> > >
> > > public class ProducerConfig extends AbstractConfig {
> > >
> > > .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS,
> > > "org.apache.kafka.clients.producer.ByteArraySerializer", Importance.HIGH,
> > > KEY_SERIALIZER_CLASS_DOC)
> > > .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS,
> > > "org.apache.kafka.clients.producer.ByteArraySerializer", Importance.HIGH,
>

Re: [DISCUSSION] adding the serializer api back to the new java producer

2014-12-02 Thread Jun Rao
Joel,

Thanks for the feedback.

Yes, the raw bytes interface is simpler than the Generic api. However, it
just pushes the complexity of dealing with the objects to the application.
We also thought about the layered approach. However, this may confuse the
users since there is no single entry point and it's not clear which layer a
user should be using.

Jun


On Tue, Dec 2, 2014 at 12:34 AM, Joel Koshy  wrote:

> > makes it hard to reason about what type of data is being sent to Kafka
> and
> > also makes it hard to share an implementation of the serializer. For
> > example, to support Avro, the serialization logic could be quite involved
> > since it might need to register the Avro schema in some remote registry
> and
> > maintain a schema cache locally, etc. Without a serialization api, it's
> > impossible to share such an implementation so that people can easily
> reuse.
> > We sort of overlooked this implication during the initial discussion of
> the
> > producer api.
>
> Thanks for bringing this up and the patch.  My take on this is that
> any reasoning about the data itself is more appropriately handled
> outside of the core producer API. FWIW, I don't think this was
> _overlooked_ during the initial discussion of the producer API
> (especially since it was a significant change from the old producer).
> IIRC we believed at the time that there is elegance and flexibility in
> a simple API that deals with raw bytes. I think it is more accurate to
> say that this is a reversal of opinion for some (which is fine) but
> personally I'm still in the old camp :) i.e., I really like the
> simplicity of the current 0.8.2 producer API and find parameterized
> types/generics to be distracting and annoying; and IMO any
> data-specific handling is better absorbed at a higher-level than the
> core Kafka APIs - possibly by a (very thin) wrapper producer library.
> I don't quite see why it is difficult to share different wrapper
> implementations; or even ser-de libraries for that matter that people
> can invoke before sending to/reading from Kafka.
>
> That said I'm not opposed to the change - it's just that I prefer
> what's currently there. So I'm +0 on the proposal.
>
> Thanks,
>
> Joel
>
> On Mon, Nov 24, 2014 at 05:58:50PM -0800, Jun Rao wrote:
> > Hi, Everyone,
> >
> > I'd like to start a discussion on whether it makes sense to add the
> > serializer api back to the new java producer. Currently, the new java
> > producer takes a byte array for both the key and the value. While this
> api
> > is simple, it pushes the serialization logic into the application. This
> > makes it hard to reason about what type of data is being sent to Kafka
> and
> > also makes it hard to share an implementation of the serializer. For
> > example, to support Avro, the serialization logic could be quite involved
> > since it might need to register the Avro schema in some remote registry
> and
> > maintain a schema cache locally, etc. Without a serialization api, it's
> > impossible to share such an implementation so that people can easily
> reuse.
> > We sort of overlooked this implication during the initial discussion of
> the
> > producer api.
> >
> > So, I'd like to propose an api change to the new producer by adding back
> > the serializer api similar to what we had in the old producer. Specially,
> > the proposed api changes are the following.
> >
> > First, we change KafkaProducer to take generic types K and V for the key
> > and the value, respectively.
> >
> > public class KafkaProducer implements Producer {
> >
> > public Future send(ProducerRecord record,
> Callback
> > callback);
> >
> > public Future send(ProducerRecord record);
> > }
> >
> > Second, we add two new configs, one for the key serializer and another
> for
> > the value serializer. Both serializers will default to the byte array
> > implementation.
> >
> > public class ProducerConfig extends AbstractConfig {
> >
> > .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS,
> > "org.apache.kafka.clients.producer.ByteArraySerializer", Importance.HIGH,
> > KEY_SERIALIZER_CLASS_DOC)
> > .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS,
> > "org.apache.kafka.clients.producer.ByteArraySerializer", Importance.HIGH,
> > VALUE_SERIALIZER_CLASS_DOC);
> > }
> >
> > Both serializers will implement the following interface.
> >
> > public interface Serializer extends Configurable {
> > public byte[] serialize(String topic, T data, boolean isKey);
> >
> > public void close();
> > }
> >
> > This is more or less the same as what's in the old producer. The slight
> > differences are (1) the serializer now only requires a parameter-less
> > constructor; (2) the serializer has a configure() and a close() method
> for
> > initialization and cleanup, respectively; (3) the serialize() method
> > additionally takes the topic and an isKey indicator, both of which are
> > useful for things like schema registration.
> >
> > The detailed changes are included in KAFKA-1797

Re: Pagecache cause OffsetOutOfRangeException

2014-12-02 Thread Manikumar Reddy
You can check the latest/earliest offsets of a given topic by running
GetOffsetShell.

https://cwiki.apache.org/confluence/display/KAFKA/System+Tools#SystemTools-GetOffsetShell

On Tue, Dec 2, 2014 at 2:05 PM, yuanjia8947  wrote:

> Hi all,
> I'm using kafka 0.8.0 release now. And I often encounter the problem
> OffsetOutOfRangeException when cosuming message by simple consumer API.
> But I'm sure that the consuming offset is smaller than the latest offset
> got from OffsetRequest.
> Can it be caused by that new messages are wrote to kernel's pagecache and
> not flush to the file yet,
> while I'm consuming new messages from the file?
> How fix it?
>
> Thanks,
> liyuanjia
>
>
>
>
>
> liyuanjia


Zookeeper load burst during replication

2014-12-02 Thread Yury Ruchin
Hello,

In a multi-broker Kafka 0.8.1.1 setup, I had one broker crashed. I
restarted it after some noticeable time, so it started catching up the
leader very intensively. During the replication, I see that the disk load
on the ZK leader bursts abnormally, resulting in ZK performance
degradation. What could cause that? How does Kafka use ZK during
replication?

Thanks,
Yury


I want to join the mailing lists

2014-12-02 Thread Chico Qi
Thank you!

Chico


Pagecache cause OffsetOutOfRangeException

2014-12-02 Thread yuanjia8947
Hi all,
I'm using kafka 0.8.0 release now. And I often encounter the problem 
OffsetOutOfRangeException when cosuming message by simple consumer API.
But I'm sure that the consuming offset is smaller than the latest offset got 
from OffsetRequest.
Can it be caused by that new messages are wrote to kernel's pagecache and not 
flush to the file yet,
while I'm consuming new messages from the file?
How fix it?

Thanks,
liyuanjia





liyuanjia

Re: How to push metrics to graphite - jmxtrans does not work

2014-12-02 Thread Andrew Otto
Maybe also set:

 -Dcom.sun.management.jmxremote.port=

?


> On Dec 2, 2014, at 02:59, David Montgomery  wrote:
> 
> Hi,
> 
> I am having a very difficult time trying to report kafka 8 metrics to
> Graphite.  Nothing is listening on  and and no data in graphite.  If
> this method of graphite reporting is know to not work is there an
> alternative to jmxtrans to get data to graphite?
> 
> I am using the deb file to install jmxtrans on ubuntu 12.04
> 
> And I use the below to modify kafka scripts
> 
> echo 'KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote=true
> -Dcom.sun.management.jmxremote.authenticate=false
> -Dcom.sun.management.jmxremote.ssl=false"' | tee -a
> /var/kafka/bin/kafka-run-class.sh
> echo 'export JMX_PORT=${JMX_PORT:-}' | tee -a
> /var/kafka/bin/kafka-server-start.sh
> 
> {
>  "servers" : [ {
>"host" : "127.0.0.1",
>"port" : "",
>"alias" : "<%=node.name%>",
>"queries" : [
> {
> "obj" : "kafka:type=kafka.SocketServerStats",
>  "resultAlias": "kafka.socketServerStats",
>  "attr" : [ "AvgFetchRequestMs", "AvgProduceRequestMs",
> "BytesReadPerSecond", "BytesWrittenPerSecond", "FetchRequestsPerSecond",
> "MaxFetchRequestMs", "MaxProduceRequestMs" , "NumFetchRequests" ,
> "NumProduceRequests" , "ProduceRequestsPerSecond", "TotalBytesRead",
> "TotalBytesWritten", "TotalFetchRequestMs", "TotalProduceRequestMs" ],
> "outputWriters" : [ {
>  "@class" : "com.googlecode.jmxtrans.model.output.GraphiteWriter",
>  "settings" : {
>"host" : "<%=@monitor_host%>",
>"port" : "2003"
>  }
>} ]
>  }
>],
>"numQueryThreads": "2"
>  } ]
> }



Re: High level Consumer API doesnt receive 10MB messages?

2014-12-02 Thread Harsha
I was talking about consumer config fetch.message.max.bytes
https://kafka.apache.org/08/configuration.html
by default its 1048576 bytes

On Mon, Dec 1, 2014, at 08:09 PM, Palur Sandeep wrote:
> Yeah I did. I made the following changes to server.config:
> 
> message.max.bytes=10485800
> replica.fetch.max.bytes=104858000
> replica.fetch.wait.max.ms=2000
> 
> 
> On Mon, Dec 1, 2014 at 10:03 PM, Harsha  wrote:
> 
> > have you set fetch.message.max.bytes to 10mb or more in your consumer
> > config.
> > -Harsha
> >
> > On Mon, Dec 1, 2014, at 07:27 PM, Palur Sandeep wrote:
> > > Hi all,
> > >
> > > Consumer doesnt receive message if it is big:  When the producer sends
> > > 256kb messages to broker, consumer is able to retrieve it, but when
> > > producer sends 10MB messages to the broker, the consumer doesn’t receive
> > > any message.
> > >
> > > Please tell me how to make the consumer receive 10MB messages.
> > > --
> > > Regards,
> > > Sandeep Palur
> > > Data-Intensive Distributed Systems Laboratory, CS/IIT
> > > Department of Computer Science, Illinois Institute of Technology (IIT)
> > > Phone : 312-647-9833
> > > Email : psand...@hawk.iit.edu 
> >
> 
> 
> 
> -- 
> Regards,
> Sandeep Palur
> Data-Intensive Distributed Systems Laboratory, CS/IIT
> Department of Computer Science, Illinois Institute of Technology (IIT)
> Phone : 312-647-9833
> Email : psand...@hawk.iit.edu 


Re: Re: How to push metrics to graphite - jmxtrans does not work

2014-12-02 Thread YuanJia Li
Jmxtrans should connect to the jmxremote port. 
Try to run "ps -aux |grep kafka", and find the process contain 
-Dcom.sun.management.jmxremote.port or not.
If not, try to edit "kafka-server-start.sh", add "export JMX_PORT=".

How to send serialized object in Kafka Producer

2014-12-02 Thread Ramesh K
Hi,

I have written the basic program to send String or byte[] messages to
consumer from producer by using java & Kafka 0.8.1 .
It Works perfectly.But i wanted to send serialized object(Java Bean Object).

Is it possible to send the serialized object from producer to consumer?

if possible, please share the ideas/samples.

Thanks in advance..
Ramesh Kasi


Pagecache cause OffsetOutOfRangeException

2014-12-02 Thread YuanJia Li
Hi all,
I'm using kafka 0.8.0 release now. And I often encounter the problem 
OffsetOutOfRangeException when cosuming message by simple consumer API.
But I'm sure that the consuming offset is smaller than the latest offset got 
from OffsetRequest.
Can it be caused by that new messages are wrote to kernel's pagecache and not 
flush to the file yet,while I'm consuming new messages from the file?
How to fix it?

Thanks,
Yuanjia Li

Re: How to push metrics to graphite - jmxtrans does not work

2014-12-02 Thread David Montgomery
I am using kafka 8.  I will try your suggestion.   But if I run lsof -i
: should I not see a proccess running on that port?  I am not seeing
anything.

On Tue, Dec 2, 2014 at 5:37 PM, yuanjia8947  wrote:

> hi David,
> which version do you use kafka?
> when I use kafka 0.8.0, I write jmxtrans "obj" like this "obj":
> "\"kafka.server\":type=\"BrokerTopicMetrics\",name=\"AllTopicsBytesOutPerSec\""
> .
> Hope it useful for you.
>
> liyuanjia
>
>
>
> liyuanjia


Re: isr never update

2014-12-02 Thread Shangan Chen
I checked the max lag and it was 0.

I grep state-change logs about topic-partition "[org.nginx,32]", and
extract some related to broker 24 and broker 29 (controller switched from
broker 24 to 29)


   - on broker 29 (current controller):


[2014-11-22 06:20:20,377] TRACE Controller 29 epoch 7 changed state of
replica 29 for partition [org.nginx,32] from OnlineReplica to OnlineReplica
(state.change.logger)
*[2014-11-22 06:20:20,650] TRACE Controller 29 epoch 7 sending
become-leader LeaderAndIsr request
(Leader:29,ISR:29,24,LeaderEpoch:10,ControllerEpoch:4) with correlationId 0
to broker 29 for partition [org.nginx,32] (state.change.logger)*
[2014-11-22 06:20:20,664] TRACE Broker 29 received LeaderAndIsr request
(LeaderAndIsrInfo:(Leader:29,ISR:29,24,LeaderEpoch:10,ControllerEpoch:4),ReplicationFactor:2),AllReplicas:29,24)
correlation id 0 from controller 29 epoch 7 for partition [org.nginx,32]
(state.change.logger)
*[2014-11-22 06:20:20,674] WARN Broker 29 received invalid LeaderAndIsr
request with correlation id 0 from controller 29 epoch 7 with an older
leader epoch 10 for partition [org.nginx,32], current leader epoch is 10
(state.change.logger)*
[2014-11-22 06:20:20,912] TRACE Controller 29 epoch 7 sending
UpdateMetadata request
(Leader:29,ISR:29,24,LeaderEpoch:10,ControllerEpoch:4) with correlationId 0
to broker 23 for partition [org.nginx,32] (state.change.logger)
*[2014-11-22 06:20:21,490] TRACE Controller 29 epoch 7 sending
UpdateMetadata request
(Leader:29,ISR:29,24,LeaderEpoch:10,ControllerEpoch:4) with correlationId 0
to broker 29 for partition [org.nginx,32] (state.change.logger)*
*[2014-11-22 06:20:21,945] TRACE Broker 29 cached leader info
(LeaderAndIsrInfo:(Leader:29,ISR:29,24,LeaderEpoch:10,ControllerEpoch:4),ReplicationFactor:2),AllReplicas:29,24)
for partition [org.nginx,32] in response to UpdateMetadata request sent by
controller 29 epoch 7 with correlation id 0 (state.change.logger)*

[2014-11-22 06:20:28,703] TRACE Broker 29 received LeaderAndIsr request
(LeaderAndIsrInfo:(Leader:29,ISR:29,LeaderEpoch:11,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:29,24)
correlation id 4897 from controller 24 epoch 6 for partition [org.nginx,32]
(state.change.logger)
[2014-11-22 06:20:28,703] WARN Broker 29 received LeaderAndIsr request
correlation id 4897 with an old controller epoch 6. Latest known controller
epoch is 7 (state.change.logger)


*analysis:*
controller 29 send become-leader LeaderAndIsr request with on old
controller epoch, and broker 29 itself deem this request invalid, so do
other brokers. And then controller 29 send updateMetadata request to all
brokers and brokers cached leaderinfo with an old controller epoch.

*question:*
when the controller send become-leader LeaderAndIsr request or other
updateMetadata request, will it check the current controller epoch and
leader epoch ? It looks like the controller did not do any checking .
Meanwhile, brokers will reject the LeaderAndIsr request with an old
controller epoch, but will deal with updateMetadata request and cache it.


   - on broker 24 (previous controller)


[2014-11-22 06:18:11,095] TRACE Controller 24 epoch 6 sending
UpdateMetadata request
(Leader:29,ISR:29,24,LeaderEpoch:10,ControllerEpoch:4) with correlationId
4886 to broker 36 for partition [org.nginx,32] (state.change.logger)
[2014-11-22 06:20:17,553] TRACE Controller 24 epoch 6 sending
UpdateMetadata request
(Leader:29,ISR:29,24,LeaderEpoch:10,ControllerEpoch:4) with correlationId
4892 to broker 34 for partition [org.nginx,32] (state.change.logger)
[2014-11-22 06:20:21,905] TRACE Controller 24 epoch 6 started leader
election for partition [org.mobile_grouprecommend_userdeletedata,9]
(state.change.logger)
[2014-11-22 06:20:21,911] TRACE Controller 24 epoch 6 elected leader 21 for
Offline partition [org.mobile_grouprecommend_userdeletedata,9]
(state.change.logger)
[2014-11-22 06:20:27,412] TRACE Controller 24 epoch 6 changed state of
replica 24 for partition [org.nginx,32] from OnlineReplica to
OfflineReplica (state.change.logger)
*[2014-11-22 06:20:28,701] TRACE Controller 24 epoch 6 sending
become-leader LeaderAndIsr request
(Leader:29,ISR:29,LeaderEpoch:11,ControllerEpoch:6) with correlationId 4897
to broker 29 for partition [org.nginx,32] (state.change.logger)*
[2014-11-22 06:20:28,713] TRACE Controller 24 epoch 6 sending
UpdateMetadata request (Leader:29,ISR:29,LeaderEpoch:11,ControllerEpoch:6)
with correlationId 4897 to broker 23 for partition [org.nginx,32]
(state.change.logger)


*analysis:*
controller 24 and controller 29 were alive together,  controller 24 send
become-leader LeaderAndIsr request to broker 29,  and broker 29 found it
had an old controllerEpoch and did not process.

*question:*

can two controllers live together ? I think it should not happen.
controller 24 and controller 29 send LeaderAndIsr to other brokers. While
controller 24 has a newer LeaderEpoch
(LeaderAndIsrInfo:(Leader:29,ISR:29,LeaderEpoch:11,ControllerEpoch:6)),
controller 29 has

Re:How to push metrics to graphite - jmxtrans does not work

2014-12-02 Thread yuanjia8947
hi David,
which version do you use kafka?
when I use kafka 0.8.0, I write jmxtrans "obj" like this "obj": 
"\"kafka.server\":type=\"BrokerTopicMetrics\",name=\"AllTopicsBytesOutPerSec\"" 
 .
Hope it useful for you.

liyuanjia



liyuanjia

Re: [DISCUSSION] adding the serializer api back to the new java producer

2014-12-02 Thread Joel Koshy
> makes it hard to reason about what type of data is being sent to Kafka and
> also makes it hard to share an implementation of the serializer. For
> example, to support Avro, the serialization logic could be quite involved
> since it might need to register the Avro schema in some remote registry and
> maintain a schema cache locally, etc. Without a serialization api, it's
> impossible to share such an implementation so that people can easily reuse.
> We sort of overlooked this implication during the initial discussion of the
> producer api.

Thanks for bringing this up and the patch.  My take on this is that
any reasoning about the data itself is more appropriately handled
outside of the core producer API. FWIW, I don't think this was
_overlooked_ during the initial discussion of the producer API
(especially since it was a significant change from the old producer).
IIRC we believed at the time that there is elegance and flexibility in
a simple API that deals with raw bytes. I think it is more accurate to
say that this is a reversal of opinion for some (which is fine) but
personally I'm still in the old camp :) i.e., I really like the
simplicity of the current 0.8.2 producer API and find parameterized
types/generics to be distracting and annoying; and IMO any
data-specific handling is better absorbed at a higher-level than the
core Kafka APIs - possibly by a (very thin) wrapper producer library.
I don't quite see why it is difficult to share different wrapper
implementations; or even ser-de libraries for that matter that people
can invoke before sending to/reading from Kafka.

That said I'm not opposed to the change - it's just that I prefer
what's currently there. So I'm +0 on the proposal.

Thanks,

Joel

On Mon, Nov 24, 2014 at 05:58:50PM -0800, Jun Rao wrote:
> Hi, Everyone,
> 
> I'd like to start a discussion on whether it makes sense to add the
> serializer api back to the new java producer. Currently, the new java
> producer takes a byte array for both the key and the value. While this api
> is simple, it pushes the serialization logic into the application. This
> makes it hard to reason about what type of data is being sent to Kafka and
> also makes it hard to share an implementation of the serializer. For
> example, to support Avro, the serialization logic could be quite involved
> since it might need to register the Avro schema in some remote registry and
> maintain a schema cache locally, etc. Without a serialization api, it's
> impossible to share such an implementation so that people can easily reuse.
> We sort of overlooked this implication during the initial discussion of the
> producer api.
> 
> So, I'd like to propose an api change to the new producer by adding back
> the serializer api similar to what we had in the old producer. Specially,
> the proposed api changes are the following.
> 
> First, we change KafkaProducer to take generic types K and V for the key
> and the value, respectively.
> 
> public class KafkaProducer implements Producer {
> 
> public Future send(ProducerRecord record, Callback
> callback);
> 
> public Future send(ProducerRecord record);
> }
> 
> Second, we add two new configs, one for the key serializer and another for
> the value serializer. Both serializers will default to the byte array
> implementation.
> 
> public class ProducerConfig extends AbstractConfig {
> 
> .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS,
> "org.apache.kafka.clients.producer.ByteArraySerializer", Importance.HIGH,
> KEY_SERIALIZER_CLASS_DOC)
> .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS,
> "org.apache.kafka.clients.producer.ByteArraySerializer", Importance.HIGH,
> VALUE_SERIALIZER_CLASS_DOC);
> }
> 
> Both serializers will implement the following interface.
> 
> public interface Serializer extends Configurable {
> public byte[] serialize(String topic, T data, boolean isKey);
> 
> public void close();
> }
> 
> This is more or less the same as what's in the old producer. The slight
> differences are (1) the serializer now only requires a parameter-less
> constructor; (2) the serializer has a configure() and a close() method for
> initialization and cleanup, respectively; (3) the serialize() method
> additionally takes the topic and an isKey indicator, both of which are
> useful for things like schema registration.
> 
> The detailed changes are included in KAFKA-1797. For completeness, I also
> made the corresponding changes for the new java consumer api as well.
> 
> Note that the proposed api changes are incompatible with what's in the
> 0.8.2 branch. However, if those api changes are beneficial, it's probably
> better to include them now in the 0.8.2 release, rather than later.
> 
> I'd like to discuss mainly two things in this thread.
> 1. Do people feel that the proposed api changes are reasonable?
> 2. Are there any concerns of including the api changes in the 0.8.2 final
> release?
> 
> Thanks,
> 
> Jun



How to push metrics to graphite - jmxtrans does not work

2014-12-02 Thread David Montgomery
Hi,

I am having a very difficult time trying to report kafka 8 metrics to
Graphite.  Nothing is listening on  and and no data in graphite.  If
this method of graphite reporting is know to not work is there an
alternative to jmxtrans to get data to graphite?

I am using the deb file to install jmxtrans on ubuntu 12.04

And I use the below to modify kafka scripts

echo 'KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote=true
-Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.ssl=false"' | tee -a
/var/kafka/bin/kafka-run-class.sh
echo 'export JMX_PORT=${JMX_PORT:-}' | tee -a
/var/kafka/bin/kafka-server-start.sh

{
  "servers" : [ {
"host" : "127.0.0.1",
"port" : "",
"alias" : "<%=node.name%>",
"queries" : [
 {
 "obj" : "kafka:type=kafka.SocketServerStats",
  "resultAlias": "kafka.socketServerStats",
  "attr" : [ "AvgFetchRequestMs", "AvgProduceRequestMs",
"BytesReadPerSecond", "BytesWrittenPerSecond", "FetchRequestsPerSecond",
"MaxFetchRequestMs", "MaxProduceRequestMs" , "NumFetchRequests" ,
"NumProduceRequests" , "ProduceRequestsPerSecond", "TotalBytesRead",
"TotalBytesWritten", "TotalFetchRequestMs", "TotalProduceRequestMs" ],
 "outputWriters" : [ {
  "@class" : "com.googlecode.jmxtrans.model.output.GraphiteWriter",
  "settings" : {
"host" : "<%=@monitor_host%>",
"port" : "2003"
  }
} ]
  }
],
"numQueryThreads": "2"
  } ]
}