Re: Measure latency from Source to Sink

2018-07-20 Thread Thakrar, Jayesh
Below are some examples.
They might be old, but the concept is the same - essentially you are 
implementing an interface.
You have to be conscious of not to do heavy or high latency processing to be 
non-impactful to the data flow.

https://www.ashishpaliwal.com/blog/2013/06/flume-cookbook-implementing-custom-interceptors/

https://medium.com/@bkvarda/building-a-custom-flume-interceptor-8c7a55070038



From: antonio saldivar 
Date: Friday, July 20, 2018 at 9:57 AM
To: "Thakrar, Jayesh" 
Cc: "users@kafka.apache.org" 
Subject: Re: Measure latency from Source to Sink

Hi

Actually I am running the app I a single node this is for a POC, I was not 
aware of the custom interceptors do you have an example of this?

Best Regards

El vie., 20 jul. 2018 a las 9:20, Thakrar, Jayesh 
(mailto:jthak...@conversantmedia.com>>) escribió:
See if you can use a custom interceptors for this.
The only fuzzy thing is that the clocks would be different so I would be a 
little skeptical of its accuracy.
I have heard of some companies who have a special topic in which they insert 
test msgs and then read them back - using the same machine for producer and 
consumer, thereby doing away with this dilemma.
The assumption is that any delay in the cluster would be felt everywhere.
Another option is to create topics with one replica on all brokers (i.e. # of 
topics = # of brokers) - so you can be sensitive to a single broker having an 
issue.

On 7/19/18, 3:09 PM, "antonio saldivar" 
mailto:ansal...@gmail.com>> wrote:

Hello

I am developing an application using kafka and flink, I need to be able to
measure the latency from the producer and when it comes out to the sink.

I can append the timestamp in Millisecond when I send the trxn from the
producer but at the end how to append the timestamp when it comes out the
sink.

Someone can help me with an example

Thank you
Best Regards



Re: Measure latency from Source to Sink

2018-07-20 Thread Thakrar, Jayesh
See if you can use a custom interceptors for this.
The only fuzzy thing is that the clocks would be different so I would be a 
little skeptical of its accuracy.
I have heard of some companies who have a special topic in which they insert 
test msgs and then read them back - using the same machine for producer and 
consumer, thereby doing away with this dilemma.
The assumption is that any delay in the cluster would be felt everywhere.
Another option is to create topics with one replica on all brokers (i.e. # of 
topics = # of brokers) - so you can be sensitive to a single broker having an 
issue.

On 7/19/18, 3:09 PM, "antonio saldivar"  wrote:

Hello

I am developing an application using kafka and flink, I need to be able to
measure the latency from the producer and when it comes out to the sink.

I can append the timestamp in Millisecond when I send the trxn from the
producer but at the end how to append the timestamp when it comes out the
sink.

Someone can help me with an example

Thank you
Best Regards




Re: If timeout is short, the first poll does not return records

2018-07-18 Thread Thakrar, Jayesh
While this does not answer your question, I believe during the first call, a 
lot of things happen - e.g. get admin and metadata info about the cluster, etc.
That takes "some time" and hence the poll interval that is acceptable/norm for 
regular processing may not be sufficient for initialization AND I believe also 
during periodic metadata update/refresh.

On 7/18/18, 3:41 AM, "jingguo yao"  wrote:

If the timeout is short say 100, the first poll does not return
records for my case. Jay Kreps gave an explanation on [1]. I think
that this behaviour for poll is counterintuitive, it will make Kafka
user's life much easier if this behaviour is documented in [2].

[1] 
http://grokbase.com/p/kafka/users/155mqpwf3n/consumer-poll-returns-no-records-unless-called-more-than-once-why
[2] 
http://kafka.apache.org/11/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll-long-





Re: Facing Duplication Issue in kakfa

2018-05-29 Thread Thakrar, Jayesh
For more details, see https://www.slideshare.net/JayeshThakrar/kafka-68540012

While this is based on Kafka 0.9, the fundamental concepts and reasons are 
still valid.


On 5/28/18, 12:20 PM, "Hans Jespersen"  wrote:

Are you seeing 1) duplicate messages stored in a Kafka topic partition or 
2) duplicate consumption and processing of a single message stored in a Kafka 
topic?

If it’s #1 then you can turn on the idempotent producer feature to get 
Exactly Once Semantics (EOS) while publishing.

If it’s #2 then you can examine more closely how your consumer is doing 
offset commits.
If you are committing offsets automatically by time then there is always a 
possibility that the last time window of messages your consumer did not yet 
commit will be received again when the consumer restarts. 

You can instead manually commit, possibly even after each message which 
will shrink the window of possible duplicate messages to 1, but at the cost of 
some performance. 

What many of the Kafka Sink Connectors do for exactly once processing is to 
store their offsets atomically with the data they write external to Kafka. For 
example a database connector would write the message data and the offsets to a 
database in one atomic write operation. Upon restart of the app it then rereads 
the offset from the database and resumes consumption from Kafka from the last 
offset point using seek() to reposition the Kafka offset for the consumer 
before the first call to poll()

These are the techniques most people use to get end to end exactly once 
processing with no duplicates even in the event of a failure.


-hans

> On May 28, 2018, at 12:17 AM, Karthick Kumar  wrote:
> 
> Hi,
> 
> Facing Duplication inconsistently while bouncing Kafka producer and
> consumer in tomcat node. any help will be appreciated to find out the root
> cause.
> 
> -- 
> With Regards,
> Karthick.K





Re: Kafka Setup for Daily counts on wide array of keys

2018-03-05 Thread Thakrar, Jayesh
Sorry Matt, I don’t have much idea about Kafka streaming (or any streaming for 
that matter).
As for saving counts from your application servers to Aerospike directly, that 
is certain simpler, requiring less hardware, resources and development effort.

One reason some people use Kafka as part of their pipeline is to decouple 
systems and protect either end from issues in the other.
It usually makes maintenance on either end simple. Furthermore, it acts as a 
dampening buffer and because of Kafka's low latency and high throughput(well, 
that's a relative term), allows the producers and consumers run at their full 
potential (kind of, but not exactly async push and pull of data).

It might even be worthwhile to start off without Kafka and once you understand 
things better introduce Kafka later on.

From: Matt Daum 
Date: Monday, March 5, 2018 at 4:33 PM
To: "Thakrar, Jayesh" 
Cc: "users@kafka.apache.org" 
Subject: Re: Kafka Setup for Daily counts on wide array of keys

And not to overthink this, but as I'm new to Kafka and streams I want to make 
sure that it makes the most sense to for my use case.  With the streams and 
grouping, it looks like I'd be getting at 1 internal topic created per grouped 
stream which then would written and reread then totaled in the count, then 
would have that produce a final stream which is then consumed/sinked to an 
external db.   Is that correct?

Overall I'm not using the streaming counts as it grows throughout the day, but 
just want a final end of day count.  We already have an Aerospike cluster setup 
for the applications themselves.  If each application server itself made the 
writes to the Aerospike DB cluster to simply increase the counts for each 
attribute then at end of day read it out there it appears it'd be less 
computing resources used.  As we'd effectively be doing inbound request -> DB 
write per counted attribute.

I am not saying that is the better route as I'm I don't fully know or 
understand the full capabilities of Kafka.  Since we aren't streaming the data, 
enriching it, etc. would the direct to DB counts be a better approach?  I just 
want to make sure I use the best tool for the job.Let me know what other 
factors I may be underestimating/misunderstanding on the Kafka approach please. 
 I want to be informed as possible before going down either path too far.

Thank you again for your time,
Matt

On Mon, Mar 5, 2018 at 3:14 PM, Thakrar, Jayesh 
mailto:jthak...@conversantmedia.com>> wrote:
Yep, exactly.

So there is some buffering that you need to do in your client and also deal 
with edge cases.
E.g. how long should you hold on to a batch before you send a smaller batch to 
producer since you want a balance between batch optimization and expedience.

You may need to do some experiments to balance between system throughput, 
record size, batch size and potential batching delay for a given rate of 
incoming requests.


From: Matt Daum mailto:m...@setfive.com>>
Date: Monday, March 5, 2018 at 1:59 PM

To: "Thakrar, Jayesh" 
mailto:jthak...@conversantmedia.com>>
Cc: "users@kafka.apache.org<mailto:users@kafka.apache.org>" 
mailto:users@kafka.apache.org>>
Subject: Re: Kafka Setup for Daily counts on wide array of keys

Ah good call, so you are really having an AVRO wrapper around your single class 
right?  IE an array of records, correct?  Then when you hit a size you are 
happy you send it to the producer?

