Kafka Broker : Socket.request.max.bytes & Socket.receive.buffer.bytes
Hi All, I am trying to increase my Kafka producer client throughput and for same, i was exploring server.properties file parameters. I would like to better understand the relation between "Socket.request.max.bytes" and "Socket.receive.buffer.bytes" parameter sets in broker's server.properties file. I have seen in my project, the "Socket.request.max.bytes" set to a high value ( around 104,857,600) but the "socket.receive.buffer.bytes" is just 102400.. Based upon the description of these parameters on Apache site,I now doubt that until we increase socket.receive.buffer.bytes value, there wont be much performance gain we can get on Producer end. is my understanding correct ? Regards, Nitin Kumar Sharma.
Kafka Streams throughput performance overtime
Hi, I have a stream application that performs rollup from 15mins to Hourly, then Hourly to Daily. The process needs to be continuously run 24 hours and each 15 mins approx 12 million records gets posted (a JSON record per message) into the input topic. There are 3 separate processors corresponding to the above, where Hourly and Daily maintains the state. So, in hourly 10million needs to be retained ever hour in the state and in daily overall 10 million. 42GB of memory is allocated for the whole application and throughput is fine for until first 10hrs, after that it degrades significantly. Any suggestions on this to identify the delay and to increase the throughput would be of great help? We are on Kafka 1.0.0 Thanks Siva
Re: Debugging message timestamps in Sarama
Hi Dmitry, Are you associated with the Sarama project? If so, understand that part of what I want is to learn about Sarama and the Kafka message format ;) The problem I'm having is that if I turn on: log.message.timestamp.type=LogAppendTime in the broker, then produce on topic1 with console producer, I will see timestamps in the sarama client. If I produce on topic2 with telegraf (incidentally, I think telegraf is a sarama producer), then I don't see timestamps in the sarama client. In both cases, if I consume using the console consumer (with --property print.timestamp=true) I *do* see timestamps. I'm happy to debug this issue myself and submit a PR to sarama, but I am missing some fundamentals of how to decode the kafka message format and would really like some pointers. Cheers, Craig P.S. Here is the sarama code I'm using to test: package main import ( "fmt" "log" "os" "os/signal" "time" "github.com/Shopify/sarama" ) func main() { // Initialize Sarama logging sarama.Logger = log.New(os.Stdout, "[Sarama] ", log.Ldate|log.Lmicroseconds|log.Lshortfile) signals := make(chan os.Signal, 1) signal.Notify(signals, os.Interrupt) config := sarama.NewConfig() config.Consumer.Return.Errors = true config.ClientID = "consumer-test" config.Metadata.RefreshFrequency = time.Duration(5) * time.Minute config.Metadata.Full = true // config.Version = sarama.V0_10_0_0 config.Version = sarama.V1_1_0_0 // config.Version = sarama.V0_10_2_1 config.Consumer.Offsets.Initial = sarama.OffsetOldest brokers := []string{"localhost:9092"} // brokers := []string{"measurement-kafka-broker.service.tgt-pe-prod-ttc.consul.c-prod.ost.cloud.target.internal:9092"} client, err := sarama.NewConsumer(brokers, config) if err != nil { panic(err) } // topic := "topic1" topic := "topic2" // topic := "metric-influx-measurement" // How to decide partition, is it fixed value...? consumer, err := client.ConsumePartition(topic, 0, sarama.OffsetOldest) if err != nil { panic(err) } defer func() { if err := client.Close(); err != nil { panic(err) } }() // Count how many message processed msgCount := 0 go func() { for { select { case err := <-consumer.Errors(): fmt.Println(err) case msg := <-consumer.Messages(): msgCount++ fmt.Println(msg.Timestamp) fmt.Println("Received messages", string(msg.Key), string(msg.Value)) case <-signals: fmt.Println("Interrupt is detected") break } } }() <-signals } On Mon, Jul 23, 2018 at 10:43 AM Dmitriy Vsekhvalnov wrote: > Hey Craig, > > what exact problem you have with Sarama client? > > On Mon, Jul 23, 2018 at 5:11 PM Craig Ching wrote: > > > Hi! > > > > I'm working on debugging a problem with how message timestamps are > handled > > in the sarama client. In some cases, the sarama client won't associate a > > timestamp with a message while the kafka console consumer does. I've > found > > the documentation on the message format here: > > > > https://kafka.apache.org/documentation/#messageformat > > > > But the information there is very sparse. For instance, what are > > 'firstTimestamp' and 'maxTimestamp'? It seems that when I'm debugging > > sarama, firstTimestamp is set to -1 and maxTimestamp appears to be the > > timestamp I want. Is there some state about the message that I need to > > understand in order to have maxTimestamp be used? Any further > > documentation or guidance on this would be very helpful! > > > > On another note, I am trying to debug this through the scala/java console > > consumer, but I'm having a hard time getting IntelliJ setup. Is there > > anything special or documentation I need to set this up for debugging? > > >
Re: Use Kafka Streams for windowing data and processing each window at once
I see. In that case, one workaround would be to query the state store directly after you know that no more updates would be applied to that store in a `punctuation` function: note that punctuation is a feature that's only available in the Processor API, but you can always add such a lower-level implementation into your DSL topology by calling `KStream#process() / transform()`. Then what you can do, is to schedule a stream-time based punctuation function with the scheduling interval equal to the window size, and then whenever the punctuation function is triggered, it is triggered with the current stream time as its parameter, by using this stream time you can then determine which window(s) have become final: for example, if your window length is 10 and grace period is configured as 5, and your punctuation function triggers at 25, then you know that your window of [0, 10) should have been final already. You can then access that windowed state store directly using the fetchAll(fromTimestamp, toTimestamp) API to fetch all the keys of that range to generate the global report. Guozhang On Mon, Jul 23, 2018 at 12:48 AM, Bruno Bottazzini < bruno.bottazz...@targatelematics.com> wrote: > Hello Guozhang, > > Managing to have a stream with just one record per each key per window > is definitely something we want to achieve. > > But, it is not all. We also want to process the whole window at once so > our callback would receive just one collection of aggregates per window > once it is ready. > > We would probably need to receive the collection as an iterable that > dynamically loads the window in chunks as the size of the window could > be unmanageable as single object. > > This way we could produce one report for each window in the example > "Final window result per key" you manage to send an alert for each user > with less than three events but we also want to collect in one report > the list of all users with less than three events in the one hour > window. > > Best Regards, > > Bruno > > On ven, 2018-07-20 at 09:11 -0700, Guozhang Wang wrote: > > Hello Bruno, > > > > We've discussed about the callback approach before, but then we > > realized > > with the proposed API, this can still be achieved. In the "Final > > window > > results per key" section, John showed how to do that. Note the > > resulted > > stream will have exactly one record per each key, with the value > > representing the "final result" for that key. > > > > > > Guozhang > > > > > > On Fri, Jul 20, 2018 at 8:13 AM, Bruno Bottazzini < > > bruno.bottazz...@targatelematics.com> wrote: > > > > > > > > Bill, > > > > > > After reading the documentation and sure it looks really close to > > > our > > > need however I had a doubt about it. > > > > > > One small question. > > > > > > I was expecting also a callback that Kafka would call after the > > > whole > > > period is passed and this callback would receive an iterable object > > > that contains all the aggregated information that was collected in > > > the > > > same period. > > > > > > Will it be possible when using grace() or suppress()? > > > > > > Best Regards, > > > > > > Bruno > > > > > > On gio, 2018-07-19 at 12:59 -0400, Bill Bejeck wrote: > > > > > > > > Hi Bruno, > > > > > > > > What you are asking is a common request. There is a KIP in the > > > > works, > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Abil > > > > ity+ > > > > to+suppress+updates+for+KTables, > > > > that should suit the requirements you've outlined. > > > > > > > > In the meantime, I'll see if I can come up with an alternative > > > > approach > > > > over the next few days. > > > > > > > > -Bill > > > > > > > > On Thu, Jul 19, 2018 at 12:07 PM Bruno Bottazzini < > > > > bruno.bottazz...@targatelematics.com> wrote: > > > > > > > > > > > > > > > > > > > Hello, > > > > > > > > > > We have a doubt about Kafka streams on how it works. Or at > > > > > least we > > > > > are > > > > > having some troubles in making it to work. > > > > > > > > > > The purpose we want to achieve is to group by user some > > > > > messages > > > > > that > > > > > we receive from a Kafka topic and window them in order to > > > > > aggregate > > > > > the > > > > > messages we receive in the window (5 minutes). Then, I'd like > > > > > to > > > > > collect all aggregates in each window in order to process them > > > > > at > > > > > once > > > > > adding them to a report of all the messages I received in the 5 > > > > > minutes > > > > > interval. > > > > > > > > > > The last point seems to be the tough part as Kafka Streams > > > > > doesn't > > > > > seem > > > > > to provide (at least we can't find it :() anything that can > > > > > collect > > > > > all > > > > > the window related stuff in a "finite" stream to be processed > > > > > in > > > > > one > > > > > place. > > > > > > > > > > The file (implemented_code.txt) contains the code we have > > > > > implemented > > > > > where it contains at least one of our
Graph Processing with Kafka
Hi, If anyone is interested in using Kafka for graph processing, I started a project to try to use Kafka Streams for that purpose. Feedback is most welcome! https://yokota.blog/2018/07/23/kafka-graphs-graph-analytics-with-apache-kafka/ https://github.com/rayokota/kafka-graphs
Re: Debugging message timestamps in Sarama
Hey Craig, what exact problem you have with Sarama client? On Mon, Jul 23, 2018 at 5:11 PM Craig Ching wrote: > Hi! > > I'm working on debugging a problem with how message timestamps are handled > in the sarama client. In some cases, the sarama client won't associate a > timestamp with a message while the kafka console consumer does. I've found > the documentation on the message format here: > > https://kafka.apache.org/documentation/#messageformat > > But the information there is very sparse. For instance, what are > 'firstTimestamp' and 'maxTimestamp'? It seems that when I'm debugging > sarama, firstTimestamp is set to -1 and maxTimestamp appears to be the > timestamp I want. Is there some state about the message that I need to > understand in order to have maxTimestamp be used? Any further > documentation or guidance on this would be very helpful! > > On another note, I am trying to debug this through the scala/java console > consumer, but I'm having a hard time getting IntelliJ setup. Is there > anything special or documentation I need to set this up for debugging? >
Re: Documentation/Article/Case Study on Scala as the Kafka Backbone Language
Not necessarily for Kafka, but you can definitely google “Java vs. Scala” and find a variety of reasons . I did a study for a client and ultimately here are the major reasons I found : 1. Functional programming language which leads itself to stateless systems 2. Better / easier to use stream processing syntax (then at that time in Java 8) 3. REPL available to quickly test functionality interactively. 4. Case classes which can be inferred with or without strongly typed cases. 5. Abilty to quickly create DSLs that seem natural to developers 6. Awesome partial function syntax 7. My personal favorite — as I was using parboiled2 to build a parser — libraries like shapeless Best -- Rahul Singh rahul.si...@anant.us Anant Corporation On Jul 23, 2018, 8:40 AM -0400, M. Manna , wrote: > Hello, > > Is anyone aware of any links or website where I can find information/case > study etc. to why Scala was the best choice for kafka design? I hope this > is not too much of a "Naive" question since I have had a very humble > introduction to Scala. > > I understand that Scala is considered where distributed/scalable systems > need to be designed. Also, in some cases it reduces multiple complex > statements to be formed using a single complex statements i.e. reduce > incremental verbosity. > > So, as a person who has background in Java, but relatively novice in Scala, > I wanted to understand whether a study document exists to document the > choice? > > Regards,
Debugging message timestamps in Sarama
Hi! I'm working on debugging a problem with how message timestamps are handled in the sarama client. In some cases, the sarama client won't associate a timestamp with a message while the kafka console consumer does. I've found the documentation on the message format here: https://kafka.apache.org/documentation/#messageformat But the information there is very sparse. For instance, what are 'firstTimestamp' and 'maxTimestamp'? It seems that when I'm debugging sarama, firstTimestamp is set to -1 and maxTimestamp appears to be the timestamp I want. Is there some state about the message that I need to understand in order to have maxTimestamp be used? Any further documentation or guidance on this would be very helpful! On another note, I am trying to debug this through the scala/java console consumer, but I'm having a hard time getting IntelliJ setup. Is there anything special or documentation I need to set this up for debugging?
Re: UNKNOWN_PRODUCER_ID error when running Streams WordCount demowith processing.guarantee set to EXACTLY_ONCE
Hello Guozhang, Bill Thanks much for your explanation. I assigned this JIRA issue to myself and will try to fix it. Regards, Lambdaliu(Shaobo Liu) On 2018-07-21 at 02:24 Bill Bejeck wrote: >Created JIRA https://issues.apache.org/jira/browse/KAFKA-7190 > >-Bill > >On Fri, Jul 20, 2018 at 1:26 PM Guozhang Wang wrote: > >> Hello Shaobo, >> >> The situation you observed is a valid case: when a streams application has >> very low traffic (like the quickstart example, where you manually type one >> message at a time), then it is possible that consumer purging would delete >> even the last message sent by this producer (i.e. all the messages sent by >> this producer have been consumed and committed), and as a result the broker >> would delete that producer's ID. The next time when this producer tries to >> send, it will get this UNKNOWN_PRODUCER_ID error code, but in this case >> this error is retriable: the producer would just get a new producer id and >> retries, and then this time it will succeed. So the results you observed is >> still correct. >> >> >> We can, probably, improve this situation either in broker side or streams >> client side: on broker side, we can consider delaying the deletion of the >> producer ID for a while; on streams client side, we can consider purging in >> a bit conservative manner (but it is still a bit tricky, since multiple >> producers may be sending to the same inner topic, so just leaving the last >> N messages to not be purged may not be safe still). >> >> >> Bill, could we create a JIRA for this? >> >> >> Guozhang >> >> >> On Thu, Jul 19, 2018 at 7:55 AM, Bill Bejeck wrote: >> >> > Hi >> > >> > Thanks for reporting this. Just off the top of my head, I'm thinking it >> > may have to do with using a console producer, but I'll have to take a >> > deeper look. >> > >> > Thanks, >> > Bill >> > >> > On Thu, Jul 19, 2018 at 9:59 AM lambdaliu(刘少波) >> > wrote: >> > >> > > Hi, >> > > >> > > I test the Kafka Streams WordCount demo follow the steps described in >> > > http://kafka.apache.org/11/documentation/streams/quickstart with >> change >> > > the processing.guarantee property to EXACTLY_ONCE. >> > > >> > > And seeing the following WARN message in streams demo app logs: >> > > [2018-07-18 21:08:03,510] WARN The configuration 'admin.retries' was >> > > supplied but isn't a known config. >> > > (org.apache.kafka.clients.consumer.ConsumerConfig) >> > > [2018-07-18 21:11:29,218] WARN [Producer >> > > clientId=apache-wordcount-2a671de0-d2b7-404f-bfe8- >> > 9e8cad5008d4-StreamThread-1-0_0-producer, >> > > transactionalId=apache-wordcount-0_0] Got error produce response with >> > > correlation id 15 on topic-partition >> > > >> apache-wordcount-KSTREAM-AGGREGATE-STATE-STORE-03-repartition-0, >> > > retrying (2147483646 attempts left). Error: UNKNOWN_PRODUCER_ID >> > > (org.apache.kafka.clients.producer.internals.Sender) >> > > [2018-07-18 21:15:04,092] WARN [Producer >> > > clientId=apache-wordcount-2a671de0-d2b7-404f-bfe8- >> > 9e8cad5008d4-StreamThread-1-0_0-producer, >> > > transactionalId=apache-wordcount-0_0] Got error produce response with >> > > correlation id 21 on topic-partition >> > > >> apache-wordcount-KSTREAM-AGGREGATE-STATE-STORE-03-repartition-0, >> > > retrying (2147483646 attempts left). Error: UNKNOWN_PRODUCER_ID >> > > (org.apache.kafka.clients.producer.internals.Sender) >> > > >> > > There are also some ERROR message in the broker logs: >> > > [2018-07-18 21:10:16,463] INFO Updated PartitionLeaderEpoch. New: >> > > {epoch:0, offset:0}, Current: {epoch:-1, offset:-1} for Partition: >> > > >> apache-wordcount-KSTREAM-AGGREGATE-STATE-STORE-03-repartition-0. >> > > Cache now contains 0 entries. (kafka.server.epoch.LeaderEpochFileCache) >> > > [2018-07-18 21:10:16,965] INFO [Log >> > > partition=apache-wordcount-KSTREAM-AGGREGATE-STATE-STORE- >> > 03-repartition-0, >> > > dir=/tmp/kafka-logs0] Incrementing log start offset to 5 >> (kafka.log.Log) >> > > [2018-07-18 21:10:16,966] INFO Cleared earliest 0 entries from epoch >> > cache >> > > based on passed offset 5 leaving 1 in EpochFile for partition >> > > apache-wordcount-KSTREAM-AGGREGATE-STATE-STORE-03-repartition-0 >> > > (kafka.server.epoch.LeaderEpochFileCache) >> > > [2018-07-18 21:11:29,217] ERROR [ReplicaManager broker=0] Error >> > processing >> > > append operation on partition >> > > apache-wordcount-KSTREAM-AGGREGATE-STATE-STORE-03-repartition-0 >> > > (kafka.server.ReplicaManager) >> > > org.apache.kafka.common.errors.UnknownProducerIdException: Found no >> > record >> > > of producerId=5000 on the broker. It is possible that the last message >> > with >> > > the producerId=5000 has been removed due to hitting the retention >> limit. >> > > [2018-07-18 21:11:29,331] INFO [Log >> > > partition=apache-wordcount-KSTREAM-AGGREGATE-STATE-STORE- >> > 03-repartition-0, >> > > dir=/tmp/kafka-logs0] Incrementing log start offset to 9 >>
Documentation/Article/Case Study on Scala as the Kafka Backbone Language
Hello, Is anyone aware of any links or website where I can find information/case study etc. to why Scala was the best choice for kafka design? I hope this is not too much of a "Naive" question since I have had a very humble introduction to Scala. I understand that Scala is considered where distributed/scalable systems need to be designed. Also, in some cases it reduces multiple complex statements to be formed using a single complex statements i.e. reduce incremental verbosity. So, as a person who has background in Java, but relatively novice in Scala, I wanted to understand whether a study document exists to document the choice? Regards,
Re: Use Kafka Streams for windowing data and processing each window at once
Hello Guozhang, Managing to have a stream with just one record per each key per window is definitely something we want to achieve. But, it is not all. We also want to process the whole window at once so our callback would receive just one collection of aggregates per window once it is ready. We would probably need to receive the collection as an iterable that dynamically loads the window in chunks as the size of the window could be unmanageable as single object. This way we could produce one report for each window in the example "Final window result per key" you manage to send an alert for each user with less than three events but we also want to collect in one report the list of all users with less than three events in the one hour window. Best Regards, Bruno On ven, 2018-07-20 at 09:11 -0700, Guozhang Wang wrote: > Hello Bruno, > > We've discussed about the callback approach before, but then we > realized > with the proposed API, this can still be achieved. In the "Final > window > results per key" section, John showed how to do that. Note the > resulted > stream will have exactly one record per each key, with the value > representing the "final result" for that key. > > > Guozhang > > > On Fri, Jul 20, 2018 at 8:13 AM, Bruno Bottazzini < > bruno.bottazz...@targatelematics.com> wrote: > > > > > Bill, > > > > After reading the documentation and sure it looks really close to > > our > > need however I had a doubt about it. > > > > One small question. > > > > I was expecting also a callback that Kafka would call after the > > whole > > period is passed and this callback would receive an iterable object > > that contains all the aggregated information that was collected in > > the > > same period. > > > > Will it be possible when using grace() or suppress()? > > > > Best Regards, > > > > Bruno > > > > On gio, 2018-07-19 at 12:59 -0400, Bill Bejeck wrote: > > > > > > Hi Bruno, > > > > > > What you are asking is a common request. There is a KIP in the > > > works, > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Abil > > > ity+ > > > to+suppress+updates+for+KTables, > > > that should suit the requirements you've outlined. > > > > > > In the meantime, I'll see if I can come up with an alternative > > > approach > > > over the next few days. > > > > > > -Bill > > > > > > On Thu, Jul 19, 2018 at 12:07 PM Bruno Bottazzini < > > > bruno.bottazz...@targatelematics.com> wrote: > > > > > > > > > > > > > > > Hello, > > > > > > > > We have a doubt about Kafka streams on how it works. Or at > > > > least we > > > > are > > > > having some troubles in making it to work. > > > > > > > > The purpose we want to achieve is to group by user some > > > > messages > > > > that > > > > we receive from a Kafka topic and window them in order to > > > > aggregate > > > > the > > > > messages we receive in the window (5 minutes). Then, I'd like > > > > to > > > > collect all aggregates in each window in order to process them > > > > at > > > > once > > > > adding them to a report of all the messages I received in the 5 > > > > minutes > > > > interval. > > > > > > > > The last point seems to be the tough part as Kafka Streams > > > > doesn't > > > > seem > > > > to provide (at least we can't find it :() anything that can > > > > collect > > > > all > > > > the window related stuff in a "finite" stream to be processed > > > > in > > > > one > > > > place. > > > > > > > > The file (implemented_code.txt) contains the code we have > > > > implemented > > > > where it contains at least one of our tries to make it to work. > > > > > > > > You can find its result inside the file (result.txt) > > > > > > > > For each window there are many log lines and they are mixed > > > > with > > > > the > > > > other windows. > > > > > > > > What I'd like to have is something like: > > > > > > > > // Hypothetical implementation > > > > windowedMessages.streamWindows((interval, window) -> > > > > process(interval, > > > > window)); > > > > > > > > where method process would be something like: > > > > > > > > // Hypothetical implementation > > > > void process(Interval interval, WindowStream > > > List> > > > > windowStream) { > > > > // Create report for the whole window > > > > Report report = new Report(nameFromInterval()); > > > > // Loop on the finite iterable that represents the window > > > > content > > > > for (WindowStreamEntry> entry: > > > > windowStream) > > > > { > > > > report.addLine(entry.getKey(), entry.getValue()); > > > > } > > > > report.close(); > > > > } > > > > > > > > > > > > >