Re: Measure latency from Source to Sink
Below are some examples. They might be old, but the concept is the same - essentially you are implementing an interface. You have to be conscious of not to do heavy or high latency processing to be non-impactful to the data flow. https://www.ashishpaliwal.com/blog/2013/06/flume-cookbook-implementing-custom-interceptors/ https://medium.com/@bkvarda/building-a-custom-flume-interceptor-8c7a55070038 From: antonio saldivar Date: Friday, July 20, 2018 at 9:57 AM To: "Thakrar, Jayesh" Cc: "users@kafka.apache.org" Subject: Re: Measure latency from Source to Sink Hi Actually I am running the app I a single node this is for a POC, I was not aware of the custom interceptors do you have an example of this? Best Regards El vie., 20 jul. 2018 a las 9:20, Thakrar, Jayesh (mailto:jthak...@conversantmedia.com>>) escribió: See if you can use a custom interceptors for this. The only fuzzy thing is that the clocks would be different so I would be a little skeptical of its accuracy. I have heard of some companies who have a special topic in which they insert test msgs and then read them back - using the same machine for producer and consumer, thereby doing away with this dilemma. The assumption is that any delay in the cluster would be felt everywhere. Another option is to create topics with one replica on all brokers (i.e. # of topics = # of brokers) - so you can be sensitive to a single broker having an issue. On 7/19/18, 3:09 PM, "antonio saldivar" mailto:ansal...@gmail.com>> wrote: Hello I am developing an application using kafka and flink, I need to be able to measure the latency from the producer and when it comes out to the sink. I can append the timestamp in Millisecond when I send the trxn from the producer but at the end how to append the timestamp when it comes out the sink. Someone can help me with an example Thank you Best Regards
Re: Measure latency from Source to Sink
See if you can use a custom interceptors for this. The only fuzzy thing is that the clocks would be different so I would be a little skeptical of its accuracy. I have heard of some companies who have a special topic in which they insert test msgs and then read them back - using the same machine for producer and consumer, thereby doing away with this dilemma. The assumption is that any delay in the cluster would be felt everywhere. Another option is to create topics with one replica on all brokers (i.e. # of topics = # of brokers) - so you can be sensitive to a single broker having an issue. On 7/19/18, 3:09 PM, "antonio saldivar" wrote: Hello I am developing an application using kafka and flink, I need to be able to measure the latency from the producer and when it comes out to the sink. I can append the timestamp in Millisecond when I send the trxn from the producer but at the end how to append the timestamp when it comes out the sink. Someone can help me with an example Thank you Best Regards
Re: If timeout is short, the first poll does not return records
While this does not answer your question, I believe during the first call, a lot of things happen - e.g. get admin and metadata info about the cluster, etc. That takes "some time" and hence the poll interval that is acceptable/norm for regular processing may not be sufficient for initialization AND I believe also during periodic metadata update/refresh. On 7/18/18, 3:41 AM, "jingguo yao" wrote: If the timeout is short say 100, the first poll does not return records for my case. Jay Kreps gave an explanation on [1]. I think that this behaviour for poll is counterintuitive, it will make Kafka user's life much easier if this behaviour is documented in [2]. [1] http://grokbase.com/p/kafka/users/155mqpwf3n/consumer-poll-returns-no-records-unless-called-more-than-once-why [2] http://kafka.apache.org/11/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll-long-
Re: Facing Duplication Issue in kakfa
For more details, see https://www.slideshare.net/JayeshThakrar/kafka-68540012 While this is based on Kafka 0.9, the fundamental concepts and reasons are still valid. On 5/28/18, 12:20 PM, "Hans Jespersen" wrote: Are you seeing 1) duplicate messages stored in a Kafka topic partition or 2) duplicate consumption and processing of a single message stored in a Kafka topic? If it’s #1 then you can turn on the idempotent producer feature to get Exactly Once Semantics (EOS) while publishing. If it’s #2 then you can examine more closely how your consumer is doing offset commits. If you are committing offsets automatically by time then there is always a possibility that the last time window of messages your consumer did not yet commit will be received again when the consumer restarts. You can instead manually commit, possibly even after each message which will shrink the window of possible duplicate messages to 1, but at the cost of some performance. What many of the Kafka Sink Connectors do for exactly once processing is to store their offsets atomically with the data they write external to Kafka. For example a database connector would write the message data and the offsets to a database in one atomic write operation. Upon restart of the app it then rereads the offset from the database and resumes consumption from Kafka from the last offset point using seek() to reposition the Kafka offset for the consumer before the first call to poll() These are the techniques most people use to get end to end exactly once processing with no duplicates even in the event of a failure. -hans > On May 28, 2018, at 12:17 AM, Karthick Kumar wrote: > > Hi, > > Facing Duplication inconsistently while bouncing Kafka producer and > consumer in tomcat node. any help will be appreciated to find out the root > cause. > > -- > With Regards, > Karthick.K
Re: Kafka Setup for Daily counts on wide array of keys
Sorry Matt, I don’t have much idea about Kafka streaming (or any streaming for that matter). As for saving counts from your application servers to Aerospike directly, that is certain simpler, requiring less hardware, resources and development effort. One reason some people use Kafka as part of their pipeline is to decouple systems and protect either end from issues in the other. It usually makes maintenance on either end simple. Furthermore, it acts as a dampening buffer and because of Kafka's low latency and high throughput(well, that's a relative term), allows the producers and consumers run at their full potential (kind of, but not exactly async push and pull of data). It might even be worthwhile to start off without Kafka and once you understand things better introduce Kafka later on. From: Matt Daum Date: Monday, March 5, 2018 at 4:33 PM To: "Thakrar, Jayesh" Cc: "users@kafka.apache.org" Subject: Re: Kafka Setup for Daily counts on wide array of keys And not to overthink this, but as I'm new to Kafka and streams I want to make sure that it makes the most sense to for my use case. With the streams and grouping, it looks like I'd be getting at 1 internal topic created per grouped stream which then would written and reread then totaled in the count, then would have that produce a final stream which is then consumed/sinked to an external db. Is that correct? Overall I'm not using the streaming counts as it grows throughout the day, but just want a final end of day count. We already have an Aerospike cluster setup for the applications themselves. If each application server itself made the writes to the Aerospike DB cluster to simply increase the counts for each attribute then at end of day read it out there it appears it'd be less computing resources used. As we'd effectively be doing inbound request -> DB write per counted attribute. I am not saying that is the better route as I'm I don't fully know or understand the full capabilities of Kafka. Since we aren't streaming the data, enriching it, etc. would the direct to DB counts be a better approach? I just want to make sure I use the best tool for the job.Let me know what other factors I may be underestimating/misunderstanding on the Kafka approach please. I want to be informed as possible before going down either path too far. Thank you again for your time, Matt On Mon, Mar 5, 2018 at 3:14 PM, Thakrar, Jayesh mailto:jthak...@conversantmedia.com>> wrote: Yep, exactly. So there is some buffering that you need to do in your client and also deal with edge cases. E.g. how long should you hold on to a batch before you send a smaller batch to producer since you want a balance between batch optimization and expedience. You may need to do some experiments to balance between system throughput, record size, batch size and potential batching delay for a given rate of incoming requests. From: Matt Daum mailto:m...@setfive.com>> Date: Monday, March 5, 2018 at 1:59 PM To: "Thakrar, Jayesh" mailto:jthak...@conversantmedia.com>> Cc: "users@kafka.apache.org<mailto:users@kafka.apache.org>" mailto:users@kafka.apache.org>> Subject: Re: Kafka Setup for Daily counts on wide array of keys Ah good call, so you are really having an AVRO wrapper around your single class right? IE an array of records, correct? Then when you hit a size you are happy you send it to the producer? On Mon, Mar 5, 2018 at 12:07 PM, Thakrar, Jayesh mailto:jthak...@conversantmedia.com>> wrote: Good luck on your test! As for the batching within Avro and by Kafka Producer, here are my thoughts without any empirical proof. There is a certain amount of overhead in terms of execution AND bytes in converting a request record into Avro and producing (generating) a Kafka message out of it. For requests of size 100-200 bytes, that can be a substantial amount - especially the fact that you will be bundling the Avro schema for each request in its Kafka message. By batching the requests, you are significantly amortizing that overhead across many rows. From: Matt Daum mailto:m...@setfive.com>> Date: Monday, March 5, 2018 at 5:54 AM To: "Thakrar, Jayesh" mailto:jthak...@conversantmedia.com>> Cc: "users@kafka.apache.org<mailto:users@kafka.apache.org>" mailto:users@kafka.apache.org>> Subject: Re: Kafka Setup for Daily counts on wide array of keys Thanks for the suggestions! It does look like it's using local RocksDB stores for the state info by default. Will look into using an external one. As for the "millions of different values per grouped attribute" an example would be assume on each requests there is a parameters "X" which at the end of each day I want to know the counts per unique value, it could have 100's of millions of possi
Re: Kafka Setup for Daily counts on wide array of keys
Yep, exactly. So there is some buffering that you need to do in your client and also deal with edge cases. E.g. how long should you hold on to a batch before you send a smaller batch to producer since you want a balance between batch optimization and expedience. You may need to do some experiments to balance between system throughput, record size, batch size and potential batching delay for a given rate of incoming requests. From: Matt Daum Date: Monday, March 5, 2018 at 1:59 PM To: "Thakrar, Jayesh" Cc: "users@kafka.apache.org" Subject: Re: Kafka Setup for Daily counts on wide array of keys Ah good call, so you are really having an AVRO wrapper around your single class right? IE an array of records, correct? Then when you hit a size you are happy you send it to the producer? On Mon, Mar 5, 2018 at 12:07 PM, Thakrar, Jayesh mailto:jthak...@conversantmedia.com>> wrote: Good luck on your test! As for the batching within Avro and by Kafka Producer, here are my thoughts without any empirical proof. There is a certain amount of overhead in terms of execution AND bytes in converting a request record into Avro and producing (generating) a Kafka message out of it. For requests of size 100-200 bytes, that can be a substantial amount - especially the fact that you will be bundling the Avro schema for each request in its Kafka message. By batching the requests, you are significantly amortizing that overhead across many rows. From: Matt Daum mailto:m...@setfive.com>> Date: Monday, March 5, 2018 at 5:54 AM To: "Thakrar, Jayesh" mailto:jthak...@conversantmedia.com>> Cc: "users@kafka.apache.org<mailto:users@kafka.apache.org>" mailto:users@kafka.apache.org>> Subject: Re: Kafka Setup for Daily counts on wide array of keys Thanks for the suggestions! It does look like it's using local RocksDB stores for the state info by default. Will look into using an external one. As for the "millions of different values per grouped attribute" an example would be assume on each requests there is a parameters "X" which at the end of each day I want to know the counts per unique value, it could have 100's of millions of possible values. I'll start to hopefully work this week on an initial test of everything and will report back. A few last questions if you have the time: - For the batching of the AVRO files, would this be different than the Producer batching? - Any other things you'd suggest looking out for as gotcha's or configurations that probably will be good to tweak further? Thanks! Matt On Sun, Mar 4, 2018 at 11:23 PM, Thakrar, Jayesh mailto:jthak...@conversantmedia.com>> wrote: BTW - I did not mean to rule-out Aerospike as a possible datastore. Its just that I am not familiar with it, but surely looks like a good candidate to store the raw and/or aggregated data, given that it also has a Kafka Connect module. From: "Thakrar, Jayesh" mailto:jthak...@conversantmedia.com>> Date: Sunday, March 4, 2018 at 9:25 PM To: Matt Daum mailto:m...@setfive.com>> Cc: "users@kafka.apache.org<mailto:users@kafka.apache.org>" mailto:users@kafka.apache.org>> Subject: Re: Kafka Setup for Daily counts on wide array of keys I don’t have any experience/knowledge on the Kafka inbuilt datastore, but believe thatfor some portions of streaming Kafka uses (used?) RocksDB to locally store some state info in the brokers. Personally I would use an external datastore. There's a wide choice out there - regular key-value stores like Cassandra, ScyllaDB, RocksDB, timeseries key-value stores like InfluxDB to regular RDBMSes. If you have hadoop in the picture, its even possible to bypass a datastore completely (if appropriate) and store the raw data on HDFS organized by (say) date+hour by using periodic (minute to hourly) extract jobs and store data in hive-compatible directory structure using ORC or Parquet. The reason for shying away from NoSQL datastores is their tendency to do compaction on data which leads to unnecessary reads and writes (referred to as write-amplification). With periodic jobs in Hadoop, you (usually) write your data once only. Ofcourse with that approach you loose the "random/keyed access" to the data, but if you are only interested in the aggregations across various dimensions, those can be stored in a SQL/NoSQL datastore. As for "having millions of different values per grouped attribute" - not sure what you mean by them. Is it that each record has some fields that represent different kinds of attributes and that their domain can have millions to hundreds of millions of values? I don't think that should matter. From: Matt Daum mailto:m...@setfive.com>> Date: Sunday, March 4, 2018 at 2:39 PM To: "Thakrar, Jayesh" mailto:jthak...@conversantmedia.com>> Cc: "us
Re: Kafka Setup for Daily counts on wide array of keys
Good luck on your test! As for the batching within Avro and by Kafka Producer, here are my thoughts without any empirical proof. There is a certain amount of overhead in terms of execution AND bytes in converting a request record into Avro and producing (generating) a Kafka message out of it. For requests of size 100-200 bytes, that can be a substantial amount - especially the fact that you will be bundling the Avro schema for each request in its Kafka message. By batching the requests, you are significantly amortizing that overhead across many rows. From: Matt Daum Date: Monday, March 5, 2018 at 5:54 AM To: "Thakrar, Jayesh" Cc: "users@kafka.apache.org" Subject: Re: Kafka Setup for Daily counts on wide array of keys Thanks for the suggestions! It does look like it's using local RocksDB stores for the state info by default. Will look into using an external one. As for the "millions of different values per grouped attribute" an example would be assume on each requests there is a parameters "X" which at the end of each day I want to know the counts per unique value, it could have 100's of millions of possible values. I'll start to hopefully work this week on an initial test of everything and will report back. A few last questions if you have the time: - For the batching of the AVRO files, would this be different than the Producer batching? - Any other things you'd suggest looking out for as gotcha's or configurations that probably will be good to tweak further? Thanks! Matt On Sun, Mar 4, 2018 at 11:23 PM, Thakrar, Jayesh mailto:jthak...@conversantmedia.com>> wrote: BTW - I did not mean to rule-out Aerospike as a possible datastore. Its just that I am not familiar with it, but surely looks like a good candidate to store the raw and/or aggregated data, given that it also has a Kafka Connect module. From: "Thakrar, Jayesh" mailto:jthak...@conversantmedia.com>> Date: Sunday, March 4, 2018 at 9:25 PM To: Matt Daum mailto:m...@setfive.com>> Cc: "users@kafka.apache.org<mailto:users@kafka.apache.org>" mailto:users@kafka.apache.org>> Subject: Re: Kafka Setup for Daily counts on wide array of keys I don’t have any experience/knowledge on the Kafka inbuilt datastore, but believe thatfor some portions of streaming Kafka uses (used?) RocksDB to locally store some state info in the brokers. Personally I would use an external datastore. There's a wide choice out there - regular key-value stores like Cassandra, ScyllaDB, RocksDB, timeseries key-value stores like InfluxDB to regular RDBMSes. If you have hadoop in the picture, its even possible to bypass a datastore completely (if appropriate) and store the raw data on HDFS organized by (say) date+hour by using periodic (minute to hourly) extract jobs and store data in hive-compatible directory structure using ORC or Parquet. The reason for shying away from NoSQL datastores is their tendency to do compaction on data which leads to unnecessary reads and writes (referred to as write-amplification). With periodic jobs in Hadoop, you (usually) write your data once only. Ofcourse with that approach you loose the "random/keyed access" to the data, but if you are only interested in the aggregations across various dimensions, those can be stored in a SQL/NoSQL datastore. As for "having millions of different values per grouped attribute" - not sure what you mean by them. Is it that each record has some fields that represent different kinds of attributes and that their domain can have millions to hundreds of millions of values? I don't think that should matter. From: Matt Daum mailto:m...@setfive.com>> Date: Sunday, March 4, 2018 at 2:39 PM To: "Thakrar, Jayesh" mailto:jthak...@conversantmedia.com>> Cc: "users@kafka.apache.org<mailto:users@kafka.apache.org>" mailto:users@kafka.apache.org>> Subject: Re: Kafka Setup for Daily counts on wide array of keys Thanks! For the counts I'd need to use a global table to make sure it's across all the data right? Also having millions of different values per grouped attribute will scale ok? On Mar 4, 2018 8:45 AM, "Thakrar, Jayesh" mailto:jthak...@conversantmedia.com>> wrote: Yes, that's the general design pattern. Another thing to look into is to compress the data. Now Kafka consumer/producer can already do it for you, but we choose to compress in the applications due to a historic issue that drgraded performance, although it has been resolved now. Also, just keep in mind that while you do your batching, kafka producer also tries to batch msgs to Kafka, and you will need to ensure you have enough buffer memory. However that's all configurable. Finally ensure you have the latest java updates and have kafka 0.10.2 or higher. Jayesh _
Re: Kafka Setup for Daily counts on wide array of keys
BTW - I did not mean to rule-out Aerospike as a possible datastore. Its just that I am not familiar with it, but surely looks like a good candidate to store the raw and/or aggregated data, given that it also has a Kafka Connect module. From: "Thakrar, Jayesh" Date: Sunday, March 4, 2018 at 9:25 PM To: Matt Daum Cc: "users@kafka.apache.org" Subject: Re: Kafka Setup for Daily counts on wide array of keys I don’t have any experience/knowledge on the Kafka inbuilt datastore, but believe thatfor some portions of streaming Kafka uses (used?) RocksDB to locally store some state info in the brokers. Personally I would use an external datastore. There's a wide choice out there - regular key-value stores like Cassandra, ScyllaDB, RocksDB, timeseries key-value stores like InfluxDB to regular RDBMSes. If you have hadoop in the picture, its even possible to bypass a datastore completely (if appropriate) and store the raw data on HDFS organized by (say) date+hour by using periodic (minute to hourly) extract jobs and store data in hive-compatible directory structure using ORC or Parquet. The reason for shying away from NoSQL datastores is their tendency to do compaction on data which leads to unnecessary reads and writes (referred to as write-amplification). With periodic jobs in Hadoop, you (usually) write your data once only. Ofcourse with that approach you loose the "random/keyed access" to the data, but if you are only interested in the aggregations across various dimensions, those can be stored in a SQL/NoSQL datastore. As for "having millions of different values per grouped attribute" - not sure what you mean by them. Is it that each record has some fields that represent different kinds of attributes and that their domain can have millions to hundreds of millions of values? I don't think that should matter. From: Matt Daum Date: Sunday, March 4, 2018 at 2:39 PM To: "Thakrar, Jayesh" Cc: "users@kafka.apache.org" Subject: Re: Kafka Setup for Daily counts on wide array of keys Thanks! For the counts I'd need to use a global table to make sure it's across all the data right? Also having millions of different values per grouped attribute will scale ok? On Mar 4, 2018 8:45 AM, "Thakrar, Jayesh" mailto:jthak...@conversantmedia.com>> wrote: Yes, that's the general design pattern. Another thing to look into is to compress the data. Now Kafka consumer/producer can already do it for you, but we choose to compress in the applications due to a historic issue that drgraded performance, although it has been resolved now. Also, just keep in mind that while you do your batching, kafka producer also tries to batch msgs to Kafka, and you will need to ensure you have enough buffer memory. However that's all configurable. Finally ensure you have the latest java updates and have kafka 0.10.2 or higher. Jayesh ________ From: Matt Daum mailto:m...@setfive.com>> Sent: Sunday, March 4, 2018 7:06:19 AM To: Thakrar, Jayesh Cc: users@kafka.apache.org<mailto:users@kafka.apache.org> Subject: Re: Kafka Setup for Daily counts on wide array of keys We actually don't have a kafka cluster setup yet at all. Right now just have 8 of our application servers. We currently sample some impressions and then dedupe/count outside at a different DC, but are looking to try to analyze all impressions for some overall analytics. Our requests are around 100-200 bytes each. If we lost some of them due to network jitter etc. it would be fine we're trying to just get overall a rough count of each attribute. Creating batched messages definitely makes sense and will also cut down on the network IO. We're trying to determine the required setup for Kafka to do what we're looking to do as these are physical servers so we'll most likely need to buy new hardware. For the first run I think we'll try it out on one of our application clusters that get a smaller amount traffic (300-400k req/sec) and run the kafka cluster on the same machines as the applications. So would the best route here be something like each application server batches requests, send it to kafka, have a stream consumer that then tallies up the totals per attribute that we want to track, output that to a new topic, which then goes to a sink to either a DB or something like S3 which then we read into our external DBs? Thanks! On Sun, Mar 4, 2018 at 12:31 AM, Thakrar, Jayesh mailto:jthak...@conversantmedia.com>> wrote: Matt, If I understand correctly, you have an 8 node Kafka cluster and need to support about 1 million requests/sec into the cluster from source servers and expect to consume that for aggregation. How big are your msgs? I would suggest looking into batching multiple requests per single Kafka msg to achieve desired throughput. So e.g. on the requ
Re: Kafka Setup for Daily counts on wide array of keys
I don’t have any experience/knowledge on the Kafka inbuilt datastore, but believe thatfor some portions of streaming Kafka uses (used?) RocksDB to locally store some state info in the brokers. Personally I would use an external datastore. There's a wide choice out there - regular key-value stores like Cassandra, ScyllaDB, RocksDB, timeseries key-value stores like InfluxDB to regular RDBMSes. If you have hadoop in the picture, its even possible to bypass a datastore completely (if appropriate) and store the raw data on HDFS organized by (say) date+hour by using periodic (minute to hourly) extract jobs and store data in hive-compatible directory structure using ORC or Parquet. The reason for shying away from NoSQL datastores is their tendency to do compaction on data which leads to unnecessary reads and writes (referred to as write-amplification). With periodic jobs in Hadoop, you (usually) write your data once only. Ofcourse with that approach you loose the "random/keyed access" to the data, but if you are only interested in the aggregations across various dimensions, those can be stored in a SQL/NoSQL datastore. As for "having millions of different values per grouped attribute" - not sure what you mean by them. Is it that each record has some fields that represent different kinds of attributes and that their domain can have millions to hundreds of millions of values? I don't think that should matter. From: Matt Daum Date: Sunday, March 4, 2018 at 2:39 PM To: "Thakrar, Jayesh" Cc: "users@kafka.apache.org" Subject: Re: Kafka Setup for Daily counts on wide array of keys Thanks! For the counts I'd need to use a global table to make sure it's across all the data right? Also having millions of different values per grouped attribute will scale ok? On Mar 4, 2018 8:45 AM, "Thakrar, Jayesh" mailto:jthak...@conversantmedia.com>> wrote: Yes, that's the general design pattern. Another thing to look into is to compress the data. Now Kafka consumer/producer can already do it for you, but we choose to compress in the applications due to a historic issue that drgraded performance, although it has been resolved now. Also, just keep in mind that while you do your batching, kafka producer also tries to batch msgs to Kafka, and you will need to ensure you have enough buffer memory. However that's all configurable. Finally ensure you have the latest java updates and have kafka 0.10.2 or higher. Jayesh From: Matt Daum mailto:m...@setfive.com>> Sent: Sunday, March 4, 2018 7:06:19 AM To: Thakrar, Jayesh Cc: users@kafka.apache.org<mailto:users@kafka.apache.org> Subject: Re: Kafka Setup for Daily counts on wide array of keys We actually don't have a kafka cluster setup yet at all. Right now just have 8 of our application servers. We currently sample some impressions and then dedupe/count outside at a different DC, but are looking to try to analyze all impressions for some overall analytics. Our requests are around 100-200 bytes each. If we lost some of them due to network jitter etc. it would be fine we're trying to just get overall a rough count of each attribute. Creating batched messages definitely makes sense and will also cut down on the network IO. We're trying to determine the required setup for Kafka to do what we're looking to do as these are physical servers so we'll most likely need to buy new hardware. For the first run I think we'll try it out on one of our application clusters that get a smaller amount traffic (300-400k req/sec) and run the kafka cluster on the same machines as the applications. So would the best route here be something like each application server batches requests, send it to kafka, have a stream consumer that then tallies up the totals per attribute that we want to track, output that to a new topic, which then goes to a sink to either a DB or something like S3 which then we read into our external DBs? Thanks! On Sun, Mar 4, 2018 at 12:31 AM, Thakrar, Jayesh mailto:jthak...@conversantmedia.com>> wrote: Matt, If I understand correctly, you have an 8 node Kafka cluster and need to support about 1 million requests/sec into the cluster from source servers and expect to consume that for aggregation. How big are your msgs? I would suggest looking into batching multiple requests per single Kafka msg to achieve desired throughput. So e.g. on the request receiving systems, I would suggest creating a logical avro file (byte buffer) of say N requests and then making that into one Kafka msg payload. We have a similar situation (https://www.slideshare.net/JayeshThakrar/apacheconflumekafka2016) and found anything from 4x to 10x better throughput with batching as compared to one request per msg. We have different kinds of msgs/topics and the individual "request" si
Re: Kafka Setup for Daily counts on wide array of keys
Yes, that's the general design pattern. Another thing to look into is to compress the data. Now Kafka consumer/producer can already do it for you, but we choose to compress in the applications due to a historic issue that drgraded performance, although it has been resolved now. Also, just keep in mind that while you do your batching, kafka producer also tries to batch msgs to Kafka, and you will need to ensure you have enough buffer memory. However that's all configurable. Finally ensure you have the latest java updates and have kafka 0.10.2 or higher. Jayesh From: Matt Daum Sent: Sunday, March 4, 2018 7:06:19 AM To: Thakrar, Jayesh Cc: users@kafka.apache.org Subject: Re: Kafka Setup for Daily counts on wide array of keys We actually don't have a kafka cluster setup yet at all. Right now just have 8 of our application servers. We currently sample some impressions and then dedupe/count outside at a different DC, but are looking to try to analyze all impressions for some overall analytics. Our requests are around 100-200 bytes each. If we lost some of them due to network jitter etc. it would be fine we're trying to just get overall a rough count of each attribute. Creating batched messages definitely makes sense and will also cut down on the network IO. We're trying to determine the required setup for Kafka to do what we're looking to do as these are physical servers so we'll most likely need to buy new hardware. For the first run I think we'll try it out on one of our application clusters that get a smaller amount traffic (300-400k req/sec) and run the kafka cluster on the same machines as the applications. So would the best route here be something like each application server batches requests, send it to kafka, have a stream consumer that then tallies up the totals per attribute that we want to track, output that to a new topic, which then goes to a sink to either a DB or something like S3 which then we read into our external DBs? Thanks! On Sun, Mar 4, 2018 at 12:31 AM, Thakrar, Jayesh mailto:jthak...@conversantmedia.com>> wrote: Matt, If I understand correctly, you have an 8 node Kafka cluster and need to support about 1 million requests/sec into the cluster from source servers and expect to consume that for aggregation. How big are your msgs? I would suggest looking into batching multiple requests per single Kafka msg to achieve desired throughput. So e.g. on the request receiving systems, I would suggest creating a logical avro file (byte buffer) of say N requests and then making that into one Kafka msg payload. We have a similar situation (https://www.slideshare.net/JayeshThakrar/apacheconflumekafka2016) and found anything from 4x to 10x better throughput with batching as compared to one request per msg. We have different kinds of msgs/topics and the individual "request" size varies from about 100 bytes to 1+ KB. On 3/2/18, 8:24 AM, "Matt Daum" mailto:m...@setfive.com>> wrote: I am new to Kafka but I think I have a good use case for it. I am trying to build daily counts of requests based on a number of different attributes in a high throughput system (~1 million requests/sec. across all 8 servers). The different attributes are unbounded in terms of values, and some will spread across 100's of millions values. This is my current through process, let me know where I could be more efficient or if there is a better way to do it. I'll create an AVRO object "Impression" which has all the attributes of the inbound request. My application servers then will on each request create and send this to a single kafka topic. I'll then have a consumer which creates a stream from the topic. From there I'll use the windowed timeframes and groupBy to group by the attributes on each given day. At the end of the day I'd need to read out the data store to an external system for storage. Since I won't know all the values I'd need something similar to the KVStore.all() but for WindowedKV Stores. This appears that it'd be possible in 1.1 with this commit: https://github.com/apache/kafka/commit/1d1c8575961bf6bce7decb049be7f10ca76bd0c5 . Is this the best approach to doing this? Or would I be better using the stream to listen and then an external DB like Aerospike to store the counts and read out of it directly end of day. Thanks for the help! Daum
Re: Kafka Setup for Daily counts on wide array of keys
Matt, If I understand correctly, you have an 8 node Kafka cluster and need to support about 1 million requests/sec into the cluster from source servers and expect to consume that for aggregation. How big are your msgs? I would suggest looking into batching multiple requests per single Kafka msg to achieve desired throughput. So e.g. on the request receiving systems, I would suggest creating a logical avro file (byte buffer) of say N requests and then making that into one Kafka msg payload. We have a similar situation (https://www.slideshare.net/JayeshThakrar/apacheconflumekafka2016) and found anything from 4x to 10x better throughput with batching as compared to one request per msg. We have different kinds of msgs/topics and the individual "request" size varies from about 100 bytes to 1+ KB. On 3/2/18, 8:24 AM, "Matt Daum" wrote: I am new to Kafka but I think I have a good use case for it. I am trying to build daily counts of requests based on a number of different attributes in a high throughput system (~1 million requests/sec. across all 8 servers). The different attributes are unbounded in terms of values, and some will spread across 100's of millions values. This is my current through process, let me know where I could be more efficient or if there is a better way to do it. I'll create an AVRO object "Impression" which has all the attributes of the inbound request. My application servers then will on each request create and send this to a single kafka topic. I'll then have a consumer which creates a stream from the topic. From there I'll use the windowed timeframes and groupBy to group by the attributes on each given day. At the end of the day I'd need to read out the data store to an external system for storage. Since I won't know all the values I'd need something similar to the KVStore.all() but for WindowedKV Stores. This appears that it'd be possible in 1.1 with this commit: https://github.com/apache/kafka/commit/1d1c8575961bf6bce7decb049be7f10ca76bd0c5 . Is this the best approach to doing this? Or would I be better using the stream to listen and then an external DB like Aerospike to store the counts and read out of it directly end of day. Thanks for the help! Daum
Re: Lost messages and messed up offsets
Can you also check if you have partition leaders flapping or changing rapidly? Also, look at the following settings on your client configs: max.partition.fetch.bytes fetch.max.bytes receive.buffer.bytes We had a similar situation in our environment when the brokers were flooded with data. The symptoms where apparent huge spikes in offset ids - much more than the data were sending. That we traced to the fact that the brokers were not able to keep up with the incoming producer + consumer + replication traffic due to the NIC bandwidth. (A bit of a lengthy story as to why the offset ids appeared to be high/spiky because of the flapping). And then the consumer would have issues - and the problem there was that the producer had a very large buffer and batch size - so the data was coming in large batches. However the client was configured to receive data in such large batches and it would give errors and would not be able to go past a certain offset. On 11/30/17, 3:03 AM, "Tom van den Berge" wrote: The consumers are using default settings, which means that enable.auto.commit=true and auto.commit.interval.ms=5000. I'm not committing manually; just consuming messages. On Thu, Nov 30, 2017 at 1:09 AM, Frank Lyaruu wrote: > Do you commit the received messages? Either by doing it manually or setting > enable.auto.commit and auto.commit.interval.ms? > > On Wed, Nov 29, 2017 at 11:15 PM, Tom van den Berge < > tom.vandenbe...@gmail.com> wrote: > > > I'm using Kafka 0.10.0. > > > > I'm reading messages from a single topic (20 partitions), using 4 > consumers > > (one group), using a standard java consumer with default configuration, > > except for the key and value deserializer, and a group id; no other > > settings. > > > > We've been experiencing a serious problem a few times now, after a large > > burst of messages (75000) have been posted to the topic. The consumer lag > > (as reported by Kafka's kafka-consumer-groups.sh) immediately shows a > huge > > lag, which is expected. The consumers start processing the messages, > which > > is expected to take them at least 30 minutes. In the mean time, more > > messages are posted to the topic, but at a "normal" rate, which the > > consumers normally handle easily. The problem is that the reported > consumer > > lag is not decreasing at all. After some 30 minutes, it has even > increased > > slightly. This would mean that the consumers are not able to process the > > backlog at all, which is extremely unlikely. > > > > After a restart of all consumer applications, something really surprising > > happens: the lag immediately drops to nearly 0! It is technically > > impossible that the consumers really processed all messages in a matter > of > > seconds. Manual verification showed that many messages were not processed > > at all; they seem to have disappeared somehow. So it seems that > restarting > > the consumers somehow messed up the offset (I think). > > > > On top of that, I noticed that the reported lag shows seemingly > impossible > > figures. During the time that the lag was not decreasing, before the > > restart of the consumers, the "current offset" that was reported for some > > partitions decreased. To my knowledge, that is impossible. > > > > Does anyone have an idea on how this could have happened? > > >
Re: Offsets in Kafka producer start with -1 for new topic
No exceptions or dropped messages. I injected/sent 100 messages and could read back all 100 of them. Its just that the offset numbers are not as expected for the producer. On 10/30/17, 2:28 AM, "Manikumar" wrote: Any exception in the callback exception field? may be you can enable client debug logs to check any errors. On Mon, Oct 30, 2017 at 7:25 AM, Thakrar, Jayesh < jthak...@conversantmedia.com> wrote: > I created a new Kafka topic with 1 partition and then sent 10 messages > using the KafkaProducer API using the async callback. > What I saw was that the offsets in the RecordMetadata for the first record > was -1. > Shouldn't it be 0 as offsets start with 0? > > Is it a bug or works as expected? > > Thanks, > Jayesh > >
Offsets in Kafka producer start with -1 for new topic
I created a new Kafka topic with 1 partition and then sent 10 messages using the KafkaProducer API using the async callback. What I saw was that the offsets in the RecordMetadata for the first record was -1. Shouldn't it be 0 as offsets start with 0? Is it a bug or works as expected? Thanks, Jayesh
Re: do i need to restart the brokers if I changed the retention time for a specific topic
Just to make it clear Haitao, in your case you do not have to restart brokers (since you are changing at the topic level). On 8/6/17, 11:37 PM, "Kaufman Ng" wrote: Hi Haitao, The retention time (retention.ms) configuration can exist as a broker-level and/or topic-level config. For the latter, changing it does NOT require broker restarts. On Sun, Aug 6, 2017 at 10:33 PM, haitao .yao wrote: > Hi, I want to change the retention time for a specific topic with the > following command: > > $ bin/kafka-topics.sh --zookeeper zk_address --alter --topic > test-topic --config retention.ms=8640 > > Is that correct? Do I have to restart the brokers? > I read the source code and thought the configuration values about retention > time in LogManager are static: > https://github.com/apache/kafka/blob/0.10.0/core/src/ > main/scala/kafka/server/KafkaServer.scala#L597-L620 > > Kafka version: kafka_2.11-0.10.0.1 > > Thanks > -- > haitao.yao > -- Kaufman Ng +1 646 961 8063 Solutions Architect | Confluent | www.confluent.io
Re: Limit of simultaneous consumers/clients?
You may want to look at the Kafka REST API instead of having so many direct client connections. https://github.com/confluentinc/kafka-rest On 7/31/17, 1:29 AM, "Dr. Sven Abels" wrote: Hi guys, does anyone have an idea about the possible limits of concurrent users? -Ursprüngliche Nachricht- Von: Dr. Sven Abels [mailto:ab...@ascora.de] Gesendet: Freitag, 28. Juli 2017 12:11 An: users@kafka.apache.org Betreff: Limit of simultaneous consumers/clients? Hello, we would like to use Kafka as a way to inform users about events of certain topics. For this purpose, we want to develop Windows and Mac clients which users would install on their desktop PCs. We got a broad number of users, so it's likely that there will be >10.000 clients running in parallel. If I understand it correctly, then Kafka uses Sockets and the user clients would maintain an active connection to Kafka. If this is correct, I wondered: -What is the limit of clients that may run in parallel? Do 10.000 clients mean 10.000 server connections? Would that be a problem for a typical server? -Can we solve this problem by simply running kafka on several servers and using something like a round-robin for the DNS so that the clients connect to different servers? -We expect to only send a few messages each day. Messages should arrive quickly (<30 seconds delay) but we don't need realtime. Considering this: Is kafka still a good solution or should we better switch to e.g. polling of clients to the server without Kafka? Best regards, Sven
Re: Causes for Kafka messages being unexpectedly delivered more than once? The 'exactly once' semantic
Hi Dmitri, This presentation might help you understand and take appropriate actions to deal with data duplication (and data loss) https://www.slideshare.net/JayeshThakrar/kafka-68540012 Regards, Jayesh On 4/13/17, 10:05 AM, "Vincent Dautremont" wrote: One of the case where you would get a message more than once is if you get disconnected / kicked off the consumer group / etc if you fail to commit offset for messages you have already read. What I do is that I insert the message in a in-memory cache redis database. If it fails to insert because of primary key duplication, well that means I've already received that message in the past. You could even do an insert of the topic+partition+offset of the message payload as the insert (instead of the full message) if you know for sure that your message payload would not be duplicated in the the kafka topic. Vincent. On Thu, Apr 13, 2017 at 4:52 PM, Dmitry Goldenberg wrote: > Hi all, > > I was wondering if someone could list some of the causes which may lead to > Kafka delivering the same messages more than once. > > We've looked around and we see no errors to notice, yet intermittently, we > see messages being delivered more than once. > > Kafka documentation talks about the below delivery modes: > >- *At most once*—Messages may be lost but are never redelivered. >- *At least once*—Messages are never lost but may be redelivered. >- *Exactly once*—this is what people actually want, each message is >delivered once and only once. > > So the default is 'at least once' and that is what we're running with (we > don't want to do "at most once" as that appears to yield some potential for > message loss). > > We had not seen duplicated deliveries for a while previously but just > started seeing them quite frequently in our test cluster. > > What are some of the possible causes for this? What are some of the > available tools for troubleshooting this issue? What are some of the > possible fixes folks have developed or instrumented for this issue? > > Also, is there an effort underway on Kafka side to provide support for the > "exactly once" semantic? That is exactly the semantic we want and we're > wondering how that may be achieved. > > Thanks, > - Dmitry > -- The information transmitted is intended only for the person or entity to which it is addressed and may contain confidential and/or privileged material. Any review, retransmission, dissemination or other use of, or taking of any action in reliance upon, this information by persons or entities other than the intended recipient is prohibited. If you received this in error, please contact the sender and delete the material from any computer.
Re: programmatic way to check for topic existence?
Have a look at the Cluster which has a "topic" method to get a set of all the topics. https://kafka.apache.org/0100/javadoc/org/apache/kafka/common/Cluster.html In version 8/9, there was also the ZKUtils, but the desire is to have clients not to interrogate ZK directly. On 10/24/16, 4:32 PM, "Ben Osheroff" wrote: Hiya! I've been trying to merge https://github.com/zendesk/maxwell/pull/457, which adds a much-requested feature of Maxwell, that of being able to have a topic-per-mysql-table. When we receive a row we programmatically generate the topic name, and the first thing we do is call `KafkaProducer#partitionsFor(topic)`, so that we know how to partition the data. The problem I'm running into is in trying to detect the case where a topic doesn't exist. If auto-creation is on, `partitionsFor()` seems to correctly auto-create the topic, but if auto-creation is off the behavior is kinda wonky; kafka goes into a metadata-fetch loop, logging "Error while fetching metadata with correlation id 573 :{topic=UNKNOWN_TOPIC_OR_PARTITION}" but then ultimately throwing me back a `TimeoutException` after 60 tries or so. I can rescue/rethrow the TimeoutException, but it seems like there might be a better way that I'm missing. Any ideas? I'd ideally just like a way to fail fast and clean when the topic doesn't exist (and auto-creation is off). Thanks, Ben Osheroff zendesk.com
Re: Too Many Open Files
What are the producers/consumers for the Kafka cluster? Remember that its not just files but also sockets that add to the count. I had seen issues when we had a network switch problem and had Storm consumers. The switch would cause issues in connectivity between Kafka brokers, zookeepers and clients, causing a flood of connections from everyone to each other. On 8/1/16, 7:14 AM, "Scott Thibault" wrote: Did you verify that the process has the correct limit applied? cat /proc//limits --Scott Thibault On Sun, Jul 31, 2016 at 4:14 PM, Kessiler Rodrigues wrote: > I’m still experiencing this issue… > > Here are the kafka logs. > > [2016-07-31 20:10:35,658] ERROR Error while accepting connection > (kafka.network.Acceptor) > java.io.IOException: Too many open files > at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method) > at > sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422) > at > sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250) > at kafka.network.Acceptor.accept(SocketServer.scala:323) > at kafka.network.Acceptor.run(SocketServer.scala:268) > at java.lang.Thread.run(Thread.java:745) > [2016-07-31 20:10:35,658] ERROR Error while accepting connection > (kafka.network.Acceptor) > java.io.IOException: Too many open files > at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method) > at > sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422) > at > sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250) > at kafka.network.Acceptor.accept(SocketServer.scala:323) > at kafka.network.Acceptor.run(SocketServer.scala:268) > at java.lang.Thread.run(Thread.java:745) > [2016-07-31 20:10:35,658] ERROR Error while accepting connection > (kafka.network.Acceptor) > java.io.IOException: Too many open files > at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method) > at > sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422) > at > sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250) > at kafka.network.Acceptor.accept(SocketServer.scala:323) > at kafka.network.Acceptor.run(SocketServer.scala:268) > at java.lang.Thread.run(Thread.java:745) > > My ulimit is 1 million, how is that possible? > > Can someone help with this? > > > > On Jul 30, 2016, at 5:05 AM, Kessiler Rodrigues > wrote: > > > > I have changed it a bit. > > > > I have 10 brokers and 20k topics with 1 partition each. > > > > I looked at the kaka’s logs dir and I only have 3318 files. > > > > I’m doing some tests to see how many topics/partitions I can have, but > it is throwing too many files once it hits 15k topics.. > > > > Any thoughts? > > > > > > > >> On Jul 29, 2016, at 10:33 PM, Gwen Shapira wrote: > >> > >> woah, it looks like you have 15,000 replicas per broker? > >> > >> You can go into the directory you configured for kafka's log.dir and > >> see how many files you have there. Depending on your segment size and > >> retention policy, you could have hundreds of files per partition > >> there... > >> > >> Make sure you have at least that many file handles and then also add > >> handles for the client connections. > >> > >> 1 million file handles sound like a lot, but you are running lots of > >> partitions per broker... > >> > >> We normally don't see more than maybe 4000 per broker and most > >> clusters have a lot fewer, so consider adding brokers and spreading > >> partitions around a bit. > >> > >> Gwen > >> > >> On Fri, Jul 29, 2016 at 12:00 PM, Kessiler Rodrigues > >> wrote: > >>> Hi guys, > >>> > >>> I have been experiencing some issues on kafka, where its throwing too > many open files. > >>> > >>> I have around of 6k topics and 5 partitions each. > >>> > >>> My cluster was made with 6 brokers. All of them are running Ubuntu 16 > and the file limits settings are: > >>> > >>> `cat /proc/sys/fs/file-max` > >>> 200 > >>> > >>> `ulimit -n` > >>> 100 > >>> > >>> Anyone has experienced it before? > > > > -- *This e-mail is not encrypted. Due to the unsecured nature of unencrypted e-mail, there may be some level of risk that the information in this e-mail could be read by a third party. Accordingly, the recipient(s) named above are hereby advised to not communicate protected health information using this e-mail address. If you desire to send protected health informatio
RE: Last offset in all partitions
Checkout the Consumer API http://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html and search for the method "seekToEnd" Here's the "text" from the API Doc - seekToEnd public void seekToEnd(Collection partitions) Seek to the last offset for each of the given partitions. This function evaluates lazily, seeking to the final offset in all partitions only when poll(long) or position(TopicPartition) are called. If no partition is provided, seek to the final offset for all of the currently assigned partitions. Specified by: seekToEnd in interface Consumer See Also: seekToEnd(Collection) -Original Message- From: Todd Palino [mailto:tpal...@gmail.com] Sent: Wednesday, July 06, 2016 10:36 AM To: users@kafka.apache.org Subject: Re: Last offset in all partitions We do this through our monitoring agents by pulling it as a metric from the LogEndOffset beans. By putting it into our metrics system we get a mapping of timestamp to offset for every partition with (currently) 60 second granularity. Useful for offset resets and other tasks. -Todd On Wednesday, July 6, 2016, Kristoffer Sjögren wrote: > Hi > > Is there a way to get the last offset written by all partitions of a > topic programmatically using the 0.10.0.0 API? > > At the moment I use KafkaConsumer.seekToEnd as seen in this gist[1] > but maybe there is a better, more efficient, way to do it? > > Cheers, > -Kristoffer > > [1] > https://gist.github.com/krisskross/a49e462bedb89505e372672cd81129ab > -- *Todd Palino* Staff Site Reliability Engineer Data Infrastructure Streaming linkedin.com/in/toddpalino This email and any files included with it may contain privileged, proprietary and/or confidential information that is for the sole use of the intended recipient(s). Any disclosure, copying, distribution, posting, or use of the information contained in or attached to this email is prohibited unless permitted by the sender. If you have received this email in error, please immediately notify the sender via return email, telephone, or fax and destroy this original transmission and its included files without reading or saving it in any manner. Thank you.
RE: Intermittent runtime exception: broker already registered
My guess is that say your broker went down and you restarted it. That time interval between shutdown/crash and the restart was shorter than the ZK node's ephemeral timeout value. Once that time is over, your node disappears from Zookeeper, the broker is able to recreate the znode and hence the success. In short, I would give about 3-30 seconds (you will need to check your settings) before doing a restart. -Original Message- From: Nomar Morado [mailto:nomar.mor...@gmail.com] Sent: Friday, June 17, 2016 6:15 AM To: users@kafka.apache.org Subject: Intermittent runtime exception: broker already registered I am using Kafka 0.9.0.1 with ZK 3.5.0-alpha I am seeing this error intermittently which goes away after several reboots. Any ideas? Sent from my iPad
RE: Problematic messages in Kafka
Thanks for the quick reply Danny. The message size as per the DumpLogSegments is around 59KB I used a very high message.max.size and a high fetchsize of 1 MB (that's the message.max.size in the broker) and still the same hang behavior. Also tried a max-wait-ms so that the consumer does not "hang" - but still the same result. Here's what I used - kafka-simple-consumer-shell.sh --broker-list $HOSTNAME:9092 --fetchsize 100 --max-messages 10-max-wait-ms 1 --offset 7207844650 --partition 0 --print-offsets --topic RtbBid --property message.max.size=100 -Original Message- From: Danny Bahir [mailto:dannyba...@gmail.com] Sent: Thursday, June 02, 2016 10:06 PM To: users@kafka.apache.org Subject: Re: Problematic messages in Kafka quoting from https://cwiki.apache.org/confluence/display/KAFKA/FAQ The high-level consumer will block if the next message available is larger than the maximum fetch size you have specified - One possibility of a stalled consumer is that the fetch size in the consumer is smaller than the largest message in the broker. You can use the DumpLogSegments tool to figure out the largest message size and set fetch.size in the consumer config accordingly. On Thu, Jun 2, 2016 at 3:41 PM, Thakrar, Jayesh < jthak...@conversantmedia.com> wrote: > Wondering if anyone has encountered similar issues. > > Using Kafka 0.8.2.1. > > Occasionally, we encounter a situation in which a consumer (including > kafka-console-consumer.sh) just hangs. > If I increment the offset to skip the offending message, things work > fine again. > > I have been able to identify the message offset and the data file (log > file) containing the message. > > However, using kafka.tools.DumpLogSegments, I can dump the message > using commands like this - > > /usr/hdp/current/kafka-broker/bin/kafka-run-class.sh > kafka.tools.DumpLogSegments --files 007207840027.log > --deep-iteration > > /usr/hdp/current/kafka-broker/bin/kafka-run-class.sh > kafka.tools.DumpLogSegments --files 007207840027.log > --print-data-log --deep-iteration > > From the DumLogSegments program, here's the checksum info that I get - > offset: 7207844652 position: 398291668 isvalid: true payloadsize: > 59041 > magic: 0 compresscodec: NoCompressionCodec crc: 186430976 keysize: 12 > > So it looks like the message is ok, since there's also a CRC checksum. > Has anyone encountered such an issue? > Is there any explanation or reason for the broker behavior? > I have the data/log file saved if there is any troubleshooting that > can be done. > > When the broker reads the message and it seems to hang forever, I have > to kill the console-consumer or our application consumer. > > When I do that, here's what I see in the broker's log file > > [2016-06-02 15:50:45,117] INFO Closing socket connection to / > 10.110.102.113. (kafka.network.Processor) > [2016-06-02 15:50:45,139] INFO Closing socket connection to / > 10.110.102.113. (kafka.network.Processor) > [2016-06-02 15:50:49,142] ERROR Closing socket for /10.110.100.46 > because of error (kafka.network.Processor) > java.io.IOException: Connection reset by peer > at sun.nio.ch.FileDispatcherImpl.read0(Native Method) > at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) > at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) > at sun.nio.ch.IOUtil.read(IOUtil.java:197) > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) > at kafka.utils.Utils$.read(Utils.scala:375) > at > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) > at kafka.network.Processor.read(SocketServer.scala:347) > at kafka.network.Processor.run(SocketServer.scala:245) > at java.lang.Thread.run(Thread.java:724) > [2016-06-02 15:50:49,936] INFO Closing socket connection to / > 10.110.105.134. (kafka.network.Processor) > [2016-06-02 15:50:51,591] INFO Closing socket connection to / > 10.110.102.113. (kafka.network.Processor) > [2016-06-02 15:50:51,699] INFO Closing socket connection to / > 10.110.102.113. (kafka.network.Processor) > > > > > > > This email and any files included with it may contain privileged, > proprietary and/or confidential information that is for the sole use > of the intended recipient(s). Any disclosure, copying, distribution, > posting, or use of the information contained in or attached to this > email is prohibited unless permitted by the sender. If you have > received this email in error, please immediately notify the sender via > return email, telephone, or fax and destroy this original transmission > and its included files without readi
Problematic messages in Kafka
Wondering if anyone has encountered similar issues. Using Kafka 0.8.2.1. Occasionally, we encounter a situation in which a consumer (including kafka-console-consumer.sh) just hangs. If I increment the offset to skip the offending message, things work fine again. I have been able to identify the message offset and the data file (log file) containing the message. However, using kafka.tools.DumpLogSegments, I can dump the message using commands like this - /usr/hdp/current/kafka-broker/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files 007207840027.log --deep-iteration /usr/hdp/current/kafka-broker/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files 007207840027.log --print-data-log --deep-iteration >From the DumLogSegments program, here's the checksum info that I get - offset: 7207844652 position: 398291668 isvalid: true payloadsize: 59041 magic: 0 compresscodec: NoCompressionCodec crc: 186430976 keysize: 12 So it looks like the message is ok, since there's also a CRC checksum. Has anyone encountered such an issue? Is there any explanation or reason for the broker behavior? I have the data/log file saved if there is any troubleshooting that can be done. When the broker reads the message and it seems to hang forever, I have to kill the console-consumer or our application consumer. When I do that, here's what I see in the broker's log file [2016-06-02 15:50:45,117] INFO Closing socket connection to /10.110.102.113. (kafka.network.Processor) [2016-06-02 15:50:45,139] INFO Closing socket connection to /10.110.102.113. (kafka.network.Processor) [2016-06-02 15:50:49,142] ERROR Closing socket for /10.110.100.46 because of error (kafka.network.Processor) java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:197) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at kafka.utils.Utils$.read(Utils.scala:375) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Processor.read(SocketServer.scala:347) at kafka.network.Processor.run(SocketServer.scala:245) at java.lang.Thread.run(Thread.java:724) [2016-06-02 15:50:49,936] INFO Closing socket connection to /10.110.105.134. (kafka.network.Processor) [2016-06-02 15:50:51,591] INFO Closing socket connection to /10.110.102.113. (kafka.network.Processor) [2016-06-02 15:50:51,699] INFO Closing socket connection to /10.110.102.113. (kafka.network.Processor) This email and any files included with it may contain privileged, proprietary and/or confidential information that is for the sole use of the intended recipient(s). Any disclosure, copying, distribution, posting, or use of the information contained in or attached to this email is prohibited unless permitted by the sender. If you have received this email in error, please immediately notify the sender via return email, telephone, or fax and destroy this original transmission and its included files without reading or saving it in any manner. Thank you.
RE: Rebalancing issue while Kafka scaling
Hi Hafsa, Not sure by what you mean "most tolerant cluster". If you mean that you want the cluster to be able to tolerate 9 of 10 servers to be down, then yes. But I would question - is your traffic activity, system load and storage requirement so low that it can be served by a single server? If so, then having a 10-server cluster is an over-kill. Note that the more replicas a partition has higher is the network traffic and load on the leader partition broker (server) as 1) all producers send data to that broker for that partition 2) all consumers read data from that broker for that partition 3) and all non-leader replica partition brokers replicate data from that partition. So determining replica is balance and "traditionally" both in HDFS and Kafka, 3 seems to be magic number. As for the number of partitions (not replicas), its good to have as a multiple of the number of brokers/servers - but again there are several things to consider - e.g. storage requirements, expected load (network, I/O, etc.) across each topic and across the cluster as a whole. One rule of thumb (again a magic number) that I heard is that its good to have the number of partitions that result in total storage for the partition to be around 20-50 GB. There are a couple of good articles out there on Kafka cluster design - e.g. http://morebigdata.blogspot.com/2015/10/tips-for-successful-kafka-deployments.html Hope that helps. Jayesh -Original Message- From: Hafsa Asif [mailto:hafsa.a...@matchinguu.com] Sent: Wednesday, June 01, 2016 7:05 AM To: users@kafka.apache.org Cc: Spico Florin Subject: Re: Rebalancing issue while Kafka scaling Just for more info: If I have 10 servers in a cluster, so for the most tolerant cluster, do we need replication-factor = 10? That is also the issue for rebalancing the scaling of kafka cluster, that when we need to add server in a cluster then we also need to increase partitions in topics as well ? Best, Hafsa 2016-06-01 13:55 GMT+02:00 Ben Stopford : > Pretty much. It’s not actually related to zookeeper. > > Generalising a bit, replication factor 2 means Kafka can lose 1 > machine and be ok. > > B > > On 1 Jun 2016, at 12:46, Hafsa Asif wrote: > > > > So, it means that I should create topics with at least > replication-factor=2 > > inspite of how many servers in a kafka cluster. If any server goes > > down > or > > slows down then zookeeper will not go out-of-sync. > > Currently, my all topics are with eplication-factor= 1 and I got an > > issue that Zookeeper goes out of sync. So, increasing > > replication-factor will solve the issue? > > > > Hafsa > > > > 2016-06-01 12:57 GMT+02:00 Ben Stopford : > > > >> Hi Hafa > >> > >> If you create a topic with replication-factor = 2, you can lose one > >> of them without losing data, so long as they were "in sync". > >> Replicas can > fall > >> out of sync if one of the machines runs slow. The system tracks in > >> sync replicas. These are exposed by JMX too. Check out the docs on > replication > >> for more details: > >> > >> http://kafka.apache.org/090/documentation.html#replication < > >> http://kafka.apache.org/090/documentation.html#replication> > >> > >> B > >> > >>> On 1 Jun 2016, at 10:45, Hafsa Asif wrote: > >>> > >>> Hello Jayesh, > >>> > >>> Thank you very much for such a good description. My further > >>> questions > are > >>> (just to be my self clear about the concept). > >>> > >>> 1. If I have only one partition in a 'Topic' in a Kafka with > >>> following configuration, bin/kafka-topics.sh --create --zookeeper > >>> localhost:2181 --replication-factor 1 --partitions 1 --topic > >>> mytopic1 Then still I need to rebalance topic partitions while > >>> node > >> adding/removing > >>> in Kafka cluster? > >>> > >>> 2. What is the actual meaning of this line 'if all your topics > >>> have > >> atleast > >>> 2 insync replicas'. My understanding is that, I need to create > >>> replica > of > >>> each topic in each server. e.g: I have two servers in a Kafka > >>> cluster > >> then > >>> I need to create topic 'mytopic1' in both servers. It helps to get > >>> rid > of > >>> any problem while removing any of the server. > >>> > >>> I will look in detail into your provided link. Many thanks for this. > >>> > >>> Looking forward for the a
RE: Rebalancing issue while Kafka scaling
Hafsa, Florin First thing first, it is possible to scale a Kafka cluster up or down (i.e. add/remove servers). And as has been noted in this thread, after you add a server to a cluster, you need to rebalance the topic partitions in order to put the newly added server into use. And similarly, before you remove a server, it is advised that you drain off the data from the server to be removed (its not a hard requirement, if all your topics have atleast 2 insync replicas, including the server being removed and you intend to rebalance after server removal). However, "automating" the rebalancing of topic partitions is not trivial. There is a KIP out there to help with the rebalancing , but lacks details - https://cwiki.apache.org/confluence/display/KAFKA/KIP-6+-+New+reassignment+partition+logic+for+rebalancing My guess is due to its non-trivial nature AND the number of cases one needs to take care of - e.g. scaling up by 5% v/s scaling up by 50% in say, a 20 node cluster. Furthermore, to be really effective, one needs to be cognizant of the partition sizes, and with rack-awareness, the task becomes even more involved. Regards, Jayesh -Original Message- From: Spico Florin [mailto:spicoflo...@gmail.com] Sent: Tuesday, May 31, 2016 9:44 AM To: users@kafka.apache.org Subject: Re: Rebalancing issue while Kafka scaling Hi! What version of Kafka you are using? What do you mean by "Kafka needs rebalacing?" Rebalancing of what? Can you please be more specific. Regards, Florin On Tue, May 31, 2016 at 4:58 PM, Hafsa Asif wrote: > Hello Folks, > > Today , my team members shows concern that whenever we increase node > in Kafka cluster, Kafka needs rebalancing. The rebalancing is sort of > manual and not-good step whenever scaling happens. Second, if Kafka > scales up then it cannot be scale down. Please provide us proper > guidance over this issue, may be we have not enough configuration properties. > > Hafsa > This email and any files included with it may contain privileged, proprietary and/or confidential information that is for the sole use of the intended recipient(s). Any disclosure, copying, distribution, posting, or use of the information contained in or attached to this email is prohibited unless permitted by the sender. If you have received this email in error, please immediately notify the sender via return email, telephone, or fax and destroy this original transmission and its included files without reading or saving it in any manner. Thank you.
RE: [DISCUSS] KIP-59 - Proposal for a kafka broker command - kafka-brokers.sh
Thanks Gwen - yes, I agree - let me work on it, make it available on github and then I guess we can go from there. Thanks, Jayesh -Original Message- From: Gwen Shapira [mailto:g...@confluent.io] Sent: Wednesday, May 11, 2016 12:26 PM To: d...@kafka.apache.org; Jayesh Thakrar Cc: Users Subject: Re: [DISCUSS] KIP-59 - Proposal for a kafka broker command - kafka-brokers.sh Hello Jayesh, Thank you for the suggestion. I like the proposal and the new tool seems useful. Do you already have the tool available in a github repository? If you don't, then this would be a good place to start - there are many Kafka utilities in github repositories (Yahoo's Kafka Manager as a famous example). This way users can evaluate the usefulness of the new tool before we include it in core-kafka. Gwen On Mon, May 9, 2016 at 8:50 PM, Jayesh Thakrar wrote: > Hi All, > > > > This is to start off a discussion on the above KIP at > https://cwiki.apache.org/confluence/display/KAFKA/KIP-59%3A+Proposal+f > or+a+kafka+broker+command The proposal is to fill the void of a > command line tool/utility that can provide information on the cluster and > brokers in a Kafka cluster. > Thank you,Jayesh Thakrar > > This email and any files included with it may contain privileged, proprietary and/or confidential information that is for the sole use of the intended recipient(s). Any disclosure, copying, distribution, posting, or use of the information contained in or attached to this email is prohibited unless permitted by the sender. If you have received this email in error, please immediately notify the sender via return email, telephone, or fax and destroy this original transmission and its included files without reading or saving it in any manner. Thank you.