Re: Metadata Refresh and TimeoutException when MAX_BLOCK_MS_CONFIG set 0

2022-10-29 Thread Bhavesh Mistry
Hi Luke and Kafka Dev Team,

Any interest in making Kafka Producer non-blocking when Broker is down and
when the metadata refresh cache does not have topic details?  This seems to
be a missing piece when it comes to Kafka Producer not being able to handle
state when it is really down vs metadata refresh is not available.

I hope there is enough interest to make this producer broker down vs
metadata not available.

Thanks,

Bhavesh

On Mon, Oct 10, 2022 at 4:04 PM Bhavesh Mistry 
wrote:

> Hi Luke,
>
> Thanks for the pointers.
>
> Sorry for being late I was out.
>
>
>
> I would like to propose the following which might be a little different
> from the Old one:
>
> Kafka Producer must distinguish between *broker down state* vs *metadata
> NOT available* for a given topic.
>
>
>
> Like the boot-strap server option, many applications (like ours) do not
> dynamically create topics and publish/subscribe to predefine topics. So,
> the Kafka producer can have a configuration option to “*predefine-topics*”.
> When a predefine-topic is configured, Metadata is fetched for those
> pre-defined topics before the producer is initialized.  Also, these
> pre-defined topics will always guarantee that Metadata will be refreshed
> before it expires meaning (the metadata cache will expire at X time, then
> the producer should automatically fetch metadata refresh request X-(3000)
> ms so the cache will always have the latest mapping of topic partition
> states continue to fetch everyone seconds till it expires cache X-2000 and
> X-1000).  This will guarantee the non-blocking behavior for pre-defined
> topics.  Blocking behavior is acceptable for topics that are NOT defined
> ahead of time or dynamic topics.
>
>
>
> Another configuration we should have is to *drop-message-on-broker-down*
> (true or false), even if the metadata has expired just DROP the message
> till the broker is online.  Do NOT block the application thread which puts
> stuff on the Kafka in-memory queue.  Of course, the Kafka producer will
> have to keep track of all brokers and it states the in-memory data
> structure and update it periodically (when send is a success or ping to
> port (IP: port) is a success).
>
>
>
> Luke and others let me know what you think about it.
>
>
> I can write documents if there is interest in the topic.
>
>
> Thanks,
>
>
> Bhavesh
>
>
> On Sun, Sep 25, 2022 at 8:44 PM Luke Chen  wrote:
>
>> Hi Bhavesh,
>>
>> I understand your point.
>> There was an old KIP with the similar idea which was not accepted by the
>> community in the end.
>> Maybe you can try to bring it back to the community again, or try to
>> propose your own KIP for this idea?
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-286%3A+producer.send%28%29+should+not+block+on+metadata+update
>>
>> Thank you.
>> Luke
>>
>> On Sat, Sep 24, 2022 at 6:36 AM Bhavesh Mistry <
>> mistry.p.bhav...@gmail.com>
>> wrote:
>>
>> > Hello Kafka Team,
>> >
>> > I would appreciate any insight into how to distinguish between Brocker
>> Down
>> > vs Metadata Refresh not available due to timing issues.
>> >
>> > Thanks,
>> >
>> > Bhavesh
>> >
>> > On Mon, Sep 19, 2022 at 12:50 PM Bhavesh Mistry <
>> > mistry.p.bhav...@gmail.com>
>> > wrote:
>> >
>> > > Hello Kafka Team,
>> > >
>> > >
>> > >
>> > > We have an environment where Kafka Broker can go down for whatever
>> > reason.
>> > >
>> > >
>> > >
>> > > Hence, we had configured MAX_BLOCK_MS_CONFIG=0 because we wanted to
>> drop
>> > > messages when brokers were NOT available.
>> > >
>> > >
>> > >
>> > > Now the issue is we get data loss due to METADATA not being available
>> and
>> > > get this exception “*Topic  not present in metadata after 0
>> ms.”.
>> > > *This is due to the fast metadata has expired and the next request to
>> > > send an event does not have metadata.
>> > >
>> > >
>> > >
>> > > Why does Kafka have his design?  Why can’t Kafka distinguish between
>> > > Broker down vs metadata refresh not available?  Is it reasonable to
>> > expect
>> > > metadata would refresh BEFORE it expires so metadata refresh doesn’t
>> need
>> > > before it expires? Have Metadata ready before expires?  Any particular
>> > > reason send() has wait for metadata refresh vs background thread that
>> > > automatically refreshes metadata before it expires, hence send()
>> method
>> > > never incur wait().
>> > >
>> > >
>> > > Let me know what suggestion you have to prevent the application thread
>> > > from blocking (MAX_BLOCK_MS_CONFIG) when the Kafka brokers are DOWN vs
>> > > metadata is NOT available due to expiration.
>> > >
>> > >
>> > >
>> > > Let me know your suggestions and what you think about metadata
>> refresh.
>> > > Should Kafka Producer be proactively refreshing metadata intelligently
>> > > rather than what the producer does today?
>> > >
>> > >
>> > >
>> > >
>> > >
>> > > Thanks,
>> > > Bhavesh
>> > >
>> >
>>
>


Re: Metadata Refresh and TimeoutException when MAX_BLOCK_MS_CONFIG set 0

2022-10-10 Thread Bhavesh Mistry
Hi Luke,

Thanks for the pointers.

Sorry for being late I was out.



I would like to propose the following which might be a little different
from the Old one:

Kafka Producer must distinguish between *broker down state* vs *metadata
NOT available* for a given topic.



Like the boot-strap server option, many applications (like ours) do not
dynamically create topics and publish/subscribe to predefine topics. So,
the Kafka producer can have a configuration option to “*predefine-topics*”.
When a predefine-topic is configured, Metadata is fetched for those
pre-defined topics before the producer is initialized.  Also, these
pre-defined topics will always guarantee that Metadata will be refreshed
before it expires meaning (the metadata cache will expire at X time, then
the producer should automatically fetch metadata refresh request X-(3000)
ms so the cache will always have the latest mapping of topic partition
states continue to fetch everyone seconds till it expires cache X-2000 and
X-1000).  This will guarantee the non-blocking behavior for pre-defined
topics.  Blocking behavior is acceptable for topics that are NOT defined
ahead of time or dynamic topics.



Another configuration we should have is to *drop-message-on-broker-down*
(true or false), even if the metadata has expired just DROP the message
till the broker is online.  Do NOT block the application thread which puts
stuff on the Kafka in-memory queue.  Of course, the Kafka producer will
have to keep track of all brokers and it states the in-memory data
structure and update it periodically (when send is a success or ping to
port (IP: port) is a success).



Luke and others let me know what you think about it.


I can write documents if there is interest in the topic.


Thanks,


Bhavesh


On Sun, Sep 25, 2022 at 8:44 PM Luke Chen  wrote:

> Hi Bhavesh,
>
> I understand your point.
> There was an old KIP with the similar idea which was not accepted by the
> community in the end.
> Maybe you can try to bring it back to the community again, or try to
> propose your own KIP for this idea?
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-286%3A+producer.send%28%29+should+not+block+on+metadata+update
>
> Thank you.
> Luke
>
> On Sat, Sep 24, 2022 at 6:36 AM Bhavesh Mistry  >
> wrote:
>
> > Hello Kafka Team,
> >
> > I would appreciate any insight into how to distinguish between Brocker
> Down
> > vs Metadata Refresh not available due to timing issues.
> >
> > Thanks,
> >
> > Bhavesh
> >
> > On Mon, Sep 19, 2022 at 12:50 PM Bhavesh Mistry <
> > mistry.p.bhav...@gmail.com>
> > wrote:
> >
> > > Hello Kafka Team,
> > >
> > >
> > >
> > > We have an environment where Kafka Broker can go down for whatever
> > reason.
> > >
> > >
> > >
> > > Hence, we had configured MAX_BLOCK_MS_CONFIG=0 because we wanted to
> drop
> > > messages when brokers were NOT available.
> > >
> > >
> > >
> > > Now the issue is we get data loss due to METADATA not being available
> and
> > > get this exception “*Topic  not present in metadata after 0
> ms.”.
> > > *This is due to the fast metadata has expired and the next request to
> > > send an event does not have metadata.
> > >
> > >
> > >
> > > Why does Kafka have his design?  Why can’t Kafka distinguish between
> > > Broker down vs metadata refresh not available?  Is it reasonable to
> > expect
> > > metadata would refresh BEFORE it expires so metadata refresh doesn’t
> need
> > > before it expires? Have Metadata ready before expires?  Any particular
> > > reason send() has wait for metadata refresh vs background thread that
> > > automatically refreshes metadata before it expires, hence send() method
> > > never incur wait().
> > >
> > >
> > > Let me know what suggestion you have to prevent the application thread
> > > from blocking (MAX_BLOCK_MS_CONFIG) when the Kafka brokers are DOWN vs
> > > metadata is NOT available due to expiration.
> > >
> > >
> > >
> > > Let me know your suggestions and what you think about metadata refresh.
> > > Should Kafka Producer be proactively refreshing metadata intelligently
> > > rather than what the producer does today?
> > >
> > >
> > >
> > >
> > >
> > > Thanks,
> > > Bhavesh
> > >
> >
>


Re: Metadata Refresh and TimeoutException when MAX_BLOCK_MS_CONFIG set 0

2022-09-23 Thread Bhavesh Mistry
Hello Kafka Team,

I would appreciate any insight into how to distinguish between Brocker Down
vs Metadata Refresh not available due to timing issues.

Thanks,

Bhavesh

On Mon, Sep 19, 2022 at 12:50 PM Bhavesh Mistry 
wrote:

> Hello Kafka Team,
>
>
>
> We have an environment where Kafka Broker can go down for whatever reason.
>
>
>
> Hence, we had configured MAX_BLOCK_MS_CONFIG=0 because we wanted to drop
> messages when brokers were NOT available.
>
>
>
> Now the issue is we get data loss due to METADATA not being available and
> get this exception “*Topic  not present in metadata after 0 ms.”.
> *This is due to the fast metadata has expired and the next request to
> send an event does not have metadata.
>
>
>
> Why does Kafka have his design?  Why can’t Kafka distinguish between
> Broker down vs metadata refresh not available?  Is it reasonable to expect
> metadata would refresh BEFORE it expires so metadata refresh doesn’t need
> before it expires? Have Metadata ready before expires?  Any particular
> reason send() has wait for metadata refresh vs background thread that
> automatically refreshes metadata before it expires, hence send() method
> never incur wait().
>
>
> Let me know what suggestion you have to prevent the application thread
> from blocking (MAX_BLOCK_MS_CONFIG) when the Kafka brokers are DOWN vs
> metadata is NOT available due to expiration.
>
>
>
> Let me know your suggestions and what you think about metadata refresh.
> Should Kafka Producer be proactively refreshing metadata intelligently
> rather than what the producer does today?
>
>
>
>
>
> Thanks,
> Bhavesh
>


Metadata Refresh and TimeoutException when MAX_BLOCK_MS_CONFIG set 0

2022-09-19 Thread Bhavesh Mistry
Hello Kafka Team,



We have an environment where Kafka Broker can go down for whatever reason.



Hence, we had configured MAX_BLOCK_MS_CONFIG=0 because we wanted to drop
messages when brokers were NOT available.



Now the issue is we get data loss due to METADATA not being available and
get this exception “*Topic  not present in metadata after 0 ms.”.  *This
is due to the fast metadata has expired and the next request to send an
event does not have metadata.



Why does Kafka have his design?  Why can’t Kafka distinguish between Broker
down vs metadata refresh not available?  Is it reasonable to expect
metadata would refresh BEFORE it expires so metadata refresh doesn’t need
before it expires? Have Metadata ready before expires?  Any particular
reason send() has wait for metadata refresh vs background thread that
automatically refreshes metadata before it expires, hence send() method
never incur wait().


Let me know what suggestion you have to prevent the application thread from
blocking (MAX_BLOCK_MS_CONFIG) when the Kafka brokers are DOWN vs metadata
is NOT available due to expiration.



Let me know your suggestions and what you think about metadata refresh.
Should Kafka Producer be proactively refreshing metadata intelligently
rather than what the producer does today?





Thanks,
Bhavesh


Re: Authorization Engine For Kafka Related to KPI-11

2015-11-03 Thread Bhavesh Mistry
+ Kafka Dev team to see if Kafka Dev team know or recommend any Auth
engine for Producers/Consumers.

Thanks,

Bhavesh

Please pardon me,  I accidentally send previous blank email.

On Tue, Nov 3, 2015 at 9:52 PM, Bhavesh Mistry
 wrote:
> On Sun, Nov 1, 2015 at 11:15 PM, Bhavesh Mistry
>  wrote:
>> HI All,
>>
>> Have any one used Apache Ranger as Authorization Engine for Kafka Topic
>> creation, consumption (read) and  write operation on a topic.  I am looking
>> at having audit log and regulating consumption/ write to particular topic
>> (for example, having production environment access does not mean that anyone
>> can run console consumer etc on particular topic. Basically, regulate who
>> can read/write to a topic as first use case).
>>
>> https://cwiki.apache.org/confluence/display/RANGER/Apache+Ranger+0.5+-+User+Guide#ApacheRanger0.5-UserGuide-KAFKA
>>
>> If you have used Apache Ranger in production, I have following question:
>> 1) Is there any performance impact with Brokers/Producer/Consumer while
>> using Apache Ranger ?
>> 2) Is Audit log really useful out-of-box ? or let me know what sort of
>> reports you run on audit logs (e.g pumping Apache Ranger audit log into any
>> other system for reporting purpose).
>>
>> Please share your experience using Kafka with any other Authorization engine
>> if you are not using Apache Ranger (This is based on
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorization+Interface).
>>
>> Thanks and looking forward to hear back from Apache Kafka Community members.
>>
>> Thanks,
>>
>> Bhavesh


Re: Authorization Engine For Kafka Related to KPI-11

2015-11-03 Thread Bhavesh Mistry
On Sun, Nov 1, 2015 at 11:15 PM, Bhavesh Mistry
 wrote:
> HI All,
>
> Have any one used Apache Ranger as Authorization Engine for Kafka Topic
> creation, consumption (read) and  write operation on a topic.  I am looking
> at having audit log and regulating consumption/ write to particular topic
> (for example, having production environment access does not mean that anyone
> can run console consumer etc on particular topic. Basically, regulate who
> can read/write to a topic as first use case).
>
> https://cwiki.apache.org/confluence/display/RANGER/Apache+Ranger+0.5+-+User+Guide#ApacheRanger0.5-UserGuide-KAFKA
>
> If you have used Apache Ranger in production, I have following question:
> 1) Is there any performance impact with Brokers/Producer/Consumer while
> using Apache Ranger ?
> 2) Is Audit log really useful out-of-box ? or let me know what sort of
> reports you run on audit logs (e.g pumping Apache Ranger audit log into any
> other system for reporting purpose).
>
> Please share your experience using Kafka with any other Authorization engine
> if you are not using Apache Ranger (This is based on
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorization+Interface).
>
> Thanks and looking forward to hear back from Apache Kafka Community members.
>
> Thanks,
>
> Bhavesh


Authorization Engine For Kafka Related to KPI-11

2015-11-01 Thread Bhavesh Mistry
HI All,

Have any one used Apache Ranger  as
Authorization Engine for Kafka Topic creation, consumption (read) and
write operation on a topic.  I am looking at having audit log and
regulating consumption/ write to particular topic (for example, having
production environment access does not mean that anyone can run console
consumer etc on particular topic. Basically, regulate who can read/write to
a topic as first use case).

https://cwiki.apache.org/confluence/display/RANGER/Apache+Ranger+0.5+-+User+Guide#ApacheRanger0.5-UserGuide-KAFKA

If you have used Apache Ranger in production, I have following question:
1) Is there any performance impact with Brokers/Producer/Consumer while
using Apache Ranger ?
2) Is Audit log really useful out-of-box ? or let me know what sort of
reports you run on audit logs (e.g pumping Apache Ranger audit log into any
other system for reporting purpose).

Please share your experience using Kafka with any other Authorization
engine if you are not using Apache Ranger (This is based on
https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorization+Interface).


Thanks and looking forward to hear back from Apache Kafka Community
members.

Thanks,

Bhavesh


Re: New Consumer API and Range Consumption with Fail-over

2015-08-05 Thread Bhavesh Mistry
Hi Jason,

Thanks for info.  I will implement (by end of next week) what you have
proposed.  If I encounter any issue,  I will let you know.

Indeed, adding new API would be uphill battle.  I did follow email chain
"Re: Kafka Consumer thoughts".

Thanks,

Bhavesh

On Wed, Aug 5, 2015 at 10:03 AM, Jason Gustafson  wrote:

> Hey Bhavesh,
>
> I think your use case can be handled with the new consumer API in roughly
> the manner I suggested previously. It might be a little easier if we added
> the ability to set the end offset for consumption. Perhaps something like
> this:
>
> // stop consumption from the partition when offset is reached
> void limit(TopicPartition partition, long offset)
>
> My guess is that we'd have a bit of an uphill battle to get this into the
> first release, but it may be possible if the use case is common enough. In
> any case, I think consuming to the limit offset and manually pausing the
> partition is a viable alternative.
>
> As for your question about fail-over, the new consumer provides a similar
> capability to the old high-level consumer. Here is a link to the wiki which
> describes its design:
>
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design
>
> -Jason
>
> On Tue, Aug 4, 2015 at 12:01 AM, Bhavesh Mistry <
> mistry.p.bhav...@gmail.com>
> wrote:
>
> > Hi Jason and Kafka Dev Team,
> >
> >
> >
> > First of all thanks for responding and I think you got expected behavior
> > correctly.
> >
> >
> >
> > The use-case is offset range consumption.  We store each minute highest
> > offset for each topic per partition.  So if we need to reload or
> re-consume
> > data from yesterday per say 8AM to noon, we would have offset start
> mapping
> > at 8AM and end offset mapping at noon in Time Series Database.
> >
> >
> >
> > I was trying to load this use case with New Consumer API.   Do you or
> Kafka
> > Dev team agree with request to either have API that takes in topic and
> its
> > start/end offset for High Level Consumer group  (With older consumer API
> we
> > used Simple consumer before without fail-over).  Also, for each
> > range-consumption, there will be different group id  and group id will
> not
> > be reused.  The main purpose is to reload or process past data again (due
> > to production bugs or downtime etc occasionally and let main
> consumer-group
> > continue to consume latest records).
> >
> >
> > void subscribe(TopicPartition[] startOffsetPartitions, TopicPartition[]
> > endOffsetPartitions)
> >
> >
> >
> > or something similar which will allow following:
> >
> >
> >
> > 1)   When consumer group already exists (meaning have consumed data and
> > committed offset to storage system either Kafka or ZK) ignore start
> offset
> > positions and use committed offset.  If not committed use start Offset
> > Partition.
> >
> > 2)   When partition consumption has reached end Offset for given
> partition,
> > pause is fine or this assigned thread become fail over or wait for
> > reassignment.
> >
> > 3)   When all are Consumer Group is done consuming all partitions offset
> > ranges (start to end), gracefully shutdown entire consumer group.
> >
> > 4)   While consuming records, if one of node or consuming thread goes
> down
> > automatic fail-over to others (Similar to High Level Consumer for OLD
> > Consumer API.   I am not sure if there exists High level and/or Simple
> > Consumer concept for New API  )
> >
> >
> >
> > I hope above explanation clarifies use-case and intended behavior.
> Thanks
> > for clarifications, and you are correct we need pause(TopicPartition tp),
> > resume(TopicPartition tp), and/or API to set to end offset for each
> > partition.
> >
> >
> >
> > Please do let us know your preference to support above simple use-case.
> >
> >
> > Thanks,
> >
> >
> > Bhavesh
> >
> > On Thu, Jul 30, 2015 at 1:23 PM, Jason Gustafson 
> > wrote:
> >
> > > Hi Bhavesh,
> > >
> > > I'm not totally sure I understand the expected behavior, but I think
> this
> > > can work. Instead of seeking to the start of the range before the poll
> > > loop, you should probably provide a ConsumerRebalanceCallback to get
> > > notifications when group assignment has changed (e.g. when one of your
> > > nodes dies). When a new partition is assigned, the callback will be
> > invoked
> > > by the consumer and you

Gobblin Data Ingestion Framework

2015-08-05 Thread Bhavesh Mistry
Camus will be phased out according to their git hub update.  Has anyone
tried Gobblin to load data into Hadoop (1.x) from Kafka (0.8.x.x)



Thanks,



Bhavesh


Re: New Consumer API and Range Consumption with Fail-over

2015-08-04 Thread Bhavesh Mistry
Hi Jason and Kafka Dev Team,



First of all thanks for responding and I think you got expected behavior
correctly.



The use-case is offset range consumption.  We store each minute highest
offset for each topic per partition.  So if we need to reload or re-consume
data from yesterday per say 8AM to noon, we would have offset start mapping
at 8AM and end offset mapping at noon in Time Series Database.



I was trying to load this use case with New Consumer API.   Do you or Kafka
Dev team agree with request to either have API that takes in topic and its
start/end offset for High Level Consumer group  (With older consumer API we
used Simple consumer before without fail-over).  Also, for each
range-consumption, there will be different group id  and group id will not
be reused.  The main purpose is to reload or process past data again (due
to production bugs or downtime etc occasionally and let main consumer-group
continue to consume latest records).


void subscribe(TopicPartition[] startOffsetPartitions, TopicPartition[]
endOffsetPartitions)



or something similar which will allow following:



1)   When consumer group already exists (meaning have consumed data and
committed offset to storage system either Kafka or ZK) ignore start offset
positions and use committed offset.  If not committed use start Offset
Partition.

2)   When partition consumption has reached end Offset for given partition,
pause is fine or this assigned thread become fail over or wait for
reassignment.

3)   When all are Consumer Group is done consuming all partitions offset
ranges (start to end), gracefully shutdown entire consumer group.

4)   While consuming records, if one of node or consuming thread goes down
automatic fail-over to others (Similar to High Level Consumer for OLD
Consumer API.   I am not sure if there exists High level and/or Simple
Consumer concept for New API  )



I hope above explanation clarifies use-case and intended behavior.  Thanks
for clarifications, and you are correct we need pause(TopicPartition tp),
resume(TopicPartition tp), and/or API to set to end offset for each
partition.



Please do let us know your preference to support above simple use-case.


Thanks,


Bhavesh

On Thu, Jul 30, 2015 at 1:23 PM, Jason Gustafson  wrote:

> Hi Bhavesh,
>
> I'm not totally sure I understand the expected behavior, but I think this
> can work. Instead of seeking to the start of the range before the poll
> loop, you should probably provide a ConsumerRebalanceCallback to get
> notifications when group assignment has changed (e.g. when one of your
> nodes dies). When a new partition is assigned, the callback will be invoked
> by the consumer and you can use it to check if there's a committed position
> in the range or if you need to seek to the beginning of the range. For
> example:
>
> void onPartitionsAssigned(consumer, partitions) {
>   for (partition : partitions) {
>  try {
>offset = consumer.committed(partition)
>consumer.seek(partition, offset)
>  } catch (NoOffsetForPartition) {
>consumer.seek(partition, rangeStart)
>  }
>   }
> }
>
> If a failure occurs, then the partitions will be rebalanced across
> whichever consumers are still active. The case of the entire cluster being
> rebooted is not really different. When the consumers come back, they check
> the committed position and resume where they left off. Does that make
> sense?
>
> After you are finished consuming a partition's range, you can use
> KafkaConsumer.pause(partition) to prevent further fetches from being
> initiated while still maintaining the current assignment. The patch to add
> pause() is not in trunk yet, but it probably will be before too long.
>
> One potential problem is that you wouldn't be able to reuse the same group
> to consume a different range because of the way it depends on the committed
> offsets. Kafka's commit API actually allows some additional metadata to go
> along with a committed offset and that could potentially be used to tie the
> commit to the range, but it's not yet exposed in KafkaConsumer. I assume it
> will be eventually, but I'm not sure whether that will be part of the
> initial release.
>
>
> Hope that helps!
>
> Jason
>
> On Thu, Jul 30, 2015 at 7:54 AM, Bhavesh Mistry <
> mistry.p.bhav...@gmail.com>
> wrote:
>
> > Hello Kafka Dev Team,
> >
> >
> > With new Consumer API redesign  (
> >
> >
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
> > ),  is there a capability to consume given the topic and partition
> start/
> > end position.  How would I achieve following use case of range
> consumption
> > with fail-over.
> >
> >
> > Use C

New Consumer API and Range Consumption with Fail-over

2015-07-30 Thread Bhavesh Mistry
Hello Kafka Dev Team,


With new Consumer API redesign  (
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
),  is there a capability to consume given the topic and partition  start/
end position.  How would I achieve following use case of range consumption
with fail-over.


Use Case:
Ability to reload data given topic and its partition offset start/end with
High Level Consumer with fail over.   Basically, High Level Range
consumption and consumer group dies while main consumer group.


Suppose you have a topic called “test-topic” and its partition begin and
end offset.

{

topic:  test-topic,

[   {  partition id : 1 , offset start:   100,  offset end:
500,000 },


{  partition id : 2 ,  offset start:   200,000, offset end:
500,000

….. for n partitions

]

}

Each you create consumer group: “Range-Consumer “ and use seek method and
for each partition.   Your feedback is greatly appreciated.


In each JVM,


For each consumption tread:


Consumer c = KafkaConsumer( { group.id=”Range-consumer}…)

Map parttionTOEndOfsetMapping ….

for(TopicPartition tp : topicPartitionlist){

seek(TopicPartition(Parition 1), long offset)

}



while(true){

ConsumerRecords records = consumer.poll(1);

// for each record check the offset

record = record.iterator().next();

if(parttionTOEndOfsetMapping(record.getPartition()) <=
record.getoffset) {
  // consume  record

//commit  offset

  consumer.commit(CommitType.SYNC);

}else {

// Should I unsubscribe it now  for this partition ?

consumer.unscribe(record.getPartition)

}



}




Please let me know if the above approach is valid:

1) how will fail-over work.

2) how Rebooting entire consumer group impacts offset seek ? Since offset
are stored by Kafka itsself.

Thanks ,

Bhavesh


Mirror Maker Copy Data over Public Network

2015-07-02 Thread Bhavesh Mistry
Hi All,

Does anyone have set-up mirror maker to copy data over target cluster  via
SSH tunnel or ipsec for encrypting data ?  I cannot use application
encryption because existing consumer expect  non-encrypted data (already
running in prod).   Any feedback SSH tunnel vs Ipsec or any issues you may
have face.

Thanks in advance for feedback !

Thanks,

Bhavesh


Re: querying messages based on timestamps

2015-06-30 Thread Bhavesh Mistry
We had similar requirement to re-load the data based on timestamp (range
between 1PM to 2PM) etc.

We store the relationship between timestamp and largest offset number in
Time Series Database using jmxtrans (LogEndOffset JMX bean vs current
time.).  You can setup polling interval to be 60 minutes via jmxtrans and
store the LogEndOffset offset (for each partitions).

Use this info from TSDB to fetch range offset any time ( last five minute
etc) for each partition.

Here is JMS bean info:
Log.LogEndOffset
Log.LogStartOffset
Log.size


I hope this helps.

Thanks,

Bhavesh



On Tue, Jun 30, 2015 at 4:23 PM, Zaiming Shi  wrote:

> Hi Jiangjie !
> Does the word 'Currently' imply any plan in introducing timestamp in log
> entries?
>
> Regards
> /Zaiming
> On Jun 30, 2015 11:08 PM, "Jiangjie Qin" 
> wrote:
>
> > Currently Kafka only have a very coarse solution to find offset by time,
> > which is based on the segment last modified time.
> > This interface is only available in simple consumer. You may issue an
> > OffsetRequest to specify a timestamp. The offset returned will be the
> > first offset of segments whose last modification time is earlier than the
> > timestamp you provided.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On 6/30/15, 1:16 PM, "Adam Dubiel"  wrote:
> >
> > >We faced similar problem and ended up with implementing variant of
> golden
> > >section search, that reads message using simple consumer and checks the
> > >timestamp (timestamps are appended by our producer though, they do not
> > >come
> > >from any Kafka metadata) till it finds message closest to given date.
> > >
> > >Adam
> > >
> > >2015-06-30 21:52 GMT+02:00 Shushant Arora :
> > >
> > >> Is it possible using low level consumer to get kafka messages based on
> > >> timestamp, say I want to get all messages from last 5 minutes. I don't
> > >>know
> > >> what were offsets of partitions 5 minutes back.
> > >>
> > >> In low level consumer : when I gave epoch for  whichTime , it failed.
> > >>  requestInfo.put(topicAndPartition, new
> > >> PartitionOffsetRequestInfo(whichTime,
> > >> 1));
> > >>
> > >> Is only latest and earliest supported in timestamp,Is there any way to
> > >> filter messages based on timestamp?
> > >>
> > >> Thanks
> > >> Shushant
> > >>
> >
> >
>


Re: At-least-once guarantees with high-level consumer

2015-06-18 Thread Bhavesh Mistry
HI Carl,

Produce side retry can produce duplicated message being sent to brokers
with different offset with same message. Also, you may get duplicated when
the High Level Consumer offset is not being saved or commit but you have
processed data and your server restart etc...



To guaranteed at-least one processing across partitions (and across
servers), you will need to store message hash or primary key into
distributed LRU cache (with eviction policy )  like Hazelcast
 and do dedupping across partitions.



I hope this help !



Thanks,

Bhavesh


On Wed, Jun 17, 2015 at 1:49 AM, yewton  wrote:

> So Carl Heymann's ConsumerIterator.next hack approach is not reasonable?
>
> 2015-06-17 08:12:50 + 上のメッセージ Stevo Slavić:
>
>  --047d7bfcf30ed09b460518b241db
>>
>> Content-Type: text/plain; charset=UTF-8
>>
>>
>>
>>
>> With auto-commit one can only have at-most-once delivery guarantee - after
>>
>> commit but before message is delivered for processing, or even after it is
>>
>> delivered but before it is processed, things can fail, causing event not
>> to
>>
>> be processed, which is basically same outcome as if it was not delivered.
>>
>>
>>
>> On Mon, Jun 15, 2015 at 9:12 PM, Carl Heymann 
>> wrote:
>>
>>
>>
>> > Hi
>>
>> >
>>
>> > ** Disclaimer: I know there's a new consumer API on the way, this mail
>> is
>>
>> > about the currently available API. I also apologise if the below has
>>
>> > already been discussed previously. I did try to check previous
>> discussions
>>
>> > on ConsumerIterator **
>>
>> >
>>
>> > It seems to me that the high-level consumer would be able to support
>>
>> > at-least-once messaging, even if one uses auto-commit, by changing
>>
>> > kafka.consumer.ConsumerIterator.next() to call
>>
>> > currentTopicInfo.resetConsumeOffset(..) _before_ super.next(). This
>> way, a
>>
>> > consumer thread for a KafkaStream could just loop:
>>
>> >
>>
>> > while (true) {
>>
>> > MyMessage message = iterator.next().message();
>>
>> > process(message);
>>
>> > }
>>
>> >
>>
>> > Each call to "iterator.next()" then updates the offset to commit to the
>> end
>>
>> > of the message that was just processed. When offsets are committed for
>> the
>>
>> > ConsumerConnector (either automatically or manually), the commit will
>> not
>>
>> > include offsets of messages that haven't been fully processed.
>>
>> >
>>
>> > I've tested the following ConsumerIterator.next(), and it seems to work
>> as
>>
>> > I expect:
>>
>> >
>>
>> >   override def next(): MessageAndMetadata[K, V] = {
>>
>> > // New code: reset consumer offset to the end of the previously
>>
>> > consumed message:
>>
>> > if (consumedOffset > -1L && currentTopicInfo != null) {
>>
>> > currentTopicInfo.resetConsumeOffset(consumedOffset)
>>
>> > val topic = currentTopicInfo.topic
>>
>> > trace("Setting %s consumed offset to %d".format(topic,
>>
>> > consumedOffset))
>>
>> > }
>>
>> >
>>
>> > // Old code, excluding reset:
>>
>> > val item = super.next()
>>
>> > if(consumedOffset < 0)
>>
>> >   throw new KafkaException("Offset returned by the message set is
>>
>> > invalid %d".format(consumedOffset))
>>
>> > val topic = currentTopicInfo.topic
>>
>> > consumerTopicStats.getConsumerTopicStats(topic).messageRate.mark()
>>
>> > consumerTopicStats.getConsumerAllTopicStats().messageRate.mark()
>>
>> > item
>>
>> >   }
>>
>> >
>>
>> > I've seen several people asking about managing commit offsets manually
>> with
>>
>> > the high level consumer. I suspect that this approach (the modified
>>
>> > ConsumerIterator) would scale better than having a separate
>>
>> > ConsumerConnecter per stream just so that you can commit offsets with
>>
>> > at-least-once semantics. The downside of this approach is more duplicate
>>
>> > deliveries after recovery from hard failure (but this is "at least
>> once",
>>
>> > right, not "exactly once").
>>
>> >
>>
>> > I don't propose that the code necessarily be changed like this in
>> trunk, I
>>
>> > just want to know if the approach seems reasonable.
>>
>> >
>>
>> > Regards
>>
>> > Carl Heymann
>>
>> >
>>
>>
>>
>> --047d7bfcf30ed09b460518b241db--
>>
>>
>>
>>
>
>
>


Re: How Producer handles Network Connectivity Issues

2015-05-29 Thread Bhavesh Mistry
Hi Kamal,

In order to monitor each instance of producer, you will need to have
alternative network monitoring channel (e.g Flume or Another Kafka Cluster
for just monitoring a producers at large scale).

Here is detail:

1) Add Custom Appender for Log4J and intercept all logs of Kafka Producer
java Package.
2) Capture WARN and ERROR logs and log it to disk
3) Have Flume Agent or any program that can ship logs on disk to  remote
location or central location ( Basically use alternative system to
transport logs and ingestion log to any monitoring system or Elastic Search
)

This assumes that you have network and/or physical layer redundancy for
this critical monitoring.  I hope this helps !


Thanks,
Bhavesh

On Wed, May 27, 2015 at 10:37 AM, Kamal C  wrote:

> Thanks for the response Ewen!
>
> On Tue, May 26, 2015 at 10:52 PM, Ewen Cheslack-Postava  >
> wrote:
>
> > It's not being switched in this case because the broker hasn't failed. It
> > can still connect to all the other brokers and zookeeper. The only
> failure
> > is of the link between a client and the broker.
> >
> > Another way to think of this is to extend the scenario with more
> producers.
> > If I have 100 other producers and they can all still connect, would you
> > still consider this a failure and expect the leader to change? Since
> > network partitions (or periods of high latency, or long GC pauses, etc)
> can
> > happen arbitrarily and clients might be spread far and wide, you can't
> rely
> > on their connectivity as an indicator of the health of the Kafka broker.
> >
> > Of course, there's also a pretty big practical issue: since the client
> > can't connect to the broker, how would it even report that it has a
> > connectivity issue?
> >
> > -Ewen
> >
> > On Mon, May 25, 2015 at 10:05 PM, Kamal C  wrote:
> >
> > > Hi,
> > >
> > > I have a cluster of 3 Kafka brokers and a remote producer. Producer
> > > started to send messages to *SampleTopic*. Then I blocked the network
> > > connectivity between the Producer and the leader node for the topic
> > > *SampleTopic* but network connectivity is healthy between the cluster
> and
> > > producer is able to reach the other two nodes.
> > >
> > > *With Script*
> > >
> > > sh kafka-topics.sh --zookeeper localhost --describe
> > > Topic:SampleTopicPartitionCount:1ReplicationFactor:3
> Configs:
> > > Topic: SampleTopicPartition: 0Leader: 1Replicas: 1,2,0
> > > Isr: 1,2,0
> > >
> > >
> > > Producer tries forever to reach the leader node by throwing connection
> > > refused exception. I understand that when there is a node failure
> leader
> > > gets switched. Why it's not switching the leader in this scenario ?
> > >
> > > --
> > > Kamal C
> > >
> >
> >
> >
> > --
> > Thanks,
> > Ewen
> >
>


Re: Is there a way to know when I've reached the end of a partition (consumed all messages) when using the high-level consumer?

2015-05-10 Thread Bhavesh Mistry
I have used what Gwen has suggested but to avoid false positive:

While consuming records keep track of *last* consumed offset and compare
with latest offset on broker for consumed topic when you get "TimeOut
Exception" for that particular partition for given topic (e.g JMX Bean
*LogEndOffset
*for consumed topic for given partition.

This works well.  In our use case,  we were using High Level Consumer for
only *single* topic.

I hope this helps !


Thanks,

Bhavesh

On Sun, May 10, 2015 at 2:03 PM, Ewen Cheslack-Postava 
wrote:

> @Gwen- But that only works for topics that have low enough traffic that you
> would ever actually hit that timeout.
>
> The Confluent schema registry needs to do something similar to make sure it
> has fully consumed the topic it stores data in so it doesn't serve stale
> data. We know in our case we'll only have a single producer to the topic
> (the current leader of the schema registry cluster) so we have a different
> solution. We produce a message to the topic (which is 1 partition, but this
> works for a topic partition too), grab the resulting offset from the
> response, then consume until we see the message we produced. Obviously this
> isn't ideal since we a) have to produce extra bogus messages to the topic
> and b) it only works in the case where you know the consumer is also the
> only producer.
>
> The new consumer interface sort of addresses this since it has seek
> functionality, where one of the options is seekToEnd. However, I think you
> have to be very careful with this, especially using the current
> implementation. It seeks to the end, but it also marks those messages as
> consumed. This means that even if you keep track of your original position
> and seek back to it, if you use background offset commits you could end up
> committing incorrect offsets, crashing, and then missing some messages when
> another consumer claims that partition (or just due to another consumer
> joining the group).
>
> Not sure if there are many other use cases for grabbing the offset data
> with a simple API. Might mean there's a use case for either some additional
> API or some utilities independent of an actual consumer instance which
> allow you to easily query the state of topics/partitions.
>
>
> On Sun, May 10, 2015 at 12:43 AM, Gwen Shapira 
> wrote:
>
> > For Flume, we use the timeout configuration and catch the exception, with
> > the assumption that "no messages for few seconds" == "the end".
> >
> > On Sat, May 9, 2015 at 2:04 AM, James Cheng  wrote:
> >
> > > Hi,
> > >
> > > I want to use the high level consumer to read all partitions for a
> topic,
> > > and know when I have reached "the end". I know "the end" might be a
> > little
> > > vague, since items keep showing up, but I'm trying to get as close as
> > > possible. I know that more messages might show up later, but I want to
> > know
> > > when I've received all the items that are currently available in the
> > topic.
> > >
> > > Is there a standard/recommended way to do this?
> > >
> > > I know one way to do it is to first issue an OffsetRequest for each
> > > partition, which would get me the last offset, and then use that
> > > information in my high level consumer to detect when I've reached that
> a
> > > message with that offset. Which is exactly what the SimpleConsumer
> > example
> > > does (
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
> > ).
> > > That involves finding the leader for the partition, etc etc. Not hard,
> > but
> > > a bunch of steps.
> > >
> > > I noticed that kafkacat has an option similar to what I'm looking for:
> > >   -e Exit successfully when last message received
> > >
> > > Looking at the code, it appears that a FetchRequest returns the
> > > HighwaterMarkOffset mark for a partition, and the API docs confirm
> that:
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-FetchResponse
> > >
> > > Does the Java high-level consumer expose the HighwaterMarkOffset in any
> > > way? I looked but I couldn't find such a thing.
> > >
> > > Thanks,
> > > -James
> > >
> > >
> >
>
>
>
> --
> Thanks,
> Ewen
>


Re: Kafka Monitoring using JMX

2015-04-20 Thread Bhavesh Mistry
You can use this
https://github.com/Stackdriver/jmxtrans-config-stackdriver/blob/master/jmxtrans/stackdriver/json-specify-instance/kafka.json
as an example of how Mbean are named and how " is being escaped with \",
and just use different output writer for any thing prior to 0.8.2.1
version.  After 0.8.2.1,   you will have to remove " and there are some
changes  type,name etc.  Please Google search there is JMXTrans tools to
auto generate JMX trans configuration from the runnning brokers.

Thanks,

Bhavesh

On Mon, Apr 20, 2015 at 1:57 PM, Daniel Compton <
daniel.compton.li...@gmail.com> wrote:

> Hi Naidu
>
> You'll need to escape the " with a \ in the Mbean names. I've run across
> this too and it was al pain. It can get a bit tricky if you're doing it in
> code because you need to account for double escapes and so forth. This is a
> bug in the version of Metrics that Kafka is using. There is a JIRA ticket
> to upgrade it somewhere, but I can't find it right now.
>
> Daniel.
> On Tue, 21 Apr 2015 at 7:02 am Saladi Naidu  >
> wrote:
>
> > We are planning to monitor health of our Kafka environment using JMX. I
> > have looked at below links to find what is available via
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/Available+Metrics
> > https://cwiki.apache.org/confluence/display/KAFKA/JMX+Reporters
> >
> > why some of the kafka objects ("kafka.cluster", "kafka.consumer" etc are
> > written with an additional " ?
> >
> > E.g
> "kafka.server":type="BrokerTopicMetrics",name="AllTopicsBytesInPerSec" Vs
> > the normal Mbeans are just something like this:
> >
> > java.lang:type=GarbageCollector,name=ConcurrentMarkSweep
> >
> > We are not successful in calling objects that have "" in their
> > names.  Naidu Saladi
> >
>


Producer Behavior When one or more Brokers' Disk is Full.

2015-03-25 Thread Bhavesh Mistry
Hello Kafka Community,



What is expected behavior on Producer side when one or more Brokers’  disk
is full, but have not reached retention period for topics (by size or by
time limit).



Does producer send data to that particular brokers and/or Producer Queue
gets full and always throws  Queue Full  or based on configuration (I have
producer with non-blocking setting when queue is full and ack are 0,1 and
retries set to 3).



What is expected behavior on OLD [Scala Based] vs Pure Java Based Producer ?


Here is reference to past discussion:
http://grokbase.com/t/kafka/users/147h4958k8/how-to-recover-from-a-disk-full-situation-in-kafka-cluster


Is there wiki or cookbook steps to recover from such situation ?



Thanks,

Bhavesh


New Offset Management API Question

2015-03-25 Thread Bhavesh Mistry
Hi Joel Koshy,

I have following questions:

1) Does Offset Commit/Fetch API works with Simple Consumer ?

Use Case:  Camus dual commit offset on hdfs as well as to Kafka offset
topic.  It will be only committed once when FILE Is committed to HDFS (Not
like auto commit)

2) With MM, can you selectively MM offset topic.

Use Case:
Let's suppose you want to build Active Consumer Group in DC1 and passive
Consumer Group (Not yet started in DC 2).  Can you MM single offset topic
and when DC1 consumer Group goes down, DC2 (with manual or  automated
custom logic) start same consumer group with last committed offset.  Is
this possible ?


Thanks,
Bhavesh


Re: integrate Camus and Hive?

2015-03-11 Thread Bhavesh Mistry
Hi Andrew,

I would say camus is generic enough (but you can propose this to Camus
Team).

Here is sample code and methods that you can use to create any path or
directory structure and create a corresponding (Hive Table schema for it).

public class UTCLogPartitioner extends Partitioner {

@Override
public String *encodePartition*(JobContext context, IEtlKey key) {
 long outfilePartitionMs =
EtlMultiOutputFormat.getEtlOutputFileTimePartitionMins(context) * 6L;
 return ""+DateUtils.getPartition(outfilePartitionMs,
*key.getTime()*);
}

@Override
public String *generatePartitionedPath*(JobContext context, String
topic, String brokerId, int partitionId, String *encodedPartition*) {
StringBuilder sb = new StringBuilder();
sb.append("Create your HDFS custom path here");
return sb.toString();
}

}

I

Thanks,
Bhavesh

On Wed, Mar 11, 2015 at 10:42 AM, Andrew Otto  wrote:

> Thanks,
>
> Do you have this partitioner implemented?  Perhaps it would be good to try
> to get this into Camus as a build in option.  HivePartitioner? :)
>
> -Ao
>
>
> > On Mar 11, 2015, at 13:11, Bhavesh Mistry 
> wrote:
> >
> > Hi Ad
> >
> > You have to implement custom partitioner and also you will have to create
> > what ever path (based on message eg log line timestamp, or however you
> > choose to create directory hierarchy from your each message).
> >
> > You will need to implement your own Partitioner class implementation:
> >
> https://github.com/linkedin/camus/blob/master/camus-api/src/main/java/com/linkedin/camus/etl/Partitioner.java
> > and use configuration "etl.partitioner.class=CLASSNAME"  then you can
> > organize any way you like.
> >
> > I hope this helps.
> >
> >
> > Thanks,
> >
> > Bhavesh
> >
> >
> > On Wed, Mar 11, 2015 at 8:36 AM, Andrew Otto 
> wrote:
> >
> >>> e.g File produce by the camus job:  /user/[hive.user]/output/
> >>>
> >>
> *partition_month_utc=2015-03/partition_day_utc=2015-03-11/partition_minute_bucket=2015-03-11-02-09/*
> >>
> >> Bhavesh, how do you get Camus to write into a directory hierarchy like
> >> this?  Is it reading the partition values from your messages'
> timestamps?
> >>
> >>
> >>> On Mar 11, 2015, at 11:29, Bhavesh Mistry 
> >> wrote:
> >>>
> >>> HI Yang,
> >>>
> >>> We do this today camus to hive (without the Avro) just plain old tab
> >>> separated log line.
> >>>
> >>> We use the hive -f command to add dynamic partition to hive table:
> >>>
> >>> Bash Shell Scripts add time buckets into HIVE table before camus job
> >> runs:
> >>>
> >>> for partition in "${@//\//,}"; do
> >>>  echo "ALTER TABLE ${env:TABLE_NAME} ADD IF NOT EXISTS PARTITION
> >>> ($partition);"
> >>> done | hive -f
> >>>
> >>>
> >>> e.g File produce by the camus job:  /user/[hive.user]/output/
> >>>
> >>
> *partition_month_utc=2015-03/partition_day_utc=2015-03-11/partition_minute_bucket=2015-03-11-02-09/*
> >>>
> >>> Above will add hive dynamic partition before camus job runs.  It works,
> >> and
> >>> you can have any schema:
> >>>
> >>> CREATE EXTERNAL TABLE IF NOT EXISTS ${env:TABLE_NAME} (
> >>> SOME Table FIELDS...
> >>> )
> >>> PARTITIONED BY (
> >>>   partition_month_utc STRING,
> >>>   partition_day_utc STRING,
> >>>   partition_minute_bucket STRING
> >>> )
> >>> ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
> >>> STORED AS SEQUENCEFILE
> >>> LOCATION '${env:TABLE_LOCATION_CAMUS_OUTPUT}'
> >>> ;
> >>>
> >>>
> >>> I hope this will help !   You will have to construct  hive query
> >> according
> >>> to partition define.
> >>>
> >>> Thanks,
> >>>
> >>> Bhavesh
> >>>
> >>> On Wed, Mar 11, 2015 at 7:24 AM, Andrew Otto 
> >> wrote:
> >>>
> >>>>> Hive provides the ability to provide custom patterns for partitions.
> >> You
> >>>>> can use this in combination with MSCK REPAIR TABLE to automatically
> >>>> detect
> >>>>> and load the partitions into the metastore.
> >>>>
> >>>> I tried this yesterday, and as far a

Re: integrate Camus and Hive?

2015-03-11 Thread Bhavesh Mistry
Hi Ad

You have to implement custom partitioner and also you will have to create
what ever path (based on message eg log line timestamp, or however you
choose to create directory hierarchy from your each message).

You will need to implement your own Partitioner class implementation:
https://github.com/linkedin/camus/blob/master/camus-api/src/main/java/com/linkedin/camus/etl/Partitioner.java
and use configuration "etl.partitioner.class=CLASSNAME"  then you can
organize any way you like.

I hope this helps.


Thanks,

Bhavesh


On Wed, Mar 11, 2015 at 8:36 AM, Andrew Otto  wrote:

> > e.g File produce by the camus job:  /user/[hive.user]/output/
> >
> *partition_month_utc=2015-03/partition_day_utc=2015-03-11/partition_minute_bucket=2015-03-11-02-09/*
>
> Bhavesh, how do you get Camus to write into a directory hierarchy like
> this?  Is it reading the partition values from your messages' timestamps?
>
>
> > On Mar 11, 2015, at 11:29, Bhavesh Mistry 
> wrote:
> >
> > HI Yang,
> >
> > We do this today camus to hive (without the Avro) just plain old tab
> > separated log line.
> >
> > We use the hive -f command to add dynamic partition to hive table:
> >
> > Bash Shell Scripts add time buckets into HIVE table before camus job
> runs:
> >
> > for partition in "${@//\//,}"; do
> >   echo "ALTER TABLE ${env:TABLE_NAME} ADD IF NOT EXISTS PARTITION
> > ($partition);"
> > done | hive -f
> >
> >
> > e.g File produce by the camus job:  /user/[hive.user]/output/
> >
> *partition_month_utc=2015-03/partition_day_utc=2015-03-11/partition_minute_bucket=2015-03-11-02-09/*
> >
> > Above will add hive dynamic partition before camus job runs.  It works,
> and
> > you can have any schema:
> >
> > CREATE EXTERNAL TABLE IF NOT EXISTS ${env:TABLE_NAME} (
> >  SOME Table FIELDS...
> >  )
> >  PARTITIONED BY (
> >partition_month_utc STRING,
> >partition_day_utc STRING,
> >partition_minute_bucket STRING
> >  )
> >  ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
> >  STORED AS SEQUENCEFILE
> >  LOCATION '${env:TABLE_LOCATION_CAMUS_OUTPUT}'
> > ;
> >
> >
> > I hope this will help !   You will have to construct  hive query
> according
> > to partition define.
> >
> > Thanks,
> >
> > Bhavesh
> >
> > On Wed, Mar 11, 2015 at 7:24 AM, Andrew Otto 
> wrote:
> >
> >>> Hive provides the ability to provide custom patterns for partitions.
> You
> >>> can use this in combination with MSCK REPAIR TABLE to automatically
> >> detect
> >>> and load the partitions into the metastore.
> >>
> >> I tried this yesterday, and as far as I can tell it doesn’t work with a
> >> custom partition layout.  At least not with external tables.  MSCK
> REPAIR
> >> TABLE reports that there are directories in the table’s location that
> are
> >> not partitions of the table, but it wouldn’t actually add the partition
> >> unless the directory layout matched Hive’s default
> >> (key1=value1/key2=value2, etc.)
> >>
> >>
> >>
> >>> On Mar 9, 2015, at 17:16, Pradeep Gollakota 
> >> wrote:
> >>>
> >>> If I understood your question correctly, you want to be able to read
> the
> >>> output of Camus in Hive and be able to know partition values. If my
> >>> understanding is right, you can do so by using the following.
> >>>
> >>> Hive provides the ability to provide custom patterns for partitions.
> You
> >>> can use this in combination with MSCK REPAIR TABLE to automatically
> >> detect
> >>> and load the partitions into the metastore.
> >>>
> >>> Take a look at this SO
> >>>
> >>
> http://stackoverflow.com/questions/24289571/hive-0-13-external-table-dynamic-partitioning-custom-pattern
> >>>
> >>> Does that help?
> >>>
> >>>
> >>> On Mon, Mar 9, 2015 at 1:42 PM, Yang  wrote:
> >>>
> >>>> I believe many users like us would export the output from camus as a
> >> hive
> >>>> external table. but the dir structure of camus is like
> >>>> //MM/DD/xx
> >>>>
> >>>> while hive generally expects /year=/month=MM/day=DD/xx if you
> >>>> define that table to be
> >>>> partitioned by (year, month, day). otherwise you'd have to add those
> >>>> partitions created by camus through a separate command. but in the
> >> latter
> >>>> case, would a camus job create >1 partitions ? how would we find out
> the
> >>>> /MM/DD values from outside ?  well you could always do
> >> something by
> >>>> hadoop dfs -ls and then grep the output, but it's kind of not
> clean
> >>>>
> >>>>
> >>>> thanks
> >>>> yang
> >>>>
> >>
> >>
>
>


Re: integrate Camus and Hive?

2015-03-11 Thread Bhavesh Mistry
HI Yang,

We do this today camus to hive (without the Avro) just plain old tab
separated log line.

We use the hive -f command to add dynamic partition to hive table:

Bash Shell Scripts add time buckets into HIVE table before camus job runs:

for partition in "${@//\//,}"; do
   echo "ALTER TABLE ${env:TABLE_NAME} ADD IF NOT EXISTS PARTITION
($partition);"
done | hive -f


e.g File produce by the camus job:  /user/[hive.user]/output/
*partition_month_utc=2015-03/partition_day_utc=2015-03-11/partition_minute_bucket=2015-03-11-02-09/*

Above will add hive dynamic partition before camus job runs.  It works, and
you can have any schema:

CREATE EXTERNAL TABLE IF NOT EXISTS ${env:TABLE_NAME} (
  SOME Table FIELDS...
  )
  PARTITIONED BY (
partition_month_utc STRING,
partition_day_utc STRING,
partition_minute_bucket STRING
  )
  ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
  STORED AS SEQUENCEFILE
  LOCATION '${env:TABLE_LOCATION_CAMUS_OUTPUT}'
;


I hope this will help !   You will have to construct  hive query according
to partition define.

Thanks,

Bhavesh

On Wed, Mar 11, 2015 at 7:24 AM, Andrew Otto  wrote:

> > Hive provides the ability to provide custom patterns for partitions. You
> > can use this in combination with MSCK REPAIR TABLE to automatically
> detect
> > and load the partitions into the metastore.
>
> I tried this yesterday, and as far as I can tell it doesn’t work with a
> custom partition layout.  At least not with external tables.  MSCK REPAIR
> TABLE reports that there are directories in the table’s location that are
> not partitions of the table, but it wouldn’t actually add the partition
> unless the directory layout matched Hive’s default
> (key1=value1/key2=value2, etc.)
>
>
>
> > On Mar 9, 2015, at 17:16, Pradeep Gollakota 
> wrote:
> >
> > If I understood your question correctly, you want to be able to read the
> > output of Camus in Hive and be able to know partition values. If my
> > understanding is right, you can do so by using the following.
> >
> > Hive provides the ability to provide custom patterns for partitions. You
> > can use this in combination with MSCK REPAIR TABLE to automatically
> detect
> > and load the partitions into the metastore.
> >
> > Take a look at this SO
> >
> http://stackoverflow.com/questions/24289571/hive-0-13-external-table-dynamic-partitioning-custom-pattern
> >
> > Does that help?
> >
> >
> > On Mon, Mar 9, 2015 at 1:42 PM, Yang  wrote:
> >
> >> I believe many users like us would export the output from camus as a
> hive
> >> external table. but the dir structure of camus is like
> >> //MM/DD/xx
> >>
> >> while hive generally expects /year=/month=MM/day=DD/xx if you
> >> define that table to be
> >> partitioned by (year, month, day). otherwise you'd have to add those
> >> partitions created by camus through a separate command. but in the
> latter
> >> case, would a camus job create >1 partitions ? how would we find out the
> >> /MM/DD values from outside ?  well you could always do
> something by
> >> hadoop dfs -ls and then grep the output, but it's kind of not clean
> >>
> >>
> >> thanks
> >> yang
> >>
>
>


Re: Camus Issue about Output File EOF Issue

2015-03-04 Thread Bhavesh Mistry
Hi Gwen,

The root cause of all io related problems seems to be file rename that
Camus does and underlying Hadoop MapR FS.

We are copying files from user volume to a day volume (rename does copy)
when mapper commits file to FS.  Please refer to
http://answers.mapr.com/questions/162562/volume-issues-end-of-file-javaioioexception-file-i.html
for
more info.

We will have to patch Camus to copy to tmp directory then move to final
destination as work around for now to make rename or file rename a more
reliable.


Thanks,

Bhavesh

On Monday, March 2, 2015, Bhavesh Mistry  wrote:

>
> I suspect Camus job has issue because other process  ( another separate
> Map/Reduce Job) also write to same "time" (folders) bucket and it does not
> have this issue at all (so far) when reading from other dependent Hive
> job.  This dependent Hive job only have issue with files created via camus
> job ( not always but intermittently and hive job fails with read error
> about EOF and work around for now is to remove these unclosed file from the
> folder and hive job succeeds  ).
>
>
>
> Thanks,
>
> Bhavesh
>
> On Mon, Mar 2, 2015 at 5:27 PM, Gwen Shapira  > wrote:
>
>> Actually, the error you sent shows that its trying to read a TEXT file
>> as if it was Seq. Thats why I suspected a misconfiguration of some
>> sort.
>>
>> Why do you suspect a race condition?
>>
>> On Mon, Mar 2, 2015 at 5:19 PM, Bhavesh Mistry
>> > > wrote:
>> > Hi Gwen,
>> >
>> > We are using MapR (Sorry no Cloudera) distribution.
>> >
>> >
>> > I am suspecting it is code issue.  I am in-processor review the code
>> about
>> > MultiOutputFormat class.
>> >
>> >
>> https://github.com/linkedin/camus/blob/master/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/mapred/EtlMultiOutputFormat.java#L67https://github.com/linkedin/camus/blob/master/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/mapred/EtlMultiOutputRecordWriter.java#L35
>> >
>> > I am suspecting that due to some concurrency, it is replacing older
>> writer
>> > with new one (old writer does not close).   The file it crates is
>> usually
>> > small,and has very small content for problematic files (EOF file).
>> >
>> >
>> https://github.com/linkedin/camus/blob/master/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/common/SequenceFileRecordWriterProvider.java#L91
>> >
>> >
>> > Based on above code, Do you think there is likelihood that output file
>> may
>> > be unclosed file ?  Also, my plan is to add isClose() api to each
>> writer,
>> > and if you have time, you can quickly review them (suggest or your
>> > feedback) about unclosed files.  By the way, we are on Hadoop 1.0.3 API
>> (
>> > so I was thinking about
>> >
>> http://tool.oschina.net/uploads/apidocs/hadoop/org/apache/hadoop/mapred/MapReduceBase.html#close()
>> > and make sure within the close we close all the File Writers.. let me
>> know
>> > if this is good or not do final clean-up).
>> >
>> >
>> > public interface RecordWriterWithCloseStatus extends
>> RecordWriter> > V>{
>> >
>> > /**
>> >
>> >  * Give Ability to check if close has been called on the writer or File
>> has
>> > been closed on not..
>> >
>> >  * @return
>> >
>> >  */
>> >
>> > public boolean isClose();
>> >
>> > }
>> >
>> > And each of the writer will have ability check for clean at all the
>> time:
>> >
>> > eg:
>> >
>> > {code}
>> >
>> >   return new RecordWriterWithStatus() {
>> >
>> > private volatile boolean close;
>> >
>> >
>> > @Override
>> >
>> > public void write(IEtlKey key, CamusWrapper data) throws
>> > IOException, InterruptedException {
>> >
>> >
>> >
>> >
>> > /**
>> >
>> >  * What if file is closed ?  Should we create a new one
>> here..?
>> >
>> >  */
>> >
>> >
>> >
>> > // Use the timestamp from the EtlKey as the key for this
>> > record.
>> >
>> > // TODO: Is there a better key to use here?
>> >
>> > writer.append(new LongWritable(key.getTime()), new Text(
>> > record));
>> >
>>

Re: Camus Issue about Output File EOF Issue

2015-03-02 Thread Bhavesh Mistry
I suspect Camus job has issue because other process  ( another separate
Map/Reduce Job) also write to same "time" (folders) bucket and it does not
have this issue at all (so far) when reading from other dependent Hive
job.  This dependent Hive job only have issue with files created via camus
job ( not always but intermittently and hive job fails with read error
about EOF and work around for now is to remove these unclosed file from the
folder and hive job succeeds  ).



Thanks,

Bhavesh

On Mon, Mar 2, 2015 at 5:27 PM, Gwen Shapira  wrote:

> Actually, the error you sent shows that its trying to read a TEXT file
> as if it was Seq. Thats why I suspected a misconfiguration of some
> sort.
>
> Why do you suspect a race condition?
>
> On Mon, Mar 2, 2015 at 5:19 PM, Bhavesh Mistry
>  wrote:
> > Hi Gwen,
> >
> > We are using MapR (Sorry no Cloudera) distribution.
> >
> >
> > I am suspecting it is code issue.  I am in-processor review the code
> about
> > MultiOutputFormat class.
> >
> >
> https://github.com/linkedin/camus/blob/master/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/mapred/EtlMultiOutputFormat.java#L67https://github.com/linkedin/camus/blob/master/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/mapred/EtlMultiOutputRecordWriter.java#L35
> >
> > I am suspecting that due to some concurrency, it is replacing older
> writer
> > with new one (old writer does not close).   The file it crates is usually
> > small,and has very small content for problematic files (EOF file).
> >
> >
> https://github.com/linkedin/camus/blob/master/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/common/SequenceFileRecordWriterProvider.java#L91
> >
> >
> > Based on above code, Do you think there is likelihood that output file
> may
> > be unclosed file ?  Also, my plan is to add isClose() api to each writer,
> > and if you have time, you can quickly review them (suggest or your
> > feedback) about unclosed files.  By the way, we are on Hadoop 1.0.3 API (
> > so I was thinking about
> >
> http://tool.oschina.net/uploads/apidocs/hadoop/org/apache/hadoop/mapred/MapReduceBase.html#close()
> > and make sure within the close we close all the File Writers.. let me
> know
> > if this is good or not do final clean-up).
> >
> >
> > public interface RecordWriterWithCloseStatus extends
> RecordWriter > V>{
> >
> > /**
> >
> >  * Give Ability to check if close has been called on the writer or File
> has
> > been closed on not..
> >
> >  * @return
> >
> >  */
> >
> > public boolean isClose();
> >
> > }
> >
> > And each of the writer will have ability check for clean at all the time:
> >
> > eg:
> >
> > {code}
> >
> >   return new RecordWriterWithStatus() {
> >
> > private volatile boolean close;
> >
> >
> > @Override
> >
> > public void write(IEtlKey key, CamusWrapper data) throws
> > IOException, InterruptedException {
> >
> >
> >
> >
> > /**
> >
> >  * What if file is closed ?  Should we create a new one
> here..?
> >
> >  */
> >
> >
> >
> > // Use the timestamp from the EtlKey as the key for this
> > record.
> >
> > // TODO: Is there a better key to use here?
> >
> > writer.append(new LongWritable(key.getTime()), new Text(
> > record));
> >
> > }
> >
> >
> > @Override
> >
> > public void close(TaskAttemptContext context) throws
> > IOException, InterruptedException {
> >
> > writer.close();
> >
> > close = true;
> >
> > }
> >
> >
> >
> > protected void finalize() throws Throwable {
> >
> > if(this.close){
> >
> >  log.error("This file was not closed so try to close during
> the
> > JVM finalize..");
> >
> >  try{
> >
> >  writer.close();
> >
> >  }catch(Throwable th){
> >
> >  log.error("File Close erorr during finalize()");
> >
> >  }
> >
> >     }
> >
> > super.finalize();
> >
> > }
> >
> > @Override
> >
> > public boolean isClose() {
> >

Re: Camus Issue about Output File EOF Issue

2015-03-02 Thread Bhavesh Mistry
Hi Gwen,

We are using MapR (Sorry no Cloudera) distribution.


I am suspecting it is code issue.  I am in-processor review the code about
MultiOutputFormat class.

https://github.com/linkedin/camus/blob/master/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/mapred/EtlMultiOutputFormat.java#L67https://github.com/linkedin/camus/blob/master/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/mapred/EtlMultiOutputRecordWriter.java#L35

I am suspecting that due to some concurrency, it is replacing older writer
with new one (old writer does not close).   The file it crates is usually
small,and has very small content for problematic files (EOF file).

https://github.com/linkedin/camus/blob/master/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/common/SequenceFileRecordWriterProvider.java#L91


Based on above code, Do you think there is likelihood that output file may
be unclosed file ?  Also, my plan is to add isClose() api to each writer,
and if you have time, you can quickly review them (suggest or your
feedback) about unclosed files.  By the way, we are on Hadoop 1.0.3 API (
so I was thinking about
http://tool.oschina.net/uploads/apidocs/hadoop/org/apache/hadoop/mapred/MapReduceBase.html#close()
and make sure within the close we close all the File Writers.. let me know
if this is good or not do final clean-up).


public interface RecordWriterWithCloseStatus extends RecordWriter{

/**

 * Give Ability to check if close has been called on the writer or File has
been closed on not..

 * @return

 */

public boolean isClose();

}

And each of the writer will have ability check for clean at all the time:

eg:

{code}

  return new RecordWriterWithStatus() {

private volatile boolean close;


@Override

public void write(IEtlKey key, CamusWrapper data) throws
IOException, InterruptedException {




/**

 * What if file is closed ?  Should we create a new one here..?

 */



// Use the timestamp from the EtlKey as the key for this
record.

// TODO: Is there a better key to use here?

writer.append(new LongWritable(key.getTime()), new Text(
record));

}


@Override

public void close(TaskAttemptContext context) throws
IOException, InterruptedException {

writer.close();

close = true;

}



protected void finalize() throws Throwable {

if(this.close){

 log.error("This file was not closed so try to close during the
JVM finalize..");

 try{

 writer.close();

 }catch(Throwable th){

 log.error("File Close erorr during finalize()");

 }

}

super.finalize();

}

@Override

public boolean isClose() {

  return close;

}

@Override

public boolean isClose() {

  return close;

 }

};


Thanks for your quick input and response.


Thanks,

Bhavesh

On Mon, Mar 2, 2015 at 4:05 PM, Gwen Shapira  wrote:

> Do you have the command you used to run Camus? and the config files?
>
> Also, I noticed your file is on maprfs - you may want to check with
> your vendor... I doubt Camus was extensively tested on that particular
> FS.
>
> On Mon, Mar 2, 2015 at 3:59 PM, Bhavesh Mistry
>  wrote:
> > Hi Kakfa User Team,
> >
> > I have been encountering two issues with Camus Kafka ETL Job:
> >
> > 1) End Of File (unclosed files)
> >
> > 2) Not SequenceFile Error
> > The details of issues can be found at
> > https://groups.google.com/forum/#!topic/camus_etl/RHS3ASy7Eqc.
> >
> > If you guys have faced similar issue, please let me know how to go about
> > solving them.
> >
> > Thanks,
> >
> > Bhavesh
>


Camus Issue about Output File EOF Issue

2015-03-02 Thread Bhavesh Mistry
Hi Kakfa User Team,

I have been encountering two issues with Camus Kafka ETL Job:

1) End Of File (unclosed files)

2) Not SequenceFile Error
The details of issues can be found at
https://groups.google.com/forum/#!topic/camus_etl/RHS3ASy7Eqc.

If you guys have faced similar issue, please let me know how to go about
solving them.

Thanks,

Bhavesh


Re: After Leadership Election and "kafka.log" JMXBean Registration Process

2015-02-24 Thread Bhavesh Mistry
HI Jun,

Thanks for info.

Thanks,
Bhavesh

On Tue, Feb 24, 2015 at 2:45 PM, Jun Rao  wrote:

> These two metrics are always registered, whether the replica is the leader
> or the follower.
>
> Thanks,
>
> Jun
>
> On Mon, Feb 23, 2015 at 6:40 PM, Bhavesh Mistry <
> mistry.p.bhav...@gmail.com>
> wrote:
>
> > Hi Kafka Team or User Community ,
> >
> > After leadership election or switch between follower/leader of partition
> > for given topic, does following metrics JMX bean gets registered (on
> > leader)  and de-registered (on follower).
> >
> > LogEndOffset
> > Size
> > LogStartOffset
> >
> > eg:
> > "kafka.log":type="Log",name="TOPIC-17-*Size*"
> > "kafka.log":type="Log",name="TOPIC-17-*LogStartOffset*"
> > "kafka.log":type="Log",name="TOPIC-17-*LogEndOffset*"
> >
> >
> > Thanks in advance for your help !!
> >
> > Thanks,
> >
> > Bhaevsh
> >
>


After Leadership Election and "kafka.log" JMXBean Registration Process

2015-02-23 Thread Bhavesh Mistry
Hi Kafka Team or User Community ,

After leadership election or switch between follower/leader of partition
for given topic, does following metrics JMX bean gets registered (on
leader)  and de-registered (on follower).

LogEndOffset
Size
LogStartOffset

eg:
"kafka.log":type="Log",name="TOPIC-17-*Size*"
"kafka.log":type="Log",name="TOPIC-17-*LogStartOffset*"
"kafka.log":type="Log",name="TOPIC-17-*LogEndOffset*"


Thanks in advance for your help !!

Thanks,

Bhaevsh


Re: Kafka New(Java) Producer Connection reset by peer error and LB

2015-02-09 Thread Bhavesh Mistry
HI Kafka Team,

Please confirm if you would like to open Jira issue to track this ?

Thanks,

Bhavesh

On Mon, Feb 9, 2015 at 12:39 PM, Bhavesh Mistry 
wrote:

> Hi Kakfa Team,
>
> We are getting this connection reset by pears after couple of minute aster
> start-up of producer due to infrastructure deployment strategies we have
> adopted from LinkedIn.
>
> We have LB hostname and port as seed server, and all producers are getting
> following exception because of TCP idle connection timeout set on LB (which
> is 2 minutes and Kafka TCP connection idle is set to 10 minutes).   This
> seems to be  minor bug to close TCP connection after discovering that seed
> server is not part of brokers list immediately.
>
>
> java.io.IOException: Connection reset by peer
> at sun.nio.ch.FileDispatcher.read0(Native Method)
> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21)
> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:198)
> at sun.nio.ch.IOUtil.read(IOUtil.java:171)
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:245)
> at
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:60)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:242)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:178)
> at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)
> at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
> at java.lang.Thread.run(Thread.java:662)
> java.io.IOException: Connection reset by peer
> at sun.nio.ch.FileDispatcher.read0(Native Method)
> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21)
> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:198)
> at sun.nio.ch.IOUtil.read(IOUtil.java:171)
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:245)
> at
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:60)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:242)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:178)
> at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)
> at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
> at java.lang.Thread.run(Thread.java:662)
>
>
> Thanks,
>
> Bhavesh
>
>