On Mon, Mar 5, 2018 at 12:07 PM, Thakrar, Jayesh 
mailto:jthak...@conversantmedia.com>> wrote:
Good luck on your test!

As for the batching within Avro and by Kafka Producer, here are my thoughts 
without any empirical proof.
There is a certain amount of overhead in terms of execution AND bytes in 
converting a request record into Avro and producing (generating) a Kafka 
message out of it.
For requests of size 100-200 bytes, that can be a substantial amount - 
especially the fact that you will be bundling the Avro schema for each request 
in its Kafka message.

By batching the requests, you are significantly amortizing that overhead across 
many rows.

From: Matt Daum mailto:m...@setfive.com>>
Date: Monday, March 5, 2018 at 5:54 AM

To: "Thakrar, Jayesh" 
mailto:jthak...@conversantmedia.com>>
Cc: "users@kafka.apache.org<mailto:users@kafka.apache.org>" 
mailto:users@kafka.apache.org>>
Subject: Re: Kafka Setup for Daily counts on wide array of keys

Thanks for the suggestions!  It does look like it's using local RocksDB stores 
for the state info by default.  Will look into using an external one.

As for the "millions of different values per grouped attribute" an example 
would be assume on each requests there is a parameters "X" which at the end of 
each day I want to know the counts per unique value, it could have 100's of 
millions of possi

Re: Kafka Setup for Daily counts on wide array of keys

2018-03-05 Thread Thakrar, Jayesh
Yep, exactly.

So there is some buffering that you need to do in your client and also deal 
with edge cases.
E.g. how long should you hold on to a batch before you send a smaller batch to 
producer since you want a balance between batch optimization and expedience.

You may need to do some experiments to balance between system throughput, 
record size, batch size and potential batching delay for a given rate of 
incoming requests.


From: Matt Daum 
Date: Monday, March 5, 2018 at 1:59 PM
To: "Thakrar, Jayesh" 
Cc: "users@kafka.apache.org" 
Subject: Re: Kafka Setup for Daily counts on wide array of keys

Ah good call, so you are really having an AVRO wrapper around your single class 
right?  IE an array of records, correct?  Then when you hit a size you are 
happy you send it to the producer?

On Mon, Mar 5, 2018 at 12:07 PM, Thakrar, Jayesh 
mailto:jthak...@conversantmedia.com>> wrote:
Good luck on your test!

As for the batching within Avro and by Kafka Producer, here are my thoughts 
without any empirical proof.
There is a certain amount of overhead in terms of execution AND bytes in 
converting a request record into Avro and producing (generating) a Kafka 
message out of it.
For requests of size 100-200 bytes, that can be a substantial amount - 
especially the fact that you will be bundling the Avro schema for each request 
in its Kafka message.

By batching the requests, you are significantly amortizing that overhead across 
many rows.

From: Matt Daum mailto:m...@setfive.com>>
Date: Monday, March 5, 2018 at 5:54 AM

To: "Thakrar, Jayesh" 
mailto:jthak...@conversantmedia.com>>
Cc: "users@kafka.apache.org<mailto:users@kafka.apache.org>" 
mailto:users@kafka.apache.org>>
Subject: Re: Kafka Setup for Daily counts on wide array of keys

Thanks for the suggestions!  It does look like it's using local RocksDB stores 
for the state info by default.  Will look into using an external one.

As for the "millions of different values per grouped attribute" an example 
would be assume on each requests there is a parameters "X" which at the end of 
each day I want to know the counts per unique value, it could have 100's of 
millions of possible values.

I'll start to hopefully work this week on an initial test of everything and 
will report back.  A few last questions if you have the time:
- For the batching of the AVRO files, would this be different than the Producer 
batching?
- Any other things you'd suggest looking out for as gotcha's or configurations 
that probably will be good to tweak further?

Thanks!
Matt

On Sun, Mar 4, 2018 at 11:23 PM, Thakrar, Jayesh 
mailto:jthak...@conversantmedia.com>> wrote:
BTW - I did not mean to rule-out Aerospike as a possible datastore.
Its just that I am not familiar with it, but surely looks like a good candidate 
to store the raw and/or aggregated data, given that it also has a Kafka Connect 
module.

From: "Thakrar, Jayesh" 
mailto:jthak...@conversantmedia.com>>
Date: Sunday, March 4, 2018 at 9:25 PM
To: Matt Daum mailto:m...@setfive.com>>

Cc: "users@kafka.apache.org<mailto:users@kafka.apache.org>" 
mailto:users@kafka.apache.org>>
Subject: Re: Kafka Setup for Daily counts on wide array of keys

I don’t have any experience/knowledge on the Kafka inbuilt datastore, but 
believe thatfor some
portions of streaming Kafka uses (used?) RocksDB to locally store some state 
info in the brokers.

Personally  I would use an external datastore.
There's a wide choice out there - regular key-value stores like Cassandra, 
ScyllaDB, RocksDB, timeseries key-value stores like InfluxDB to regular RDBMSes.
If you have hadoop in the picture, its even possible to bypass a datastore 
completely (if appropriate) and store the raw data on HDFS organized by (say) 
date+hour
by using periodic (minute to hourly) extract jobs and store data in 
hive-compatible directory structure using ORC or Parquet.

The reason for shying away from NoSQL datastores is their tendency to do 
compaction on data which leads to unnecessary reads and writes (referred to as 
write-amplification).
With periodic jobs in Hadoop, you (usually) write your data once only. Ofcourse 
with that approach you loose the "random/keyed access" to the data,
but if you are only interested in the aggregations across various dimensions, 
those can be stored in a SQL/NoSQL datastore.

As for "having millions of different values per grouped attribute" - not sure 
what you mean by them.
Is it that each record has some fields that represent different kinds of 
attributes and that their domain can have millions to hundreds of millions of 
values?
I don't think that should matter.

From: Matt Daum mailto:m...@setfive.com>>
Date: Sunday, March 4, 2018 at 2:39 PM
To: "Thakrar, Jayesh" 
mailto:jthak...@conversantmedia.com>>
Cc: "us

Re: Kafka Setup for Daily counts on wide array of keys

2018-03-05 Thread Thakrar, Jayesh
Good luck on your test!

As for the batching within Avro and by Kafka Producer, here are my thoughts 
without any empirical proof.
There is a certain amount of overhead in terms of execution AND bytes in 
converting a request record into Avro and producing (generating) a Kafka 
message out of it.
For requests of size 100-200 bytes, that can be a substantial amount - 
especially the fact that you will be bundling the Avro schema for each request 
in its Kafka message.

By batching the requests, you are significantly amortizing that overhead across 
many rows.

From: Matt Daum 
Date: Monday, March 5, 2018 at 5:54 AM
To: "Thakrar, Jayesh" 
Cc: "users@kafka.apache.org" 
Subject: Re: Kafka Setup for Daily counts on wide array of keys

Thanks for the suggestions!  It does look like it's using local RocksDB stores 
for the state info by default.  Will look into using an external one.

As for the "millions of different values per grouped attribute" an example 
would be assume on each requests there is a parameters "X" which at the end of 
each day I want to know the counts per unique value, it could have 100's of 
millions of possible values.

I'll start to hopefully work this week on an initial test of everything and 
will report back.  A few last questions if you have the time:
- For the batching of the AVRO files, would this be different than the Producer 
batching?
- Any other things you'd suggest looking out for as gotcha's or configurations 
that probably will be good to tweak further?

Thanks!
Matt

On Sun, Mar 4, 2018 at 11:23 PM, Thakrar, Jayesh 
mailto:jthak...@conversantmedia.com>> wrote:
BTW - I did not mean to rule-out Aerospike as a possible datastore.
Its just that I am not familiar with it, but surely looks like a good candidate 
to store the raw and/or aggregated data, given that it also has a Kafka Connect 
module.

From: "Thakrar, Jayesh" 
mailto:jthak...@conversantmedia.com>>
Date: Sunday, March 4, 2018 at 9:25 PM
To: Matt Daum mailto:m...@setfive.com>>

Cc: "users@kafka.apache.org<mailto:users@kafka.apache.org>" 
mailto:users@kafka.apache.org>>
Subject: Re: Kafka Setup for Daily counts on wide array of keys

I don’t have any experience/knowledge on the Kafka inbuilt datastore, but 
believe thatfor some
portions of streaming Kafka uses (used?) RocksDB to locally store some state 
info in the brokers.

