Kafka Broker : Socket.request.max.bytes & Socket.receive.buffer.bytes

2018-07-23 Thread nitin sharma
Hi All,

I am trying to increase my Kafka producer client throughput and for same, i
was exploring server.properties file parameters.

I would like to better understand the relation between
"Socket.request.max.bytes" and "Socket.receive.buffer.bytes" parameter sets
in broker's server.properties file.

I have seen in my project, the "Socket.request.max.bytes" set to a high
value ( around 104,857,600) but the "socket.receive.buffer.bytes" is just
102400..

Based upon the description of these parameters on Apache site,I now doubt
that until we increase socket.receive.buffer.bytes value, there wont be
much performance gain we can get on Producer end. is my understanding
correct ?



Regards,
Nitin Kumar Sharma.


Kafka Streams throughput performance overtime

2018-07-23 Thread Siva Ram
 Hi,

I have a stream application that performs rollup from 15mins to Hourly,
then Hourly to Daily.  The process needs to be continuously run 24 hours
and each 15 mins approx 12 million records gets posted (a JSON record per
message) into the input topic. There are 3 separate processors
corresponding to the above, where Hourly and Daily maintains the state.
So, in hourly 10million needs to be retained ever hour in the state and in
daily overall 10 million.  42GB of memory is allocated for the whole
application and throughput is fine for until first 10hrs, after that it
degrades significantly.  Any suggestions on this to identify the delay and
to increase the throughput would be of great help?  We are on Kafka 1.0.0

Thanks
Siva


Re: Debugging message timestamps in Sarama

2018-07-23 Thread Craig Ching
Hi Dmitry,

Are you associated with the Sarama project?  If so, understand that part of
what I want is to learn about Sarama and the Kafka message format ;)

The problem I'm having is that if I turn on:

log.message.timestamp.type=LogAppendTime

in the broker, then produce on topic1 with console producer, I will see
timestamps in the sarama client.  If I produce on topic2 with telegraf
(incidentally, I think telegraf is a sarama producer), then I don't see
timestamps in the sarama client.  In both cases, if I consume using the
console consumer (with --property print.timestamp=true) I *do* see
timestamps.

I'm happy to debug this issue myself and submit a PR to sarama, but I am
missing some fundamentals of how to decode the kafka message format and
would really like some pointers.

Cheers,
Craig

P.S.  Here is the sarama code I'm using to test:

package main

import (
"fmt"
"log"
"os"
"os/signal"
"time"

"github.com/Shopify/sarama"
)

func main() {

// Initialize Sarama logging
sarama.Logger = log.New(os.Stdout, "[Sarama] ",
log.Ldate|log.Lmicroseconds|log.Lshortfile)

signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)

config := sarama.NewConfig()
config.Consumer.Return.Errors = true
config.ClientID = "consumer-test"
config.Metadata.RefreshFrequency = time.Duration(5) * time.Minute
config.Metadata.Full = true
// config.Version = sarama.V0_10_0_0
config.Version = sarama.V1_1_0_0
// config.Version = sarama.V0_10_2_1
config.Consumer.Offsets.Initial = sarama.OffsetOldest

brokers := []string{"localhost:9092"}
// brokers :=
[]string{"measurement-kafka-broker.service.tgt-pe-prod-ttc.consul.c-prod.ost.cloud.target.internal:9092"}

client, err := sarama.NewConsumer(brokers, config)
if err != nil {
panic(err)
}

// topic := "topic1"
topic := "topic2"
// topic := "metric-influx-measurement"
// How to decide partition, is it fixed value...?
consumer, err := client.ConsumePartition(topic, 0, sarama.OffsetOldest)
if err != nil {
panic(err)
}

defer func() {
if err := client.Close(); err != nil {
panic(err)
}
}()

// Count how many message processed
msgCount := 0

go func() {
for {
select {
case err := <-consumer.Errors():
fmt.Println(err)
case msg := <-consumer.Messages():
msgCount++
fmt.Println(msg.Timestamp)
fmt.Println("Received messages", string(msg.Key), string(msg.Value))
case <-signals:
fmt.Println("Interrupt is detected")
break
}
}
}()
<-signals
}