Re: Get Latest Offset for Specific Topic for All Partition

2015-02-09 Thread Bhavesh Mistry
Hi Gwen,

This JMX stats  is good for calculate  injection rate per partition.  I do
not have to depend on ZK to figuring out who is leader what is latest
offset.

One quick question,  what is Size # ?  is it # of bytes particular
partition has on disk ?   Unfortunately, MBean description is very limited
and does not help much(it is "Information on the management interface of
the MBean").  It is gauge that is all I can tell .

1189855393 LogEndOffset
1165330350 Size
1176813232 LogStartOffset

Thanks for your help !!

Thanks,

Bhaevsh

Thanks,
Bhavesh

On Thu, Feb 5, 2015 at 12:55 PM, Gwen Shapira  wrote:

> You can use the metrics Kafka publishes.  I think the relevant metrics are:
> Log.LogEndOffset
> Log.LogStartOffset
> Log.size
>
>
> Gwen
>
> On Thu, Feb 5, 2015 at 11:54 AM, Bhavesh Mistry
>  wrote:
> > HI All,
> >
> > I just need to get the latest offset # for topic (not for consumer
> group).
> > Which API to get this info ?
> >
> > My use case is to analyze the data injection rate to each of  partition
> is
> > uniform or not (close). For this,  I am planing to dump the latest offset
> > into graphite  for each partition and look at derivative over time.
> >
> > Thanks,
> >
> > Bhavesh
>


Kafka New(Java) Producer Connection reset by peer error and LB

2015-02-09 Thread Bhavesh Mistry
Hi Kakfa Team,

We are getting this connection reset by pears after couple of minute aster
start-up of producer due to infrastructure deployment strategies we have
adopted from LinkedIn.

We have LB hostname and port as seed server, and all producers are getting
following exception because of TCP idle connection timeout set on LB (which
is 2 minutes and Kafka TCP connection idle is set to 10 minutes).   This
seems to be  minor bug to close TCP connection after discovering that seed
server is not part of brokers list immediately.


java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcher.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:198)
at sun.nio.ch.IOUtil.read(IOUtil.java:171)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:245)
at
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:60)
at org.apache.kafka.common.network.Selector.poll(Selector.java:242)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:178)
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
at java.lang.Thread.run(Thread.java:662)
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcher.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:198)
at sun.nio.ch.IOUtil.read(IOUtil.java:171)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:245)
at
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:60)
at org.apache.kafka.common.network.Selector.poll(Selector.java:242)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:178)
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
at java.lang.Thread.run(Thread.java:662)


Thanks,

Bhavesh


Get Latest Offset for Specific Topic for All Partition

2015-02-05 Thread Bhavesh Mistry
HI All,

I just need to get the latest offset # for topic (not for consumer group).
Which API to get this info ?

My use case is to analyze the data injection rate to each of  partition is
uniform or not (close). For this,  I am planing to dump the latest offset
into graphite  for each partition and look at derivative over time.

Thanks,

Bhavesh


Re: high cpu and network traffic when cluster has no topic

2015-02-04 Thread Bhavesh Mistry
Hi Steven,

Can you please try to see if io thread is indeed a problem ?  The following
on works on Linux:

ps  -p "$java_pid" -L -o tid,pcpu
jstack -F "$java_pid"

Then compare the thread # (may have to Hex # to decimal) between the Jstack
and ps command.  This will  tell you which thread is consuming more CPU for
that process.

Thanks,

Bhavesh

On Wed, Feb 4, 2015 at 9:01 AM, Steven Wu  wrote:

> I have re-run my unit test with 0.8.2.0. same tight-loop problem happened
> after a few mins.
>
> On Tue, Feb 3, 2015 at 10:00 PM, Guozhang Wang  wrote:
>
> > Steven, you may be hitting on KAFKA-1642
> > .
> >
> > As Jay said, a bunch of such issues are fixed in the new release. Please
> > let us know if you still see the issue with it.
> >
> > Guozhang
> >
> > On Tue, Feb 3, 2015 at 8:52 PM, Steven Wu  wrote:
> >
> > > sure. will try my unit test again with 0.8.2.0 release tomorrow and
> > report
> > > back my findings.
> > >
> > > On Tue, Feb 3, 2015 at 8:42 PM, Jay Kreps  wrote:
> > >
> > > > Hey Steven,
> > > >
> > > > That sounds like a bug. I think we fixed a few producer high cpu
> issues
> > > > since the beta, I wonder if you could repeat the same test with the
> > > 0.8.2.
> > > > final release?
> > > >
> > > > -Jay
> > > >
> > > > On Tue, Feb 3, 2015 at 8:37 PM, Steven Wu 
> > wrote:
> > > >
> > > > > actually, my local test can reproduce the issue although not
> > > immediately.
> > > > > seems to happen after a few mins. I enabled TRACE level logging.
> here
> > > > seems
> > > > > to be the tight loop. you can see that there are two metadata
> > requests
> > > in
> > > > > one milli-seconds.
> > > > >
> > > > > kafka-producer-network-thread | foo 20:34:32,626 TRACE
> > > NetworkClient:301
> > > > -
> > > > > Ignoring empty metadata response with correlation id 360185.
> > > > > kafka-producer-network-thread | foo 20:34:32,626 DEBUG
> > > NetworkClient:369
> > > > -
> > > > > Trying to send metadata request to node -2
> > > > > kafka-producer-network-thread | foo 20:34:32,626 DEBUG
> > > NetworkClient:374
> > > > -
> > > > > Sending metadata request ClientRequest(expectResponse=true,
> > > payload=null,
> > > > >
> > > > >
> > > >
> > >
> >
> request=RequestSend(header={api_key=3,api_version=0,correlation_id=360186,client_id=foo},
> > > > > body={topics=[]})) to node -2
> > > > > kafka-producer-network-thread | foo 20:34:32,626 TRACE
> > > NetworkClient:301
> > > > -
> > > > > Ignoring empty metadata response with correlation id 360186.
> > > > > kafka-producer-network-thread | foo 20:34:32,626 DEBUG
> > > NetworkClient:369
> > > > -
> > > > > Trying to send metadata request to node -2
> > > > >
> > > > >
> > > > > On Tue, Feb 3, 2015 at 8:10 PM, Steven Wu 
> > > wrote:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > We have observed high cpu and high network traffic problem when
> > > > > > 1) cluster (0.8.1.1) has no topic
> > > > > > 2) KafkaProducer (0.8.2-beta) object is created without sending
> any
> > > > > traffic
> > > > > >
> > > > > > We have observed such problem twice. In both cases, problem went
> > away
> > > > > > immediately after one/any topic is created.
> > > > > >
> > > > > > Is this a known issue? Just want to check with the community
> first
> > > > before
> > > > > > I spend much time to reproduce it.
> > > > > >
> > > > > > I couldn't reproduce the issue with similar setup with unit test
> > code
> > > > in
> > > > > > IDE. start two brokers with no topic locally on my laptop.
> create a
> > > > > > KafkaProducer object without sending any msgs. but I only tested
> > with
> > > > > > 0.8.2-beta for both broker and producer.
> > > > > >
> > > > > > Thanks,
> > > > > > Steven
> > > > > >
> > > > >
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>


Re: Kafka ETL Camus Question

2015-02-03 Thread Bhavesh Mistry
Hi All ,

Thanks for input I think I got enough information and also
https://groups.google.com/forum/#!topic/camus_etl/1FcpqCnC5M4 gave me more
info about the this.


Thank you all for entertaining my question.  I am in luck on both form :)

Thanks,

Bhavesh


On Tue, Feb 3, 2015 at 12:56 PM, Joel Koshy  wrote:

> There was some confusion here - turns out that they do turn it on. I added
> Tu
> to this thread and his response:
>
> 
> We have speculative set to true by default.  With these settings, we are
> seeing about 5-7% of the tasks have speculative tasks launched, other 90%
> finished within the standard deviations difference and thus speculation
> tasks were never launched.  This will ensure if we have a slow datanode,
> our job would not be impacted.
>
> Camus is setup to consume 10 minutes worth of offset/topic/run. If a topic
> has more than 10 minutes of offset to be consumed, speculative will also
> be active for that topic.  We haven't play much with this setting.
> However, if we ever get into a situation where we have to do catchup, it's
> good to have this setting disabled.
>
> mapreduce.job.speculative.slownodethreshold 1.0
> mapreduce.job.speculative.speculativecap0.1
>
> mapreduce.map.speculative   true
> 
>
> On Tue, Feb 03, 2015 at 05:14:02PM +, Aditya Auradkar wrote:
> > Hi Bhavesh,
> >
> > I just checked with one of the devs on the Camus team. We run the Camus
> job with speculative execution disabled.
> >
> > Aditya
> >
> > 
> > From: Pradeep Gollakota [pradeep...@gmail.com]
> > Sent: Monday, February 02, 2015 11:15 PM
> > To: users@kafka.apache.org
> > Subject: Re: Kafka ETL Camus Question
> >
> > Hi Bhavesh,
> >
> > At Lithium, we don't run Camus in our pipelines yet, though we plan to.
> But
> > I just wanted to comment regarding speculative execution. We have it
> > disabled at the cluster level and typically don't need it for most of our
> > jobs. Especially with something like Camus, I don't see any need to run
> > parallel copies of the same task.
> >
> > On Mon, Feb 2, 2015 at 10:36 PM, Bhavesh Mistry <
> mistry.p.bhav...@gmail.com>
> > wrote:
> >
> > > Hi Jun,
> > >
> > > Thanks for info.  I did not get answer  to my question there so I
> thought I
> > > try my luck here :)
> > >
> > > Thanks,
> > >
> > > Bhavesh
> > >
> > > On Mon, Feb 2, 2015 at 9:46 PM, Jun Rao  wrote:
> > >
> > > > You can probably ask the Camus mailing list.
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > > On Thu, Jan 29, 2015 at 1:59 PM, Bhavesh Mistry <
> > > > mistry.p.bhav...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi Kafka Team or Linked-In  Team,
> > > > >
> > > > > I would like to know if you guys run Camus ETL job with speculative
> > > > > execution true or false.  Does it make sense to set this to false ?
> > > > Having
> > > > > true, it creates additional load on brokers for each map task
> (create a
> > > > map
> > > > > task to pull same partition twice).  Is there any advantage to this
> > > > having
> > > > > it on vs off ?
> > > > >
> > > > > mapred.map.tasks.speculative.execution
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Bhavesh
> > > > >
> > > >
> > >
>
>


Re: Kafka ETL Camus Question

2015-02-02 Thread Bhavesh Mistry
Hi Jun,

Thanks for info.  I did not get answer  to my question there so I thought I
try my luck here :)

Thanks,

Bhavesh

On Mon, Feb 2, 2015 at 9:46 PM, Jun Rao  wrote:

> You can probably ask the Camus mailing list.
>
> Thanks,
>
> Jun
>
> On Thu, Jan 29, 2015 at 1:59 PM, Bhavesh Mistry <
> mistry.p.bhav...@gmail.com>
> wrote:
>
> > Hi Kafka Team or Linked-In  Team,
> >
> > I would like to know if you guys run Camus ETL job with speculative
> > execution true or false.  Does it make sense to set this to false ?
> Having
> > true, it creates additional load on brokers for each map task (create a
> map
> > task to pull same partition twice).  Is there any advantage to this
> having
> > it on vs off ?
> >
> > mapred.map.tasks.speculative.execution
> >
> > Thanks,
> >
> > Bhavesh
> >
>


Kafka ETL Camus Question

2015-01-29 Thread Bhavesh Mistry
Hi Kafka Team or Linked-In  Team,

I would like to know if you guys run Camus ETL job with speculative
execution true or false.  Does it make sense to set this to false ? Having
true, it creates additional load on brokers for each map task (create a map
task to pull same partition twice).  Is there any advantage to this having
it on vs off ?

mapred.map.tasks.speculative.execution

Thanks,

Bhavesh


Re: [kafka-clients] Re: [VOTE] 0.8.2.0 Candidate 2 (with the correct links)

2015-01-26 Thread Bhavesh Mistry
Hi Kafka Team,

I just wanted to bring this to your attention regarding Java New Producer
limitation compare to old producer.

a) Partition Increasing is limited to configured memory allocation.


buffer.memory
batch.size


The maximum partition you could have before impacting (New Java Producers)
producers is buffer.memory /  batch.size.  So Developer can plan for
horizontal scaling partition from the beginning otherwise production
running code will be impacted based on *block.on.buffer.full
configuration *(block
or BufferExhaustedException).

This limitation does not exits with old scala based Producer.

This will allow user community to buffer more and plan the capacity before
hand.   May be add this info
http://kafka.apache.org/documentation.html#newproducerconfigs about
limitation.

Thanks,

Bhavesh


On Mon, Jan 26, 2015 at 10:28 AM, Joe Stein  wrote:

> +1 (binding)
>
> artifacts and quick start look good. I ran in some client code, minor edits
> from 0-8.2-beta https://github.com/stealthly/scala-kafka/pull/26
>
> On Mon, Jan 26, 2015 at 3:38 AM, Manikumar Reddy 
> wrote:
>
> > +1 (Non-binding)
> > Verified source package, unit tests, release build, topic deletion,
> > compaction and random testing
> >
> > On Mon, Jan 26, 2015 at 6:14 AM, Neha Narkhede 
> wrote:
> >
> >> +1 (binding)
> >> Verified keys, quick start, unit tests.
> >>
> >> On Sat, Jan 24, 2015 at 4:26 PM, Joe Stein 
> wrote:
> >>
> >> > That makes sense, thanks!
> >> >
> >> > On Sat, Jan 24, 2015 at 7:00 PM, Jay Kreps 
> wrote:
> >> >
> >> > > But I think the flaw in trying to guess what kind of serializer they
> >> will
> >> > > use is when we get it wrong. Basically let's say we guess "String".
> >> Say
> >> > 30%
> >> > > of the time we will be right and we will save the two configuration
> >> > lines.
> >> > > 70% of the time we will be wrong and the user gets a super cryptic
> >> > > ClassCastException: "xyz cannot be cast to [B" (because [B is how
> java
> >> > > chooses to display the byte array class just to up the pain), then
> >> they
> >> > > figure out how to subscribe to our mailing list and email us the
> >> cryptic
> >> > > exception, then we explain about how we helpfully set these
> properties
> >> > for
> >> > > them to save them time. :-)
> >> > >
> >> > >
> >>
> https://www.google.com/?gws_rd=ssl#q=kafka+classcastexception+%22%5BB%22
> >> > >
> >> > > I think basically we did this experiment with the old clients and
> the
> >> > > conclusion is that serialization is something you basically have to
> >> think
> >> > > about to use Kafka and trying to guess just makes things worse.
> >> > >
> >> > > -Jay
> >> > >
> >> > > On Sat, Jan 24, 2015 at 2:51 PM, Joe Stein 
> >> wrote:
> >> > >
> >> > >> Maybe. I think the StringSerialzer could look more like a typical
> >> type
> >> > of
> >> > >> message.  Instead of encoding being a property it would be more
> >> > typically
> >> > >> just written in the bytes.
> >> > >>
> >> > >> On Sat, Jan 24, 2015 at 12:12 AM, Jay Kreps 
> >> > wrote:
> >> > >>
> >> > >> > I don't think so--see if you buy my explanation. We previously
> >> > defaulted
> >> > >> > to the byte array serializer and it was a source of unending
> >> > frustration
> >> > >> > and confusion. Since it wasn't a required config people just went
> >> > along
> >> > >> > plugging in whatever objects they had, and thinking that changing
> >> the
> >> > >> > parametric types would somehow help. Then they would get a class
> >> case
> >> > >> > exception and assume our stuff was somehow busted, not realizing
> we
> >> > had
> >> > >> > helpfully configured a type different from what they were passing
> >> in
> >> > >> under
> >> > >> > the covers. So I think it is actually good for people to think:
> how
> >> > am I
> >> > >> > serializing my data, and getting that exception will make them
> ask
> >> > that
> >> > >> > question right?
> >> > >> >
> >> > >> > -Jay
> >> > >> >
> >> > >> > On Fri, Jan 23, 2015 at 9:06 PM, Joe Stein  >
> >> > >> wrote:
> >> > >> >
> >> > >> >> Should value.serializer in the new java producer be defaulted to
> >> > >> >> Array[Byte] ?
> >> > >> >>
> >> > >> >> I was working on testing some upgrade paths and got this
> >> > >> >>
> >> > >> >> ! return exception in callback when buffer cannot accept
> >> message
> >> > >> >>
> >> > >> >>   ConfigException: Missing required configuration
> >> > >> "value.serializer"
> >> > >> >> which has no default value. (ConfigDef.java:124)
> >> > >> >>
> >> > >> >>
> >>  org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124)
> >> > >> >>
> >> > >> >>
> >> > >> >>
> >> > >> >>
> >> > >>
> >> >
> >>
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:48)
> >> > >> >>
> >> > >> >>
> >> > >> >>
> >> > >> >>
> >> > >>
> >> >
> >>
> org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:235)
> >> > >> >>
> >> > >> >>
> >> > >> >>
> >> > >> >>
> >> > >>
> >> >
> >>
> org.apache.kafka.clients.producer.KafkaProd

Counting # of Message Brokers Receive Per Minute Per Topic

2015-01-22 Thread Bhavesh Mistry
Hi Kafka Team,

I need to count message received by entire Kafka Broker Cluster for a
particular topic.

I have 3 brokers, so do I need to sum the COUNT metric or just one server
count reflect all server count.  It seems that count is always increasing
(although metric name is *MessagesInPerSec* so count does not get reset)

"kafka.server":type="BrokerTopicMetrics",name="topic-MessagesInPerSec"

Please clarify.

Thanks,
Bhavesh


Kafka Cluster Monitoring and Documentation of Internals (JMX Metrics) of Rejected Events

2015-01-12 Thread Bhavesh Mistry
Hi Kafka Team,

I am trying to find out Kafka Internal and how a message can be corrupted
or lost at brokers side.

I have refer to following documentations for monitoring:
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Internals
http://kafka.apache.org/documentation.html#monitoring

I am looking at following beans:

"kafka.server":type="BrokerTopicMetrics",name="test-FailedProduceRequestsPerSec"
"kafka.server":type="BrokerTopicMetrics",name="test-BytesRejectedPerSec"


I see following exception on Brokers side rejecting due to large request.
This is great but it does not show the source ip of prodcuer that caused
this issue ? Is there any way to log and capture this ?






*[2014-10-14 22:09:53,262] ERROR [KafkaApi-2] Error processing
ProducerRequest with correlation id 28795280 from client Xon partition
[XXX,17]
(kafka.server.KafkaApis)kafka.common.MessageSizeTooLargeException: Message
size is 2924038 bytes which exceeds the maximum configured message size of
2097152.  *

Can you this be reported as separate metric "MessageSizeTooLargeException
per topic" ?

Also, what is best way to find the CRC check error from the consumer side ?
How do you debug this ?

e.g log line:

*11 Dec 2014 07:22:33,387 ERROR [pool-15-thread-4] *
*kafka.message.InvalidMessageException: Message is corrupt (stored crc =
1834644195, computed crc = 2374999037)*

Also, is there any jira open to update with list all latest metrics and its
format and what it means ?
http://kafka.apache.org/documentation.html#monitoring.  Please see attached
image for list of all metrics.

Version of Broker is 0.8.1.1.

Thanks,

Bhavesh


Re: Follow-up On Important Issues for 0.8.2

2015-01-08 Thread Bhavesh Mistry
Adding User Community to see if any one knows behavior of Producer for
issue #1) and status of 2).


Thanks,

Bhavesh

On Fri, Jan 2, 2015 at 12:37 PM, Bhavesh Mistry 
wrote:

> Hi Kafka Dev Team,
>
> I am following-up with you guys regarding New (Java) Producer behavior in
> event of network or firewall rules.  I just wanted to make Java Producer
> resilient of any network or firewall issues, and does not become
> single-point of failure in application:
>
> 1) Jira Issue https://issues.apache.org/jira/browse/KAFKA-1788
>
>
> https://issues.apache.org/jira/browse/KAFKA-1788?focusedCommentId=14259235&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14259235
>
> What should be the behavior of the Producer when it can not reach leader
> broker, but metadata reported broker is leader for that partition (via
> other broker) ?  Should the record-error-rate be counted and Call Back
> should be called with error or not ?
>
> 1) *record-error-rate* metric remain zero despite following firewall
> rule. In my opinion, it should have called
> org.apache.kafka.clients.producer.Callback but I did not see that happening
> either one or two are brokers not reachable.
>
> 2)  Is  jira ticket https://issues.apache.org/jira/browse/KAFKA-1788 will
> be merged to 0.8.2 ?  This will give the ability to close the producer in
> event of lost connectivity to broker  if io thread misbehave (does not end)
> ?
>
> Thanks for your help !
>
> Thanks,
> Bhavesh
>


Latency Tracking Across All Kafka Component

2015-01-05 Thread Bhavesh Mistry
Hi Kafka Team/Users,

We are using Linked-in Kafka data pipe-line end-to-end.

Producer(s) ->Local DC Brokers -> MM -> Central brokers -> Camus Job ->
HDFS

This is working out very well for us, but we need to have visibility of
latency at each layer (Local DC Brokers -> MM -> Central brokers -> Camus
Job ->  HDFS).  Our events are time-based (time event was produce).  Is
there any feature or any audit trail  mentioned at (
https://github.com/linkedin/camus/) ?  But, I would like to know in-between
latency and time event spent in each hope? So, we do not know where is
problem and what t o optimize ?

Any of this cover in 0.9.0 or any other version of upcoming Kafka release
?  How might we achive this  latency tracking across all components ?


Thanks,

Bhavesh


Re: Kafka 0.8.2 new producer blocking on metadata

2014-12-29 Thread Bhavesh Mistry
Hi Paul,

I have faced similar issue, which you have faced.  Our use case was bit
different and we needed to aggregate events and publish to same partition
for same topic.   Occasionally, I have run into blocked application threads
(not because of metadata but sync block for each batch).  When you use new
Producer with Asyn mode, application thread is responsible for : 1)
compressing message (if you have compression on), b) obtaining lock for
partition batch that it hash to and 3) en-queue message and/or 4) if batch
is full, allocation new batch (sometime app thread have to wait for
waitOnMetadata
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java)
.   So application thread is doing too much work as compare to old
producer, which would simply add into synchronize blocking queue.   The
workaround that I have done is to have more Producer instance or producer
instance per partition or have Global AsyncKafkaProucer as describe in
following ticket, but it does not solve true root of problem, but app
thread will just en-queue message into blocking queue, but worker threads
will pay the cost of doing Kafka work in background as describe above.