Personally  I would use an external datastore.
There's a wide choice out there - regular key-value stores like Cassandra, 
ScyllaDB, RocksDB, timeseries key-value stores like InfluxDB to regular RDBMSes.
If you have hadoop in the picture, its even possible to bypass a datastore 
completely (if appropriate) and store the raw data on HDFS organized by (say) 
date+hour
by using periodic (minute to hourly) extract jobs and store data in 
hive-compatible directory structure using ORC or Parquet.

The reason for shying away from NoSQL datastores is their tendency to do 
compaction on data which leads to unnecessary reads and writes (referred to as 
write-amplification).
With periodic jobs in Hadoop, you (usually) write your data once only. Ofcourse 
with that approach you loose the "random/keyed access" to the data,
but if you are only interested in the aggregations across various dimensions, 
those can be stored in a SQL/NoSQL datastore.

As for "having millions of different values per grouped attribute" - not sure 
what you mean by them.
Is it that each record has some fields that represent different kinds of 
attributes and that their domain can have millions to hundreds of millions of 
values?
I don't think that should matter.

From: Matt Daum mailto:m...@setfive.com>>
Date: Sunday, March 4, 2018 at 2:39 PM
To: "Thakrar, Jayesh" 
mailto:jthak...@conversantmedia.com>>
Cc: "users@kafka.apache.org<mailto:users@kafka.apache.org>" 
mailto:users@kafka.apache.org>>
Subject: Re: Kafka Setup for Daily counts on wide array of keys

Thanks! For the counts I'd need to use a global table to make sure it's across 
all the data right?   Also having millions of different values per grouped 
attribute will scale ok?

On Mar 4, 2018 8:45 AM, "Thakrar, Jayesh" 
mailto:jthak...@conversantmedia.com>> wrote:
Yes, that's the general design pattern. Another thing to look into is to 
compress the data. Now Kafka consumer/producer can already do it for you, but 
we choose to compress in the applications due to a historic issue that drgraded 
performance,  although it has been resolved now.
Also,  just keep in mind that while you do your batching, kafka producer also 
tries to batch msgs to Kafka, and you will need to ensure you have enough 
buffer memory. However that's all configurable.
Finally ensure you have the latest java updates and have kafka 0.10.2 or higher.
Jayesh

_

Re: Kafka Setup for Daily counts on wide array of keys

2018-03-04 Thread Thakrar, Jayesh
BTW - I did not mean to rule-out Aerospike as a possible datastore.
Its just that I am not familiar with it, but surely looks like a good candidate 
to store the raw and/or aggregated data, given that it also has a Kafka Connect 
module.

From: "Thakrar, Jayesh" 
Date: Sunday, March 4, 2018 at 9:25 PM
To: Matt Daum 
Cc: "users@kafka.apache.org" 
Subject: Re: Kafka Setup for Daily counts on wide array of keys

I don’t have any experience/knowledge on the Kafka inbuilt datastore, but 
believe thatfor some
portions of streaming Kafka uses (used?) RocksDB to locally store some state 
info in the brokers.

Personally  I would use an external datastore.
There's a wide choice out there - regular key-value stores like Cassandra, 
ScyllaDB, RocksDB, timeseries key-value stores like InfluxDB to regular RDBMSes.
If you have hadoop in the picture, its even possible to bypass a datastore 
completely (if appropriate) and store the raw data on HDFS organized by (say) 
date+hour
by using periodic (minute to hourly) extract jobs and store data in 
hive-compatible directory structure using ORC or Parquet.

The reason for shying away from NoSQL datastores is their tendency to do 
compaction on data which leads to unnecessary reads and writes (referred to as 
write-amplification).
With periodic jobs in Hadoop, you (usually) write your data once only. Ofcourse 
with that approach you loose the "random/keyed access" to the data,
but if you are only interested in the aggregations across various dimensions, 
those can be stored in a SQL/NoSQL datastore.

As for "having millions of different values per grouped attribute" - not sure 
what you mean by them.
Is it that each record has some fields that represent different kinds of 
attributes and that their domain can have millions to hundreds of millions of 
values?
I don't think that should matter.

From: Matt Daum 
Date: Sunday, March 4, 2018 at 2:39 PM
To: "Thakrar, Jayesh" 
Cc: "users@kafka.apache.org" 
Subject: Re: Kafka Setup for Daily counts on wide array of keys

Thanks! For the counts I'd need to use a global table to make sure it's across 
all the data right?   Also having millions of different values per grouped 
attribute will scale ok?

On Mar 4, 2018 8:45 AM, "Thakrar, Jayesh" 
mailto:jthak...@conversantmedia.com>> wrote:
Yes, that's the general design pattern. Another thing to look into is to 
compress the data. Now Kafka consumer/producer can already do it for you, but 
we choose to compress in the applications due to a historic issue that drgraded 
performance,  although it has been resolved now.
Also,  just keep in mind that while you do your batching, kafka producer also 
tries to batch msgs to Kafka, and you will need to ensure you have enough 
buffer memory. However that's all configurable.
Finally ensure you have the latest java updates and have kafka 0.10.2 or higher.
Jayesh

________
From: Matt Daum mailto:m...@setfive.com>>
Sent: Sunday, March 4, 2018 7:06:19 AM
To: Thakrar, Jayesh
Cc: users@kafka.apache.org<mailto:users@kafka.apache.org>
Subject: Re: Kafka Setup for Daily counts on wide array of keys

We actually don't have a kafka cluster setup yet at all.  Right now just have 8 
of our application servers.  We currently sample some impressions and then 
dedupe/count outside at a different DC, but are looking to try to analyze all 
impressions for some overall analytics.

Our requests are around 100-200 bytes each.  If we lost some of them due to 
network jitter etc. it would be fine we're trying to just get overall a rough 
count of each attribute.  Creating batched messages definitely makes sense and 
will also cut down on the network IO.

We're trying to determine the required setup for Kafka to do what we're looking 
to do as these are physical servers so we'll most likely need to buy new 
hardware.  For the first run I think we'll try it out on one of our application 
clusters that get a smaller amount traffic (300-400k req/sec) and run the kafka 
cluster on the same machines as the applications.

So would the best route here be something like each application server batches 
requests, send it to kafka, have a stream consumer that then tallies up the 
totals per attribute that we want to track, output that to a new topic, which 
then goes to a sink to either a DB or something like S3 which then we read into 
our external DBs?

Thanks!

On Sun, Mar 4, 2018 at 12:31 AM, Thakrar, Jayesh 
mailto:jthak...@conversantmedia.com>> wrote:
Matt,

If I understand correctly, you have an 8 node Kafka cluster and need to support 
 about 1 million requests/sec into the cluster from source servers and expect 
to consume that for aggregation.

How big are your msgs?

I would suggest looking into batching multiple requests per single Kafka msg to 
achieve desired throughput.

So e.g. on the requ

Re: Kafka Setup for Daily counts on wide array of keys

2018-03-04 Thread Thakrar, Jayesh
I don’t have any experience/knowledge on the Kafka inbuilt datastore, but 
believe thatfor some
portions of streaming Kafka uses (used?) RocksDB to locally store some state 
info in the brokers.

Personally  I would use an external datastore.
There's a wide choice out there - regular key-value stores like Cassandra, 
ScyllaDB, RocksDB, timeseries key-value stores like InfluxDB to regular RDBMSes.
If you have hadoop in the picture, its even possible to bypass a datastore 
completely (if appropriate) and store the raw data on HDFS organized by (say) 
date+hour
by using periodic (minute to hourly) extract jobs and store data in 
hive-compatible directory structure using ORC or Parquet.

The reason for shying away from NoSQL datastores is their tendency to do 
compaction on data which leads to unnecessary reads and writes (referred to as 
write-amplification).
With periodic jobs in Hadoop, you (usually) write your data once only. Ofcourse 
with that approach you loose the "random/keyed access" to the data,
but if you are only interested in the aggregations across various dimensions, 
those can be stored in a SQL/NoSQL datastore.

As for "having millions of different values per grouped attribute" - not sure 
what you mean by them.
Is it that each record has some fields that represent different kinds of 
attributes and that their domain can have millions to hundreds of millions of 
values?
I don't think that should matter.

From: Matt Daum 
Date: Sunday, March 4, 2018 at 2:39 PM
To: "Thakrar, Jayesh" 
Cc: "users@kafka.apache.org" 
Subject: Re: Kafka Setup for Daily counts on wide array of keys

Thanks! For the counts I'd need to use a global table to make sure it's across 
all the data right?   Also having millions of different values per grouped 
attribute will scale ok?