On Mon, Jul 23, 2018 at 10:43 AM Dmitriy Vsekhvalnov 
wrote:

> Hey Craig,
>
> what exact problem you have with Sarama client?
>
> On Mon, Jul 23, 2018 at 5:11 PM Craig Ching  wrote:
>
> > Hi!
> >
> > I'm working on debugging a problem with how message timestamps are
> handled
> > in the sarama client.  In some cases, the sarama client won't associate a
> > timestamp with a message while the kafka console consumer does.  I've
> found
> > the documentation on the message format here:
> >
> > https://kafka.apache.org/documentation/#messageformat
> >
> > But the information there is very sparse.  For instance, what are
> > 'firstTimestamp' and 'maxTimestamp'?  It seems that when I'm debugging
> > sarama, firstTimestamp is set to -1 and maxTimestamp appears to be the
> > timestamp I want.  Is there some state about the message that I need to
> > understand in order to have maxTimestamp be used?  Any further
> > documentation or guidance on this would be very helpful!
> >
> > On another note, I am trying to debug this through the scala/java console
> > consumer, but I'm having a hard time getting IntelliJ setup.  Is there
> > anything special or documentation I need to set this up for debugging?
> >
>


Re: Use Kafka Streams for windowing data and processing each window at once

2018-07-23 Thread Guozhang Wang
I see.

In that case, one workaround would be to query the state store directly
after you know that no more updates would be applied to that store in a
`punctuation` function: note that punctuation is a feature that's only
available in the Processor API, but you can always add such a lower-level
implementation into your DSL topology by calling `KStream#process() /
transform()`.

Then what you can do, is to schedule a stream-time based punctuation
function with the scheduling interval equal to the window size, and then
whenever the punctuation function is triggered, it is triggered with the
current stream time as its parameter, by using this stream time you can
then determine which window(s) have become final: for example, if your
window length is 10 and grace period is configured as 5, and your
punctuation function triggers at 25, then you know that your window of [0,
10) should have been final already. You can then access that windowed state
store directly using the fetchAll(fromTimestamp, toTimestamp) API to fetch
all the keys of that range to generate the global report.


Guozhang


On Mon, Jul 23, 2018 at 12:48 AM, Bruno Bottazzini <
bruno.bottazz...@targatelematics.com> wrote:

> Hello Guozhang,
>
> Managing to have a stream with just one record per each key per window
> is definitely something we want to achieve.
>
> But, it is not all. We also want to process the whole window at once so
> our callback would receive just one collection of aggregates per window
> once it is ready.
>
> We would probably need to receive the collection as an iterable that
> dynamically loads the window in chunks as the size of the window could
> be unmanageable as single object.
>
> This way we could produce one report for each window in the example
> "Final window result per key" you manage to send an alert for each user
> with less than three events but we also want to collect in one report
> the list of all users with less than three events in the one hour
> window.
>
> Best Regards,
>
> Bruno
>
> On ven, 2018-07-20 at 09:11 -0700, Guozhang Wang wrote:
> > Hello Bruno,
> >
> > We've discussed about the callback approach before, but then we
> > realized
> > with the proposed API, this can still be achieved. In the "Final
> > window
> > results per key" section, John showed how to do that. Note the
> > resulted
> > stream will have exactly one record per each key, with the value
> > representing the "final result" for that key.
> >
> >
> > Guozhang
> >
> >
> > On Fri, Jul 20, 2018 at 8:13 AM, Bruno Bottazzini <
> > bruno.bottazz...@targatelematics.com> wrote:
> >
> > >
> > > Bill,
> > >
> > > After reading the documentation and sure it looks really close to
> > > our
> > > need however I had a doubt about it.
> > >
> > > One small question.
> > >
> > > I was expecting also a callback that Kafka would call after the
> > > whole
> > > period is passed and this callback would receive an iterable object
> > > that contains all the aggregated information that was collected in
> > > the
> > > same period.
> > >
> > > Will it be possible when using grace() or suppress()?
> > >
> > > Best Regards,
> > >
> > > Bruno
> > >
> > > On gio, 2018-07-19 at 12:59 -0400, Bill Bejeck wrote:
> > > >
> > > > Hi Bruno,
> > > >
> > > > What you are asking is a common request.  There is a KIP in the
> > > > works,
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Abil
> > > > ity+
> > > > to+suppress+updates+for+KTables,
> > > > that should suit the requirements you've outlined.
> > > >
> > > > In the meantime, I'll see if I can come up with an alternative
> > > > approach
> > > > over the next few days.
> > > >
> > > > -Bill
> > > >
> > > > On Thu, Jul 19, 2018 at 12:07 PM Bruno Bottazzini <
> > > > bruno.bottazz...@targatelematics.com> wrote:
> > > >
> > > > >
> > > > >
> > > > > Hello,
> > > > >
> > > > > We have a doubt about Kafka streams on how it works. Or at
> > > > > least we
> > > > > are
> > > > > having some troubles in making it to work.
> > > > >
> > > > > The purpose we want to achieve is to group by user some
> > > > > messages
> > > > > that
> > > > > we receive from a Kafka topic and window them in order to
> > > > > aggregate
> > > > > the
> > > > > messages we receive in the window (5 minutes). Then, I'd like
> > > > > to
> > > > > collect all aggregates in each window in order to process them
> > > > > at
> > > > > once
> > > > > adding them to a report of all the messages I received in the 5
> > > > > minutes
> > > > > interval.
> > > > >
> > > > > The last point seems to be the tough part as Kafka Streams
> > > > > doesn't
> > > > > seem
> > > > > to provide (at least we can't find it :() anything that can
> > > > > collect
> > > > > all
> > > > > the window related stuff in a "finite" stream to be processed
> > > > > in
> > > > > one
> > > > > place.
> > > > >
> > > > > The file (implemented_code.txt) contains the code we have
> > > > > implemented
> > > > > where it contains at least one of our 

Graph Processing with Kafka

2018-07-23 Thread Robert Yokota
Hi,

If anyone is interested in using Kafka for graph processing, I started a
project to try to use Kafka Streams for that purpose.  Feedback is most
welcome!

https://yokota.blog/2018/07/23/kafka-graphs-graph-analytics-with-apache-kafka/

https://github.com/rayokota/kafka-graphs


Re: Debugging message timestamps in Sarama

2018-07-23 Thread Dmitriy Vsekhvalnov
Hey Craig,

what exact problem you have with Sarama client?

On Mon, Jul 23, 2018 at 5:11 PM Craig Ching  wrote:

> Hi!
>
> I'm working on debugging a problem with how message timestamps are handled
> in the sarama client.  In some cases, the sarama client won't associate a
> timestamp with a message while the kafka console consumer does.  I've found
> the documentation on the message format here:
>
> https://kafka.apache.org/documentation/#messageformat
>
> But the information there is very sparse.  For instance, what are
> 'firstTimestamp' and 'maxTimestamp'?  It seems that when I'm debugging
> sarama, firstTimestamp is set to -1 and maxTimestamp appears to be the
> timestamp I want.  Is there some state about the message that I need to
> understand in order to have maxTimestamp be used?  Any further
> documentation or guidance on this would be very helpful!
>
> On another note, I am trying to debug this through the scala/java console
> consumer, but I'm having a hard time getting IntelliJ setup.  Is there
> anything special or documentation I need to set this up for debugging?
>


Re: Documentation/Article/Case Study on Scala as the Kafka Backbone Language

2018-07-23 Thread Rahul Singh
Not necessarily for Kafka, but you can definitely google “Java vs. Scala” and 
find a variety of reasons . I did a study for a client and ultimately here are 
the major reasons I found :

1. Functional programming language which leads itself to stateless systems
2. Better / easier to use stream processing syntax (then at that time in Java 8)
3. REPL available to quickly test functionality interactively.
4. Case classes which can be inferred with or without strongly typed cases.
5. Abilty to quickly create DSLs that seem natural to developers
6. Awesome partial function syntax
7. My personal favorite — as I was using parboiled2 to build a parser — 
libraries like shapeless

Best

--
Rahul Singh
rahul.si...@anant.us

Anant Corporation
On Jul 23, 2018, 8:40 AM -0400, M. Manna , wrote:
> Hello,
>
> Is anyone aware of any links or website where I can find information/case
> study etc. to why Scala was the best choice for kafka design? I hope this
> is not too much of a "Naive" question since I have had a very humble
> introduction to Scala.
>
> I understand that Scala is considered where distributed/scalable systems
> need to be designed. Also, in some cases it reduces multiple complex
> statements to be formed using a single complex statements i.e. reduce
> incremental verbosity.
>
> So, as a person who has background in Java, but relatively novice in Scala,
> I wanted to understand whether a study document exists to document the
> choice?
>
> Regards,


Debugging message timestamps in Sarama

2018-07-23 Thread Craig Ching
Hi!

I'm working on debugging a problem with how message timestamps are handled
in the sarama client.  In some cases, the sarama client won't associate a
timestamp with a message while the kafka console consumer does.  I've found
the documentation on the message format here:

https://kafka.apache.org/documentation/#messageformat

But the information there is very sparse.  For instance, what are
'firstTimestamp' and 'maxTimestamp'?  It seems that when I'm debugging
sarama, firstTimestamp is set to -1 and maxTimestamp appears to be the
timestamp I want.  Is there some state about the message that I need to
understand in order to have maxTimestamp be used?  Any further
documentation or guidance on this would be very helpful!

On another note, I am trying to debug this through the scala/java console
consumer, but I'm having a hard time getting IntelliJ setup.  Is there
anything special or documentation I need to set this up for debugging?


Re: UNKNOWN_PRODUCER_ID error when running Streams WordCount demowith processing.guarantee set to EXACTLY_ONCE

2018-07-23 Thread 刘少波
Hello Guozhang, Bill

Thanks much for your explanation. I assigned this JIRA issue to myself and will
try to fix it.
 
Regards,
Lambdaliu(Shaobo Liu)
 
On 2018-07-21 at 02:24 Bill Bejeck   wrote:
 
>Created JIRA https://issues.apache.org/jira/browse/KAFKA-7190
>
>-Bill
>
>On Fri, Jul 20, 2018 at 1:26 PM Guozhang Wang  wrote:
>
>> Hello Shaobo,
>>
>> The situation you observed is a valid case: when a streams application has
>> very low traffic (like the quickstart example, where you manually type one
>> message at a time), then it is possible that consumer purging would delete
>> even the last message sent by this producer (i.e. all the messages sent by
>> this producer have been consumed and committed), and as a result the broker
>> would delete that producer's ID. The next time when this producer tries to
>> send, it will get this UNKNOWN_PRODUCER_ID error code, but in this case
>> this error is retriable: the producer would just get a new producer id and
>> retries, and then this time it will succeed. So the results you observed is
>> still correct.
>>
>>
>> We can, probably, improve this situation either in broker side or streams
>> client side: on broker side, we can consider delaying the deletion of the
>> producer ID for a while; on streams client side, we can consider purging in
>> a bit conservative manner (but it is still a bit tricky, since multiple
>> producers may be sending to the same inner topic, so just leaving the last
>> N messages to not be purged may not be safe still).
>>
>>
>> Bill, could we create a JIRA for this?
>>
>>
>> Guozhang
>>
>>
>> On Thu, Jul 19, 2018 at 7:55 AM, Bill Bejeck  wrote:
>>
>> > Hi
>> >
>> > Thanks for reporting this.  Just off the top of my head, I'm thinking it
>> > may have to do with using a console producer, but I'll have to take a
>> > deeper look.
>> >
>> > Thanks,
>> > Bill
>> >
>> > On Thu, Jul 19, 2018 at 9:59 AM lambdaliu(刘少波) 
>> > wrote:
>> >
>> > > Hi,
>> > >
>> > > I test the Kafka Streams WordCount demo follow the steps described in
>> > > http://kafka.apache.org/11/documentation/streams/quickstart with
>> change
>> > > the processing.guarantee property to EXACTLY_ONCE.
>> > >
>> > > And seeing the following WARN message in streams demo app logs:
>> > > [2018-07-18 21:08:03,510] WARN The configuration 'admin.retries' was
>> > > supplied but isn't a known config.
>> > > (org.apache.kafka.clients.consumer.ConsumerConfig)
>> > > [2018-07-18 21:11:29,218] WARN [Producer
>> > > clientId=apache-wordcount-2a671de0-d2b7-404f-bfe8-
>> > 9e8cad5008d4-StreamThread-1-0_0-producer,
>> > > transactionalId=apache-wordcount-0_0] Got error produce response with
>> > > correlation id 15 on topic-partition
>> > >
>> apache-wordcount-KSTREAM-AGGREGATE-STATE-STORE-03-repartition-0,
>> > > retrying (2147483646 attempts left). Error: UNKNOWN_PRODUCER_ID
>> > > (org.apache.kafka.clients.producer.internals.Sender)
>> > > [2018-07-18 21:15:04,092] WARN [Producer
>> > > clientId=apache-wordcount-2a671de0-d2b7-404f-bfe8-
>> > 9e8cad5008d4-StreamThread-1-0_0-producer,
>> > > transactionalId=apache-wordcount-0_0] Got error produce response with
>> > > correlation id 21 on topic-partition
>> > >
>> apache-wordcount-KSTREAM-AGGREGATE-STATE-STORE-03-repartition-0,
>> > > retrying (2147483646 attempts left). Error: UNKNOWN_PRODUCER_ID
>> > > (org.apache.kafka.clients.producer.internals.Sender)
>> > >
>> > > There are also some ERROR message in the broker logs:
>> > > [2018-07-18 21:10:16,463] INFO Updated PartitionLeaderEpoch. New:
>> > > {epoch:0, offset:0}, Current: {epoch:-1, offset:-1} for Partition:
>> > >
>> apache-wordcount-KSTREAM-AGGREGATE-STATE-STORE-03-repartition-0.
>> > > Cache now contains 0 entries. (kafka.server.epoch.LeaderEpochFileCache)
>> > > [2018-07-18 21:10:16,965] INFO [Log
>> > > partition=apache-wordcount-KSTREAM-AGGREGATE-STATE-STORE-
>> > 03-repartition-0,
>> > > dir=/tmp/kafka-logs0] Incrementing log start offset to 5
>> (kafka.log.Log)
>> > > [2018-07-18 21:10:16,966] INFO Cleared earliest 0 entries from epoch
>> > cache
>> > > based on passed offset 5 leaving 1 in EpochFile for partition
>> > > apache-wordcount-KSTREAM-AGGREGATE-STATE-STORE-03-repartition-0
>> > > (kafka.server.epoch.LeaderEpochFileCache)
>> > > [2018-07-18 21:11:29,217] ERROR [ReplicaManager broker=0] Error
>> > processing
>> > > append operation on partition
>> > > apache-wordcount-KSTREAM-AGGREGATE-STATE-STORE-03-repartition-0
>> > > (kafka.server.ReplicaManager)
>> > > org.apache.kafka.common.errors.UnknownProducerIdException: Found no
>> > record
>> > > of producerId=5000 on the broker. It is possible that the last message
>> > with
>> > > the producerId=5000 has been removed due to hitting the retention
>> limit.
>> > > [2018-07-18 21:11:29,331] INFO [Log
>> > > partition=apache-wordcount-KSTREAM-AGGREGATE-STATE-STORE-
>> > 03-repartition-0,
>> > > dir=/tmp/kafka-logs0] Incrementing log start offset to 9
>> 

Documentation/Article/Case Study on Scala as the Kafka Backbone Language

2018-07-23 Thread M. Manna
Hello,

Is anyone aware of any links or website where I can find information/case
study etc. to why Scala was the best choice for kafka design? I hope this
is not too much of a "Naive" question since I have had a very humble
introduction to Scala.

I understand that Scala is considered where distributed/scalable systems
need to be designed. Also, in some cases it reduces multiple complex
statements to be formed using a single complex statements i.e. reduce
incremental verbosity.

So, as a person who has background in Java, but relatively novice in Scala,
I wanted to understand whether a study document exists to document the
choice?

Regards,


Re: Use Kafka Streams for windowing data and processing each window at once

2018-07-23 Thread Bruno Bottazzini
Hello Guozhang,

Managing to have a stream with just one record per each key per window
is definitely something we want to achieve.

But, it is not all. We also want to process the whole window at once so
our callback would receive just one collection of aggregates per window
once it is ready. 

We would probably need to receive the collection as an iterable that
dynamically loads the window in chunks as the size of the window could
be unmanageable as single object.

This way we could produce one report for each window in the example
"Final window result per key" you manage to send an alert for each user
with less than three events but we also want to collect in one report
the list of all users with less than three events in the one hour
window.

Best Regards,

Bruno

On ven, 2018-07-20 at 09:11 -0700, Guozhang Wang wrote:
> Hello Bruno,
> 
> We've discussed about the callback approach before, but then we
> realized
> with the proposed API, this can still be achieved. In the "Final
> window
> results per key" section, John showed how to do that. Note the
> resulted
> stream will have exactly one record per each key, with the value
> representing the "final result" for that key.
> 
> 
> Guozhang
> 
> 
> On Fri, Jul 20, 2018 at 8:13 AM, Bruno Bottazzini <
> bruno.bottazz...@targatelematics.com> wrote:
> 
> > 
> > Bill,
> > 
> > After reading the documentation and sure it looks really close to
> > our
> > need however I had a doubt about it.
> > 
> > One small question.
> > 
> > I was expecting also a callback that Kafka would call after the
> > whole
> > period is passed and this callback would receive an iterable object
> > that contains all the aggregated information that was collected in
> > the
> > same period.
> > 
> > Will it be possible when using grace() or suppress()?
> > 
> > Best Regards,
> > 
> > Bruno
> > 
> > On gio, 2018-07-19 at 12:59 -0400, Bill Bejeck wrote:
> > > 
> > > Hi Bruno,
> > > 
> > > What you are asking is a common request.  There is a KIP in the
> > > works,
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Abil
> > > ity+
> > > to+suppress+updates+for+KTables,
> > > that should suit the requirements you've outlined.
> > > 
> > > In the meantime, I'll see if I can come up with an alternative
> > > approach
> > > over the next few days.
> > > 
> > > -Bill
> > > 
> > > On Thu, Jul 19, 2018 at 12:07 PM Bruno Bottazzini <
> > > bruno.bottazz...@targatelematics.com> wrote:
> > > 
> > > > 
> > > > 
> > > > Hello,
> > > > 
> > > > We have a doubt about Kafka streams on how it works. Or at
> > > > least we
> > > > are
> > > > having some troubles in making it to work.
> > > > 
> > > > The purpose we want to achieve is to group by user some
> > > > messages
> > > > that
> > > > we receive from a Kafka topic and window them in order to
> > > > aggregate
> > > > the
> > > > messages we receive in the window (5 minutes). Then, I'd like
> > > > to
> > > > collect all aggregates in each window in order to process them
> > > > at
> > > > once
> > > > adding them to a report of all the messages I received in the 5
> > > > minutes
> > > > interval.
> > > > 
> > > > The last point seems to be the tough part as Kafka Streams
> > > > doesn't
> > > > seem
> > > > to provide (at least we can't find it :() anything that can
> > > > collect
> > > > all
> > > > the window related stuff in a "finite" stream to be processed
> > > > in
> > > > one
> > > > place.
> > > > 
> > > > The file (implemented_code.txt) contains the code we have
> > > > implemented
> > > > where it contains at least one of our tries to make it to work.
> > > > 
> > > > You can find its result inside the file (result.txt)
> > > > 
> > > > For each window there are many log lines and they are mixed
> > > > with
> > > > the
> > > > other windows.
> > > > 
> > > > What I'd like to have is something like:
> > > > 
> > > > // Hypothetical implementation
> > > > windowedMessages.streamWindows((interval, window) ->
> > > > process(interval,
> > > > window));
> > > > 
> > > > where method process would be something like:
> > > > 
> > > > // Hypothetical implementation
> > > > void process(Interval interval, WindowStream > > > List>
> > > > windowStream) {
> > > > // Create report for the whole window
> > > > Report report = new Report(nameFromInterval());
> > > > // Loop on the finite iterable that represents the window
> > > > content
> > > > for (WindowStreamEntry> entry:
> > > > windowStream)
> > > > {
> > > > report.addLine(entry.getKey(), entry.getValue());
> > > > }
> > > > report.close();
> > > > }
> > > > 
> > > > 
> > 
> 
> 
>