For more info, please refer to following jira issue:
https://issues.apache.org/jira/browse/KAFKA-1710

I hope this helps!

Thanks,

Bhavesh


On Mon, Dec 29, 2014 at 1:26 PM, Paul Pearcy  wrote:

> FYI, here is the ticket I opened for this improvement:
> https://issues.apache.org/jira/browse/KAFKA-1835
>
> Feel free to add feedback on if it meets your use case and if not how
> things could.
>
> This should make the blocking behavior explicit as long as you know all
> your topics up front. Ideally a separate queue would be nice, but as Jay
> explained to me ends up getting complex because messages are stored in
> compact partitioned binary form currently, which requires metadata info.
>
> Thanks,
> Paul
>
>
> On Mon, Dec 29, 2014 at 2:48 PM, Jay Kreps  wrote:
>
> > I don't think a separate queue will be a very simple solution to
> implement.
> >
> > Could you describe your use case a little bit more. It does seem to me
> that
> > as long as the metadata fetch happens only once and the blocking has a
> > tight time bound this should be okay in any use case I can imagine. And,
> of
> > course, by default the client blocks anyway whenever you exhaust the
> memory
> > buffer space. But it sounds like you feel it isn't. Maybe you could
> > describe the scenario a bit?
> >
> > I think one thing we could do is what was discussed in another thread,
> > namely add an option like
> >   preinitialize.metadata=true/false
> > which would default to false. When true this would cause the producer to
> > just initialize metadata for all topics when it is created. Note that
> this
> > then brings back the opposite problem--doing remote communication during
> > initialization which tends to bite a lot of people. But since this would
> be
> > an option that would default to false perhaps it would be less likely to
> > come as a surprise.
> >
> > -Jay
> >
> > On Mon, Dec 29, 2014 at 8:38 AM, Steven Wu  wrote:
> >
> > > +1. it should be truly async in all cases.
> > >
> > > I understand some challenges that Jay listed in the other thread. But
> we
> > > need a solution nonetheless. e.g. can we maintain a separate
> > > list/queue/buffer for pending messages without metadata.
> > >
> > > On Tue, Dec 23, 2014 at 12:57 PM, John Boardman <
> boardmanjo...@gmail.com
> > >
> > > wrote:
> > >
> > > > I was just fighting this same situation. I never expected the new
> > > producer
> > > > send() method to block as it returns a Future and accepts a Callback.
> > > > However, when I tried my unit test, just replacing the old producer
> > with
> > > > the new, I immediately started getting timeouts waiting for
> metadata. I
> > > > struggled with this until I went into the source code and found the
> > > wait()
> > > > that waits for the metadata.
> > > >
> > > > At that point I realized that this new "async" producer would have to
> > be
> > > > executed on its own thread, unlike the old producer, which
> complicates
> > my
> > > > code unnecessarily. I totally agree with Paul that the contract of
> > send()
> > > > is being completely violated with internal code that can block.
> > > >
> > > > I did try fetching the metadata first, but that only worked for a few
> > > calls
> > > > before the producer decided it was time to update the metadata again.
> > > >
> > > > Again, I agree with Paul that this API should be fixed so that it is
> > > truly
> > > > asynchronous in all cases. Otherwise, it cannot be used on the main
> > > thread
> > > > of an application as it will block and fail.
> > > >
> > >
> >
>


Re: [DISCUSSION] adding the serializer api back to the new java producer

2014-12-09 Thread Bhavesh Mistry
Hi All,

This is very likely when you have large site such as Linked-in and you have
thousand of servers producing data.  You will mixed bag of producer and
serialization or deserialization because of incremental code deployment.
So, it is best to keep the API as generic as possible and each org  /
company can wrap the generic API with how ever they fit with serialization/
de-serialization  framework (java or proto buffer or avro or base 64).

Keep the API as generic as possible.

Thanks,

Bhavesh

On Tue, Dec 9, 2014 at 3:29 PM, Steven Wu  wrote:

> > In practice the cases that actually mix serialization types in a single
> stream are pretty rare I think just because the consumer then has the
> problem of guessing how to deserialize, so most of these will end up with
> at least some marker or schema id or whatever that tells you how to read
> the data. Arguable this mixed serialization with marker is itself a
> serializer type and should have a serializer of its own...
>
> agree that it is unlikely to have mixed serialization format for one
> topic/type. But we sometimes/often create one Producer object for one
> cluster. and there can be many topics on this cluster. different topics may
> have different serialization formats. So I agree with Guozhang's point
> regarding "data type flexibility" of using simple byte[] (instead of
> generic ).
>
> On Fri, Dec 5, 2014 at 5:00 PM, Jay Kreps  wrote:
>
> > Hey Sriram,
> >
> > Thanks! I think this is a very helpful summary.
> >
> > Let me try to address your point about passing in the serde at send time.
> >
> > I think the first objection is really to the paired key/value serializer
> > interfaces. This leads to kind of a weird combinatorial thing where you
> > would have an avro/avro serializer a string/avro serializer, a pb/pb
> > serializer, and a string/pb serializer, and so on. But your proposal
> would
> > work as well with separate serializers for key and value.
> >
> > I think the downside is just the one you call out--that this is a corner
> > case and you end up with two versions of all the apis to support it. This
> > also makes the serializer api more annoying to implement. I think the
> > alternative solution to this case and any other we can give people is
> just
> > configuring ByteArraySerializer which gives you basically the api that
> you
> > have now with byte arrays. If this is incredibly common then this would
> be
> > a silly solution, but I guess the belief is that these cases are rare
> and a
> > really well implemented avro or json serializer should be 100% of what
> most
> > people need.
> >
> > In practice the cases that actually mix serialization types in a single
> > stream are pretty rare I think just because the consumer then has the
> > problem of guessing how to deserialize, so most of these will end up with
> > at least some marker or schema id or whatever that tells you how to read
> > the data. Arguable this mixed serialization with marker is itself a
> > serializer type and should have a serializer of its own...
> >
> > -Jay
> >
> > On Fri, Dec 5, 2014 at 3:48 PM, Sriram Subramanian <
> > srsubraman...@linkedin.com.invalid> wrote:
> >
> > > This thread has diverged multiple times now and it would be worth
> > > summarizing them.
> > >
> > > There seems to be the following points of discussion -
> > >
> > > 1. Can we keep the serialization semantics outside the Producer
> interface
> > > and have simple bytes in / bytes out for the interface (This is what we
> > > have today).
> > >
> > > The points for this is to keep the interface simple and usage easy to
> > > understand. The points against this is that it gets hard to share
> common
> > > usage patterns around serialization/message validations for the future.
> > >
> > > 2. Can we create a wrapper producer that does the serialization and
> have
> > > different variants of it for different data formats?
> > >
> > > The points for this is again to keep the main API clean. The points
> > > against this is that it duplicates the API, increases the surface area
> > and
> > > creates redundancy for a minor addition.
> > >
> > > 3. Do we need to support different data types per record? The current
> > > interface (bytes in/bytes out) lets you instantiate one producer and
> use
> > > it to send multiple data formats. There seems to be some valid use
> cases
> > > for this.
> > >
> > > I have still not seen a strong argument against not having this
> > > functionality. Can someone provide their views on why we don't need
> this
> > > support that is possible with the current API?
> > >
> > > One possible approach for the per record serialization would be to
> define
> > >
> > > public interface SerDe {
> > >   public byte[] serializeKey();
> > >
> > >   public K deserializeKey();
> > >
> > >   public byte[] serializeValue();
> > >
> > >   public V deserializeValue();
> > > }
> > >
> > > This would be used by both the Producer and the Consumer.
> > >
> > > The send APIs can then be

Re: [DISCUSSION] adding the serializer api back to the new java producer

2014-11-25 Thread Bhavesh Mistry
How will mix bag will work with Consumer side ?  Entire site can not be
rolled at once so Consumer will have to deals with New and Old Serialize
Bytes ?  This could be app team responsibility.  Are you guys targeting
0.8.2 release, which may break customer who are already using new producer
API (beta version).

Thanks,

Bhavesh

On Tue, Nov 25, 2014 at 8:29 AM, Manikumar Reddy 
wrote:

> +1 for this change.
>
> what about de-serializer  class in 0.8.2?  Say i am using new producer with
> Avro and old consumer combination.
> then i need to give custom Decoder implementation for Avro right?.
>
> On Tue, Nov 25, 2014 at 9:19 PM, Joe Stein  wrote:
>
> > The serializer is an expected use of the producer/consumer now and think
> we
> > should continue that support in the new client. As far as breaking the
> API
> > it is why we released the 0.8.2-beta to help get through just these type
> of
> > blocking issues in a way that the community at large could be involved in
> > easier with a build/binaries to download and use from maven also.
> >
> > +1 on the change now prior to the 0.8.2 release.
> >
> > - Joe Stein
> >
> >
> > On Mon, Nov 24, 2014 at 11:43 PM, Sriram Subramanian <
> > srsubraman...@linkedin.com.invalid> wrote:
> >
> > > Looked at the patch. +1 from me.
> > >
> > > On 11/24/14 8:29 PM, "Gwen Shapira"  wrote:
> > >
> > > >As one of the people who spent too much time building Avro
> repositories,
> > > >+1
> > > >on bringing serializer API back.
> > > >
> > > >I think it will make the new producer easier to work with.
> > > >
> > > >Gwen
> > > >
> > > >On Mon, Nov 24, 2014 at 6:13 PM, Jay Kreps 
> wrote:
> > > >
> > > >> This is admittedly late in the release cycle to make a change. To
> add
> > to
> > > >> Jun's description the motivation was that we felt it would be better
> > to
> > > >> change that interface now rather than after the release if it needed
> > to
> > > >> change.
> > > >>
> > > >> The motivation for wanting to make a change was the ability to
> really
> > be
> > > >> able to develop support for Avro and other serialization formats.
> The
> > > >> current status is pretty scattered--there is a schema repository on
> an
> > > >>Avro
> > > >> JIRA and another fork of that on github, and a bunch of people we
> have
> > > >> talked to have done similar things for other serialization systems.
> It
> > > >> would be nice if these things could be packaged in such a way that
> it
> > > >>was
> > > >> possible to just change a few configs in the producer and get rich
> > > >>metadata
> > > >> support for messages.
> > > >>
> > > >> As we were thinking this through we realized that the new api we
> were
> > > >>about
> > > >> to introduce was kind of not very compatable with this since it was
> > just
> > > >> byte[] oriented.
> > > >>
> > > >> You can always do this by adding some kind of wrapper api that wraps
> > the
> > > >> producer. But this puts us back in the position of trying to
> document
> > > >>and
> > > >> support multiple interfaces.
> > > >>
> > > >> This also opens up the possibility of adding a MessageValidator or
> > > >> MessageInterceptor plug-in transparently so that you can do other
> > custom
> > > >> validation on the messages you are sending which obviously requires
> > > >>access
> > > >> to the original object not the byte array.
> > > >>
> > > >> This api doesn't prevent using byte[] by configuring the
> > > >> ByteArraySerializer it works as it currently does.
> > > >>
> > > >> -Jay
> > > >>
> > > >> On Mon, Nov 24, 2014 at 5:58 PM, Jun Rao  wrote:
> > > >>
> > > >> > Hi, Everyone,
> > > >> >
> > > >> > I'd like to start a discussion on whether it makes sense to add
> the
> > > >> > serializer api back to the new java producer. Currently, the new
> > java
> > > >> > producer takes a byte array for both the key and the value. While
> > this
> > > >> api
> > > >> > is simple, it pushes the serialization logic into the application.
> > > >>This
> > > >> > makes it hard to reason about what type of data is being sent to
> > Kafka
> > > >> and
> > > >> > also makes it hard to share an implementation of the serializer.
> For
> > > >> > example, to support Avro, the serialization logic could be quite
> > > >>involved
> > > >> > since it might need to register the Avro schema in some remote
> > > >>registry
> > > >> and
> > > >> > maintain a schema cache locally, etc. Without a serialization api,
> > > >>it's
> > > >> > impossible to share such an implementation so that people can
> easily
> > > >> reuse.
> > > >> > We sort of overlooked this implication during the initial
> discussion
> > > >>of
> > > >> the
> > > >> > producer api.
> > > >> >
> > > >> > So, I'd like to propose an api change to the new producer by
> adding
> > > >>back
> > > >> > the serializer api similar to what we had in the old producer.
> > > >>Specially,
> > > >> > the proposed api changes are the following.
> > > >> >
> > > >> > First, we change KafkaProducer to take generic types K and V for
> th

Re: How to recover from ConsumerRebalanceFailedException ?

2014-11-20 Thread Bhavesh Mistry
HI Jun,

Do you want me to request Jira ticket for feature a notification for new
consumer API and old consumer feature that consumer stream is dying.  So
application can try to restart it programmatically.  I understand this is
due to network or zk cluster instability.

Let me know if you have alternative proposal for this for new  and old
high-level consumer API.

Thanks,

Bhavesh

On Tue, Nov 18, 2014 at 9:53 PM, Bhavesh Mistry 
wrote:

> Hi Jun,
>
> ZK cluster are up and running.  What is best way to programmatically
> recover and I would try to exponential recovery process which I am willing
> to implement.So do you think monitoring  "ZkClient-EventThread
> <http://zkclient-eventthread-65-dare-msgq00.sv.walmartlabs.com:9091/>*"
>  thread status will be enough to indicate source thread is dead and
> therefore start exponential reconnect process ?
>
> Can you guys at least for new consumer api (0.9.0) provide a call back
> method or notification the consumer is died and reason for it ?
>
>
> Thanks,
> Bhavesh
>
>
>
> On Tue, Nov 18, 2014 at 9:34 PM, Jun Rao  wrote:
>
>> Is your ZK service alive at that point? If not, you just need to monitor
>> the ZK server properly.
>>
>> Thanks,
>>
>> Jun
>>
>> On Mon, Nov 17, 2014 at 2:30 PM, Bhavesh Mistry <
>> mistry.p.bhav...@gmail.com>
>> wrote:
>>
>> > Hi Kafka Team,
>> >
>> >
>> > I get following exception due to ZK/Network issues intermittently.  How
>> do
>> > I recover from consumer thread dying *programmatically* and restart
>> source
>> > because we have alerts that due to this error we have partition
>> OWNERSHIP
>> > is *none* ?  Please let me know how to restart source and detect
>> consumer
>> > thread died and need to be restarted ?
>> >
>> >
>> >
>> > 17 Nov 2014 04:29:41,180 ERROR [
>> > ZkClient-EventThread-65-dare-msgq00.sv.walmartlabs.com:9091,
>> > dare-msgq01.sv.walmartlabs.com:9091,dare-msgq02.sv.walmartlabs.com:9091
>> ]
>> > (org.I0Itec.zkclient.ZkEventThread.run:77)  - Error handling event
>> > ZkEvent[New session event sent to
>> >
>> kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener@552c5ea8
>> > ]
>> > kafka.common.ConsumerRebalanceFailedException:
>> >
>> >
>> mupd_statsd_logmon_metric_events18_sdc-stg-logsearch-app1-1416023311208-14501d68
>> > can't rebalance after 8 retries
>> > at
>> >
>> >
>> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:626)
>> > at
>> >
>> >
>> kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener.handleNewSession(ZookeeperConsumerConnector.scala:481)
>> > at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472)
>> > at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
>> >
>> >
>> >
>> >
>> >
>> > ZK Connection Issues:
>> >
>> > java.net.SocketException: Transport endpoint is not connected
>> > at sun.nio.ch.SocketChannelImpl.shutdown(Native Method)
>> > at
>> > sun.nio.ch.SocketChannelImpl.shutdownInput(SocketChannelImpl.java:633)
>> > at
>> sun.nio.ch.SocketAdaptor.shutdownInput(SocketAdaptor.java:360)
>> > at
>> > org.apache.zookeeper.ClientCnxn$SendThread.cleanup(ClientCnxn.java:1205)
>> > at
>> > org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1170)
>> >
>> >
>> >
>> >
>> > at
>> > org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47)
>> > at
>> > org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685)
>> > at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766)
>> > at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
>> > at kafka.utils.ZkUtils$.readData(ZkUtils.scala:449)
>> > at
>> > kafka.consumer.TopicCount$.constructTopicCount(TopicCount.scala:61)
>> > at
>> >
>> >
>> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:630)
>> > at
>> >
>> >
>> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(ZookeeperConsumerCo

Re: How to recover from ConsumerRebalanceFailedException ?

2014-11-18 Thread Bhavesh Mistry
Hi Jun,

ZK cluster are up and running.  What is best way to programmatically
recover and I would try to exponential recovery process which I am willing
to implement.So do you think monitoring  "ZkClient-EventThread
<http://zkclient-eventthread-65-dare-msgq00.sv.walmartlabs.com:9091/>*"
 thread status will be enough to indicate source thread is dead and
therefore start exponential reconnect process ?

Can you guys at least for new consumer api (0.9.0) provide a call back
method or notification the consumer is died and reason for it ?


Thanks,
Bhavesh



On Tue, Nov 18, 2014 at 9:34 PM, Jun Rao  wrote:

> Is your ZK service alive at that point? If not, you just need to monitor
> the ZK server properly.
>
> Thanks,
>
> Jun
>
> On Mon, Nov 17, 2014 at 2:30 PM, Bhavesh Mistry <
> mistry.p.bhav...@gmail.com>
> wrote:
>
> > Hi Kafka Team,
> >
> >
> > I get following exception due to ZK/Network issues intermittently.  How
> do
> > I recover from consumer thread dying *programmatically* and restart
> source
> > because we have alerts that due to this error we have partition OWNERSHIP
> > is *none* ?  Please let me know how to restart source and detect consumer
> > thread died and need to be restarted ?
> >
> >
> >
> > 17 Nov 2014 04:29:41,180 ERROR [
> > ZkClient-EventThread-65-dare-msgq00.sv.walmartlabs.com:9091,
> > dare-msgq01.sv.walmartlabs.com:9091,dare-msgq02.sv.walmartlabs.com:9091]
> > (org.I0Itec.zkclient.ZkEventThread.run:77)  - Error handling event
> > ZkEvent[New session event sent to
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener@552c5ea8
> > ]
> > kafka.common.ConsumerRebalanceFailedException:
> >
> >
> mupd_statsd_logmon_metric_events18_sdc-stg-logsearch-app1-1416023311208-14501d68
> > can't rebalance after 8 retries
> > at
> >
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:626)
> > at
> >
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener.handleNewSession(ZookeeperConsumerConnector.scala:481)
> > at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472)
> > at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> >
> >
> >
> >
> >
> > ZK Connection Issues:
> >
> > java.net.SocketException: Transport endpoint is not connected
> > at sun.nio.ch.SocketChannelImpl.shutdown(Native Method)
> > at
> > sun.nio.ch.SocketChannelImpl.shutdownInput(SocketChannelImpl.java:633)
> > at sun.nio.ch.SocketAdaptor.shutdownInput(SocketAdaptor.java:360)
> > at
> > org.apache.zookeeper.ClientCnxn$SendThread.cleanup(ClientCnxn.java:1205)
> > at
> > org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1170)
> >
> >
> >
> >
> > at
> > org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47)
> > at
> > org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685)
> > at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766)
> > at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
> > at kafka.utils.ZkUtils$.readData(ZkUtils.scala:449)
> > at
> > kafka.consumer.TopicCount$.constructTopicCount(TopicCount.scala:61)
> > at
> >
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:630)
> > at
> >
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:601)
> > at
> scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
> > at
> >
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:595)
> > at
> >
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:592)
> > at
> >
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:592)
> > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > at
> >
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:591)
> > at
&g

Re: How to recover from ConsumerRebalanceFailedException ?

2014-11-18 Thread Bhavesh Mistry
Hi Kakfa team,

So just monitor "ZkClient-EventThread
<http://zkclient-eventthread-65-dare-msgq00.sv.walmartlabs.com:9091/>*"
threads via ThreadInfo[] threads =
ManagementFactory.getThreadMXBean().; and if this ZkClient-EventThread
<http://zkclient-eventthread-65-dare-msgq00.sv.walmartlabs.com:9091/>* dies
thread dies, then restart the sources.  Is there any alter approach or life
cycle method  that so api consumer can attached to Consumer life cycle that
it is dying and get notified so we can take some action.

Thanks,

Bhavesh

On Mon, Nov 17, 2014 at 2:30 PM, Bhavesh Mistry 
wrote:

> Hi Kafka Team,
>
>
> I get following exception due to ZK/Network issues intermittently.  How do
> I recover from consumer thread dying *programmatically* and restart
> source because we have alerts that due to this error we have partition
> OWNERSHIP is *none* ?  Please let me know how to restart source and
> detect consumer thread died and need to be restarted ?
>
>
>
> 17 Nov 2014 04:29:41,180 ERROR [
> ZkClient-EventThread-65-dare-msgq00.sv.walmartlabs.com:9091,
> dare-msgq01.sv.walmartlabs.com:9091,dare-msgq02.sv.walmartlabs.com:9091]
> (org.I0Itec.zkclient.ZkEventThread.run:77)  - Error handling event
> ZkEvent[New session event sent to
> kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener@552c5ea8
> ]
> kafka.common.ConsumerRebalanceFailedException:
> mupd_statsd_logmon_metric_events18_sdc-stg-logsearch-app1-1416023311208-14501d68
> can't rebalance after 8 retries
> at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:626)
> at
> kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener.handleNewSession(ZookeeperConsumerConnector.scala:481)
> at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472)
> at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
>
>
>
>
>
> ZK Connection Issues:
>
> java.net.SocketException: Transport endpoint is not connected
> at sun.nio.ch.SocketChannelImpl.shutdown(Native Method)
> at
> sun.nio.ch.SocketChannelImpl.shutdownInput(SocketChannelImpl.java:633)
> at sun.nio.ch.SocketAdaptor.shutdownInput(SocketAdaptor.java:360)
> at
> org.apache.zookeeper.ClientCnxn$SendThread.cleanup(ClientCnxn.java:1205)
> at
> org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1170)
>
>
>
>
> at
> org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47)
> at
> org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685)
> at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766)
> at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
> at kafka.utils.ZkUtils$.readData(ZkUtils.scala:449)
> at
> kafka.consumer.TopicCount$.constructTopicCount(TopicCount.scala:61)
> at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:630)
> at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:601)
> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
> at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:595)
> at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:592)
> at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:592)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:591)
> at
> kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener.handleNewSession(ZookeeperConsumerConnector.scala:481)
> at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472)
> at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> Caused by: org.apache.zookeeper.KeeperException$NoNodeException:
> KeeperErrorCode = NoNode for
> /consumers/mupd_statsd_logmon_metric_events18/ids/mupd_statsd_logmon_metric_events18_sdc-stg-logsearch-app1-1416023311208-14501d68
> at
> org.apache.zookeeper.KeeperException.create(KeeperException.java:102)
> at
> org.apache.zookeeper.KeeperException.create(KeeperException.java:42)
> at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:921)
>   

Re: Enforcing Network Bandwidth Quote with New Java Producer

2014-11-17 Thread Bhavesh Mistry
Hi Jun,

So If I set the "*metrics.sample.window.ms
<http://metrics.sample.window.ms>*" to 1 minute, I will be able to get
compression bytes count per minute.  Which one should I be using from
following ?

"outgoing-byte-rate", //The average number of outgoing bytes 
sent
per second to all servers.
"byte-rate", //Rate Per seconds


Thanks,

Bhavesh

On Fri, Nov 14, 2014 at 3:39 PM, Jun Rao  wrote:

> We have a metric that measures the per-topic bytes send rate (after
> compression). You can get the values through the producer api.
>
> Thanks,
>
> Jun
>
> On Fri, Nov 14, 2014 at 10:34 AM, Bhavesh Mistry <
> mistry.p.bhav...@gmail.com
> > wrote:
>
> > HI Kafka Team,
> >
> > We like to enforce a network bandwidth quota limit per minute on producer
> > side.  How can I do this ?  I need some way to count compressed bytes on
> > producer ?  I know there is callback does not give this ability ?  Let me
> > know the best way.
> >
> >
> >
> > Thanks,
> >
> > Bhavesh
> >
>


How to recover from ConsumerRebalanceFailedException ?

2014-11-17 Thread Bhavesh Mistry
Hi Kafka Team,


I get following exception due to ZK/Network issues intermittently.  How do
I recover from consumer thread dying *programmatically* and restart source
because we have alerts that due to this error we have partition OWNERSHIP
is *none* ?  Please let me know how to restart source and detect consumer
thread died and need to be restarted ?



17 Nov 2014 04:29:41,180 ERROR [
ZkClient-EventThread-65-dare-msgq00.sv.walmartlabs.com:9091,
dare-msgq01.sv.walmartlabs.com:9091,dare-msgq02.sv.walmartlabs.com:9091]
(org.I0Itec.zkclient.ZkEventThread.run:77)  - Error handling event
ZkEvent[New session event sent to
kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener@552c5ea8]
kafka.common.ConsumerRebalanceFailedException:
mupd_statsd_logmon_metric_events18_sdc-stg-logsearch-app1-1416023311208-14501d68
can't rebalance after 8 retries
at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:626)
at
kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener.handleNewSession(ZookeeperConsumerConnector.scala:481)
at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472)
at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)





ZK Connection Issues:

java.net.SocketException: Transport endpoint is not connected
at sun.nio.ch.SocketChannelImpl.shutdown(Native Method)
at
sun.nio.ch.SocketChannelImpl.shutdownInput(SocketChannelImpl.java:633)
at sun.nio.ch.SocketAdaptor.shutdownInput(SocketAdaptor.java:360)
at
org.apache.zookeeper.ClientCnxn$SendThread.cleanup(ClientCnxn.java:1205)
at
org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1170)




at
org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47)
at
org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685)
at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766)
at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
at kafka.utils.ZkUtils$.readData(ZkUtils.scala:449)
at
kafka.consumer.TopicCount$.constructTopicCount(TopicCount.scala:61)
at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:630)
at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:601)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:595)
at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:592)
at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:592)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:591)
at
kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener.handleNewSession(ZookeeperConsumerConnector.scala:481)
at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472)
at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
Caused by: org.apache.zookeeper.KeeperException$NoNodeException:
KeeperErrorCode = NoNode for
/consumers/mupd_statsd_logmon_metric_events18/ids/mupd_statsd_logmon_metric_events18_sdc-stg-logsearch-app1-1416023311208-14501d68
at
org.apache.zookeeper.KeeperException.create(KeeperException.java:102)
at
org.apache.zookeeper.KeeperException.create(KeeperException.java:42)
at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:921)
at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:950)
at org.I0Itec.zkclient.ZkConnection.readData(ZkConnection.java:103)
at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:770)
at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:766)
at
org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)


Enforcing Network Bandwidth Quote with New Java Producer

2014-11-14 Thread Bhavesh Mistry
HI Kafka Team,

We like to enforce a network bandwidth quota limit per minute on producer
side.  How can I do this ?  I need some way to count compressed bytes on
producer ?  I know there is callback does not give this ability ?  Let me
know the best way.



Thanks,

Bhavesh


Re: Programmatic Kafka version detection/extraction?

2014-11-11 Thread Bhavesh Mistry
If is maven artifact then you will get following pre-build property file
from maven build called pom.properties under
/META-INF/maven/groupid/artifactId/pom.properties folder.

Here is sample:
#Generated by Maven
#Mon Oct 10 10:44:31 EDT 2011
version=10.0.1
groupId=com.google.guava
artifactId=guava

Thanks,

Bhavesh

On Tue, Nov 11, 2014 at 10:34 AM, Gwen Shapira 
wrote:

> In Sqoop we do the following:
>
> Maven runs a shell script, passing the version as a parameter.
> The shell-script generates a small java class, which is then built with a
> Maven plugin.
> Our code references this generated class when we expose "getVersion()".
>
> Its complex and ugly, so I'm kind of hoping that there's a better way to do
> it :)
>
> Gwen
>
> On Tue, Nov 11, 2014 at 9:42 AM, Jun Rao  wrote:
>
> > Currently, the version number is only stored in our build config file,
> > gradle.properties. Not sure how we can automatically extract it and
> expose
> > it in an mbean. How do other projects do this?
> >
> > Thanks,
> >
> > Jun
> >
> > On Tue, Nov 11, 2014 at 7:05 AM, Otis Gospodnetic <
> > otis.gospodne...@gmail.com> wrote:
> >
> > > Hi Jun,
> > >
> > > Sounds good.  But is the version number stored anywhere from where it
> > could
> > > be gotten?
> > >
> > > Thanks,
> > > Otis
> > > --
> > > Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> > > Solr & Elasticsearch Support * http://sematext.com/
> > >
> > >
> > > On Tue, Nov 11, 2014 at 12:45 AM, Jun Rao  wrote:
> > >
> > > > Otis,
> > > >
> > > > We don't have an api for that now. We can probably expose this as a
> JMX
> > > as
> > > > part of kafka-1481.
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > > On Mon, Nov 10, 2014 at 7:17 PM, Otis Gospodnetic <
> > > > otis.gospodne...@gmail.com> wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > Is there a way to detect which version of Kafka one is running?
> > > > > Is there an API for that, or a constant with this value, or maybe
> an
> > > > MBean
> > > > > or some other way to get to this info?
> > > > >
> > > > > Thanks,
> > > > > Otis
> > > > > --
> > > > > Monitoring * Alerting * Anomaly Detection * Centralized Log
> > Management
> > > > > Solr & Elasticsearch Support * http://sematext.com/
> > > > >
> > > >
> > >
> >
>


Re: expanding cluster and reassigning parititions without restarting producer

2014-11-10 Thread Bhavesh Mistry
I had different experience with expanding partition for new producer and
its impact.  I only tried for non-key message.I would always advice to
keep batch size relatively low or plan for expansion with new java producer
in advance or since inception otherwise running producer code is impacted.

Here is mail chain:
http://mail-archives.apache.org/mod_mbox/kafka-dev/201411.mbox/%3ccaoejijit4cgry97dgzfjkfvaqfduv-o1x1kafefbshgirkm...@mail.gmail.com%3E

Thanks,

Bhavesh

On Mon, Nov 10, 2014 at 5:20 AM, Shlomi Hazan  wrote:

> Hmmm..
> The Java producer example seems to ignore added partitions too...
> How can I auto refresh keyed producers to use new partitions as these
> partitions are added?
>
>
> On Mon, Nov 10, 2014 at 12:33 PM, Shlomi Hazan  wrote:
>
> > One more thing:
> > I saw that the Python client is also unaffected by addition of partitions
> > to a topic and that it continues to send requests only to the old
> > partitions.
> > is this also handled appropriately by the Java producer? Will he see the
> > change and produce to the new partitions as well?
> > Shlomi
> >
> > On Mon, Nov 10, 2014 at 9:34 AM, Shlomi Hazan  wrote:
> >
> >> No I don't see anything like that, the question was aimed at learning if
> >> it is worthwhile to make the effort of reimplementing the Python
> producer
> >> in Java, I so I will not make all the effort just to be disappointed
> >> afterwards.
> >> understand I have nothing to worry about, so I will try to simulate this
> >> situation in small scale...
> >> maybe 3 brokers, one topic with one partition and then add partitions.
> >> we'll see.
> >> thanks for clarifying.
> >> Oh, Good luck with Confluent!!
> >> :)
> >>
> >> On Mon, Nov 10, 2014 at 4:17 AM, Neha Narkhede  >
> >> wrote:
> >>
> >>> The producer might get an error code if the leader of the partitions
> >>> being
> >>> reassigned also changes. However it should retry and succeed. Do you
> see
> >>> a
> >>> behavior that suggests otherwise?
> >>>
> >>> On Sat, Nov 8, 2014 at 11:45 PM, Shlomi Hazan 
> wrote:
> >>>
> >>> > Hi All,
> >>> > I recently had an issue producing from python where expanding a
> cluster
> >>> > from 3 to 5 nodes and reassigning partitions forced me to restart the
> >>> > producer b/c of KeyError thrown.
> >>> > Is this situation handled by the Java producer automatically or need
> I
> >>> do
> >>> > something to have the java producer refresh itself to see the
> >>> reassigned
> >>> > partition layout and produce away ?
> >>> > Shlomi
> >>> >
> >>>
> >>
> >>
> >
>


Re: Announcing Confluent

2014-11-06 Thread Bhavesh Mistry
HI Guys,

Thanks for your awesome support.  I wish you good luck !!   Thanks for open
sources Kafka !!

Thanks,

Bhavesh

On Thu, Nov 6, 2014 at 10:52 AM, Rajasekar Elango 
wrote:

> Congrats. Wish you all the very best and success.
>
> Thanks,
> Raja.
>
> On Thu, Nov 6, 2014 at 1:36 PM, Niek Sanders 
> wrote:
>
> > Congrats!
> >
> > On Thu, Nov 6, 2014 at 10:28 AM, Jay Kreps  wrote:
> > > Hey all,
> > >
> > > I’m happy to announce that Jun Rao, Neha Narkhede and I are creating a
> > > company around Kafka called Confluent. We are planning on productizing
> > the
> > > kind of Kafka-based real-time data platform we built out at LinkedIn.
> We
> > > are doing this because we think this is a really powerful idea and we
> > felt
> > > there was a lot to do to make this idea really take root. We wanted to
> > make
> > > that our full time mission and focus.
> > >
> > > There is a blog post that goes into a little more depth here:
> > > http://blog.confluent.io/
> > >
> > > LinkedIn will remain a heavy Kafka user and contributor. Combined with
> > our
> > > additional resources from the funding of the company this should be a
> > > really good thing for the Kafka development effort. Especially when
> > > combined with the increasing contributions from the rest of the
> > development
> > > community. This is great news, as there is a lot of work to do. We'll
> > need
> > > to really focus on scaling this distributed development in a healthy
> way.
> > >
> > > One thing I do want to emphasize is that the addition of a company in
> the
> > > Kafka ecosystem won’t mean meddling with open source. Kafka will remain
> > > 100% open source and community focused, as of course is true of any
> > Apache
> > > project. I have been doing open source for a long time and strongly
> > believe
> > > it is the right model for infrastructure software development.
> > >
> > > Confluent is just getting off the ground now. We left LinkedIn, raised
> > some
> > > money, and we have an office (but no furniture yet!). None the less, f
> > you
> > > are interested in finding out more about the company and either getting
> > > help with your Kafka usage or joining us to help build all this, by all
> > > means reach out to us, we’d love to talk.
> > >
> > > Wish us luck!
> > >
> > > -Jay
> >
>
>
>
> --
> Thanks,
> Raja.
>


Re: Spark Kafka Performance

2014-11-04 Thread Bhavesh Mistry
Hi Eduardo,

Can you please take thread dump and see if there are blocking issues on
producer side ?  Do you have single instance of Producers and Multiple
treads ?

Are you using Scala Producer or New Java Producer ?  Also, what is your
producer property ?


Thanks,

Bhavesh

On Tue, Nov 4, 2014 at 12:40 AM, Eduardo Alfaia 
wrote:

> Hi Gwen,
> I have changed the java code kafkawordcount to use reducebykeyandwindow in
> spark.
>
> - Messaggio originale -
> Da: "Gwen Shapira" 
> Inviato: ‎03/‎11/‎2014 21:08
> A: "users@kafka.apache.org" 
> Cc: "u...@spark.incubator.apache.org" 
> Oggetto: Re: Spark Kafka Performance
>
> Not sure about the throughput, but:
>
> "I mean that the words counted in spark should grow up" - The spark
> word-count example doesn't accumulate.
> It gets an RDD every n seconds and counts the words in that RDD. So we
> don't expect the count to go up.
>
>
>
> On Mon, Nov 3, 2014 at 6:57 AM, Eduardo Costa Alfaia <
> e.costaalf...@unibs.it
> > wrote:
>
> > Hi Guys,
> > Anyone could explain me how to work Kafka with Spark, I am using the
> > JavaKafkaWordCount.java like a test and the line command is:
> >
> > ./run-example org.apache.spark.streaming.examples.JavaKafkaWordCount
> > spark://192.168.0.13:7077 computer49:2181 test-consumer-group unibs.it 3
> >
> > and like a producer I am using this command:
> >
> > rdkafka_cachesender -t unibs.nec -p 1 -b 192.168.0.46:9092 -f output.txt
> > -l 100 -n 10
> >
> >
> > rdkafka_cachesender is a program that was developed by me which send to
> > kafka the output.txt’s content where -l is the length of each send(upper
> > bound) and -n is the lines to send in a row. Bellow is the throughput
> > calculated by the program:
> >
> > File is 2235755 bytes
> > throughput (b/s) = 699751388
> > throughput (b/s) = 723542382
> > throughput (b/s) = 662989745
> > throughput (b/s) = 505028200
> > throughput (b/s) = 471263416
> > throughput (b/s) = 446837266
> > throughput (b/s) = 409856716
> > throughput (b/s) = 373994467
> > throughput (b/s) = 366343097
> > throughput (b/s) = 373240017
> > throughput (b/s) = 386139016
> > throughput (b/s) = 373802209
> > throughput (b/s) = 369308515
> > throughput (b/s) = 366935820
> > throughput (b/s) = 365175388
> > throughput (b/s) = 362175419
> > throughput (b/s) = 358356633
> > throughput (b/s) = 357219124
> > throughput (b/s) = 352174125
> > throughput (b/s) = 348313093
> > throughput (b/s) = 355099099
> > throughput (b/s) = 348069777
> > throughput (b/s) = 348478302
> > throughput (b/s) = 340404276
> > throughput (b/s) = 339876031
> > throughput (b/s) = 339175102
> > throughput (b/s) = 327555252
> > throughput (b/s) = 324272374
> > throughput (b/s) = 322479222
> > throughput (b/s) = 319544906
> > throughput (b/s) = 317201853
> > throughput (b/s) = 317351399
> > throughput (b/s) = 315027978
> > throughput (b/s) = 313831014
> > throughput (b/s) = 310050384
> > throughput (b/s) = 307654601
> > throughput (b/s) = 305707061
> > throughput (b/s) = 307961102
> > throughput (b/s) = 296898200
> > throughput (b/s) = 296409904
> > throughput (b/s) = 294609332
> > throughput (b/s) = 293397843
> > throughput (b/s) = 293194876
> > throughput (b/s) = 291724886
> > throughput (b/s) = 290031314
> > throughput (b/s) = 289747022
> > throughput (b/s) = 289299632
> >
> > The throughput goes down after some seconds and it does not maintain the
> > performance like the initial values:
> >
> > throughput (b/s) = 699751388
> > throughput (b/s) = 723542382
> > throughput (b/s) = 662989745
> >
> > Another question is about spark, after I have started the spark line
> > command after 15 sec spark continue to repeat the words counted, but my
> > program continue to send words to kafka, so I mean that the words counted
> > in spark should grow up. I have attached the log from spark.
> >
> > My Case is:
> >
> > ComputerA(Kafka_cachsesender) -> ComputerB(Kakfa-Brokers-Zookeeper) ->
> > ComputerC (Spark)
> >
> > If I don’t explain very well send a reply to me.
> >
> > Thanks Guys
> > --
> > Informativa sulla Privacy: http://www.unibs.it/node/8155
> >
>
> --
> Informativa sulla Privacy: http://www.unibs.it/node/8155
>


Re: queued.max.message.chunks impact and consumer tuning

2014-11-04 Thread Bhavesh Mistry
Thanks for info.  I will have to tune the memory. What else do you
recommend for High level Consumer for optimal performance and drain as
quickly as possible with auto commit on ?

Thanks,

Bhavesh

On Tue, Nov 4, 2014 at 9:59 AM, Joel Koshy  wrote:

> We used to default to 10, but two should be sufficient. There is
> little reason to buffer more than that. If you increase it to 2000 you
> will most likely run into memory issues. E.g., if your fetch size is
> 1MB you would enqueue 1MB*2000 chunks in each queue.
>
> On Tue, Nov 04, 2014 at 09:05:44AM -0800, Bhavesh Mistry wrote:
> > Hi Kafka Dev Team,
> >
> > It seems that Maximum buffer size is set to  2 default.  What is impact
> of
> > changing this to 2000 or so ?   This will improve the consumer thread
> > performance ?  More event will be buffered in memory.  Or Is there any
> > other recommendation to tune High Level Consumers ?
> >
> > Here is code from Kafka Trunk Branch:
> >
> >   val MaxQueuedChunks = 2
> >   /** max number of message chunks buffered for consumption, each chunk
> can
> > be up to fetch.message.max.bytes*/
> >   val queuedMaxMessages = props.getInt("queued.max.message.chunks",
> > MaxQueuedChunks)
> >
> >
> >
> > Thanks,
> >
> > Bhavesh
>
>


Re: High Level Consumer Iterator IllegalStateException Issue

2014-11-04 Thread Bhavesh Mistry
Hi Neha and Jun,

I have fixed the issue on my side based on what Jun had mentioned "next()
gives IllegalStateException if hasNext is not called..."  Based on this I
did further debug, I was my mistake sharing same consumer iterator across
multiple threads so (I forgot to call iterator.remove() in my registry so
it was using first Consumer iterator across all threads).  So due to
multiple threads sharing same (first one) consumer iterator therefore it
was intermittently getting this exception.

This issue is resolve thank you very much for your support.

Thanks,

Bhavesh

On Mon, Nov 3, 2014 at 4:35 PM, Jun Rao  wrote:

> Bhavesh,
>
> That example has a lot of code. Could you provide a simpler test that
> demonstrates the problem?
>
> Thanks,
>
> Jun
>
> On Fri, Oct 31, 2014 at 10:07 PM, Bhavesh Mistry <
> mistry.p.bhav...@gmail.com
> > wrote:
>
> > Hi Jun,
> >
> > Here is code base:
> >
> >
> https://github.com/bmistry13/kafka-trunk-producer/blob/master/KafkaConsumerWithDelay.java
> >
> > Please let me know if you can help me determine  the root cause.   Why
> > there is illegal state and blocking ?
> >
> > Thanks,
> >
> > Bhavesh
> >
> > On Fri, Oct 31, 2014 at 8:33 AM, Jun Rao  wrote:
> >
> > > Do you have a simple test that can reproduce this issue?
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Thu, Oct 30, 2014 at 8:34 PM, Bhavesh Mistry <
> > > mistry.p.bhav...@gmail.com>
> > > wrote:
> > >
> > > > HI Jun,
> > > >
> > > > Consumer Connector is not closed because I can see the
> ConsumerFetcher
> > > > Thread alive but Blocked on *put* and hasNext() is blocked on *take*.
> > > > This is what I see after recovery.
> > > >
> > > >
> > > >
> > > > Thanks,
> > > >
> > > > Bhavesh
> > > >
> > > > On Thu, Oct 30, 2014 at 11:42 AM, Jun Rao  wrote:
> > > >
> > > > > Another possibility is that the consumer connector is already
> closed
> > > and
> > > > > then you call hasNext() on the iterator.
> > > > >
> > > > > Thanks,
> > > > >
> > > > >
> > > > > Jun
> > > > >
> > > > > On Wed, Oct 29, 2014 at 9:06 PM, Bhavesh Mistry <
> > > > > mistry.p.bhav...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi Jun,
> > > > > >
> > > > > > The hasNext() itself throws this error.  I have to manually reset
> > > state
> > > > > and
> > > > > > sometime it is able to recover and other it is not. Any other
> clue
> > ?
> > > > > >
> > > > > > public boolean hasNext() {
> > > > > > LOG.info("called of  hasNext() :");
> > > > > > int retry = 3;
> > > > > > while(retry > 0){
> > > > > > try{
> > > > > > // this hasNext is blocking call..
> > > > > > boolean result = iterator.hasNext();
> > > > > > return result;
> > > > > > }catch(IllegalStateException exp){
> > > > > > iterator.resetState();
> > > > > > LOG.error("GOT IllegalStateException arg
> trying
> > > to
> > > > > > recover", exp);
> > > > > > retry--;
> > > > > > }
> > > > > > }
> > > > > > return false;
> > > > > > }
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Bhavesh
> > > > > >
> > > > > > On Wed, Oct 29, 2014 at 6:36 PM, Jun Rao 
> wrote:
> > > > > >
> > > > > > > The IllegalStateException typically happens if you call next()
> > > before
> > > > > > > hasNext() on the iterator.
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Jun
> > > > > > >
> > > > > > > On Tue, Oct 28, 2014 at 10:50 AM, Bhavesh Mistry <
> > &

queued.max.message.chunks impact and consumer tuning

2014-11-04 Thread Bhavesh Mistry
Hi Kafka Dev Team,

It seems that Maximum buffer size is set to  2 default.  What is impact of
changing this to 2000 or so ?   This will improve the consumer thread
performance ?  More event will be buffered in memory.  Or Is there any
other recommendation to tune High Level Consumers ?

Here is code from Kafka Trunk Branch:

  val MaxQueuedChunks = 2
  /** max number of message chunks buffered for consumption, each chunk can
be up to fetch.message.max.bytes*/
  val queuedMaxMessages = props.getInt("queued.max.message.chunks",
MaxQueuedChunks)



Thanks,

Bhavesh


Re: High Level Consumer Iterator IllegalStateException Issue

2014-10-31 Thread Bhavesh Mistry
Hi Jun,

Here is code base:
https://github.com/bmistry13/kafka-trunk-producer/blob/master/KafkaConsumerWithDelay.java

Please let me know if you can help me determine  the root cause.   Why
there is illegal state and blocking ?

Thanks,

Bhavesh

On Fri, Oct 31, 2014 at 8:33 AM, Jun Rao  wrote:

> Do you have a simple test that can reproduce this issue?
>
> Thanks,
>
> Jun
>
> On Thu, Oct 30, 2014 at 8:34 PM, Bhavesh Mistry <
> mistry.p.bhav...@gmail.com>
> wrote:
>
> > HI Jun,
> >
> > Consumer Connector is not closed because I can see the ConsumerFetcher
> > Thread alive but Blocked on *put* and hasNext() is blocked on *take*.
> > This is what I see after recovery.
> >
> >
> >
> > Thanks,
> >
> > Bhavesh
> >
> > On Thu, Oct 30, 2014 at 11:42 AM, Jun Rao  wrote:
> >
> > > Another possibility is that the consumer connector is already closed
> and
> > > then you call hasNext() on the iterator.
> > >
> > > Thanks,
> > >
> > >
> > > Jun
> > >
> > > On Wed, Oct 29, 2014 at 9:06 PM, Bhavesh Mistry <
> > > mistry.p.bhav...@gmail.com>
> > > wrote:
> > >
> > > > Hi Jun,
> > > >
> > > > The hasNext() itself throws this error.  I have to manually reset
> state
> > > and
> > > > sometime it is able to recover and other it is not. Any other clue ?
> > > >
> > > > public boolean hasNext() {
> > > > LOG.info("called of  hasNext() :");
> > > > int retry = 3;
> > > > while(retry > 0){
> > > > try{
> > > > // this hasNext is blocking call..
> > > > boolean result = iterator.hasNext();
> > > > return result;
> > > > }catch(IllegalStateException exp){
> > > > iterator.resetState();
> > > > LOG.error("GOT IllegalStateException arg trying
> to
> > > > recover", exp);
> > > > retry--;
> > > > }
> > > > }
> > > > return false;
> > > > }
> > > >
> > > > Thanks,
> > > >
> > > > Bhavesh
> > > >
> > > > On Wed, Oct 29, 2014 at 6:36 PM, Jun Rao  wrote:
> > > >
> > > > > The IllegalStateException typically happens if you call next()
> before
> > > > > hasNext() on the iterator.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > > On Tue, Oct 28, 2014 at 10:50 AM, Bhavesh Mistry <
> > > > > mistry.p.bhav...@gmail.com
> > > > > > wrote:
> > > > >
> > > > > > Hi Neha,
> > > > > >
> > > > > > Thanks for your answer.  Can you please let me know how I can
> > resolve
> > > > the
> > > > > > Iterator IllegalStateException ?  I would appreciate your is this
> > is
> > > > bug
> > > > > I
> > > > > > can file one or let me know if this is use case specific ?
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Bhavesh
> > > > > >
> > > > > > On Tue, Oct 28, 2014 at 9:30 AM, Neha Narkhede <
> > > > neha.narkh...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > queued.max.message.chunks controls the consumer's fetcher
> queue.
> > > > > > >
> > > > > > > On Mon, Oct 27, 2014 at 9:32 PM, Bhavesh Mistry <
> > > > > > > mistry.p.bhav...@gmail.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > HI Neha,
> > > > > > > >
> > > > > > > > If I solved the problem number 1 think and number 2 will be
> > > solved
> > > > > > (prob
> > > > > > > > 1 is causing problem number 2(blocked)).  Can you please let
> me
> > > > know
> > > > > > what
> > > > > > > > controls the queue size for *ConsumerFetcherThread* thread ?
> > > > > > > >
> > > > > > 

Re: High Level Consumer Iterator IllegalStateException Issue

2014-10-30 Thread Bhavesh Mistry
HI Jun,

Consumer Connector is not closed because I can see the ConsumerFetcher
Thread alive but Blocked on *put* and hasNext() is blocked on *take*.
This is what I see after recovery.



Thanks,

Bhavesh

On Thu, Oct 30, 2014 at 11:42 AM, Jun Rao  wrote:

> Another possibility is that the consumer connector is already closed and
> then you call hasNext() on the iterator.
>
> Thanks,
>
>
> Jun
>
> On Wed, Oct 29, 2014 at 9:06 PM, Bhavesh Mistry <
> mistry.p.bhav...@gmail.com>
> wrote:
>
> > Hi Jun,
> >
> > The hasNext() itself throws this error.  I have to manually reset state
> and
> > sometime it is able to recover and other it is not. Any other clue ?
> >
> > public boolean hasNext() {
> > LOG.info("called of  hasNext() :");
> > int retry = 3;
> > while(retry > 0){
> > try{
> > // this hasNext is blocking call..
> > boolean result = iterator.hasNext();
> > return result;
> > }catch(IllegalStateException exp){
> > iterator.resetState();
> > LOG.error("GOT IllegalStateException arg trying to
> > recover", exp);
> > retry--;
> > }
> > }
> > return false;
> > }
> >
> > Thanks,
> >
> > Bhavesh
> >
> > On Wed, Oct 29, 2014 at 6:36 PM, Jun Rao  wrote:
> >
> > > The IllegalStateException typically happens if you call next() before
> > > hasNext() on the iterator.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Tue, Oct 28, 2014 at 10:50 AM, Bhavesh Mistry <
> > > mistry.p.bhav...@gmail.com
> > > > wrote:
> > >
> > > > Hi Neha,
> > > >
> > > > Thanks for your answer.  Can you please let me know how I can resolve
> > the
> > > > Iterator IllegalStateException ?  I would appreciate your is this is
> > bug
> > > I
> > > > can file one or let me know if this is use case specific ?
> > > >
> > > > Thanks,
> > > >
> > > > Bhavesh
> > > >
> > > > On Tue, Oct 28, 2014 at 9:30 AM, Neha Narkhede <
> > neha.narkh...@gmail.com>
> > > > wrote:
> > > >
> > > > > queued.max.message.chunks controls the consumer's fetcher queue.
> > > > >
> > > > > On Mon, Oct 27, 2014 at 9:32 PM, Bhavesh Mistry <
> > > > > mistry.p.bhav...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > HI Neha,
> > > > > >
> > > > > > If I solved the problem number 1 think and number 2 will be
> solved
> > > > (prob
> > > > > > 1 is causing problem number 2(blocked)).  Can you please let me
> > know
> > > > what
> > > > > > controls the queue size for *ConsumerFetcherThread* thread ?
> > > > > >
> > > > > >
> > > > > > Please see the attached java source code which will reproduce the
> > > > > > problem.  You may remove the recovery process...  Please check.
> We
> > > > have
> > > > > to
> > > > > > do some work before we start reading from Kafka Stream Interator
> > and
> > > > this
> > > > > > seems to cause some issue with java.lang.
> > > > > > IllegalStateException: Iterator is in failed state*.
> > > > > >
> > > > > > Please let me know your finding and recommendation.
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Bhavesh
> > > > > >
> > > > > > On Mon, Oct 27, 2014 at 6:24 PM, Neha Narkhede <
> > > > neha.narkh...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > >> >> Sometime it give following exception.
> > > > > >>
> > > > > >> It will help to have a more specific test case that reproduces
> the
> > > > > failed
> > > > > >> iterator state.
> > > > > >>
> > > > > >> Also, the consumer threads block if the fetcher queue is full.
> The
> > > > queue
> > > > > >> can fill up if 

Re: partitions stealing & balancing consumer threads across servers

2014-10-30 Thread Bhavesh Mistry
Hi Joel,

Yes, I am on Kafka Trunk branch.  In my scenario, if you have back-up
threads does that impact the allocation.  If I have 24 threads (6 thread
for each JVM total of 4 JVMS) in above example , does partition allocation
gets evenly distributed (3 on each JVM) ?  is this supported use case ?

Thanks,

Bhavesh


Re: partitions stealing & balancing consumer threads across servers

2014-10-30 Thread Bhavesh Mistry
HI Joel,

Correction to my previous question:   What is expected behavior of *roundrobin
*policy above scenario  ?

Thanks,

Bhavesh

On Thu, Oct 30, 2014 at 1:39 PM, Bhavesh Mistry 
wrote:

> Hi Joel,
>
> I have similar issue.  I have tried *partition.assignment.strategy=*
> *"roundrobin"*, but how do you accept this accept to work ?
>
> We have a topic with 32 partitions and 4 JVM with 10 threads each ( 8 is
> backup if one of JVM goes down).   The roundrobin does not select all the
> JVM only 3 JVM but uneven distribution of threads across 4 JVMs (the 4th
> JVM does not get any active consumption threads).  What is best way to
> evenly (or close to even)  distribute the consumption threads  across JVMs.
>
>
> Thanks,
>
> Bhavesh
>
> On Thu, Oct 30, 2014 at 10:07 AM, Joel Koshy  wrote:
>
>>
>> > example: launching 4 processes on 4 different machines with 4 threads
>> per
>> > process on 12 partition topic will have each machine with 3 assigned
>> > threads and one doing nothing. more over no matter what number of
>> threads
>> > each process will have , as long as it is bigger then 3, the end result
>> > will stay the same with 3 assigned threads per machine, and the rest of
>> > them doing nothing.
>> >
>> > Ideally, I would want something like consumer set/ensemble/{what ever
>> word
>> > not group} that will be used to denote a group of threads on a machine,
>> > so that when specific threads request to join a consumer group they
>> will be
>> > elected so that they are balanced across the machine denoted by the
>> > consumer set/ensemble identifier.
>> >
>> > will partition.assignment.strategy="roundrobin" help with that?
>>
>> You can give it a try. It does have the constraint the subscription is
>> identical across your machines. i.e., it should work in your above
>> scenario (4 process on 4 machines with 4 threads per process). The
>> partition assignment will assign partitions to threads in a
>> round-robin manner. The difference of max(owned) and min(owned) will
>> be exactly one.
>>
>> We can discuss improving partition assignment strategies in the 0.9
>> release with the new consumer.
>>
>> On Thu, Oct 30, 2014 at 08:52:40AM +0200, Shlomi Hazan wrote:
>> > Jun, Joel,
>> >
>> > The issue here is exactly which threads are left out, and which threads
>> are
>> > assigned partitions.
>> > Maybe I am missing something but what I want is to balance consuming
>> > threads across machines/processes, regardless of the amount of threads
>> the
>> > machine launches (side effect: this way if you have more threads than
>> > partitions you get a reserve force awaiting to charge in).
>> >
>> > example: launching 4 processes on 4 different machines with 4 threads
>> per
>> > process on 12 partition topic will have each machine with 3 assigned
>> > threads and one doing nothing. more over no matter what number of
>> threads
>> > each process will have , as long as it is bigger then 3, the end result
>> > will stay the same with 3 assigned threads per machine, and the rest of
>> > them doing nothing.
>> >
>> > Ideally, I would want something like consumer set/ensemble/{what ever
>> word
>> > not group} that will be used to denote a group of threads on a machine,
>> > so that when specific threads request to join a consumer group they
>> will be
>> > elected so that they are balanced across the machine denoted by the
>> > consumer set/ensemble identifier.
>> >
>> > will partition.assignment.strategy="roundrobin" help with that?
>> > 10x,
>> > Shlomi
>> >
>> > On Thu, Oct 30, 2014 at 4:00 AM, Joel Koshy 
>> wrote:
>> >
>> > > Shlomi,
>> > >
>> > > If you are on trunk, and your consumer subscriptions are identical
>> > > then you can try a slightly different partition assignment strategy.
>> > > Try setting partition.assignment.strategy="roundrobin" in your
>> > > consumer config.
>> > >
>> > > Thanks,
>> > >
>> > > Joel
>> > >
>> > > On Wed, Oct 29, 2014 at 06:29:30PM -0700, Jun Rao wrote:
>> > > > By consumer, I actually mean consumer threads (the thread # you
>> used when
>> > > > creating consumer streams). So, if you have 4 consumers, each with 4
>> > > > threads, 4 of the threads will not get any data with 12 p

Re: partitions stealing & balancing consumer threads across servers

2014-10-30 Thread Bhavesh Mistry
Hi Joel,

I have similar issue.  I have tried *partition.assignment.strategy=*
*"roundrobin"*, but how do you accept this accept to work ?

We have a topic with 32 partitions and 4 JVM with 10 threads each ( 8 is
backup if one of JVM goes down).   The roundrobin does not select all the
JVM only 3 JVM but uneven distribution of threads across 4 JVMs (the 4th
JVM does not get any active consumption threads).  What is best way to
evenly (or close to even)  distribute the consumption threads  across JVMs.


Thanks,

Bhavesh

On Thu, Oct 30, 2014 at 10:07 AM, Joel Koshy  wrote:

>
> > example: launching 4 processes on 4 different machines with 4 threads per
> > process on 12 partition topic will have each machine with 3 assigned
> > threads and one doing nothing. more over no matter what number of threads
> > each process will have , as long as it is bigger then 3, the end result
> > will stay the same with 3 assigned threads per machine, and the rest of
> > them doing nothing.
> >
> > Ideally, I would want something like consumer set/ensemble/{what ever
> word
> > not group} that will be used to denote a group of threads on a machine,
> > so that when specific threads request to join a consumer group they will
> be
> > elected so that they are balanced across the machine denoted by the
> > consumer set/ensemble identifier.
> >
> > will partition.assignment.strategy="roundrobin" help with that?
>
> You can give it a try. It does have the constraint the subscription is
> identical across your machines. i.e., it should work in your above
> scenario (4 process on 4 machines with 4 threads per process). The
> partition assignment will assign partitions to threads in a
> round-robin manner. The difference of max(owned) and min(owned) will
> be exactly one.
>
> We can discuss improving partition assignment strategies in the 0.9
> release with the new consumer.
>
> On Thu, Oct 30, 2014 at 08:52:40AM +0200, Shlomi Hazan wrote:
> > Jun, Joel,
> >
> > The issue here is exactly which threads are left out, and which threads
> are
> > assigned partitions.
> > Maybe I am missing something but what I want is to balance consuming
> > threads across machines/processes, regardless of the amount of threads
> the
> > machine launches (side effect: this way if you have more threads than
> > partitions you get a reserve force awaiting to charge in).
> >
> > example: launching 4 processes on 4 different machines with 4 threads per
> > process on 12 partition topic will have each machine with 3 assigned
> > threads and one doing nothing. more over no matter what number of threads
> > each process will have , as long as it is bigger then 3, the end result
> > will stay the same with 3 assigned threads per machine, and the rest of
> > them doing nothing.
> >
> > Ideally, I would want something like consumer set/ensemble/{what ever
> word
> > not group} that will be used to denote a group of threads on a machine,
> > so that when specific threads request to join a consumer group they will
> be
> > elected so that they are balanced across the machine denoted by the
> > consumer set/ensemble identifier.
> >
> > will partition.assignment.strategy="roundrobin" help with that?
> > 10x,
> > Shlomi
> >
> > On Thu, Oct 30, 2014 at 4:00 AM, Joel Koshy  wrote:
> >
> > > Shlomi,
> > >
> > > If you are on trunk, and your consumer subscriptions are identical
> > > then you can try a slightly different partition assignment strategy.
> > > Try setting partition.assignment.strategy="roundrobin" in your
> > > consumer config.
> > >
> > > Thanks,
> > >
> > > Joel
> > >
> > > On Wed, Oct 29, 2014 at 06:29:30PM -0700, Jun Rao wrote:
> > > > By consumer, I actually mean consumer threads (the thread # you used
> when
> > > > creating consumer streams). So, if you have 4 consumers, each with 4
> > > > threads, 4 of the threads will not get any data with 12 partitions.
> It
> > > > sounds like that's not what you get?  What's the output of the
> > > > ConsumerOffsetChecker (see
> http://kafka.apache.org/documentation.html)?
> > > >
> > > > For consumer.id, you don't need to set it in general. We generate
> some
> > > uuid
> > > > automatically.
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > > On Tue, Oct 28, 2014 at 4:59 AM, Shlomi Hazan 
> wrote:
> > > >
> > > > > Jun,
> > > > >
> > > > > I hear you say "partitions are evenly distributed among all
> consumers
> > > in
> > > > > the same group", yet I did bump into a case where launching a
> process
> > > with
> > > > > X high level consumer API threads took over all partitions, sending
> > > > > existing consumers to be unemployed.
> > > > >
> > > > > According to the claim above, and if I am not mistaken:
> > > > > on a topic T with 12 partitions and 3 consumers C1-C3 on the same
> group
> > > > > with 4 threads each,
> > > > > adding a new consumer C4 with 12 threads should yield the following
> > > > > balance:
> > > > > C1-C3 each relinquish a single partition holding only 3 partitions
> > > eac

Re: High Level Consumer Iterator IllegalStateException Issue

2014-10-29 Thread Bhavesh Mistry
Hi Jun,

The hasNext() itself throws this error.  I have to manually reset state and
sometime it is able to recover and other it is not. Any other clue ?

public boolean hasNext() {
LOG.info("called of  hasNext() :");
int retry = 3;
while(retry > 0){
try{
// this hasNext is blocking call..
boolean result = iterator.hasNext();
return result;
}catch(IllegalStateException exp){
iterator.resetState();
LOG.error("GOT IllegalStateException arg trying to
recover", exp);
retry--;
}
}
return false;
}

Thanks,

Bhavesh

On Wed, Oct 29, 2014 at 6:36 PM, Jun Rao  wrote:

> The IllegalStateException typically happens if you call next() before
> hasNext() on the iterator.
>
> Thanks,
>
> Jun
>
> On Tue, Oct 28, 2014 at 10:50 AM, Bhavesh Mistry <
> mistry.p.bhav...@gmail.com
> > wrote:
>
> > Hi Neha,
> >
> > Thanks for your answer.  Can you please let me know how I can resolve the
> > Iterator IllegalStateException ?  I would appreciate your is this is bug
> I
> > can file one or let me know if this is use case specific ?
> >
> > Thanks,
> >
> > Bhavesh
> >
> > On Tue, Oct 28, 2014 at 9:30 AM, Neha Narkhede 
> > wrote:
> >
> > > queued.max.message.chunks controls the consumer's fetcher queue.
> > >
> > > On Mon, Oct 27, 2014 at 9:32 PM, Bhavesh Mistry <
> > > mistry.p.bhav...@gmail.com>
> > > wrote:
> > >
> > > > HI Neha,
> > > >
> > > > If I solved the problem number 1 think and number 2 will be solved
> > (prob
> > > > 1 is causing problem number 2(blocked)).  Can you please let me know
> > what
> > > > controls the queue size for *ConsumerFetcherThread* thread ?
> > > >
> > > >
> > > > Please see the attached java source code which will reproduce the
> > > > problem.  You may remove the recovery process...  Please check.  We
> > have
> > > to
> > > > do some work before we start reading from Kafka Stream Interator and
> > this
> > > > seems to cause some issue with java.lang.
> > > > IllegalStateException: Iterator is in failed state*.
> > > >
> > > > Please let me know your finding and recommendation.
> > > >
> > > > Thanks,
> > > >
> > > > Bhavesh
> > > >
> > > > On Mon, Oct 27, 2014 at 6:24 PM, Neha Narkhede <
> > neha.narkh...@gmail.com>
> > > > wrote:
> > > >
> > > >> >> Sometime it give following exception.
> > > >>
> > > >> It will help to have a more specific test case that reproduces the
> > > failed
> > > >> iterator state.
> > > >>
> > > >> Also, the consumer threads block if the fetcher queue is full. The
> > queue
> > > >> can fill up if your consumer thread dies or slows down. I'd
> recommend
> > > you
> > > >> ensure that all your consumer threads are alive. You can take a
> thread
> > > >> dump
> > > >> to verify this.
> > > >>
> > > >> Thanks,
> > > >> Neha
> > > >>
> > > >> On Mon, Oct 27, 2014 at 2:14 PM, Bhavesh Mistry <
> > > >> mistry.p.bhav...@gmail.com>
> > > >> wrote:
> > > >>
> > > >> > Hi Neha,
> > > >> >
> > > >> >
> > > >> > I have two problems:.  Any help is greatly appreciated.
> > > >> >
> > > >> >
> > > >> > 1)* java.lang.IllegalStateException: Iterator is in failed state*
> > > >> >
> > > >> >ConsumerConnector  consumerConnector = Consumer
> > > >> > .createJavaConsumerConnector(getConsumerConfig());
> > > >> > Map topicCountMap = new HashMap > > >> > Integer>();
> > > >> > topicCountMap.put(topic, *32*);
> > > >> > Map>>
> > topicStreamMap
> > > =
> > > >> > consumerConnector
> > > >> > .createMessageStreams(topicCountMap);
> > > >> >
> > > >> > List> streams =
> > > >> > Collections.sy

Re: High Level Consumer and Close with Auto Commit On

2014-10-28 Thread Bhavesh Mistry
HI Gwen,

Thanks for info appreciate. Sometime, I have observed duplicated events...
so that is why I ask.

Thanks,

Bhavesh

On Tue, Oct 28, 2014 at 11:51 AM, Gwen Shapira 
wrote:

> High level consumer commits before shutting down.
>
> If you'll look at ZookeeperConsumerConnector.scala (currently the only
> implementation of ConsumerConnector) you'll see shutdown() includes
> the following:
>
>   if (config.autoCommitEnable)
> commitOffsets()
>
> Gwen
>
> On Tue, Oct 28, 2014 at 11:44 AM, Bhavesh Mistry
>  wrote:
> > Hi Kafka Team,
> >
> > What is expected behavior when you close *ConsumerConnector* and auto
> > commit is on ?  Basically, when auto commit interval is set to 5 seconds
> > and shutdown is called (before 5 seconds elapses) does ConsumerConnector
> > commit the offset of message consumed by (next()) method or consumer will
> > get duplicate messages when it comes online after restart ?
> >
> > ConsumerConnector.shutdown();
> >
> > Thanks,
> >
> > Bhavesh
>


High Level Consumer and Close with Auto Commit On

2014-10-28 Thread Bhavesh Mistry
Hi Kafka Team,

What is expected behavior when you close *ConsumerConnector* and auto
commit is on ?  Basically, when auto commit interval is set to 5 seconds
and shutdown is called (before 5 seconds elapses) does ConsumerConnector
commit the offset of message consumed by (next()) method or consumer will
get duplicate messages when it comes online after restart ?

ConsumerConnector.shutdown();

Thanks,

Bhavesh


Re: High Level Consumer Iterator IllegalStateException Issue

2014-10-28 Thread Bhavesh Mistry
Hi Neha,

Thanks for your answer.  Can you please let me know how I can resolve the
Iterator IllegalStateException ?  I would appreciate your is this is bug I
can file one or let me know if this is use case specific ?

Thanks,

Bhavesh

On Tue, Oct 28, 2014 at 9:30 AM, Neha Narkhede 
wrote:

> queued.max.message.chunks controls the consumer's fetcher queue.
>
> On Mon, Oct 27, 2014 at 9:32 PM, Bhavesh Mistry <
> mistry.p.bhav...@gmail.com>
> wrote:
>
> > HI Neha,
> >
> > If I solved the problem number 1 think and number 2 will be solved  (prob
> > 1 is causing problem number 2(blocked)).  Can you please let me know what
> > controls the queue size for *ConsumerFetcherThread* thread ?
> >
> >
> > Please see the attached java source code which will reproduce the
> > problem.  You may remove the recovery process...  Please check.  We have
> to
> > do some work before we start reading from Kafka Stream Interator and this
> > seems to cause some issue with java.lang.
> > IllegalStateException: Iterator is in failed state*.
> >
> > Please let me know your finding and recommendation.
> >
> > Thanks,
> >
> > Bhavesh
> >
> > On Mon, Oct 27, 2014 at 6:24 PM, Neha Narkhede 
> > wrote:
> >
> >> >> Sometime it give following exception.
> >>
> >> It will help to have a more specific test case that reproduces the
> failed
> >> iterator state.
> >>
> >> Also, the consumer threads block if the fetcher queue is full. The queue
> >> can fill up if your consumer thread dies or slows down. I'd recommend
> you
> >> ensure that all your consumer threads are alive. You can take a thread
> >> dump
> >> to verify this.
> >>
> >> Thanks,
> >> Neha
> >>
> >> On Mon, Oct 27, 2014 at 2:14 PM, Bhavesh Mistry <
> >> mistry.p.bhav...@gmail.com>
> >> wrote:
> >>
> >> > Hi Neha,
> >> >
> >> >
> >> > I have two problems:.  Any help is greatly appreciated.
> >> >
> >> >
> >> > 1)* java.lang.IllegalStateException: Iterator is in failed state*
> >> >
> >> >ConsumerConnector  consumerConnector = Consumer
> >> > .createJavaConsumerConnector(getConsumerConfig());
> >> > Map topicCountMap = new HashMap >> > Integer>();
> >> > topicCountMap.put(topic, *32*);
> >> > Map>> topicStreamMap
> =
> >> > consumerConnector
> >> > .createMessageStreams(topicCountMap);
> >> >
> >> > List> streams =
> >> > Collections.synchronizedList(topicStreamMap.get(topic));
> >> >
> >> > AppStaticInfo info = Mupd8Main.STATICINFO();
> >> >
> >> > Iterator> iterator =
> >> > streams.iterator();
> >> > // remove the head first list for this source...rest are for
> the
> >> > Dynamic Souce...
> >> > mainIterator = iterator.next().iterator();
> >> >
> >> > List> iteratorList = new
> >> > ArrayList>(streams.size());
> >> > // now rest of the iterator must be registered now..
> >> > while(iterator.hasNext()){
> >> > iteratorList.add(iterator.next().iterator());
> >> > }
> >> > *KafkaStreamRegistory.registerStream(mainSourceName,
> >> > iteratorList);*
> >> >
> >> > Once the Consumer iterator is created and registered.  We use this in
> >> > another thread to start reading from the Consumer Iterator.   Sometime
> >> it
> >> > give following exception.
> >> >
> >> > 24 Oct 2014 16:03:25,923 ERROR
> >> > [SourceReader:request_source:LogStreamKafkaSource1]
> >> > (grizzled.slf4j.Logger.error:116)  - SourceThread: exception during
> >> reads.
> >> > Swallowed to continue next read.
> >> > java.lang.IllegalStateException: Iterator is in failed state
> >> > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)
> >> >
> >> >
> >> > I have tried to recover from this state by using this:
> >> > iterator.resetState(); but it does not recover sometime.
> >> >
> >> >
> >> >
> >> >
> >> > *2) ConsumerFetcherThread are blocked on enqueue ?  What control