On Mar 4, 2018 8:45 AM, "Thakrar, Jayesh" 
mailto:jthak...@conversantmedia.com>> wrote:
Yes, that's the general design pattern. Another thing to look into is to 
compress the data. Now Kafka consumer/producer can already do it for you, but 
we choose to compress in the applications due to a historic issue that drgraded 
performance,  although it has been resolved now.
Also,  just keep in mind that while you do your batching, kafka producer also 
tries to batch msgs to Kafka, and you will need to ensure you have enough 
buffer memory. However that's all configurable.
Finally ensure you have the latest java updates and have kafka 0.10.2 or higher.
Jayesh


From: Matt Daum mailto:m...@setfive.com>>
Sent: Sunday, March 4, 2018 7:06:19 AM
To: Thakrar, Jayesh
Cc: users@kafka.apache.org<mailto:users@kafka.apache.org>
Subject: Re: Kafka Setup for Daily counts on wide array of keys

We actually don't have a kafka cluster setup yet at all.  Right now just have 8 
of our application servers.  We currently sample some impressions and then 
dedupe/count outside at a different DC, but are looking to try to analyze all 
impressions for some overall analytics.

Our requests are around 100-200 bytes each.  If we lost some of them due to 
network jitter etc. it would be fine we're trying to just get overall a rough 
count of each attribute.  Creating batched messages definitely makes sense and 
will also cut down on the network IO.

We're trying to determine the required setup for Kafka to do what we're looking 
to do as these are physical servers so we'll most likely need to buy new 
hardware.  For the first run I think we'll try it out on one of our application 
clusters that get a smaller amount traffic (300-400k req/sec) and run the kafka 
cluster on the same machines as the applications.

So would the best route here be something like each application server batches 
requests, send it to kafka, have a stream consumer that then tallies up the 
totals per attribute that we want to track, output that to a new topic, which 
then goes to a sink to either a DB or something like S3 which then we read into 
our external DBs?

Thanks!

On Sun, Mar 4, 2018 at 12:31 AM, Thakrar, Jayesh 
mailto:jthak...@conversantmedia.com>> wrote:
Matt,

If I understand correctly, you have an 8 node Kafka cluster and need to support 
 about 1 million requests/sec into the cluster from source servers and expect 
to consume that for aggregation.

How big are your msgs?

I would suggest looking into batching multiple requests per single Kafka msg to 
achieve desired throughput.

So e.g. on the request receiving systems, I would suggest creating a logical 
avro file (byte buffer) of say N requests and then making that into one Kafka 
msg payload.

We have a similar situation 
(https://www.slideshare.net/JayeshThakrar/apacheconflumekafka2016) and found 
anything from 4x to 10x better throughput with batching as compared to one 
request per msg.
We have different kinds of msgs/topics and the individual "request" si

Re: Kafka Setup for Daily counts on wide array of keys

2018-03-04 Thread Thakrar, Jayesh
Yes, that's the general design pattern. Another thing to look into is to 
compress the data. Now Kafka consumer/producer can already do it for you, but 
we choose to compress in the applications due to a historic issue that drgraded 
performance,  although it has been resolved now.

Also,  just keep in mind that while you do your batching, kafka producer also 
tries to batch msgs to Kafka, and you will need to ensure you have enough 
buffer memory. However that's all configurable.

Finally ensure you have the latest java updates and have kafka 0.10.2 or higher.

Jayesh


From: Matt Daum 
Sent: Sunday, March 4, 2018 7:06:19 AM
To: Thakrar, Jayesh
Cc: users@kafka.apache.org
Subject: Re: Kafka Setup for Daily counts on wide array of keys

We actually don't have a kafka cluster setup yet at all.  Right now just have 8 
of our application servers.  We currently sample some impressions and then 
dedupe/count outside at a different DC, but are looking to try to analyze all 
impressions for some overall analytics.

Our requests are around 100-200 bytes each.  If we lost some of them due to 
network jitter etc. it would be fine we're trying to just get overall a rough 
count of each attribute.  Creating batched messages definitely makes sense and 
will also cut down on the network IO.

We're trying to determine the required setup for Kafka to do what we're looking 
to do as these are physical servers so we'll most likely need to buy new 
hardware.  For the first run I think we'll try it out on one of our application 
clusters that get a smaller amount traffic (300-400k req/sec) and run the kafka 
cluster on the same machines as the applications.

So would the best route here be something like each application server batches 
requests, send it to kafka, have a stream consumer that then tallies up the 
totals per attribute that we want to track, output that to a new topic, which 
then goes to a sink to either a DB or something like S3 which then we read into 
our external DBs?

Thanks!

On Sun, Mar 4, 2018 at 12:31 AM, Thakrar, Jayesh 
mailto:jthak...@conversantmedia.com>> wrote:
Matt,

If I understand correctly, you have an 8 node Kafka cluster and need to support 
 about 1 million requests/sec into the cluster from source servers and expect 
to consume that for aggregation.

How big are your msgs?

I would suggest looking into batching multiple requests per single Kafka msg to 
achieve desired throughput.

So e.g. on the request receiving systems, I would suggest creating a logical 
avro file (byte buffer) of say N requests and then making that into one Kafka 
msg payload.

We have a similar situation 
(https://www.slideshare.net/JayeshThakrar/apacheconflumekafka2016) and found 
anything from 4x to 10x better throughput with batching as compared to one 
request per msg.
We have different kinds of msgs/topics and the individual "request" size varies 
from  about 100 bytes to 1+ KB.

On 3/2/18, 8:24 AM, "Matt Daum" mailto:m...@setfive.com>> 
wrote:

I am new to Kafka but I think I have a good use case for it.  I am trying
to build daily counts of requests based on a number of different attributes
in a high throughput system (~1 million requests/sec. across all  8
servers).  The different attributes are unbounded in terms of values, and
some will spread across 100's of millions values.  This is my current
through process, let me know where I could be more efficient or if there is
a better way to do it.

I'll create an AVRO object "Impression" which has all the attributes of the
inbound request.  My application servers then will on each request create
and send this to a single kafka topic.

I'll then have a consumer which creates a stream from the topic.  From
there I'll use the windowed timeframes and groupBy to group by the
attributes on each given day.  At the end of the day I'd need to read out
the data store to an external system for storage.  Since I won't know all
the values I'd need something similar to the KVStore.all() but for
WindowedKV Stores.  This appears that it'd be possible in 1.1 with this
commit:

https://github.com/apache/kafka/commit/1d1c8575961bf6bce7decb049be7f10ca76bd0c5
.

Is this the best approach to doing this?  Or would I be better using the
stream to listen and then an external DB like Aerospike to store the counts
and read out of it directly end of day.

Thanks for the help!
Daum





Re: Kafka Setup for Daily counts on wide array of keys

2018-03-03 Thread Thakrar, Jayesh
Matt,

If I understand correctly, you have an 8 node Kafka cluster and need to support 
 about 1 million requests/sec into the cluster from source servers and expect 
to consume that for aggregation.

How big are your msgs?

I would suggest looking into batching multiple requests per single Kafka msg to 
achieve desired throughput.

So e.g. on the request receiving systems, I would suggest creating a logical 
avro file (byte buffer) of say N requests and then making that into one Kafka 
msg payload.

We have a similar situation 
(https://www.slideshare.net/JayeshThakrar/apacheconflumekafka2016) and found 
anything from 4x to 10x better throughput with batching as compared to one 
request per msg.
We have different kinds of msgs/topics and the individual "request" size varies 
from  about 100 bytes to 1+ KB. 

On 3/2/18, 8:24 AM, "Matt Daum"  wrote:

I am new to Kafka but I think I have a good use case for it.  I am trying
to build daily counts of requests based on a number of different attributes
in a high throughput system (~1 million requests/sec. across all  8
servers).  The different attributes are unbounded in terms of values, and
some will spread across 100's of millions values.  This is my current
through process, let me know where I could be more efficient or if there is
a better way to do it.

I'll create an AVRO object "Impression" which has all the attributes of the
inbound request.  My application servers then will on each request create
and send this to a single kafka topic.

I'll then have a consumer which creates a stream from the topic.  From
there I'll use the windowed timeframes and groupBy to group by the
attributes on each given day.  At the end of the day I'd need to read out
the data store to an external system for storage.  Since I won't know all
the values I'd need something similar to the KVStore.all() but for
WindowedKV Stores.  This appears that it'd be possible in 1.1 with this
commit:

https://github.com/apache/kafka/commit/1d1c8575961bf6bce7decb049be7f10ca76bd0c5
.

Is this the best approach to doing this?  Or would I be better using the
stream to listen and then an external DB like Aerospike to store the counts
and read out of it directly end of day.

Thanks for the help!
Daum




Re: Lost messages and messed up offsets

2017-11-30 Thread Thakrar, Jayesh
Can you also check if you have partition leaders flapping or changing rapidly?
Also, look at the following settings on your client configs:

max.partition.fetch.bytes
fetch.max.bytes
receive.buffer.bytes

We had a similar situation in our environment when the brokers were flooded 
with data.
The symptoms where apparent huge spikes in offset ids - much more than the data 
were sending.
That we traced to the fact that the brokers were not able to keep up with the 
incoming producer + consumer + replication traffic due to the NIC bandwidth.
(A bit of a lengthy story as to why the offset ids appeared to be high/spiky 
because of the flapping).

And then the consumer would have issues - and the problem there was that the 
producer had a very large buffer and batch size - so the data was coming in 
large batches.
However the client was configured to receive data in such large batches and it 
would give errors and would not be able to go past a certain offset.


On 11/30/17, 3:03 AM, "Tom van den Berge"  wrote:

The consumers are using default settings, which means that
enable.auto.commit=true and auto.commit.interval.ms=5000. I'm not
committing manually; just consuming messages.

On Thu, Nov 30, 2017 at 1:09 AM, Frank Lyaruu  wrote:

> Do you commit the received messages? Either by doing it manually or 
setting
> enable.auto.commit and auto.commit.interval.ms?
>
> On Wed, Nov 29, 2017 at 11:15 PM, Tom van den Berge <
> tom.vandenbe...@gmail.com> wrote:
>
> > I'm using Kafka 0.10.0.
> >
> > I'm reading messages from a single topic (20 partitions), using 4
> consumers
> > (one group), using a standard java consumer with default configuration,
> > except for the key and value deserializer, and a group id; no other
> > settings.
> >
> > We've been experiencing a serious problem a few times now, after a large
> > burst of messages (75000) have been posted to the topic. The consumer 
lag
> > (as reported by Kafka's kafka-consumer-groups.sh) immediately shows a
> huge
> > lag, which is expected. The consumers start processing the messages,
> which
> > is expected to take them at least 30 minutes. In the mean time, more
> > messages are posted to the topic, but at a "normal" rate, which the
> > consumers normally handle easily. The problem is that the reported
> consumer
> > lag is not decreasing at all. After some 30 minutes, it has even
> increased
> > slightly. This would mean that the consumers are not able to process the
> > backlog at all, which is extremely unlikely.
> >
> > After a restart of all consumer applications, something really 
surprising
> > happens: the lag immediately drops to nearly 0! It is technically
> > impossible that the consumers really processed all messages in a matter
> of
> > seconds. Manual verification showed that many messages were not 
processed
> > at all; they seem to have disappeared somehow. So it seems that
> restarting
> > the consumers somehow messed up the offset (I think).
> >
> > On top of that, I noticed that the reported lag shows seemingly
> impossible
> > figures. During the time that the lag was not decreasing, before the
> > restart of the consumers, the "current offset" that was reported for 
some
> > partitions decreased. To my knowledge, that is impossible.
> >
> > Does anyone have an idea on how this could have happened?
> >
>




Re: Offsets in Kafka producer start with -1 for new topic

2017-10-30 Thread Thakrar, Jayesh
No exceptions or dropped messages.
I injected/sent 100 messages and could read back all 100 of them.
Its just that the offset numbers are not as expected for the producer.


On 10/30/17, 2:28 AM, "Manikumar"  wrote:

Any exception in the callback exception field?
may be you can enable client debug logs to check any errors.

On Mon, Oct 30, 2017 at 7:25 AM, Thakrar, Jayesh <
jthak...@conversantmedia.com> wrote:

> I created a new Kafka topic with 1 partition and then sent 10 messages
> using the KafkaProducer API using the async callback.
> What I saw was that the offsets in the RecordMetadata for the first record
> was -1.
> Shouldn't it be 0 as offsets start with 0?
>
> Is it a bug or works as expected?
>
> Thanks,
> Jayesh
>
>




Offsets in Kafka producer start with -1 for new topic

2017-10-29 Thread Thakrar, Jayesh
I created a new Kafka topic with 1 partition and then sent 10 messages using 
the KafkaProducer API using the async callback.
What I saw was that the offsets in the RecordMetadata for the first record was 
-1.
Shouldn't it be 0 as offsets start with 0?

Is it a bug or works as expected?

Thanks,
Jayesh



Re: do i need to restart the brokers if I changed the retention time for a specific topic

2017-08-07 Thread Thakrar, Jayesh
Just to make it clear Haitao, in your case you do not have to restart brokers 
(since you are changing at the topic level).

On 8/6/17, 11:37 PM, "Kaufman Ng"  wrote:

Hi Haitao,

The retention time (retention.ms) configuration can exist as a broker-level
and/or topic-level config. For the latter, changing it does NOT require
broker restarts.


On Sun, Aug 6, 2017 at 10:33 PM, haitao .yao  wrote:

> Hi, I want to change the retention time for a specific topic with the
> following command:
>
> $ bin/kafka-topics.sh --zookeeper zk_address --alter --topic
> test-topic --config retention.ms=8640
>
> Is that correct? Do I have to restart the brokers?
> I read the source code and thought the configuration values about 
retention
> time in LogManager are static:
> https://github.com/apache/kafka/blob/0.10.0/core/src/
> main/scala/kafka/server/KafkaServer.scala#L597-L620
>
> Kafka version: kafka_2.11-0.10.0.1
>
> Thanks
> --
> haitao.yao
>



-- 
Kaufman Ng
+1 646 961 8063
Solutions Architect | Confluent | www.confluent.io




Re: Limit of simultaneous consumers/clients?

2017-07-31 Thread Thakrar, Jayesh
You may want to look at the Kafka REST API instead of having so many direct 
client connections.

https://github.com/confluentinc/kafka-rest



On 7/31/17, 1:29 AM, "Dr. Sven Abels"  wrote:

Hi guys,

does anyone have an idea about the possible limits of concurrent users?

-Ursprüngliche Nachricht-
Von: Dr. Sven Abels [mailto:ab...@ascora.de] 
Gesendet: Freitag, 28. Juli 2017 12:11
An: users@kafka.apache.org
Betreff: Limit of simultaneous consumers/clients?

Hello,

 

we would like to use Kafka as a way to inform users about events of certain
topics. For this purpose, we want to develop Windows and Mac clients which
users would install on their desktop PCs.

 

We got a broad number of users, so it's likely that there will be >10.000
clients running in parallel.

 

If I understand it correctly, then Kafka uses Sockets and the user clients
would maintain an active connection to Kafka. If this is correct, I
wondered:

 

-What is the limit of clients that may run in parallel? Do 10.000 clients
mean 10.000 server connections? Would that be a problem for a typical
server? 

 

-Can we solve this problem by simply running kafka on several servers and
using something like a round-robin for the DNS so that the clients connect
to different servers?

 

-We expect to only send a few messages each day. Messages should arrive
quickly (<30 seconds delay) but we don't need realtime. Considering this: Is
kafka still a good solution or should we better switch to e.g. polling of
clients to the server without Kafka?

 

 

 

Best regards,

 

Sven







Re: Causes for Kafka messages being unexpectedly delivered more than once? The 'exactly once' semantic

2017-04-13 Thread Thakrar, Jayesh
Hi Dmitri,

This presentation might help you understand and take appropriate actions to 
deal with data duplication (and data loss)

https://www.slideshare.net/JayeshThakrar/kafka-68540012

Regards,
Jayesh

On 4/13/17, 10:05 AM, "Vincent Dautremont" 
 wrote:

One of the case where you would get a message more than once is if you get
disconnected / kicked off the consumer group / etc if you fail to commit
offset for messages you have already read.

What I do is that I insert the message in a in-memory cache redis database.
If it fails to insert because of primary key duplication, well that means
I've already received that message in the past.

You could even do an insert of the topic+partition+offset of the message
payload as the insert (instead of the full message) if you know for sure
that your message payload would not be duplicated in the the kafka topic.

Vincent.

On Thu, Apr 13, 2017 at 4:52 PM, Dmitry Goldenberg  wrote:

> Hi all,
>
> I was wondering if someone could list some of the causes which may lead to
> Kafka delivering the same messages more than once.
>
> We've looked around and we see no errors to notice, yet intermittently, we
> see messages being delivered more than once.
>
> Kafka documentation talks about the below delivery modes:
>
>- *At most once*—Messages may be lost but are never redelivered.
>- *At least once*—Messages are never lost but may be redelivered.
>- *Exactly once*—this is what people actually want, each message is
>delivered once and only once.
>
> So the default is 'at least once' and that is what we're running with (we
> don't want to do "at most once" as that appears to yield some potential 
for
> message loss).
>
> We had not seen duplicated deliveries for a while previously but just
> started seeing them quite frequently in our test cluster.
>
> What are some of the possible causes for this?  What are some of the
> available tools for troubleshooting this issue? What are some of the
> possible fixes folks have developed or instrumented for this issue?
>
> Also, is there an effort underway on Kafka side to provide support for the
> "exactly once" semantic?  That is exactly the semantic we want and we're
> wondering how that may be achieved.
>
> Thanks,
> - Dmitry
>

-- 
The information transmitted is intended only for the person or entity to 
which it is addressed and may contain confidential and/or privileged 
material. Any review, retransmission, dissemination or other use of, or 
taking of any action in reliance upon, this information by persons or 
entities other than the intended recipient is prohibited. If you received 
this in error, please contact the sender and delete the material from any 
computer.




Re: programmatic way to check for topic existence?

2016-10-25 Thread Thakrar, Jayesh
Have a look at the Cluster which has a "topic" method to get a set of all the 
topics.

https://kafka.apache.org/0100/javadoc/org/apache/kafka/common/Cluster.html

In version 8/9, there was also the ZKUtils, but the desire is to have clients 
not to interrogate ZK directly.

On 10/24/16, 4:32 PM, "Ben Osheroff"  wrote:

Hiya!

I've been trying to merge https://github.com/zendesk/maxwell/pull/457,
which adds a much-requested feature of Maxwell, that of being able to
have a topic-per-mysql-table.  When we receive a row we programmatically
generate the topic name, and the first thing we do is call
`KafkaProducer#partitionsFor(topic)`, so that we know how to partition
the data.

The problem I'm running into is in trying to detect the case where a
topic doesn't exist.  If auto-creation is on, `partitionsFor()` seems to
correctly auto-create the topic, but if auto-creation is off the
behavior is kinda wonky; kafka goes into a metadata-fetch loop, logging

"Error while fetching metadata with correlation id 573 
:{topic=UNKNOWN_TOPIC_OR_PARTITION}"

but then ultimately throwing me back a `TimeoutException` after 60 tries
or so.

I can rescue/rethrow the TimeoutException, but it seems like there might
be a better way that I'm missing.  Any ideas?  I'd ideally just like a
way to fail fast and clean when the topic doesn't exist (and
auto-creation is off).

Thanks,
Ben Osheroff
zendesk.com







Re: Too Many Open Files

2016-08-01 Thread Thakrar, Jayesh
What are the producers/consumers for the Kafka cluster?
Remember that its not just files but also sockets that add to the count.

I had seen issues when we had a network switch problem and had Storm consumers.
The switch would cause issues in connectivity between Kafka brokers, zookeepers 
and clients, causing a flood of connections from everyone to each other.

On 8/1/16, 7:14 AM, "Scott Thibault"  wrote:

Did you verify that the process has the correct limit applied?
cat /proc//limits

--Scott Thibault


On Sun, Jul 31, 2016 at 4:14 PM, Kessiler Rodrigues 
wrote:

> I’m still experiencing this issue…
>
> Here are the kafka logs.
>
> [2016-07-31 20:10:35,658] ERROR Error while accepting connection
> (kafka.network.Acceptor)
> java.io.IOException: Too many open files
> at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
> at
> 
sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422)
> at
> 
sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250)
> at kafka.network.Acceptor.accept(SocketServer.scala:323)
> at kafka.network.Acceptor.run(SocketServer.scala:268)
> at java.lang.Thread.run(Thread.java:745)
> [2016-07-31 20:10:35,658] ERROR Error while accepting connection
> (kafka.network.Acceptor)
> java.io.IOException: Too many open files
> at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
> at
> 
sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422)
> at
> 
sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250)
> at kafka.network.Acceptor.accept(SocketServer.scala:323)
> at kafka.network.Acceptor.run(SocketServer.scala:268)
> at java.lang.Thread.run(Thread.java:745)
> [2016-07-31 20:10:35,658] ERROR Error while accepting connection
> (kafka.network.Acceptor)
> java.io.IOException: Too many open files
> at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
> at
> 
sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422)
> at
> 
sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250)
> at kafka.network.Acceptor.accept(SocketServer.scala:323)
> at kafka.network.Acceptor.run(SocketServer.scala:268)
> at java.lang.Thread.run(Thread.java:745)
>
> My ulimit is 1 million, how is that possible?
>
> Can someone help with this?
>
>
> > On Jul 30, 2016, at 5:05 AM, Kessiler Rodrigues 
> wrote:
> >
> > I have changed it a bit.
> >
> > I have 10 brokers and 20k topics with 1 partition each.
> >
> > I looked at the kaka’s logs dir and I only have 3318 files.
> >
> > I’m doing some tests to see how many topics/partitions I can have, but
> it is throwing too many files once it hits 15k topics..
> >
> > Any thoughts?
> >
> >
> >
> >> On Jul 29, 2016, at 10:33 PM, Gwen Shapira  wrote:
> >>
> >> woah, it looks like you have 15,000 replicas per broker?
> >>
> >> You can go into the directory you configured for kafka's log.dir and
> >> see how many files you have there. Depending on your segment size and
> >> retention policy, you could have hundreds of files per partition
> >> there...
> >>
> >> Make sure you have at least that many file handles and then also add
> >> handles for the client connections.
> >>
> >> 1 million file handles sound like a lot, but you are running lots of
> >> partitions per broker...
> >>
> >> We normally don't see more than maybe 4000 per broker and most
> >> clusters have a lot fewer, so consider adding brokers and spreading
> >> partitions around a bit.
> >>
> >> Gwen
> >>
> >> On Fri, Jul 29, 2016 at 12:00 PM, Kessiler Rodrigues
> >>  wrote:
> >>> Hi guys,
> >>>
> >>> I have been experiencing some issues on kafka, where its throwing too
> many open files.
> >>>
> >>> I have around of 6k topics and 5 partitions each.
> >>>
> >>> My cluster was made with 6 brokers. All of them are running Ubuntu 16
> and the file limits settings are:
> >>>
> >>> `cat  /proc/sys/fs/file-max`
> >>> 200
> >>>
> >>> `ulimit -n`
> >>> 100
> >>>
> >>> Anyone has experienced it before?
> >
>
>