Where Compression/Decompression happens

2014-10-27 Thread Bhavesh Mistry
Hi Kafka Team,

Is Compression  happening on Producer Side (on application thread meaning
thread that call send method or background Kafka thread ) and where does
decompression Consumer side ?

Is there any Compression/Decompression happening on Brokers Side when
receiving message from producer and sending message to consumers  ?

Thanks for your help !

Thanks,

Bhavesh


Re: High Level Consumer Iterator IllegalStateException Issue

2014-10-27 Thread Bhavesh Mistry
HI Neha,

If I solved the problem number 1 think and number 2 will be solved  (prob 1
is causing problem number 2(blocked)).  Can you please let me know what
controls the queue size for *ConsumerFetcherThread* thread ?


Please see the attached java source code which will reproduce the problem.
You may remove the recovery process...  Please check.  We have to do some
work before we start reading from Kafka Stream Interator and this seems to
cause some issue with java.lang.
IllegalStateException: Iterator is in failed state*.

Please let me know your finding and recommendation.

Thanks,

Bhavesh

On Mon, Oct 27, 2014 at 6:24 PM, Neha Narkhede 
wrote:

> >> Sometime it give following exception.
>
> It will help to have a more specific test case that reproduces the failed
> iterator state.
>
> Also, the consumer threads block if the fetcher queue is full. The queue
> can fill up if your consumer thread dies or slows down. I'd recommend you
> ensure that all your consumer threads are alive. You can take a thread dump
> to verify this.
>
> Thanks,
> Neha
>
> On Mon, Oct 27, 2014 at 2:14 PM, Bhavesh Mistry <
> mistry.p.bhav...@gmail.com>
> wrote:
>
> > Hi Neha,
> >
> >
> > I have two problems:.  Any help is greatly appreciated.
> >
> >
> > 1)* java.lang.IllegalStateException: Iterator is in failed state*
> >
> >ConsumerConnector  consumerConnector = Consumer
> > .createJavaConsumerConnector(getConsumerConfig());
> > Map topicCountMap = new HashMap > Integer>();
> > topicCountMap.put(topic, *32*);
> > Map>> topicStreamMap =
> > consumerConnector
> > .createMessageStreams(topicCountMap);
> >
> > List> streams =
> > Collections.synchronizedList(topicStreamMap.get(topic));
> >
> > AppStaticInfo info = Mupd8Main.STATICINFO();
> >
> > Iterator> iterator =
> > streams.iterator();
> > // remove the head first list for this source...rest are for the
> > Dynamic Souce...
> > mainIterator = iterator.next().iterator();
> >
> > List> iteratorList = new
> > ArrayList>(streams.size());
> > // now rest of the iterator must be registered now..
> > while(iterator.hasNext()){
> > iteratorList.add(iterator.next().iterator());
> > }
> > *KafkaStreamRegistory.registerStream(mainSourceName,
> > iteratorList);*
> >
> > Once the Consumer iterator is created and registered.  We use this in
> > another thread to start reading from the Consumer Iterator.   Sometime it
> > give following exception.
> >
> > 24 Oct 2014 16:03:25,923 ERROR
> > [SourceReader:request_source:LogStreamKafkaSource1]
> > (grizzled.slf4j.Logger.error:116)  - SourceThread: exception during
> reads.
> > Swallowed to continue next read.
> > java.lang.IllegalStateException: Iterator is in failed state
> > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)
> >
> >
> > I have tried to recover from this state by using this:
> > iterator.resetState(); but it does not recover sometime.
> >
> >
> >
> >
> > *2) ConsumerFetcherThread are blocked on enqueue ?  What controls size of
> > queue ? Why are they blocked ?  *Due to this our lags are increasing.
> our
> > threads blocked on hasNext()...
> >
> >
> >
> "ConsumerFetcherThread-Mupd8_Kafka_caq1-fl4-ilo.walmart.com-1414443037185-70e42954-0-1"
> > prio=5 tid=0x7fb36292c800 nid=0xab03 waiting on condition
> > [0x000116379000]
> >java.lang.Thread.State: WAITING (parking)
> > at sun.misc.Unsafe.park(Native Method)
> > - parking to wait for  <0x000704019388> (a
> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> > at
> > java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> > at
> >
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
> > at
> >
> java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
> > at
> > kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
> > at
> >
> >
> kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
> > at
> >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:131)
> > at

Re: High Level Consumer Iterator IllegalStateException Issue

2014-10-27 Thread Bhavesh Mistry
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:112)
at
scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
at
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:112)
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:112)
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:112)
at kafka.utils.Utils$.inLock(Utils.scala:535)
at
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:111)
at
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)






Thanks,

Bhavesh



On Sun, Oct 26, 2014 at 3:14 PM, Neha Narkhede 
wrote:

> Can you provide the steps to reproduce this issue?
>
> On Fri, Oct 24, 2014 at 6:11 PM, Bhavesh Mistry <
> mistry.p.bhav...@gmail.com>
> wrote:
>
> > I am using one from the Kafka Trunk branch.
> >
> > Thanks,
> >
> > Bhavesh
> >
> > On Fri, Oct 24, 2014 at 5:24 PM, Neha Narkhede 
> > wrote:
> >
> > > Which version of Kafka are you using on the consumer?
> > >
> > > On Fri, Oct 24, 2014 at 4:14 PM, Bhavesh Mistry <
> > > mistry.p.bhav...@gmail.com>
> > > wrote:
> > >
> > > > HI Kafka Community ,
> > > >
> > > > I am using kafka trunk source code and I get following exception.
> What
> > > > could cause the iterator to have FAILED state.  Please let me know
> how
> > I
> > > > can fix this issue.
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > *java.lang.IllegalStateException: Iterator is in failed stateat
> > > > kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)*
> > > > Here is Properties:
> > > >
> > > > Properties props = new Properties();
> > > > props.put("zookeeper.connect", zkConnect);
> > > > props.put("group.id", groupId);
> > > > *props.put("consumer.timeout.ms <http://consumer.timeout.ms
> >",
> > > > "-1");*
> > > > props.put("zookeeper.session.timeout.ms", "1");
> > > > props.put("zookeeper.sync.time.ms", "6000");
> > > > props.put("auto.commit.interval.ms", "2000");
> > > > props.put("rebalance.max.retries", "8");
> > > > props.put("auto.offset.reset", "largest");
> > > > props.put("fetch.message.max.bytes","2097152");
> > > > props.put("socket.receive.buffer.bytes","2097152");
> > > > props.put("auto.commit.enable","true");
> > > >
> > > >
> > > > Thanks,
> > > >
> > > > Bhavesh
> > > >
> > >
> >
>


Re: High Level Consumer Iterator IllegalStateException Issue

2014-10-24 Thread Bhavesh Mistry
I am using one from the Kafka Trunk branch.

Thanks,

Bhavesh

On Fri, Oct 24, 2014 at 5:24 PM, Neha Narkhede 
wrote:

> Which version of Kafka are you using on the consumer?
>
> On Fri, Oct 24, 2014 at 4:14 PM, Bhavesh Mistry <
> mistry.p.bhav...@gmail.com>
> wrote:
>
> > HI Kafka Community ,
> >
> > I am using kafka trunk source code and I get following exception.  What
> > could cause the iterator to have FAILED state.  Please let me know how I
> > can fix this issue.
> >
> >
> >
> >
> >
> > *java.lang.IllegalStateException: Iterator is in failed stateat
> > kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)*
> > Here is Properties:
> >
> > Properties props = new Properties();
> > props.put("zookeeper.connect", zkConnect);
> > props.put("group.id", groupId);
> > *props.put("consumer.timeout.ms <http://consumer.timeout.ms>",
> > "-1");*
> > props.put("zookeeper.session.timeout.ms", "1");
> > props.put("zookeeper.sync.time.ms", "6000");
> > props.put("auto.commit.interval.ms", "2000");
> > props.put("rebalance.max.retries", "8");
> > props.put("auto.offset.reset", "largest");
> > props.put("fetch.message.max.bytes","2097152");
> > props.put("socket.receive.buffer.bytes","2097152");
> > props.put("auto.commit.enable","true");
> >
> >
> > Thanks,
> >
> > Bhavesh
> >
>


High Level Consumer Iterator IllegalStateException Issue

2014-10-24 Thread Bhavesh Mistry
HI Kafka Community ,

I am using kafka trunk source code and I get following exception.  What
could cause the iterator to have FAILED state.  Please let me know how I
can fix this issue.





*java.lang.IllegalStateException: Iterator is in failed stateat
kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)*
Here is Properties:

Properties props = new Properties();
props.put("zookeeper.connect", zkConnect);
props.put("group.id", groupId);
*props.put("consumer.timeout.ms ",
"-1");*
props.put("zookeeper.session.timeout.ms", "1");
props.put("zookeeper.sync.time.ms", "6000");
props.put("auto.commit.interval.ms", "2000");
props.put("rebalance.max.retries", "8");
props.put("auto.offset.reset", "largest");
props.put("fetch.message.max.bytes","2097152");
props.put("socket.receive.buffer.bytes","2097152");
props.put("auto.commit.enable","true");


Thanks,

Bhavesh


Re: Sending Same Message to Two Topics on Same Broker Cluster

2014-10-21 Thread Bhavesh Mistry
Hi Neha,

All, I am saying is that if same byte[] or data  has to go to two topics
then, I have to call send twice and with same data has to transfer over the
wire twice (assuming the partition is on same broker for two topics, then
it not efficient.).  If Kafka Protocol allows to set multiple topics and
partitions for request then it would me great.
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ProduceRequest
*ProducerRecord
<http://people.apache.org/~nehanarkhede/kafka-0.9-producer-javadoc/doc/org/apache/kafka/clients/producer/ProducerRecord.html#ProducerRecord(java.lang.String,
byte[], byte[])>*(java.lang.String *topic*, byte[] key, byte[] value)


Thanks,

Bhavesh

On Tue, Oct 21, 2014 at 8:26 AM, Neha Narkhede 
wrote:

> I'm not sure I understood your concern about invoking send() twice, once
> with each topic. Are you worried about the network overhead? Whether Kafka
> does this transparently or not, sending messages to different topics will
> carry some overhead. I think the design of the API is much more intuitive
> and cleaner if a message is sent to a topic partition.
>
> On Mon, Oct 20, 2014 at 9:17 PM, Bhavesh Mistry <
> mistry.p.bhav...@gmail.com>
> wrote:
>
> > Hi Neha,
> >
> > Yes, I understand that but when transmitting single message (I can not
> set
> > List of all topics)  Only Single one.  So I will to add same message in
> > buffer with different topic. If Kakfa protocol, allows to add multiple
> > topic then message does not have to be re-transmited over the wire to add
> > to multiple topic.
> >
> > The Producer record only allow one topic.
> >
> >
> http://people.apache.org/~nehanarkhede/kafka-0.9-producer-javadoc/doc/org/apache/kafka/clients/producer/ProducerRecord.html
> >
> > Thanks for your quick response and I appreciate your help.
> >
> > Thanks,
> >
> > Bhavesh
> >
> >
> > On Mon, Oct 20, 2014 at 9:10 PM, Neha Narkhede 
> > wrote:
> >
> > > Not really. You need producers to send data to Kafka.
> > >
> > > On Mon, Oct 20, 2014 at 9:05 PM, Bhavesh Mistry <
> > > mistry.p.bhav...@gmail.com>
> > > wrote:
> > >
> > > > Hi Kakfa Team,
> > > >
> > > >
> > > > I would like to send a single message to multiple topics (two for
> now)
> > > > without re-transmitting the message from producer to brokers.  Is
> this
> > > > possible?
> > > >
> > > > Both Producers Scala and Java does not allow this.   I do not have to
> > do
> > > > this all the time only based on application condition.
> > > >
> > > >
> > > > Thanks in advance of your help !!
> > > >
> > > >
> > > > Thanks,
> > > >
> > > >
> > > > Bhavesh
> > > >
> > >
> >
>


Re: Sending Same Message to Two Topics on Same Broker Cluster

2014-10-20 Thread Bhavesh Mistry
Hi Neha,

Yes, I understand that but when transmitting single message (I can not set
List of all topics)  Only Single one.  So I will to add same message in
buffer with different topic. If Kakfa protocol, allows to add multiple
topic then message does not have to be re-transmited over the wire to add
to multiple topic.

The Producer record only allow one topic.
http://people.apache.org/~nehanarkhede/kafka-0.9-producer-javadoc/doc/org/apache/kafka/clients/producer/ProducerRecord.html

Thanks for your quick response and I appreciate your help.

Thanks,

Bhavesh


On Mon, Oct 20, 2014 at 9:10 PM, Neha Narkhede 
wrote:

> Not really. You need producers to send data to Kafka.
>
> On Mon, Oct 20, 2014 at 9:05 PM, Bhavesh Mistry <
> mistry.p.bhav...@gmail.com>
> wrote:
>
> > Hi Kakfa Team,
> >
> >
> > I would like to send a single message to multiple topics (two for now)
> > without re-transmitting the message from producer to brokers.  Is this
> > possible?
> >
> > Both Producers Scala and Java does not allow this.   I do not have to do
> > this all the time only based on application condition.
> >
> >
> > Thanks in advance of your help !!
> >
> >
> > Thanks,
> >
> >
> > Bhavesh
> >
>


Sending Same Message to Two Topics on Same Broker Cluster

2014-10-20 Thread Bhavesh Mistry
Hi Kakfa Team,


I would like to send a single message to multiple topics (two for now)
without re-transmitting the message from producer to brokers.  Is this
possible?

Both Producers Scala and Java does not allow this.   I do not have to do
this all the time only based on application condition.


Thanks in advance of your help !!


Thanks,


Bhavesh


Re: Auto Purging Consumer Group Configuration [Especially Kafka Console Group]

2014-10-09 Thread Bhavesh Mistry
We just want to clean-up old configuration from ZK.   We can check from the
offset API so we can delete based on offset ..  is that right ? there is no
date last associated with Consumer Group ?  Is that right in ZK
configuration ?

Thanks,

Bhavesh

On Thu, Oct 9, 2014 at 9:23 PM, Gwen Shapira  wrote:

> The problem with Kafka is that we never know when a consumer is
> "truly" inactive.
>
> But - if you decide to define inactive as consumer who's last offset
> is lower than anything available on the log (or perhaps lagging by
> over X messages?), its fairly easy to write a script to detect and
> clean them directly on ZK.
>
> BTW. Why do you need to clean them? What issue do you see with just
> letting them hang around?
>
> Gwen
>
> On Thu, Oct 9, 2014 at 9:18 PM, Bhavesh Mistry
>  wrote:
> > Hi Kafka,
> >
> > We have lots of lingering console consumer group people have created for
> > testing or debugging purpose for one time use via
> > bin/kafka-console-consumer.sh.  Is there auto purging that clean script
> > that Kafka provide ?  Is three any API to find out inactive Consumer
> group
> > and delete consumer group configuration.
> >
> > Thanks,
> >
> > Bhavesh
>


Auto Purging Consumer Group Configuration [Especially Kafka Console Group]

2014-10-09 Thread Bhavesh Mistry
Hi Kafka,

We have lots of lingering console consumer group people have created for
testing or debugging purpose for one time use via
bin/kafka-console-consumer.sh.  Is there auto purging that clean script
that Kafka provide ?  Is three any API to find out inactive Consumer group
and delete consumer group configuration.

Thanks,

Bhavesh


Re: [Java New Producer] CPU Usage Spike to 100% when network connection is lost

2014-09-18 Thread Bhavesh Mistry
HI Jay,

I am running trunk producer based on following last commit with timestamp
Mon Sep 15 20:34:14 2014 -0700.   Please let me know if this timestamp
contains the fix. Otherwise, I will file a bug with logs as Neha suggested.

commit cf0f5750b39e675cf9a9c6d6394a665366db0f58
Author: Alexis Midon 
Date:   Mon Sep 15 20:34:14 2014 -0700
KAFKA-1597 New metrics: ResponseQueueSize and BeingSentResponses;
reviewed by Neha Narkhede and Jun Rao


Thanks,

Bhavesh

On Wed, Sep 17, 2014 at 11:22 PM, Jay Kreps  wrote:

> Also do you know what version you are running we did fix several bugs
> similar to this against trunk.
>
> -Jay
>
> On Wed, Sep 17, 2014 at 2:14 PM, Bhavesh Mistry
>  wrote:
> > Hi Kafka Dev team,
> >
> > I see my CPU spike to 100% when network connection is lost for while.  It
> > seems network  IO thread are very busy logging following error message.
> Is
> > this expected behavior ?
> >
> > 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR
> > org.apache.kafka.clients.producer.internals.Sender - Uncaught error in
> > kafka producer I/O thread:
> >
> > java.lang.IllegalStateException: No entry found for node -2
> >
> > at org.apache.kafka.clients.ClusterConnectionStates.nodeState(
> > ClusterConnectionStates.java:110)
> >
> > at org.apache.kafka.clients.ClusterConnectionStates.disconnected(
> > ClusterConnectionStates.java:99)
> >
> > at org.apache.kafka.clients.NetworkClient.initiateConnect(
> > NetworkClient.java:394)
> >
> > at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(
> > NetworkClient.java:380)
> >
> > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174)
> >
> > at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)
> >
> > at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
> >
> > at java.lang.Thread.run(Thread.java:744)
> >
> > Thanks,
> >
> > Bhavesh
>


[Java New Producer] CPU Usage Spike to 100% when network connection is lost

2014-09-17 Thread Bhavesh Mistry
Hi Kafka Dev team,

I see my CPU spike to 100% when network connection is lost for while.  It
seems network  IO thread are very busy logging following error message.  Is
this expected behavior ?

2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR
org.apache.kafka.clients.producer.internals.Sender - Uncaught error in
kafka producer I/O thread:

java.lang.IllegalStateException: No entry found for node -2

at org.apache.kafka.clients.ClusterConnectionStates.nodeState(
ClusterConnectionStates.java:110)

at org.apache.kafka.clients.ClusterConnectionStates.disconnected(
ClusterConnectionStates.java:99)

at org.apache.kafka.clients.NetworkClient.initiateConnect(
NetworkClient.java:394)

at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(
NetworkClient.java:380)

at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174)

at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)

at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)

at java.lang.Thread.run(Thread.java:744)

Thanks,

Bhavesh


Re: MBeans, dashes, underscores, and KAFKA-1481

2014-09-17 Thread Bhavesh Mistry
Sure we can do the option 2 for JMX beans.  But same solution should be
applied to producer.metrics() method for new producer.  Regardless of
metric is access (JMX or via producer), it has to be consistent naming
convention.

For example, I get following metric name when my topic is "topic.dot".  So
can we just use escape char if topic name or client.id contains Kafka
"reserved" chars (.,-,_etc).

topic.*topic.dot*.record-error-rate
topic.*topic.dot*.record-retry-rate
topic.*topic.dot*.byte-rate
topic.*topic.dot*.record-send-rate
topic.*topic.dot*.compression-rate


Thanks,

Bhavesh

On Wed, Sep 17, 2014 at 9:35 AM, Jun Rao  wrote:

> Bhavesh,
>
> Yes, allowing dot in clientId and topic makes it a bit harder to define the
> JMX bean names. I see a couple of solutions here.
>
> 1. Disable dot in clientId and topic names. The issue is that dot may
> already be used in existing deployment.
>
> 2. We can represent the JMX bean name differently in the new producer.
> Instead of
>   kafka.producer.myclientid:type=mytopic
> we could change it to
>   kafka.producer:clientId=myclientid,topic=mytopic
>
> I felt that option 2 is probably better since it doesn't affect existing
> users.
>
> Otis,
>
> We probably can also use option 2 to address KAFKA-1481. For topic/clientid
> specific metrics, we could explicitly specify the metric name so that it
> contains "topic=mytopic,clientid=myclientid". That seems to be a much
> cleaner way than having all parts included in a single string separated by
> '|'.
>
> Thanks,
>
> Jun
>
>
>
>
> On Tue, Sep 16, 2014 at 5:15 PM, Bhavesh Mistry <
> mistry.p.bhav...@gmail.com>
> wrote:
>
> > HI Otis,
> >
> > What is migration path ?  If topic with special chars exists already(
> > ".","-","|" etc)  in previous version of producer/consumer of Kafka, what
> > happens after the upgrade new producer or consumer (kafka version) ?
> Also,
> > in new producer API (Kafka Trunk), does this enforce the rule about
> client
> > id as well ?
> >
> > Thanks,
> >
> > Bhavesh
> >
> > On Tue, Sep 16, 2014 at 2:09 PM, Otis Gospodnetic <
> > otis.gospodne...@gmail.com> wrote:
> >
> > > Hi,
> > >
> > > So maybe I should I should have asked the Q explicitly:
> > > Could we commit the patch from
> > > https://issues.apache.org/jira/browse/KAFKA-1481 now that, I hope,
> it's
> > > clear what problems the current MBean names can cause?
> > >
> > > Thanks,
> > > Otis
> > > --
> > > Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> > > Solr & Elasticsearch Support * http://sematext.com/
> > >
> > >
> > >
> > > On Mon, Sep 15, 2014 at 10:40 PM, Otis Gospodnetic <
> > > otis.gospodne...@gmail.com> wrote:
> > >
> > > > Hi,
> > > >
> > > > *Problem:*
> > > > Some Kafka 0.8.x MBeans have names composed of things like  > > > group>--.  Note how dashes are used as
> delimiters.
> > > >  When  and  don't contain the delimiter
> > character
> > > > all is good if you want to extract parts of this MBean name by simply
> > > > splitting on the delimiter character.  The problem is that dashes are
> > > > allowed in topic and group names, so this splitting doesn't work.
> > > > Moreover, underscores are also used as delimiters, and they can also
> be
> > > > used in things like topic names.
> > > >
> > > > *Example*:
> > > > This MBean's name is composed of  group>--BytesPerSec:
> > > >
> > > > kafka.consumer:type="ConsumerTopicMetrics",
> > name="*myGroup**-myTopic**-*
> > > > BytesPerSec"
> > > >
> > > > Here we can actually split on "-" and extract all 3 parts from the
> > MBean
> > > > name::
> > > > * consumer group ('*myGroup*')
> > > > * topic ('*myTopic*')
> > > > * metric (‘BytesPerSec’)
> > > >
> > > > All good!
> > > >
> > > > But imagine if I named the group: *my-Group*
> > > > And if I named the topic: *my-Topic*
> > > >
> > > > Then we'd have:
> > > > kafka.consumer:type="ConsumerTopicMetrics",
> > > name="*my-Group**-my-Topic**-*
> > > > BytesPerSec"
> > > >
> > > > Now splitting on "-" would no longer work!  To extract "my-Group" and
> > > > "my-Topic" and "BytesPerSec" parts I would have to know the specific
> > > group
> > > > name and topic name to look for and could not use generic approach of
> > > just
> > > > splitting the MBean name on the delimiter.
> > > >
> > > > *Solution*:
> > > > The patch in https://issues.apache.org/jira/browse/KAFKA-1481
> replaces
> > > > all _ and - characters where they are used as delimiters in MBean
> names
> > > > with a "|" character.  Because the "I" character is not allowed in
> > topic
> > > > names, consumer groups, host names, splitting on this new and unified
> > > > delimiter works.
> > > >
> > > > I hope this explains the problem, the solution, and that this can
> make
> > it
> > > > in the next 0.8.x.
> > > >
> > > > Otis
> > > > --
> > > > Monitoring * Alerting * Anomaly Detection * Centralized Log
> Management
> > > > Solr & Elasticsearch Support * http://sematext.com/
> > > >
> > > >
> > >
> >
>