-- 
*This e-mail is not encrypted.  Due to the unsecured nature of unencrypted
e-mail, there may be some level of risk that the information in this e-mail
could be read by a third party.  Accordingly, the recipient(s) named above
are hereby advised to not communicate protected health information using
this e-mail address.  If you desire to send protected health informatio

RE: Last offset in all partitions

2016-07-06 Thread Thakrar, Jayesh
Checkout  the Consumer API 

http://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

and search for the method "seekToEnd"

Here's the "text" from the API Doc -

seekToEnd
public void seekToEnd(Collection partitions)
Seek to the last offset for each of the given partitions. This function 
evaluates lazily, seeking to the final offset in all partitions only when 
poll(long) or position(TopicPartition) are called. If no partition is provided, 
seek to the final offset for all of the currently assigned partitions.
Specified by:
seekToEnd in interface Consumer
See Also:
seekToEnd(Collection)


-Original Message-
From: Todd Palino [mailto:tpal...@gmail.com] 
Sent: Wednesday, July 06, 2016 10:36 AM
To: users@kafka.apache.org
Subject: Re: Last offset in all partitions

We do this through our monitoring agents by pulling it as a metric from the 
LogEndOffset beans. By putting it into our metrics system we get a mapping of 
timestamp to offset for every partition with (currently) 60 second granularity. 
Useful for offset resets and other tasks.

-Todd

On Wednesday, July 6, 2016, Kristoffer Sjögren  wrote:

> Hi
>
> Is there a way to get the last offset written by all partitions of a 
> topic programmatically using the 0.10.0.0 API?
>
> At the moment I use KafkaConsumer.seekToEnd as seen in this gist[1] 
> but maybe there is a better, more efficient, way to do it?
>
> Cheers,
> -Kristoffer
>
> [1] 
> https://gist.github.com/krisskross/a49e462bedb89505e372672cd81129ab
>


--
*Todd Palino*
Staff Site Reliability Engineer
Data Infrastructure Streaming



linkedin.com/in/toddpalino




This email and any files included with it may contain privileged,
proprietary and/or confidential information that is for the sole use
of the intended recipient(s).  Any disclosure, copying, distribution,
posting, or use of the information contained in or attached to this
email is prohibited unless permitted by the sender.  If you have
received this email in error, please immediately notify the sender
via return email, telephone, or fax and destroy this original transmission
and its included files without reading or saving it in any manner.
Thank you.


RE: Intermittent runtime exception: broker already registered

2016-06-17 Thread Thakrar, Jayesh
My guess is that say your broker went down and you restarted it.
That time interval between shutdown/crash and the restart was shorter than the 
ZK node's ephemeral timeout value.

Once that time is over, your node disappears from Zookeeper, the broker is able 
to recreate the znode and hence the success.

In short, I would give about 3-30 seconds (you will need to check your 
settings) before doing a restart.

-Original Message-
From: Nomar Morado [mailto:nomar.mor...@gmail.com] 
Sent: Friday, June 17, 2016 6:15 AM
To: users@kafka.apache.org
Subject: Intermittent runtime exception: broker already registered

I am using Kafka 0.9.0.1 with ZK 3.5.0-alpha

I am seeing this error intermittently which goes away after several reboots.

Any ideas?

Sent from my iPad



RE: Problematic messages in Kafka

2016-06-02 Thread Thakrar, Jayesh
Thanks for the quick reply Danny.

The message size as per the DumpLogSegments is around 59KB

I used a very high message.max.size and a high fetchsize of 1 MB (that's the 
message.max.size in the broker) and still the same hang behavior.
Also tried a max-wait-ms so that the consumer does not "hang" - but still the 
same result.

Here's what I used -

kafka-simple-consumer-shell.sh --broker-list $HOSTNAME:9092 --fetchsize 100 
--max-messages 10-max-wait-ms 1 --offset 7207844650  --partition 0 
--print-offsets --topic RtbBid --property message.max.size=100


-Original Message-
From: Danny Bahir [mailto:dannyba...@gmail.com] 
Sent: Thursday, June 02, 2016 10:06 PM
To: users@kafka.apache.org
Subject: Re: Problematic messages in Kafka

quoting from https://cwiki.apache.org/confluence/display/KAFKA/FAQ

The high-level consumer will block if
the next message available is larger than the maximum fetch size you have 
specified

   - One possibility of a stalled consumer is that the fetch size in the
   consumer is smaller than the largest message in the broker. You can use the
   DumpLogSegments tool to figure out the largest message size and set
   fetch.size in the consumer config accordingly.


On Thu, Jun 2, 2016 at 3:41 PM, Thakrar, Jayesh < jthak...@conversantmedia.com> 
wrote:

> Wondering if anyone has encountered similar issues.
>
> Using Kafka 0.8.2.1.
>
> Occasionally, we encounter a situation in which a consumer (including
> kafka-console-consumer.sh) just hangs.
> If I increment the offset to skip the offending message, things work 
> fine again.
>
> I have been able to identify the message offset and the data file (log
> file) containing the message.
>
> However, using kafka.tools.DumpLogSegments, I can dump the message 
> using commands like this -
>
> /usr/hdp/current/kafka-broker/bin/kafka-run-class.sh
> kafka.tools.DumpLogSegments --files 007207840027.log 
> --deep-iteration
>
> /usr/hdp/current/kafka-broker/bin/kafka-run-class.sh
> kafka.tools.DumpLogSegments --files 007207840027.log 
> --print-data-log --deep-iteration
>
> From the DumLogSegments program, here's the checksum info that I get -
> offset: 7207844652 position: 398291668 isvalid: true payloadsize: 
> 59041
> magic: 0 compresscodec: NoCompressionCodec crc: 186430976 keysize: 12
>
> So it looks like the message is ok, since there's also a CRC checksum.
> Has anyone encountered such an issue?
> Is there any explanation or reason for the broker behavior?
> I have the data/log file saved if there is any troubleshooting that 
> can be done.
>
> When the broker reads the message and it seems to hang forever, I have 
> to kill the console-consumer or our application consumer.
>
> When I do that, here's what I see in the broker's log file
>
> [2016-06-02 15:50:45,117] INFO Closing socket connection to / 
> 10.110.102.113. (kafka.network.Processor)
> [2016-06-02 15:50:45,139] INFO Closing socket connection to / 
> 10.110.102.113. (kafka.network.Processor)
> [2016-06-02 15:50:49,142] ERROR Closing socket for /10.110.100.46 
> because of error (kafka.network.Processor)
> java.io.IOException: Connection reset by peer
> at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
> at sun.nio.ch.IOUtil.read(IOUtil.java:197)
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
> at kafka.utils.Utils$.read(Utils.scala:375)
> at
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> at kafka.network.Processor.read(SocketServer.scala:347)
> at kafka.network.Processor.run(SocketServer.scala:245)
> at java.lang.Thread.run(Thread.java:724)
> [2016-06-02 15:50:49,936] INFO Closing socket connection to / 
> 10.110.105.134. (kafka.network.Processor)
> [2016-06-02 15:50:51,591] INFO Closing socket connection to / 
> 10.110.102.113. (kafka.network.Processor)
> [2016-06-02 15:50:51,699] INFO Closing socket connection to / 
> 10.110.102.113. (kafka.network.Processor)
>
>
>
>
>
>
> This email and any files included with it may contain privileged, 
> proprietary and/or confidential information that is for the sole use 
> of the intended recipient(s).  Any disclosure, copying, distribution, 
> posting, or use of the information contained in or attached to this 
> email is prohibited unless permitted by the sender.  If you have 
> received this email in error, please immediately notify the sender via 
> return email, telephone, or fax and destroy this original transmission 
> and its included files without readi

Problematic messages in Kafka

2016-06-02 Thread Thakrar, Jayesh
Wondering if anyone has encountered similar issues.

Using Kafka 0.8.2.1.

Occasionally, we encounter a situation in which a consumer (including 
kafka-console-consumer.sh) just hangs.
If I increment the offset to skip the offending message, things work fine again.

I have been able to identify the message offset and the data file (log file) 
containing the message.

However, using kafka.tools.DumpLogSegments, I can dump the message using 
commands like this -

/usr/hdp/current/kafka-broker/bin/kafka-run-class.sh 
kafka.tools.DumpLogSegments --files 007207840027.log --deep-iteration

/usr/hdp/current/kafka-broker/bin/kafka-run-class.sh 
kafka.tools.DumpLogSegments --files 007207840027.log --print-data-log 
--deep-iteration

>From the DumLogSegments program, here's the checksum info that I get -
offset: 7207844652 position: 398291668 isvalid: true payloadsize: 59041 magic: 
0 compresscodec: NoCompressionCodec crc: 186430976 keysize: 12

So it looks like the message is ok, since there's also a CRC checksum.
Has anyone encountered such an issue?
Is there any explanation or reason for the broker behavior?
I have the data/log file saved if there is any troubleshooting that can be done.

When the broker reads the message and it seems to hang forever, I have to kill 
the console-consumer or our application consumer.

When I do that, here's what I see in the broker's log file

[2016-06-02 15:50:45,117] INFO Closing socket connection to /10.110.102.113. 
(kafka.network.Processor)
[2016-06-02 15:50:45,139] INFO Closing socket connection to /10.110.102.113. 
(kafka.network.Processor)
[2016-06-02 15:50:49,142] ERROR Closing socket for /10.110.100.46 because of 
error (kafka.network.Processor)
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:197)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
at kafka.utils.Utils$.read(Utils.scala:375)
at 
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Processor.read(SocketServer.scala:347)
at kafka.network.Processor.run(SocketServer.scala:245)
at java.lang.Thread.run(Thread.java:724)
[2016-06-02 15:50:49,936] INFO Closing socket connection to /10.110.105.134. 
(kafka.network.Processor)
[2016-06-02 15:50:51,591] INFO Closing socket connection to /10.110.102.113. 
(kafka.network.Processor)
[2016-06-02 15:50:51,699] INFO Closing socket connection to /10.110.102.113. 
(kafka.network.Processor)






This email and any files included with it may contain privileged,
proprietary and/or confidential information that is for the sole use
of the intended recipient(s).  Any disclosure, copying, distribution,
posting, or use of the information contained in or attached to this
email is prohibited unless permitted by the sender.  If you have
received this email in error, please immediately notify the sender
via return email, telephone, or fax and destroy this original transmission
and its included files without reading or saving it in any manner.
Thank you.


RE: Rebalancing issue while Kafka scaling