Re: MBeans, dashes, underscores, and KAFKA-1481

2014-09-16 Thread Bhavesh Mistry
HI Otis,

What is migration path ?  If topic with special chars exists already(
".","-","|" etc)  in previous version of producer/consumer of Kafka, what
happens after the upgrade new producer or consumer (kafka version) ?  Also,
in new producer API (Kafka Trunk), does this enforce the rule about client
id as well ?

Thanks,

Bhavesh

On Tue, Sep 16, 2014 at 2:09 PM, Otis Gospodnetic <
otis.gospodne...@gmail.com> wrote:

> Hi,
>
> So maybe I should I should have asked the Q explicitly:
> Could we commit the patch from
> https://issues.apache.org/jira/browse/KAFKA-1481 now that, I hope, it's
> clear what problems the current MBean names can cause?
>
> Thanks,
> Otis
> --
> Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> Solr & Elasticsearch Support * http://sematext.com/
>
>
>
> On Mon, Sep 15, 2014 at 10:40 PM, Otis Gospodnetic <
> otis.gospodne...@gmail.com> wrote:
>
> > Hi,
> >
> > *Problem:*
> > Some Kafka 0.8.x MBeans have names composed of things like  > group>--.  Note how dashes are used as delimiters.
> >  When  and  don't contain the delimiter character
> > all is good if you want to extract parts of this MBean name by simply
> > splitting on the delimiter character.  The problem is that dashes are
> > allowed in topic and group names, so this splitting doesn't work.
> > Moreover, underscores are also used as delimiters, and they can also be
> > used in things like topic names.
> >
> > *Example*:
> > This MBean's name is composed of --BytesPerSec:
> >
> > kafka.consumer:type="ConsumerTopicMetrics", name="*myGroup**-myTopic**-*
> > BytesPerSec"
> >
> > Here we can actually split on "-" and extract all 3 parts from the MBean
> > name::
> > * consumer group ('*myGroup*')
> > * topic ('*myTopic*')
> > * metric (‘BytesPerSec’)
> >
> > All good!
> >
> > But imagine if I named the group: *my-Group*
> > And if I named the topic: *my-Topic*
> >
> > Then we'd have:
> > kafka.consumer:type="ConsumerTopicMetrics",
> name="*my-Group**-my-Topic**-*
> > BytesPerSec"
> >
> > Now splitting on "-" would no longer work!  To extract "my-Group" and
> > "my-Topic" and "BytesPerSec" parts I would have to know the specific
> group
> > name and topic name to look for and could not use generic approach of
> just
> > splitting the MBean name on the delimiter.
> >
> > *Solution*:
> > The patch in https://issues.apache.org/jira/browse/KAFKA-1481 replaces
> > all _ and - characters where they are used as delimiters in MBean names
> > with a "|" character.  Because the "I" character is not allowed in topic
> > names, consumer groups, host names, splitting on this new and unified
> > delimiter works.
> >
> > I hope this explains the problem, the solution, and that this can make it
> > in the next 0.8.x.
> >
> > Otis
> > --
> > Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> > Solr & Elasticsearch Support * http://sematext.com/
> >
> >
>


Re: Need Document and Explanation Of New Metrics Name in New Java Producer on Kafka Trunk

2014-09-15 Thread Bhavesh Mistry
Hi Jun,

I also wanted to highlight the implication of having a dot "." as part of
the topic and client.id with ".".   Both topic name and client id need rule
enforcement.

JMX Bean names are not correct:

eg:

I have  topics  called "topic.dot"  and client.id as "my.client".

JMX Bean name is:

kafka.producer.*topic*:type=*dot*
kafka.producer.*my.client*.topic.topic:type=dot


With this developing a generic tool to flush the JMX values to some
reporting tool  for monitoring will be very hard to develop.  Please let me
know your recommendation. We had similar issues in Kafka old producer.

At my company, we have convention to use "." to separation out
functionality of website.  eg "search.log"  "checkout.log"  etc.  Please
define/enforce rule for topic name and client.id.  (Also, you might have to
define a process of migrating topic name and client.id with new rules that
you enforce with new producer).

Thanks,

Bhavesh

On Mon, Sep 15, 2014 at 5:11 PM, Jun Rao  wrote:

> Yes, that description is not precise. We do allow dots in general. However,
> a topic can't be just "." or "..".
>
> Thanks,
>
> Jun
>
> On Mon, Sep 15, 2014 at 9:31 AM, Michael G. Noll <
> michael+st...@michael-noll.com> wrote:
>
> > Ah -- I only tested with dots "."
> >
> > However, haven't you said earlier in this thread that only "-" and "_"
> are
> > allowed as special chars?  This would exclude dots, although in the error
> > message below they (dots) are said to be fine.  Is this a bug or a
> feature?
> > ;-)
> >
> > --Michael
> >
> >
> >
> > > On 14.09.2014, at 20:41, Jun Rao  wrote:
> > >
> > > Actually, we do give you an error if an invalid topic is created
> through
> > > CLI. When we add a create topic api, we can return the correct error
> code
> > > too.
> > >
> > > bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic="te+dd"
> > > --partitions 1 --replication-factor 1
> > >
> > > Error while executing topic command topic name te+dd is illegal,
> > contains a
> > > character other than ASCII alphanumerics, '.', '_' and '-'
> > >
> > > kafka.common.InvalidTopicException: topic name te+dd is illegal,
> > contains a
> > > character other than ASCII alphanumerics, '.', '_' and '-'
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Sun, Sep 14, 2014 at 1:46 AM, Michael G. Noll <
> > mich...@michael-noll.com>
> > > wrote:
> > >
> > >> Wouldn't it be helpful to throw an error or a warning if the user
> > >> tries to create a topic with an invalid name?  Currently neither the
> > >> API nor the CLI tools inform you that you are naming a topic in a way
> > >> you shouldn't.
> > >>
> > >> And as Otis pointed out elsewhere in this thread this ties back into
> > >> the JMX/MBean issues related the usage of "-"/dashes and
> > >> "_"/underscores, which are unfortunately the only non-alphanumeric
> > >> characters that are at your disposal to add reasonable "structure" to
> > >> your Kafka topic names. (
> > https://issues.apache.org/jira/browse/KAFKA-1481)
> > >>
> > >> Do you mind sharing your topic naming convention at LinkedIn?
> > >>
> > >> --Michael
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>> On 11.09.2014 00:24, Jun Rao wrote:
> > >>> We actually don't allow "." in the topic name. Topic name can be
> > >>> alpha-numeric plus "-" and "_".
> > >>>
> > >>> Thanks,
> > >>>
> > >>> Jun
> > >>>
> > >>> On Tue, Sep 9, 2014 at 6:29 PM, Bhavesh Mistry
> > >>>  wrote:
> > >>>
> > >>>> Thanks, I was using without JMX.  I will go through doc.  But how
> > >>>> about Topic or Metric name Topic Name Convention or Metric Name
> > >>>> Convention ?  The dot notation with topic having a ".".  Any
> > >>>> future plan to enforce some stand rules.
> > >>>>
> > >>>> Thanks,
> > >>>>
> > >>>> Bhavesh
> > >>>>
> > >>>> On Tue, Sep 9, 2014 at 3:38 PM, Jay Kreps 

[Java New Producer Configuration] Maximum time spent in Queue in Async mode

2014-09-11 Thread Bhavesh Mistry
Hi Kafka team,

How do I configure a max amount a message spend in Queue ?  In old
producer, there is property called queue.buffering.max.ms and it is not
present in new one.  Basically, if I just send one message that is less
than batch size,  what is amount of time message will be in local Producer
Queue ?

How do I control the time a message time spent in Queue with configuration
and batch size never reaches configuration limit  ?

Thanks,

Bhavesh


Re: message size limit

2014-09-10 Thread Bhavesh Mistry
Hi Jun,

Thank for highlighting.  Please correct me my understanding.  If the max
request size <= message.max.bytes then batch size will be optimally decided
( batch size will be determine by  either max.request.size limit or
batch.size which ever is less).

Here is configuration parameter I was referring to on new producer side and
Broker config.  Effectively eliminating the data loss issue or large amount
of messages (except for one the really exceed the limit).  That is what I
had asked for in previous email.

*New Producer Config:*
max.request.size=2MB
batch.size=1000

*Broker Config:*
message.max.bytes=2MB

Thanks,
Bhavesh

On Wed, Sep 10, 2014 at 10:10 AM, Jun Rao  wrote:

> Actually, with the new producer, you can configure the batch size in bytes.
> If you set the batch size to be smaller than the max message size, messages
> exceeding the max message limit will be in its own batch. Then, only one
> message will be rejected by the broker.
>
> Thanks,
>
> Jun
>
> On Tue, Sep 9, 2014 at 5:51 PM, Bhavesh Mistry  >
> wrote:
>
> > Hi Jun,
> >
> > Is there any plug-ability that Developer can customize batching logic or
> > inject custom code for this ? Shall I file Jira for this issues.
> >
> > Thanks,
> >
> > Bhavesh
> >
> > On Tue, Sep 9, 2014 at 3:52 PM, Jun Rao  wrote:
> >
> > > No, the new producer doesn't address that problem.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Tue, Sep 9, 2014 at 12:59 PM, Bhavesh Mistry <
> > > mistry.p.bhav...@gmail.com>
> > > wrote:
> > >
> > > > HI Jun,
> > > >
> > > > Thanks for clarification.  Follow up questions, does new producer
> solve
> > > the
> > > > issues highlight.  In event of compression and async mode in new
> > > producer,
> > > > will it break down messages to this UPPER limit and submit or new
> > > producer
> > > > strictly honor batch size.  I am just asking if compression batch
> size
> > > > message reaches this configured limit, does batch broken down to sub
> > > batch
> > > > that is within limit.  I would like to minimize the data loss due to
> > this
> > > > limit.
> > > >
> > > >
> > > > Thanks,
> > > >
> > > > Bhavesh
> > > >
> > > > On Mon, Sep 8, 2014 at 9:17 PM, Jun Rao  wrote:
> > > >
> > > > > Are you using compression in the producer? If so, message.max.bytes
> > > > applies
> > > > > to the compressed size of a batch of messages. Otherwise,
> > > > message.max.bytes
> > > > > applies to the size of each individual message.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > > On Wed, Sep 3, 2014 at 3:25 PM, Bhavesh Mistry <
> > > > mistry.p.bhav...@gmail.com
> > > > > >
> > > > > wrote:
> > > > >
> > > > > > I am referring to wiki
> > http://kafka.apache.org/08/configuration.html
> > > > and
> > > > > > following parameter control max batch message bytes as far as I
> > know.
> > > > > > Kafka Community, please correct me if I am wrong.  I do not want
> to
> > > > > create
> > > > > > confusion for Kafka User Community here.   Also, if you increase
> > this
> > > > > limit
> > > > > > than you have to set the corresponding limit increase on consumer
> > > side
> > > > as
> > > > > > well (fetch.message.max.bytes).
> > > > > >
> > > > > > Since we are using batch async mode, our messages are getting
> drop
> > > > > sometime
> > > > > > if the entire batch bytes  exceed this limit so I was asking
> Kafka
> > > > > > Developers if any optimal way to determine the batch size based
> on
> > > this
> > > > > > limit to minimize the data loss. Because, entire batch is
> rejected
> > by
> > > > > > brokers.
> > > > > >
> > > > > > message.max.bytes 100 The maximum size of a message that the
> > > server
> > > > > can
> > > > > > receive. It is important that this property be in sync with the
> > > maximum
> > > > > > fetch size your consumers use or else an unruly producer will be
> > able
&g

Re: Need Document and Explanation Of New Metrics Name in New Java Producer on Kafka Trunk

2014-09-09 Thread Bhavesh Mistry
Thanks, I was using without JMX.  I will go through doc.  But how about
Topic or Metric name Topic Name Convention or Metric Name Convention ?  The
dot notation with topic having a ".".  Any future plan to enforce some
stand rules.

Thanks,

Bhavesh

On Tue, Sep 9, 2014 at 3:38 PM, Jay Kreps  wrote:

> Hi Bhavesh,
>
> Each of those JMX attributes comes with documentation. If you open up
> jconsole and attach to a jvm running the consumer you should be able
> to read the descriptions for each attribute.
>
> -Jay
>
> On Tue, Sep 9, 2014 at 2:07 PM, Bhavesh Mistry
>  wrote:
> > Kafka Team,
> >
> > Can you please let me know what each of following Metrics means ?  Some
> of
> > them are obvious, but some are hard to understand. My Topic name is
> > *TOPIC_NAME*.
> >
> >
> >
> > can we enforce a Topic Name Convention or Metric Name Convention.
> Because
> > in previous version of Kafka, we have similar issue of parsing Kafka
> > Metrics name with host name issue (codahale lib) .  I have topic name
> with
> > “.”  So, it is hard to distinguish metric name and topic.   Also,  when
> > guys get chance I would appreciate if you guys can explain metric
> > description on wiki so community would know what to monitor.  Please see
> > below for full list of metrics from new producer.
> >
> >
> > Thanks,
> >
> > Bhavesh
> >
> >
> > record-queue-time-avg NaN
> > *node-1.*request-latency-max -Infinity
> > record-size-max -Infinity
> > *node-1.*incoming-byte-rate NaN
> > request-size-avg NaN
> > *node-1.*request-latency-avg NaN
> > *node-2.*request-size-avg NaN
> > requests-in-flight 0.0
> > bufferpool-wait-ratio NaN
> > network-io-rate NaN
> > metadata-age 239.828
> > records-per-request-avg NaN
> > record-retry-rate NaN
> > buffer-total-bytes 6.7108864E7
> > buffer-available-bytes 6.7108864E7
> > topic.*TOPIC_NAME*.record-error-rate NaN
> > record-send-rate NaN
> > select-rate NaN
> > node-2.outgoing-byte-rate NaN
> > topic.*TOPIC_NAME*.record-retry-rate NaN
> > batch-size-max -Infinity
> > connection-creation-rate NaN
> > node-1.outgoing-byte-rate NaN
> > topic.*TOPIC_NAME*.byte-rate NaN
> > waiting-threads 0.0
> > batch-size-avg NaN
> > io-wait-ratio NaN
> > io-wait-time-ns-avg NaN
> > io-ratio NaN
> > topic.TOPIC_NAME.record-send-rate NaN
> > request-size-max -Infinity
> > record-size-avg NaN
> > request-latency-max -Infinity
> > node-2.request-latency-max -Infinity
> > record-queue-time-max -Infinity
> > node-2.response-rate NaN
> > node-1.request-rate NaN
> > node-1.request-size-max -Infinity
> > connection-count 3.0
> > incoming-byte-rate NaN
> > compression-rate-avg NaN
> > request-rate NaN
> > node-1.response-rate NaN
> > node-2.request-latency-avg NaN
> > request-latency-avg NaN
> > record-error-rate NaN
> > connection-close-rate NaN
> > *node-2.*request-size-max -Infinity
> > topic.TOPIC_NAME.compression-rate NaN
> > node-2.incoming-byte-rate NaN
> > node-1.request-size-avg NaN
> > io-time-ns-avg NaN
> > outgoing-byte-rate NaN
> > *node-2*.request-rate NaN
> > response-rate NaN
>


Re: message size limit

2014-09-09 Thread Bhavesh Mistry
Hi Jun,

Is there any plug-ability that Developer can customize batching logic or
inject custom code for this ? Shall I file Jira for this issues.

Thanks,

Bhavesh

On Tue, Sep 9, 2014 at 3:52 PM, Jun Rao  wrote:

> No, the new producer doesn't address that problem.
>
> Thanks,
>
> Jun
>
> On Tue, Sep 9, 2014 at 12:59 PM, Bhavesh Mistry <
> mistry.p.bhav...@gmail.com>
> wrote:
>
> > HI Jun,
> >
> > Thanks for clarification.  Follow up questions, does new producer solve
> the
> > issues highlight.  In event of compression and async mode in new
> producer,
> > will it break down messages to this UPPER limit and submit or new
> producer
> > strictly honor batch size.  I am just asking if compression batch size
> > message reaches this configured limit, does batch broken down to sub
> batch
> > that is within limit.  I would like to minimize the data loss due to this
> > limit.
> >
> >
> > Thanks,
> >
> > Bhavesh
> >
> > On Mon, Sep 8, 2014 at 9:17 PM, Jun Rao  wrote:
> >
> > > Are you using compression in the producer? If so, message.max.bytes
> > applies
> > > to the compressed size of a batch of messages. Otherwise,
> > message.max.bytes
> > > applies to the size of each individual message.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Wed, Sep 3, 2014 at 3:25 PM, Bhavesh Mistry <
> > mistry.p.bhav...@gmail.com
> > > >
> > > wrote:
> > >
> > > > I am referring to wiki http://kafka.apache.org/08/configuration.html
> > and
> > > > following parameter control max batch message bytes as far as I know.
> > > > Kafka Community, please correct me if I am wrong.  I do not want to
> > > create
> > > > confusion for Kafka User Community here.   Also, if you increase this
> > > limit
> > > > than you have to set the corresponding limit increase on consumer
> side
> > as
> > > > well (fetch.message.max.bytes).
> > > >
> > > > Since we are using batch async mode, our messages are getting drop
> > > sometime
> > > > if the entire batch bytes  exceed this limit so I was asking Kafka
> > > > Developers if any optimal way to determine the batch size based on
> this
> > > > limit to minimize the data loss. Because, entire batch is rejected by
> > > > brokers.
> > > >
> > > > message.max.bytes 100 The maximum size of a message that the
> server
> > > can
> > > > receive. It is important that this property be in sync with the
> maximum
> > > > fetch size your consumers use or else an unruly producer will be able
> > to
> > > > publish messages too large for consumers to consume.
> > > >
> > > > Thanks,
> > > >
> > > > Bhavesh
> > > >
> > > >
> > > > On Wed, Sep 3, 2014 at 2:59 PM, Alexis Midon <
> > > > alexis.mi...@airbedandbreakfast.com> wrote:
> > > >
> > > > > Hi Bhavesh
> > > > >
> > > > > can you explain what limit you're referring to?
> > > > > I'm asking because `message.max.bytes` is applied per message not
> per
> > > > > batch.
> > > > > is there another limit I should be aware of?
> > > > >
> > > > > thanks
> > > > >
> > > > >
> > > > > On Wed, Sep 3, 2014 at 2:07 PM, Bhavesh Mistry <
> > > > mistry.p.bhav...@gmail.com
> > > > > >
> > > > > wrote:
> > > > >
> > > > > > Hi Jun,
> > > > > >
> > > > > > We have similar problem.  We have variable length of messages.
> So
> > > when
> > > > > we
> > > > > > have fixed size of Batch sometime the batch exceed the limit set
> on
> > > the
> > > > > > brokers (2MB).
> > > > > >
> > > > > > So can Producer have some extra logic to determine the optimal
> > batch
> > > > size
> > > > > > by looking at configured message.max.bytes  value.
> > > > > >
> > > > > > During the metadata update, Producer will get this value from the
> > > > Broker
> > > > > > for each topic and Producer will check if current batch size
> reach
> > > this
> > > > > > limit than 

Need Document and Explanation Of New Metrics Name in New Java Producer on Kafka Trunk

2014-09-09 Thread Bhavesh Mistry
Kafka Team,

Can you please let me know what each of following Metrics means ?  Some of
them are obvious, but some are hard to understand. My Topic name is
*TOPIC_NAME*.



can we enforce a Topic Name Convention or Metric Name Convention.  Because
in previous version of Kafka, we have similar issue of parsing Kafka
Metrics name with host name issue (codahale lib) .  I have topic name with
“.”  So, it is hard to distinguish metric name and topic.   Also,  when
guys get chance I would appreciate if you guys can explain metric
description on wiki so community would know what to monitor.  Please see
below for full list of metrics from new producer.


Thanks,

Bhavesh


record-queue-time-avg NaN
*node-1.*request-latency-max -Infinity
record-size-max -Infinity
*node-1.*incoming-byte-rate NaN
request-size-avg NaN
*node-1.*request-latency-avg NaN
*node-2.*request-size-avg NaN
requests-in-flight 0.0
bufferpool-wait-ratio NaN
network-io-rate NaN
metadata-age 239.828
records-per-request-avg NaN
record-retry-rate NaN
buffer-total-bytes 6.7108864E7
buffer-available-bytes 6.7108864E7
topic.*TOPIC_NAME*.record-error-rate NaN
record-send-rate NaN
select-rate NaN
node-2.outgoing-byte-rate NaN
topic.*TOPIC_NAME*.record-retry-rate NaN
batch-size-max -Infinity
connection-creation-rate NaN
node-1.outgoing-byte-rate NaN
topic.*TOPIC_NAME*.byte-rate NaN
waiting-threads 0.0
batch-size-avg NaN
io-wait-ratio NaN
io-wait-time-ns-avg NaN
io-ratio NaN
topic.TOPIC_NAME.record-send-rate NaN
request-size-max -Infinity
record-size-avg NaN
request-latency-max -Infinity
node-2.request-latency-max -Infinity
record-queue-time-max -Infinity
node-2.response-rate NaN
node-1.request-rate NaN
node-1.request-size-max -Infinity
connection-count 3.0
incoming-byte-rate NaN
compression-rate-avg NaN
request-rate NaN
node-1.response-rate NaN
node-2.request-latency-avg NaN
request-latency-avg NaN
record-error-rate NaN
connection-close-rate NaN
*node-2.*request-size-max -Infinity
topic.TOPIC_NAME.compression-rate NaN
node-2.incoming-byte-rate NaN
node-1.request-size-avg NaN
io-time-ns-avg NaN
outgoing-byte-rate NaN
*node-2*.request-rate NaN
response-rate NaN


Re: message size limit

2014-09-09 Thread Bhavesh Mistry
HI Jun,

Thanks for clarification.  Follow up questions, does new producer solve the
issues highlight.  In event of compression and async mode in new producer,
will it break down messages to this UPPER limit and submit or new producer
strictly honor batch size.  I am just asking if compression batch size
message reaches this configured limit, does batch broken down to sub batch
that is within limit.  I would like to minimize the data loss due to this
limit.


Thanks,

Bhavesh

On Mon, Sep 8, 2014 at 9:17 PM, Jun Rao  wrote:

> Are you using compression in the producer? If so, message.max.bytes applies
> to the compressed size of a batch of messages. Otherwise, message.max.bytes
> applies to the size of each individual message.
>
> Thanks,
>
> Jun
>
> On Wed, Sep 3, 2014 at 3:25 PM, Bhavesh Mistry  >
> wrote:
>
> > I am referring to wiki http://kafka.apache.org/08/configuration.html and
> > following parameter control max batch message bytes as far as I know.
> > Kafka Community, please correct me if I am wrong.  I do not want to
> create
> > confusion for Kafka User Community here.   Also, if you increase this
> limit
> > than you have to set the corresponding limit increase on consumer side as
> > well (fetch.message.max.bytes).
> >
> > Since we are using batch async mode, our messages are getting drop
> sometime
> > if the entire batch bytes  exceed this limit so I was asking Kafka
> > Developers if any optimal way to determine the batch size based on this
> > limit to minimize the data loss. Because, entire batch is rejected by
> > brokers.
> >
> > message.max.bytes 100 The maximum size of a message that the server
> can
> > receive. It is important that this property be in sync with the maximum
> > fetch size your consumers use or else an unruly producer will be able to
> > publish messages too large for consumers to consume.
> >
> > Thanks,
> >
> > Bhavesh
> >
> >
> > On Wed, Sep 3, 2014 at 2:59 PM, Alexis Midon <
> > alexis.mi...@airbedandbreakfast.com> wrote:
> >
> > > Hi Bhavesh
> > >
> > > can you explain what limit you're referring to?
> > > I'm asking because `message.max.bytes` is applied per message not per
> > > batch.
> > > is there another limit I should be aware of?
> > >
> > > thanks
> > >
> > >
> > > On Wed, Sep 3, 2014 at 2:07 PM, Bhavesh Mistry <
> > mistry.p.bhav...@gmail.com
> > > >
> > > wrote:
> > >
> > > > Hi Jun,
> > > >
> > > > We have similar problem.  We have variable length of messages.  So
> when
> > > we
> > > > have fixed size of Batch sometime the batch exceed the limit set on
> the
> > > > brokers (2MB).
> > > >
> > > > So can Producer have some extra logic to determine the optimal batch
> > size
> > > > by looking at configured message.max.bytes  value.
> > > >
> > > > During the metadata update, Producer will get this value from the
> > Broker
> > > > for each topic and Producer will check if current batch size reach
> this
> > > > limit than break batch into smaller chunk such way that It would not
> > > exceed
> > > > limit (unless single message exceed the limit). Basically try to
> avoid
> > > data
> > > > loss as much as possible.
> > > >
> > > > Please let me know what is your opinion on this...
> > > >
> > > > Thanks,
> > > >
> > > > Bhavesh
> > > >
> > > >
> > > > On Wed, Sep 3, 2014 at 6:21 AM, Alexis Midon <
> > > > alexis.mi...@airbedandbreakfast.com> wrote:
> > > >
> > > > > Thanks Jun.
> > > > >
> > > > > I'll create a jira and try to provide a patch. I think this is
> pretty
> > > > > serious.
> > > > >
> > > > > On Friday, August 29, 2014, Jun Rao  wrote:
> > > > >
> > > > > > The goal of batching is mostly to reduce the # RPC calls to the
> > > broker.
> > > > > If
> > > > > > compression is enabled, a larger batch typically implies better
> > > > > compression
> > > > > > ratio.
> > > > > >
> > > > > > The reason that we have to fail the whole batch is that the error
> > > code
> > > > in
> > > > > > the produce response is per partition, instead of per 

Re: High Level Consumer and Commit

2014-09-03 Thread Bhavesh Mistry
Thanks got the idea !!  But it will create a fragments for example

Main Thread  reads 0-50 messages give to Thread 1 for bulk index and commit
0 to 50 offset...
Main Thread  reads 51-100 message give to Thread 2 for bulk index and
commit 51 100 offset...

So Zookeeper might have offset that will overridden by thread that finish
first Indexing (Thread 2 will finish first and Thread 1 will commit offset
then ZK will have older offset).  I guess it does not matter in our case
since indexing same document will over write.

Thanks,

Bhavesh



On Wed, Sep 3, 2014 at 3:20 PM, Gwen Shapira  wrote:

> Thanks, Balaji!
>
> It looks like your approach depends on specific implementation
> details, such as the directory structure in ZK.
> In this case it doesn't matter much since the APIs are not stable yet,
> but in general, wouldn't you prefer to use public APIs, even if it
> means multiple consumers without threads?
>
> Gwen
>
> On Wed, Sep 3, 2014 at 3:06 PM, Seshadri, Balaji
>  wrote:
> > We can still do with single ConsumerConnector with multiple threads.
> >
> > Each thread updates its own data in zookeeper.The below one is our own
> implementation of commitOffset.
> >
> > public void commitOffset(DESMetadata metaData) {
> > log.debug("Update offsets only for ->"+
> metaData.toString());
> > String key =
> metaData.getTopic()+"/"+metaData.getPartitionNumber();
> > Long nextOffset = metaData.getOffSet()+1;
> > if(nextOffset!=checkPointedOffset.get(key)){
> > ZKGroupTopicDirs topicDirs = new
> ZKGroupTopicDirs(metaData.getGroupId(),metaData.getTopic());
> > ZkUtils.updatePersistentPath(zkClient,
> topicDirs.consumerOffsetDir()+"/"+metaData.getPartitionNumber(),nextOffset+"");
> > checkPointedOffset.put(key,nextOffset);
> > }
> > }
> >
> > -Original Message-
> > From: Gwen Shapira [mailto:gshap...@cloudera.com]
> > Sent: Tuesday, September 02, 2014 11:38 PM
> > To: users@kafka.apache.org; Philip O'Toole
> > Subject: Re: High Level Consumer and Commit
> >
> > I believe a simpler solution would be to create multiple
> ConsumerConnector, each with 1 thread (single ConsumerStream) and use
> commitOffset API to commit all partitions managed by each ConsumerConnector
> after the thread finished processing the messages.
> >
> > Does that solve the problem, Bhavesh?
> >
> > Gwen
> >
> > On Tue, Sep 2, 2014 at 5:47 PM, Philip O'Toole
>  wrote:
> >> Yeah, from reading that I suspect you need the SimpleConsumer. Try it
> out and see.
> >>
> >> Philip
> >>
> >>
> >> -
> >> http://www.philipotoole.com
> >>
> >>
> >> On Tuesday, September 2, 2014 5:43 PM, Bhavesh Mistry <
> mistry.p.bhav...@gmail.com> wrote:
> >>
> >>
> >>
> >> Hi Philip,
> >>
> >> Yes, We have disabled auto commit but, we need to be able to read from
> >> particular offset if we manage the offset ourself in some storage(DB).
> >> High Level consumer does not allow per partition management
> plug-ability.
> >>
> >> I like to have the High Level consumers Failover and auto rebalancing.
> >> We just need plug ability of offset management.
> >>
> >> Thanks,
> >>
> >> Bhavesh
> >>
> >>
> >>
> >> On Tue, Sep 2, 2014 at 5:20 PM, Philip O'Toole <
> >> philip.oto...@yahoo.com.invalid> wrote:
> >>
> >>> No, you'll need to write your own failover.
> >>>
> >>> I'm not sure I follow your second question, but the high-level
> >>> Consumer should be able to do what you want if you disable
> >>> auto-commit. I'm not sure what else you're asking.
> >>>
> >>>
> >>> Philip
> >>>
> >>>
> >>> -
> >>> http://www.philipotoole.com
> >>>
> >>>
> >>> On Tuesday, September 2, 2014 5:15 PM, Bhavesh Mistry <
> >>> mistry.p.bhav...@gmail.com> wrote:
> >>>
> >>>
> >>>
> >>> Hi Philip,
> >>>
> >>> Thanks for the update.  With Simple Consumer I will not get failover
> >>> and rebalance that is provided out of box.  what is other option not
> >>> to block read

Re: message size limit

2014-09-03 Thread Bhavesh Mistry
I am referring to wiki http://kafka.apache.org/08/configuration.html and
following parameter control max batch message bytes as far as I know.
Kafka Community, please correct me if I am wrong.  I do not want to create
confusion for Kafka User Community here.   Also, if you increase this limit
than you have to set the corresponding limit increase on consumer side as
well (fetch.message.max.bytes).

Since we are using batch async mode, our messages are getting drop sometime
if the entire batch bytes  exceed this limit so I was asking Kafka
Developers if any optimal way to determine the batch size based on this
limit to minimize the data loss. Because, entire batch is rejected by
brokers.

message.max.bytes 100 The maximum size of a message that the server can
receive. It is important that this property be in sync with the maximum
fetch size your consumers use or else an unruly producer will be able to
publish messages too large for consumers to consume.

Thanks,

Bhavesh


On Wed, Sep 3, 2014 at 2:59 PM, Alexis Midon <
alexis.mi...@airbedandbreakfast.com> wrote:

> Hi Bhavesh
>
> can you explain what limit you're referring to?
> I'm asking because `message.max.bytes` is applied per message not per
> batch.
> is there another limit I should be aware of?
>
> thanks
>
>
> On Wed, Sep 3, 2014 at 2:07 PM, Bhavesh Mistry  >
> wrote:
>
> > Hi Jun,
> >
> > We have similar problem.  We have variable length of messages.  So when
> we
> > have fixed size of Batch sometime the batch exceed the limit set on the
> > brokers (2MB).
> >
> > So can Producer have some extra logic to determine the optimal batch size
> > by looking at configured message.max.bytes  value.
> >
> > During the metadata update, Producer will get this value from the Broker
> > for each topic and Producer will check if current batch size reach this
> > limit than break batch into smaller chunk such way that It would not
> exceed
> > limit (unless single message exceed the limit). Basically try to avoid
> data
> > loss as much as possible.
> >
> > Please let me know what is your opinion on this...
> >
> > Thanks,
> >
> > Bhavesh
> >
> >
> > On Wed, Sep 3, 2014 at 6:21 AM, Alexis Midon <
> > alexis.mi...@airbedandbreakfast.com> wrote:
> >
> > > Thanks Jun.
> > >
> > > I'll create a jira and try to provide a patch. I think this is pretty
> > > serious.
> > >
> > > On Friday, August 29, 2014, Jun Rao  wrote:
> > >
> > > > The goal of batching is mostly to reduce the # RPC calls to the
> broker.
> > > If
> > > > compression is enabled, a larger batch typically implies better
> > > compression
> > > > ratio.
> > > >
> > > > The reason that we have to fail the whole batch is that the error
> code
> > in
> > > > the produce response is per partition, instead of per message.
> > > >
> > > > Retrying individual messages on MessageSizeTooLarge seems reasonable.
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > >
> > > > On Fri, Aug 29, 2014 at 4:28 PM, Alexis Midon <
> > > > alexis.mi...@airbedandbreakfast.com > wrote:
> > > >
> > > > > Could you explain the goals of batches? I was assuming this was
> > simply
> > > a
> > > > > performance optimization, but this behavior makes me think I'm
> > missing
> > > > > something.
> > > > > is a batch more than a list of *independent* messages?
> > > > >
> > > > > Why would you reject the whole batch? One invalid message causes
> the
> > > loss
> > > > > of batch.num.messages-1 messages :(
> > > > > It seems pretty critical to me.
> > > > >
> > > > > If ack=0, the producer will never know about it.
> > > > > If ack !=0, the producer will retry the whole batch. If the issue
> was
> > > > > related to data corruption (etc), retries might work. But in the
> case
> > > of
> > > > > "big message", the batch will always be rejected and the producer
> > will
> > > > give
> > > > > up.
> > > > >
> > > > > If the messages are indeed considered independent, I think this is
> a
> > > > pretty
> > > > > serious issue.
> > > > >
> > > > > I see 2 possible fix approaches:
> > &g

Re: message size limit

2014-09-03 Thread Bhavesh Mistry
Hi Jun,

We have similar problem.  We have variable length of messages.  So when we
have fixed size of Batch sometime the batch exceed the limit set on the
brokers (2MB).

So can Producer have some extra logic to determine the optimal batch size
by looking at configured message.max.bytes  value.

During the metadata update, Producer will get this value from the Broker
for each topic and Producer will check if current batch size reach this
limit than break batch into smaller chunk such way that It would not exceed
limit (unless single message exceed the limit). Basically try to avoid data
loss as much as possible.

Please let me know what is your opinion on this...

Thanks,

Bhavesh


On Wed, Sep 3, 2014 at 6:21 AM, Alexis Midon <
alexis.mi...@airbedandbreakfast.com> wrote:

> Thanks Jun.
>
> I'll create a jira and try to provide a patch. I think this is pretty
> serious.
>
> On Friday, August 29, 2014, Jun Rao  wrote:
>
> > The goal of batching is mostly to reduce the # RPC calls to the broker.
> If
> > compression is enabled, a larger batch typically implies better
> compression
> > ratio.
> >
> > The reason that we have to fail the whole batch is that the error code in
> > the produce response is per partition, instead of per message.
> >
> > Retrying individual messages on MessageSizeTooLarge seems reasonable.
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Fri, Aug 29, 2014 at 4:28 PM, Alexis Midon <
> > alexis.mi...@airbedandbreakfast.com > wrote:
> >
> > > Could you explain the goals of batches? I was assuming this was simply
> a
> > > performance optimization, but this behavior makes me think I'm missing
> > > something.
> > > is a batch more than a list of *independent* messages?
> > >
> > > Why would you reject the whole batch? One invalid message causes the
> loss
> > > of batch.num.messages-1 messages :(
> > > It seems pretty critical to me.
> > >
> > > If ack=0, the producer will never know about it.
> > > If ack !=0, the producer will retry the whole batch. If the issue was
> > > related to data corruption (etc), retries might work. But in the case
> of
> > > "big message", the batch will always be rejected and the producer will
> > give
> > > up.
> > >
> > > If the messages are indeed considered independent, I think this is a
> > pretty
> > > serious issue.
> > >
> > > I see 2 possible fix approaches:
> > > - the broker could reject only the invalid messages
> > > - the broker could reject the whole batch (like today) but the producer
> > (if
> > > ack!=0) could retry messages one at a time on exception like
> > > "MessageSizeTooLarge".
> > >
> > > opinions?
> > >
> > > Alexis
> > >
> > > ```
> > > [2014-08-29 16:00:35,170] WARN Produce request with correlation id 46
> > > failed due to [test,1]: kafka.common.MessageSizeTooLargeException
> > > (kafka.producer.async.DefaultEventHandler)
> > > [2014-08-29 16:00:35,284] WARN Produce request with correlation id 51
> > > failed due to [test,0]: kafka.common.MessageSizeTooLargeException
> > > (kafka.producer.async.DefaultEventHandler)
> > > [2014-08-29 16:00:35,392] WARN Produce request with correlation id 56
> > > failed due to [test,0]: kafka.common.MessageSizeTooLargeException
> > > (kafka.producer.async.DefaultEventHandler)
> > > [2014-08-29 16:00:35,499] WARN Produce request with correlation id 61
> > > failed due to [test,1]: kafka.common.MessageSizeTooLargeException
> > > (kafka.producer.async.DefaultEventHandler)
> > > [2014-08-29 16:00:35,603] ERROR Failed to send requests for topics test
> > > with correlation ids in [43,62]
> > (kafka.producer.async.DefaultEventHandler)
> > > [2014-08-29 16:00:35,603] ERROR Error in handling batch of 3 events
> > > (kafka.producer.async.ProducerSendThread)
> > > kafka.common.FailedToSendMessageException: Failed to send messages
> after
> > 3
> > > tries.
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
> > >  at
> > >
> > >
> >
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
> > > at
> > >
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
> > >  at
> > >
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
> > > ```
> > >
> > >
> > > On Thu, Aug 28, 2014 at 7:13 AM, Jun Rao  > > wrote:
> > >
> > > > That's right. If one message in a batch exceeds the size limit, the
> > whole
> > > > batch is rejected.
> > > >
> > > > When determining message.max.bytes, the most important thing to
> > consider
> > > is
> > > > probably memory since currently we need to allocate memory for a full
> > > > message in the broker and the producer and the consumer client.
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > >
> > > > On Wed, Aug 27, 2014 at 9:52 PM, Alexis Midon <
> > > > alexis.mi...@airbedandbreakfast.com > wrote:
> > > >
> > > > > am I miss reading this loop:
> > > > >
> > > > >
> > > >
> > >
> >
>

Re: High Level Consumer and Commit

2014-09-02 Thread Bhavesh Mistry
Hi Philip,

Yes, We have disabled auto commit but, we need to be able to read from
particular offset if we manage the offset ourself in some storage(DB).
 High Level consumer does not allow per partition management plug-ability.

I like to have the High Level consumers Failover and auto rebalancing.  We
just need plug ability of offset management.

Thanks,

Bhavesh


On Tue, Sep 2, 2014 at 5:20 PM, Philip O'Toole <
philip.oto...@yahoo.com.invalid> wrote:

> No, you'll need to write your own failover.
>
> I'm not sure I follow your second question, but the high-level Consumer
> should be able to do what you want if you disable auto-commit. I'm not sure
> what else you're asking.
>
>
> Philip
>
>
> -
> http://www.philipotoole.com
>
>
> On Tuesday, September 2, 2014 5:15 PM, Bhavesh Mistry <
> mistry.p.bhav...@gmail.com> wrote:
>
>
>
> Hi Philip,
>
> Thanks for the update.  With Simple Consumer I will not get failover and
> rebalance that is provided out of box.  what is other option not to block
> reading and keep processing and commit only when batch is done.
>
> Thanks,
>
> Bhavesh
>
>
>
> On Tue, Sep 2, 2014 at 4:43 PM, Philip O'Toole <
> philip.oto...@yahoo.com.invalid> wrote:
>
> > Either use the SimpleConsumer which gives you much finer-grained control,
> > or (this worked with 0.7) spin up a ConsumerConnection (this is a
> HighLevel
> > consumer concept) per partition, turn off auto-commit.
> >
> > Philip
> >
> >
> > -
> > http://www.philipotoole.com
> >
> >
> > On Tuesday, September 2, 2014 4:38 PM, Bhavesh Mistry <
> > mistry.p.bhav...@gmail.com> wrote:
> >
> >
> >
> > Hi Kafka Group,
> >
> > I have to pull the data from the Topic and index into Elastic Search with
> > Bulk API and wanted to commit only batch that has been committed and
> still
> > continue to read from topic further on same topic.  I have auto commit to
> > be off.
> >
> >
> > List  batch .
> >
> > while (iterator.hasNext()) {
> > batch.add(iterator.next().message());
> > if(batch size is 50 ){
> >   //===>>>>  Once the bulk API is successful it will commit the
> offset
> > to zookeeper...
> >   executor.submit(new Thread() process batch and commit batch,
> > cconsumerConnector)
> >   batch = new batch buffer
> >}
> > }
> >
> > This commitOffset API commits all messages that have been read so far.
> > What is best way to continue reading and only commit another thread
> finish
> > batch process is successful.  This will lead to fragmentation of the
> > Consumer offset so what is best way to implement continuous reading
> stream
> > and commit the rage offset.
> >
> > Is Simple Consumer a better approach for this.
> >
> >
> > Thanks,
> >
> > Bhavesh
> >
> >
> >
> >
> >
> >
> >
> > Thanks,
> > Bhavesh
> >
>


Re: High Level Consumer and Commit

2014-09-02 Thread Bhavesh Mistry
Hi Philip,

Thanks for the update.  With Simple Consumer I will not get failover and
rebalance that is provided out of box.  what is other option not to block
reading and keep processing and commit only when batch is done.

Thanks,

Bhavesh


On Tue, Sep 2, 2014 at 4:43 PM, Philip O'Toole <
philip.oto...@yahoo.com.invalid> wrote:

> Either use the SimpleConsumer which gives you much finer-grained control,
> or (this worked with 0.7) spin up a ConsumerConnection (this is a HighLevel
> consumer concept) per partition, turn off auto-commit.
>
> Philip
>
>
> -
> http://www.philipotoole.com
>
>
> On Tuesday, September 2, 2014 4:38 PM, Bhavesh Mistry <
> mistry.p.bhav...@gmail.com> wrote:
>
>
>
> Hi Kafka Group,
>
> I have to pull the data from the Topic and index into Elastic Search with
> Bulk API and wanted to commit only batch that has been committed and still
> continue to read from topic further on same topic.  I have auto commit to
> be off.
>
>
> List  batch .
>
> while (iterator.hasNext()) {
> batch.add(iterator.next().message());
> if(batch size is 50 ){
>   //===>>>>  Once the bulk API is successful it will commit the offset
> to zookeeper...
>   executor.submit(new Thread() process batch and commit batch,
> cconsumerConnector)
>   batch = new batch buffer
>}
> }
>
> This commitOffset API commits all messages that have been read so far.
> What is best way to continue reading and only commit another thread finish
> batch process is successful.  This will lead to fragmentation of the
> Consumer offset so what is best way to implement continuous reading stream
> and commit the rage offset.
>
> Is Simple Consumer a better approach for this.
>
>
> Thanks,
>
> Bhavesh
>
>
>
>
>
>
>
> Thanks,
> Bhavesh
>


High Level Consumer and Commit

2014-09-02 Thread Bhavesh Mistry
Hi Kafka Group,

I have to pull the data from the Topic and index into Elastic Search with
Bulk API and wanted to commit only batch that has been committed and still
continue to read from topic further on same topic.  I have auto commit to
be off.


List  batch .

while (iterator.hasNext()) {
 batch.add(iterator.next().message());
 if(batch size is 50 ){
  //===  Once the bulk API is successful it will commit the offset
to zookeeper...
  executor.submit(new Thread() process batch and commit batch,
cconsumerConnector)
  batch = new batch buffer
   }
}

This commitOffset API commits all messages that have been read so far.
 What is best way to continue reading and only commit another thread finish
batch process is successful.  This will lead to fragmentation of the
Consumer offset so what is best way to implement continuous reading stream
and commit the rage offset.

Is Simple Consumer a better approach for this.


Thanks,

Bhavesh







Thanks,
Bhavesh


[Kafka MirrorMaker] Message with Custom Partition Logic

2014-08-11 Thread Bhavesh Mistry
HI Kafka Dev Team,



We have to aggregate events (count) per DC and across DCs for one of topic.
We have standard Linked-in data pipe line producers --> Local Brokers -->
MM -->  Center Brokers.



So I would like to know How MM handles messages when custom partitioning
logic is used as below and number of partition in target DC is SAME vs
 different
than the source DC  ?



If we have key based messages and custom partitioning logic ( hash(key)  %
number of partition per topic source topic)  we want to count event  similar
event by hashing to same partition and count events, and but when same
event is MM to target DC will it go to same partition even though number of
partition is different in target DC  (meaning does MM will use hash(key
message) % number of partition) ?



According to this reference, I do not have way to configure this or to
control which partitioning logic to use when MM data ?

https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=27846330


Thanks,



Bhavesh


Re: [New Feature Request] Ability to Inject Queue Implementation Async Mode

2014-08-11 Thread Bhavesh Mistry
Thank you all for the comments.  Yes, I understand concern from community
members with extra burden of having the complexity to drop message, but if
ability to inject implementation of the Queue which will make this
completely transparent to Kafka.

I just need fine-grained control of the application and queue. Kafka can do
it magic of transferring data. Hence, by default Kafka can just use default
behavior which today.  The injection  give flexibility to control buffer
and how data enqueue/dequeue priority  and addition to that also let us
runtime optimize the queue size without having to manage the complete life
cycle of the Producer.

I just presenting use cases where developer need a fine-grain control
queue.  I can create another buffer for same data in pointless in my
opinion.

Thanks for your support and suggestions.

Thanks.

Bhavesh




On Thu, Aug 7, 2014 at 3:00 PM, Philip O'Toole <
philip.oto...@yahoo.com.invalid> wrote:

> Policies for which messages to drop, retain, etc seem like something you
> should code in your application. I personally would not like to see this
> extra complexity added to Kafka.
>
> Philip
>
> --
> http://www.philipotoole.com
>
> > On Aug 7, 2014, at 2:44 PM, Bhavesh Mistry 
> wrote:
> >
> > Basically, requirement is to support message dropping policy in event
> when
> > queue is full.  When you get storm of data (in our case logging due to
> > buggy application code), we would like to retain current message instead
> of
> > first one in queue.   We will mitigate this by rate limiting on producer
> > side. Only thing is if Kafka allows the flexibility to inject
> > implementation, then developer have control over what to drop and retain
> > and also what to priorities.
> >
> > We would like to change tunable parameters (such as batch size and queue
> > size etc or non intrusive parameter that does not impact the life cycle
> of
> > the producer instance )   at runtime after producer instance is created.
> >
> > Thanks,
> >
> > Bhavesh
> >
> >> On Mon, Aug 4, 2014 at 7:05 PM, Joe Stein  wrote:
> >>
> >> Is it possible there is another solution to the problem? I think if you
> >> could better describe the problem(s) you are facing and how you are
> >> architected some then you may get responses from others that perhaps
> have
> >> faced the same problem with similar architectures ... or maybe folks can
> >> chime in with solution(s) to the problem(s).  When only being presented
> >> with solutions it is hard to say much about if it is problem that folks
> >> will have and if this solution will work for them.
> >>
> >> /***
> >> Joe Stein
> >> Founder, Principal Consultant
> >> Big Data Open Source Security LLC
> >> http://www.stealth.ly
> >> Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> >> /
> >>
> >>
> >> On Mon, Aug 4, 2014 at 8:52 PM, Bhavesh Mistry <
> mistry.p.bhav...@gmail.com
> >> wrote:
> >>
> >>> Kafka Version:  0.8.x
> >>>
> >>> 1) Ability to define which messages get drooped (least recently instead
> >> of
> >>> most recent in queue)
> >>> 2) Try Unbounded Queue to find out the Upper Limit without drooping any
> >>> messages for application (use case Stress test)
> >>> 3) Priority Blocking Queue ( meaning a single Producer can send
> messages
> >> to
> >>> multiple topic and I would like to give Delivery Priority to message
> for
> >>> particular message for topic)
> >>>
> >>> We have use case to support #3 and #1 since we would like to deliver
> the
> >>> Application Heartbeat first then any other event within the queue for
> any
> >>> topics. To lower TCP connections, we only use one producer for 4 topics
> >> but
> >>> one of the topics has priority for delivery.
> >>>
> >>> Please let me know if this is useful feature to have or not.
> >>>
> >>> Thanks in advance for great support !!
> >>>
> >>> Thanks,
> >>>
> >>> Bhavesh
> >>>
> >>> P.S.  Sorry for asking this question again, but last time there was no
> >>> conclusion.
> >>
>


Re: [New Feature Request] Ability to Inject Queue Implementation Async Mode

2014-08-07 Thread Bhavesh Mistry
Basically, requirement is to support message dropping policy in event when
queue is full.  When you get storm of data (in our case logging due to
buggy application code), we would like to retain current message instead of
first one in queue.   We will mitigate this by rate limiting on producer
side. Only thing is if Kafka allows the flexibility to inject
implementation, then developer have control over what to drop and retain
and also what to priorities.

We would like to change tunable parameters (such as batch size and queue
size etc or non intrusive parameter that does not impact the life cycle of
the producer instance )   at runtime after producer instance is created.

Thanks,

Bhavesh

On Mon, Aug 4, 2014 at 7:05 PM, Joe Stein  wrote:

> Is it possible there is another solution to the problem? I think if you
> could better describe the problem(s) you are facing and how you are
> architected some then you may get responses from others that perhaps have
> faced the same problem with similar architectures ... or maybe folks can
> chime in with solution(s) to the problem(s).  When only being presented
> with solutions it is hard to say much about if it is problem that folks
> will have and if this solution will work for them.
>
> /***
>  Joe Stein
>  Founder, Principal Consultant
>  Big Data Open Source Security LLC
>  http://www.stealth.ly
>  Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> ********/
>
>
> On Mon, Aug 4, 2014 at 8:52 PM, Bhavesh Mistry  >
> wrote:
>
> > Kafka Version:  0.8.x
> >
> > 1) Ability to define which messages get drooped (least recently instead
> of
> > most recent in queue)
> > 2) Try Unbounded Queue to find out the Upper Limit without drooping any
> > messages for application (use case Stress test)
> > 3) Priority Blocking Queue ( meaning a single Producer can send messages
> to
> > multiple topic and I would like to give Delivery Priority to message for
> > particular message for topic)
> >
> > We have use case to support #3 and #1 since we would like to deliver the
> > Application Heartbeat first then any other event within the queue for any
> > topics. To lower TCP connections, we only use one producer for 4 topics
> but
> > one of the topics has priority for delivery.
> >
> > Please let me know if this is useful feature to have or not.
> >
> > Thanks in advance for great support !!
> >
> > Thanks,
> >
> > Bhavesh
> >
> > P.S.  Sorry for asking this question again, but last time there was no
> > conclusion.
> >
>


Re: Uniform Distribution of Messages for Topic Across Partitions Without Effecting Performance

2014-08-07 Thread Bhavesh Mistry
The root of problem is consumer lag on one or two partition even with no op
( read log and discard it) consumer .  Our use case is very simple.  Send
all the log lines to Brokers.  But under storm of data (due to exception or
application error etc), one or two partition gets lags behind while other
consumer are at 0 lag.  We have tune the GC using the recommended GC
setting (according to
http://www.slideshare.net/ToddPalino/enterprise-kafka-kafka-as-a-service
tuning section )   In normal situation, this is ok.

Hashing based on a key, and sticking to Murmur hash(key) % number of
partition did not give did not give a better throughput as compare to
random partitioning.   It would be good to build intelligence about
producer selection based on rate of data for topic and/or lag.   Is there
any way to customize stickiness interval for random partitioning strategy  ?

sorry for late response.

Thanks,

Bhavesh


On Mon, Aug 4, 2014 at 6:50 PM, Joe Stein  wrote:

> Bhavesh, take a look at
>
> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whyisdatanotevenlydistributedamongpartitionswhenapartitioningkeyisnotspecified
> ?
>
> Maybe the root cause issue is something else? Even if producers produce
> more or less than what they are producing you should be able to make it
> random enough with a partitioner and a key.  I don't think you should need
> more than what is in the FAQ but incase so maybe look into
> http://en.wikipedia.org/wiki/MurmurHash as another hash option.
>
> /***
>  Joe Stein
>  Founder, Principal Consultant
>  Big Data Open Source Security LLC
>  http://www.stealth.ly
>  Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> ************/
>
>
> On Mon, Aug 4, 2014 at 9:12 PM, Bhavesh Mistry  >
> wrote:
>
> > How to achieve uniform distribution of non-keyed messages per topic
> across
> > all partitions?
> >
> > We have tried to do this uniform distribution across partition using
> custom
> > partitioning from each producer instance using round robing (
> > count(messages) % number of partition for topic). This strategy results
> in
> > very poor performance.  So we have switched back to random stickiness
> that
> > Kafka provide out of box per some interval ( 10 minutes not sure exactly
> )
> > per topic.
> >
> > The above strategy results in consumer side lags sometime for some
> > partitions because we have some applications/producers  producing more
> > messages for same topic than other servers.
> >
> > Can Kafka provide out of box uniform distribution by using coordination
> > among all producers and rely on measure rate such as  # messages per
> minute
> > or # of bytes produce per minute to achieve uniform distribution and
> > coordinate stickiness of partition among hundreds of producers for same
> > topic ?
> >
> > Thanks,
> >
> > Bhavesh
> >
>


Uniform Distribution of Messages for Topic Across Partitions Without Effecting Performance

2014-08-04 Thread Bhavesh Mistry
How to achieve uniform distribution of non-keyed messages per topic across
all partitions?

We have tried to do this uniform distribution across partition using custom
partitioning from each producer instance using round robing (
count(messages) % number of partition for topic). This strategy results in
very poor performance.  So we have switched back to random stickiness that
Kafka provide out of box per some interval ( 10 minutes not sure exactly )
per topic.

The above strategy results in consumer side lags sometime for some
partitions because we have some applications/producers  producing more
messages for same topic than other servers.

Can Kafka provide out of box uniform distribution by using coordination
among all producers and rely on measure rate such as  # messages per minute
or # of bytes produce per minute to achieve uniform distribution and
coordinate stickiness of partition among hundreds of producers for same
topic ?

Thanks,

Bhavesh


[New Feature Request] Ability to Inject Queue Implementation Async Mode

2014-08-04 Thread Bhavesh Mistry
Kafka Version:  0.8.x

1) Ability to define which messages get drooped (least recently instead of
most recent in queue)
2) Try Unbounded Queue to find out the Upper Limit without drooping any
messages for application (use case Stress test)
3) Priority Blocking Queue ( meaning a single Producer can send messages to
multiple topic and I would like to give Delivery Priority to message for
particular message for topic)

We have use case to support #3 and #1 since we would like to deliver the
Application Heartbeat first then any other event within the queue for any
topics. To lower TCP connections, we only use one producer for 4 topics but
one of the topics has priority for delivery.

Please let me know if this is useful feature to have or not.

Thanks in advance for great support !!

Thanks,

Bhavesh

P.S.  Sorry for asking this question again, but last time there was no
conclusion.


Re: Monitoring Producers at Large Scale

2014-07-08 Thread Bhavesh Mistry
HI Otis,

You are right.  If the Kafka itself have problem (QUEUE is full, auto
rebalance etc, drop event), how can it transmit the logs...  So we have
tried to avoid "agent based" solution Apache Flume Agent or Syslog
configuration.

You are right we have to build a redundant transportation for monitoring
Transport Layer.

Thank you very much for suggestion.  I will look into Logsene
<https://sematext.atlassian.net/wiki/display/PUBLOGSENE/Sending+Events+to+Logsene>
.
The problem is we have 4 data centers and 24000 or more producers.  so
when application team come to us our data is lost or we do not see our log
lines etc... we have to pin point what exactly happen.  So it is very ideal
to monitor/transmit/set alarm for Kafka Producers.

We replaced the Apache Flume with "Apache Kafka" as log transportation
Layer.  Agent is not required.


Thanks,
Bhavesh



On Mon, Jul 7, 2014 at 1:41 PM, Otis Gospodnetic  wrote:

> Hi,
>
> I'm late to the thread... but that "...we intercept log4j..." caught my
> attention.  Why intercept, especially if it's causing trouble?
>
> Could you use log4j syslog appender and get logs routed to wherever you
> want them via syslog, for example?
> Or you can have syslog tail log4j log files (e.g. rsyslog has "imfile" you
> can use for tailing).
>
> We use our own Logsene <http://sematext.com/logsene/> for Kafka and all
> other logs and SPM <http://sematext.com/spm/> for Kafka and all other
> metrics we monitor.
>
> Oh, actually, this may help you:
>
> https://sematext.atlassian.net/wiki/display/PUBLOGSENE/Sending+Events+to+Logsene
> (ignore the Logsene-specific parts --- there is plenty of general info,
> configs, etc. for log handling)
>
> Otis
> --
> Performance Monitoring * Log Analytics * Search Analytics
> Solr & Elasticsearch Support * http://sematext.com/
>
>
> On Thu, Jun 26, 2014 at 3:09 PM, Bhavesh Mistry <
> mistry.p.bhav...@gmail.com>
> wrote:
>
> > Hi All,
> >
> > Thanks for all your responses.
> >
> >
> >
> > JMX metrics are there and we do pull the metrics, but I would like to
> > capture the logs from Kafka lib as well especially WARN, FATAL and ERROR
> > etc to debug the issue.
> >
> >
> >
> > To do this, we intercept Log4j logging and send it to Kafka Log Topics,
> but
> > I realize that under heavy Kafka Lib error/warn/  it will create a
> deadlock
> > between Producer Send thread  (Logging Kafka log topic queue...)
> >
> >
> >
> > *public* *class* KafkaLog4jAppender *extends* AppenderSkeleton {
> >
> >
> >
> > Producer  producer..
> >
> > *protected* *void* append(LoggingEvent event) {
> >
> >
> > if(event.getLoggerName().startsWith("kafka")){
> >
> >  if(event is WARN, FATAL and ERROR){
> >
> > producer.send(event.getRenderedMessage())
> >
> > }
> >
> > }
> >
> >
> >
> > }
> >
> >
> > Other option is to log Kafka Logs into disk and transport logs via
> > separate process
> > to Kafka Topic and transport via https://github.com/harelba/tail2kafka
> to
> > topic...
> >
> >
> > We use Kafka for Log transportation and we want to debug/trouble shoot
> > issue via logs or create alerts/etc
> >
> >
> > Thanks,
> >
> >
> > Bhavesh
> >
> >
> >
> >
> > On Wed, Jun 25, 2014 at 10:49 AM, Neha Narkhede  >
> > wrote:
> >
> > > We monitor producers or for that matter any process/service using JMX
> > > metrics. Every server and service in LinkedIn sends metrics in a Kafka
> > > message to a metrics Kafka cluster. We have subscribers that connect to
> > the
> > > metrics cluster to index that data in RRDs.
> > >
> > > Our aim is to expose all important metrics through JMX. We are doing
> that
> > > for the new producer under org.apache.kafka.clients.producer. Feel free
> > to
> > > take a look at that and give feedback.
> > >
> > > Thanks,
> > > Neha
> > >
> > >
> > > On Tue, Jun 24, 2014 at 7:59 PM, Darion Yaphet <
> darion.yap...@gmail.com>
> > > wrote:
> > >
> > > > Sorry I want to  know  you want to monitor kafka producers or kafka
> > > brokers
> > > > and zookeepers ?
> > > > It's seems you will want to monitor monitor Exceptions eg Leader Not
> > > Found,
> > > > Queue is full, resend fail  etc  are kafka cluster
> > > 

  1   2   >