2016-06-01 Thread Thakrar, Jayesh
Hi Hafsa,

Not sure by what you mean "most tolerant cluster".
If you mean that you want the cluster to be able to tolerate 9 of 10 servers to 
be down, then yes.
But I would question - is your traffic activity, system load and storage 
requirement so low that it can be served by a single server?
If so, then having a 10-server cluster is an over-kill.
Note that the more replicas a partition has higher is the network traffic and 
load on the leader partition broker (server) as 
1) all producers send data to that broker for that partition
2) all consumers read data from that broker for that partition
3) and all non-leader replica partition brokers replicate data from that 
partition.
So determining replica is balance and "traditionally" both in HDFS and Kafka, 3 
seems to be magic number.

As for the number of partitions (not replicas), its good to have as a multiple 
of the number of brokers/servers - but again there are several things to 
consider - e.g. storage requirements, expected load (network, I/O, etc.) across 
each topic and across the cluster as a whole.
One rule of thumb (again a magic number) that I heard is that its good to have 
the number of partitions that result in total storage for the partition to be 
around 20-50 GB. 

There are a couple of good articles out there on Kafka cluster design - e.g. 
http://morebigdata.blogspot.com/2015/10/tips-for-successful-kafka-deployments.html

Hope that helps.
Jayesh


-Original Message-
From: Hafsa Asif [mailto:hafsa.a...@matchinguu.com] 
Sent: Wednesday, June 01, 2016 7:05 AM
To: users@kafka.apache.org
Cc: Spico Florin 
Subject: Re: Rebalancing issue while Kafka scaling

Just for more info:
If I have 10 servers in a cluster, so for the most tolerant cluster, do we need 
replication-factor = 10?
That is also the issue for rebalancing the scaling of kafka cluster, that when 
we need to add server in a cluster then we also need to increase partitions in 
topics as well ?

Best,
Hafsa

2016-06-01 13:55 GMT+02:00 Ben Stopford :

> Pretty much. It’s not actually related to zookeeper.
>
> Generalising a bit, replication factor 2 means Kafka can lose 1 
> machine and be ok.
>
> B
> > On 1 Jun 2016, at 12:46, Hafsa Asif  wrote:
> >
> > So, it means that I should create topics with at least
> replication-factor=2
> > inspite of how many servers in a kafka cluster. If any server goes 
> > down
> or
> > slows down then zookeeper will not go out-of-sync.
> > Currently, my all topics are with eplication-factor= 1 and I got an 
> > issue that Zookeeper goes out of sync. So, increasing 
> > replication-factor will solve the issue?
> >
> > Hafsa
> >
> > 2016-06-01 12:57 GMT+02:00 Ben Stopford :
> >
> >> Hi Hafa
> >>
> >> If you create a topic with replication-factor = 2, you can lose one 
> >> of them without losing data, so long as they were "in sync". 
> >> Replicas can
> fall
> >> out of sync if one of the machines runs slow. The system tracks in 
> >> sync replicas. These are exposed by JMX too. Check out the docs on
> replication
> >> for more details:
> >>
> >> http://kafka.apache.org/090/documentation.html#replication < 
> >> http://kafka.apache.org/090/documentation.html#replication>
> >>
> >> B
> >>
> >>> On 1 Jun 2016, at 10:45, Hafsa Asif  wrote:
> >>>
> >>> Hello Jayesh,
> >>>
> >>> Thank you very much for such a good description. My further 
> >>> questions
> are
> >>> (just to be my self clear about the concept).
> >>>
> >>> 1. If I have only one partition in a 'Topic' in a Kafka with 
> >>> following configuration, bin/kafka-topics.sh --create --zookeeper 
> >>> localhost:2181 --replication-factor 1 --partitions 1 --topic 
> >>> mytopic1 Then still I need to rebalance topic partitions while 
> >>> node
> >> adding/removing
> >>> in Kafka cluster?
> >>>
> >>> 2. What is the actual meaning of this line 'if all your topics 
> >>> have
> >> atleast
> >>> 2 insync replicas'. My understanding is that, I need to create 
> >>> replica
> of
> >>> each topic in each server. e.g: I have two servers in a Kafka 
> >>> cluster
> >> then
> >>> I need to create topic 'mytopic1' in both servers. It helps to get 
> >>> rid
> of
> >>> any problem while removing any of the server.
> >>>
> >>> I will look in detail into your provided link. Many thanks for this.
> >>>
> >>> Looking forward for the a

RE: Rebalancing issue while Kafka scaling

2016-05-31 Thread Thakrar, Jayesh
Hafsa, Florin

First thing first, it is possible to scale a Kafka cluster up or down (i.e. 
add/remove servers).
And as has been noted in this thread, after you add a server to a cluster, you 
need to rebalance the topic partitions in order to put the newly added server 
into use.
And similarly, before you remove a server, it is advised that you drain off the 
data from the server to be removed (its not a hard requirement, if all your 
topics have atleast 2 insync replicas, including the server being removed and 
you intend to rebalance after server removal).

However, "automating" the rebalancing of topic partitions is not trivial.

There is a KIP out there to help with the rebalancing , but lacks details - 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-6+-+New+reassignment+partition+logic+for+rebalancing
My guess is due to its non-trivial nature AND the number of cases one needs to 
take care of - e.g. scaling up by 5% v/s scaling up by 50% in say, a 20 node 
cluster.
Furthermore, to be really effective, one needs to be cognizant of the partition 
sizes, and with rack-awareness, the task becomes even more involved.

Regards,
Jayesh

-Original Message-
From: Spico Florin [mailto:spicoflo...@gmail.com] 
Sent: Tuesday, May 31, 2016 9:44 AM
To: users@kafka.apache.org
Subject: Re: Rebalancing issue while Kafka scaling

Hi!
  What version of Kafka you are using? What do you mean by "Kafka needs 
rebalacing?" Rebalancing of what? Can you please be more specific.

Regards,
 Florin



On Tue, May 31, 2016 at 4:58 PM, Hafsa Asif 
wrote:

> Hello Folks,
>
> Today , my team members shows concern that whenever we increase node 
> in Kafka cluster, Kafka needs rebalancing. The rebalancing is sort of 
> manual and not-good step whenever scaling happens. Second, if Kafka 
> scales up then it cannot be scale down. Please provide us proper 
> guidance over this issue, may be we have not enough configuration properties.
>
> Hafsa
>




This email and any files included with it may contain privileged,
proprietary and/or confidential information that is for the sole use
of the intended recipient(s).  Any disclosure, copying, distribution,
posting, or use of the information contained in or attached to this
email is prohibited unless permitted by the sender.  If you have
received this email in error, please immediately notify the sender
via return email, telephone, or fax and destroy this original transmission
and its included files without reading or saving it in any manner.
Thank you.


RE: [DISCUSS] KIP-59 - Proposal for a kafka broker command - kafka-brokers.sh

2016-05-11 Thread Thakrar, Jayesh
Thanks Gwen - yes, I agree - let me work on it, make it available on github and 
then I guess we can go from there.

Thanks,
Jayesh


-Original Message-
From: Gwen Shapira [mailto:g...@confluent.io] 
Sent: Wednesday, May 11, 2016 12:26 PM
To: d...@kafka.apache.org; Jayesh Thakrar 
Cc: Users 
Subject: Re: [DISCUSS] KIP-59 - Proposal for a kafka broker command - 
kafka-brokers.sh

Hello Jayesh,

Thank you for the suggestion. I like the proposal and the new tool seems useful.

Do you already have the tool available in a github repository?

If you don't, then this would be a good place to start - there are many Kafka 
utilities in github repositories (Yahoo's Kafka Manager as a famous example). 
This way users can evaluate the usefulness of the new tool before we include it 
in core-kafka.

Gwen

On Mon, May 9, 2016 at 8:50 PM, Jayesh Thakrar  
wrote:
> Hi All,
>
>
>
> This is to start off a discussion on the above KIP at 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-59%3A+Proposal+f
> or+a+kafka+broker+command The proposal is to fill the void of a 
> command line tool/utility that can provide information on the cluster and 
> brokers in a Kafka cluster.
> Thank you,Jayesh Thakrar
>
>





This email and any files included with it may contain privileged,
proprietary and/or confidential information that is for the sole use
of the intended recipient(s).  Any disclosure, copying, distribution,
posting, or use of the information contained in or attached to this
email is prohibited unless permitted by the sender.  If you have
received this email in error, please immediately notify the sender
via return email, telephone, or fax and destroy this original transmission
and its included files without reading or saving it in any manner.
Thank you.