rolling upgrade from 0.9 to 0.11

2018-03-05 Thread Sunil Parmar
Any documents / suggestions / experience that community can share about
rolling upgrade from 0.9 to 0.11 and tips/ tricks you might have learned ?
Our producers are c++ ( using librdkafka ) . I have tested upgrading client
library to 0.9.5 works against both clusters in test environment but afraid
of performance impact in production when done on scale.

thoughts ?

Sunil Parmar


Re: when use kafka streams to(topic) method sometime throw error?

2018-03-05 Thread 杰 杨
it seems i don't config ProducerConfig in stream application.
should I config that ?




funk...@live.com

From: funk...@live.com
Date: 2018-03-06 11:23
To: users
Subject: when use kafka streams to(topic) method sometime throw error?

hi:
I meet a problem today.
when I use kafka stream to consumer one topic and do mapValues() method,
and to another topic then .sometimes throw an error
this is code sample:
new StreamsBuilder().stream(xxxtopic, Consumed.with(Serdes.String(), 
Serdes.String())).mapValus(method).to(newTopic).
sometimes it's work well but sometime it's throw error

to topic newTopic due to org.apache.kafka.common.errors.TimeoutException: 
Expiring 6 record(s) for newTopic-2: 30030 ms has passed since last attempt 
plus backoff time


funk...@live.com


when use kafka streams to(topic) method sometime throw error?

2018-03-05 Thread ? ?

hi:
I meet a problem today.
when I use kafka stream to consumer one topic and do mapValues() method,
and to another topic then .sometimes throw an error
this is code sample:
new StreamsBuilder().stream(xxxtopic, Consumed.with(Serdes.String(), 
Serdes.String())).mapValus(method).to(newTopic).
sometimes it's work well but sometime it's throw error

to topic newTopic due to org.apache.kafka.common.errors.TimeoutException: 
Expiring 6 record(s) for newTopic-2: 30030 ms has passed since last attempt 
plus backoff time


funk...@live.com


Re: Kafka Setup for Daily counts on wide array of keys

2018-03-05 Thread Thakrar, Jayesh
Sorry Matt, I don’t have much idea about Kafka streaming (or any streaming for 
that matter).
As for saving counts from your application servers to Aerospike directly, that 
is certain simpler, requiring less hardware, resources and development effort.

One reason some people use Kafka as part of their pipeline is to decouple 
systems and protect either end from issues in the other.
It usually makes maintenance on either end simple. Furthermore, it acts as a 
dampening buffer and because of Kafka's low latency and high throughput(well, 
that's a relative term), allows the producers and consumers run at their full 
potential (kind of, but not exactly async push and pull of data).

It might even be worthwhile to start off without Kafka and once you understand 
things better introduce Kafka later on.

From: Matt Daum 
Date: Monday, March 5, 2018 at 4:33 PM
To: "Thakrar, Jayesh" 
Cc: "users@kafka.apache.org" 
Subject: Re: Kafka Setup for Daily counts on wide array of keys

And not to overthink this, but as I'm new to Kafka and streams I want to make 
sure that it makes the most sense to for my use case.  With the streams and 
grouping, it looks like I'd be getting at 1 internal topic created per grouped 
stream which then would written and reread then totaled in the count, then 
would have that produce a final stream which is then consumed/sinked to an 
external db.   Is that correct?

Overall I'm not using the streaming counts as it grows throughout the day, but 
just want a final end of day count.  We already have an Aerospike cluster setup 
for the applications themselves.  If each application server itself made the 
writes to the Aerospike DB cluster to simply increase the counts for each 
attribute then at end of day read it out there it appears it'd be less 
computing resources used.  As we'd effectively be doing inbound request -> DB 
write per counted attribute.

I am not saying that is the better route as I'm I don't fully know or 
understand the full capabilities of Kafka.  Since we aren't streaming the data, 
enriching it, etc. would the direct to DB counts be a better approach?  I just 
want to make sure I use the best tool for the job.Let me know what other 
factors I may be underestimating/misunderstanding on the Kafka approach please. 
 I want to be informed as possible before going down either path too far.

Thank you again for your time,
Matt

On Mon, Mar 5, 2018 at 3:14 PM, Thakrar, Jayesh 
mailto:jthak...@conversantmedia.com>> wrote:
Yep, exactly.

So there is some buffering that you need to do in your client and also deal 
with edge cases.
E.g. how long should you hold on to a batch before you send a smaller batch to 
producer since you want a balance between batch optimization and expedience.

You may need to do some experiments to balance between system throughput, 
record size, batch size and potential batching delay for a given rate of 
incoming requests.


From: Matt Daum mailto:m...@setfive.com>>
Date: Monday, March 5, 2018 at 1:59 PM

To: "Thakrar, Jayesh" 
mailto:jthak...@conversantmedia.com>>
Cc: "users@kafka.apache.org" 
mailto:users@kafka.apache.org>>
Subject: Re: Kafka Setup for Daily counts on wide array of keys

Ah good call, so you are really having an AVRO wrapper around your single class 
right?  IE an array of records, correct?  Then when you hit a size you are 
happy you send it to the producer?

On Mon, Mar 5, 2018 at 12:07 PM, Thakrar, Jayesh 
mailto:jthak...@conversantmedia.com>> wrote:
Good luck on your test!

As for the batching within Avro and by Kafka Producer, here are my thoughts 
without any empirical proof.
There is a certain amount of overhead in terms of execution AND bytes in 
converting a request record into Avro and producing (generating) a Kafka 
message out of it.
For requests of size 100-200 bytes, that can be a substantial amount - 
especially the fact that you will be bundling the Avro schema for each request 
in its Kafka message.

By batching the requests, you are significantly amortizing that overhead across 
many rows.

From: Matt Daum mailto:m...@setfive.com>>
Date: Monday, March 5, 2018 at 5:54 AM

To: "Thakrar, Jayesh" 
mailto:jthak...@conversantmedia.com>>
Cc: "users@kafka.apache.org" 
mailto:users@kafka.apache.org>>
Subject: Re: Kafka Setup for Daily counts on wide array of keys

Thanks for the suggestions!  It does look like it's using local RocksDB stores 
for the state info by default.  Will look into using an external one.

As for the "millions of different values per grouped attribute" an example 
would be assume on each requests there is a parameters "X" which at the end of 
each day I want to know the counts per unique value, it could have 100's of 
millions of possible values.

I'll start to hopefully work this week on an initial test of everything and 
will report back.  A few last questions if you have the time:
- For the batching of the AVRO files, would this be diff

Re: Kafka Setup for Daily counts on wide array of keys

2018-03-05 Thread Matt Daum
And not to overthink this, but as I'm new to Kafka and streams I want to
make sure that it makes the most sense to for my use case.  With the
streams and grouping, it looks like I'd be getting at 1 internal topic
created per grouped stream which then would written and reread then totaled
in the count, then would have that produce a final stream which is then
consumed/sinked to an external db.   Is that correct?

Overall I'm not using the streaming counts as it grows throughout the day,
but just want a final end of day count.  We already have an Aerospike
cluster setup for the applications themselves.  If each application server
itself made the writes to the Aerospike DB cluster to simply increase the
counts for each attribute then at end of day read it out there it appears
it'd be less computing resources used.  As we'd effectively be doing
inbound request -> DB write per counted attribute.

I am not saying that is the better route as I'm I don't fully know or
understand the full capabilities of Kafka.  Since we aren't streaming the
data, enriching it, etc. would the direct to DB counts be a better
approach?  I just want to make sure I use the best tool for the job.Let
me know what other factors I may be underestimating/misunderstanding on the
Kafka approach please.  I want to be informed as possible before going down
either path too far.

Thank you again for your time,
Matt

On Mon, Mar 5, 2018 at 3:14 PM, Thakrar, Jayesh <
jthak...@conversantmedia.com> wrote:

> Yep, exactly.
>
>
>
> So there is some buffering that you need to do in your client and also
> deal with edge cases.
>
> E.g. how long should you hold on to a batch before you send a smaller
> batch to producer since you want a balance between batch optimization and
> expedience.
>
>
>
> You may need to do some experiments to balance between system throughput,
> record size, batch size and potential batching delay for a given rate of
> incoming requests.
>
>
>
>
>
> *From: *Matt Daum 
> *Date: *Monday, March 5, 2018 at 1:59 PM
>
> *To: *"Thakrar, Jayesh" 
> *Cc: *"users@kafka.apache.org" 
> *Subject: *Re: Kafka Setup for Daily counts on wide array of keys
>
>
>
> Ah good call, so you are really having an AVRO wrapper around your single
> class right?  IE an array of records, correct?  Then when you hit a size
> you are happy you send it to the producer?
>
>
>
> On Mon, Mar 5, 2018 at 12:07 PM, Thakrar, Jayesh <
> jthak...@conversantmedia.com> wrote:
>
> Good luck on your test!
>
>
>
> As for the batching within Avro and by Kafka Producer, here are my
> thoughts without any empirical proof.
>
> There is a certain amount of overhead in terms of execution AND bytes in
> converting a request record into Avro and producing (generating) a Kafka
> message out of it.
>
> For requests of size 100-200 bytes, that can be a substantial amount -
> especially the fact that you will be bundling the Avro schema for each
> request in its Kafka message.
>
>
>
> By batching the requests, you are significantly amortizing that overhead
> across many rows.
>
>
>
> *From: *Matt Daum 
> *Date: *Monday, March 5, 2018 at 5:54 AM
>
>
> *To: *"Thakrar, Jayesh" 
> *Cc: *"users@kafka.apache.org" 
> *Subject: *Re: Kafka Setup for Daily counts on wide array of keys
>
>
>
> Thanks for the suggestions!  It does look like it's using local RocksDB
> stores for the state info by default.  Will look into using an external
> one.
>
>
>
> As for the "millions of different values per grouped attribute" an example
> would be assume on each requests there is a parameters "X" which at the end
> of each day I want to know the counts per unique value, it could have 100's
> of millions of possible values.
>
>
>
> I'll start to hopefully work this week on an initial test of everything
> and will report back.  A few last questions if you have the time:
>
> - For the batching of the AVRO files, would this be different than the
> Producer batching?
>
> - Any other things you'd suggest looking out for as gotcha's or
> configurations that probably will be good to tweak further?
>
>
>
> Thanks!
>
> Matt
>
>
>
> On Sun, Mar 4, 2018 at 11:23 PM, Thakrar, Jayesh <
> jthak...@conversantmedia.com> wrote:
>
> BTW - I did not mean to rule-out Aerospike as a possible datastore.
>
> Its just that I am not familiar with it, but surely looks like a good
> candidate to store the raw and/or aggregated data, given that it also has a
> Kafka Connect module.
>
>
>
> *From: *"Thakrar, Jayesh" 
> *Date: *Sunday, March 4, 2018 at 9:25 PM
> *To: *Matt Daum 
>
>
> *Cc: *"users@kafka.apache.org" 
> *Subject: *Re: Kafka Setup for Daily counts on wide array of keys
>
>
>
> I don’t have any experience/knowledge on the Kafka inbuilt datastore, but
> believe thatfor some
>
> portions of streaming Kafka uses (used?) RocksDB to locally store some
> state info in the brokers.
>
>
>
> Personally  I would use an external datastore.
>
> There's a wide choice out there - regular key-value stores like Cassandra,
> ScyllaDB, Rocks

Re: kafka-streams, late data, tumbling windows aggregations and event time

2018-03-05 Thread Guozhang Wang
Sounds great! :)

On Mon, Mar 5, 2018 at 12:28 PM, Dmitriy Vsekhvalnov  wrote:

> Thanks, that's an option, i'll take a look at configuration.
>
> But yeah, i was thinking same, if streams relies on the fact that internal
> topics should use 'CreateTime' configuration, then it is streams library
> responsibility to configure it.
>
> I can open a Jira ticket :)
>
> On Mon, Mar 5, 2018 at 11:18 PM, Guozhang Wang  wrote:
>
> > Hello Dmitriy,
> >
> > In your case, you can override this config to CreateTime only for the
> > internal topics created by Streams, this is documented in
> >
> > https://kafka.apache.org/10/javadoc/org/apache/kafka/
> > streams/StreamsConfig.html#TOPIC_PREFIX
> >
> >
> > We are also discussing to always override the log.message.timestamp.type
> > config for internal topics to CreateTime, I vaguely remember there is a
> > JIRA open for it in case you are interested in contributing to Streams
> > library.
> >
> > Guozhang
> >
> >
> > On Mon, Mar 5, 2018 at 10:50 AM, Dmitriy Vsekhvalnov <
> > dvsekhval...@gmail.com
> > > wrote:
> >
> > > Which effectively means given scenario is not working with
> LogAppendTime,
> > > correct? Because all internal re-partition topics will always contain
> > "now"
> > > instead of real timestamp from original payload message?
> > >
> > > Is kafka-streams designed to work with LogAppendTime at all? It seems a
> > lot
> > > of stuff will NOT work correctly using
> > > built-in ExtractRecordMetadataTimestamp ?
> > >
> > > On Mon, Mar 5, 2018 at 9:30 PM, Guozhang Wang 
> > wrote:
> > >
> > > > If broker configures log.message.timestamp.type=LogAppendTime
> > > universally,
> > > > it will ignore whatever timestamp set in the message metadata and
> > > override
> > > > it with the append time. So when the messages are fetched by
> downstream
> > > > processors which always use the metadata timestamp extractor, it will
> > get
> > > > the append timestamp set by brokers.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > > On Mon, Mar 5, 2018 at 9:53 AM, Dmitriy Vsekhvalnov <
> > > > dvsekhval...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi Guozhang,
> > > > >
> > > > > interesting, will same logic applies (internal topic rewrite) for
> > > brokers
> > > > > configured with:
> > > > >   log.message.timestamp.type=LogAppendTime
> > > > >
> > > > > ?
> > > > >
> > > > > On Mon, Mar 5, 2018 at 8:33 PM, Guozhang Wang 
> > > > wrote:
> > > > >
> > > > > > Hello Dmitriy,
> > > > > >
> > > > > > What you have observed is by design, and it maybe a bit confusing
> > at
> > > > > first
> > > > > > place. Let me explain:
> > > > > >
> > > > > > When you do a group-by aggregation like the above case, during
> the
> > > > > > "groupBy((key,
> > > > > > value) -> ..)" stage Streams library will do a
> re-partitioning
> > by
> > > > > > sending the original data stream into an internal repartition
> topic
> > > > based
> > > > > > on the aggregation key defined in the "groupBy" function and
> fetch
> > > from
> > > > > > that topic again. This is similar to a shuffle phase in
> distributed
> > > > > > computing frameworks to make sure the down stream aggregations
> can
> > be
> > > > > done
> > > > > > in parallel. When the "groupBy" operator sends the messages to
> this
> > > > > > repartition topic, it will set in the record metadata the
> extracted
> > > > > > timestamp from the payload, and hence for the downstream
> > aggregation
> > > > > > operator to read from this repartition topic, it is OK to always
> > use
> > > > > > the ExtractRecordMetadataTimestamp
> > > > > > to extract that timestamp and use the extracted value to
> determine
> > > > which
> > > > > > window this record should fall into.
> > > > > >
> > > > > > More details can be found in this JIRA:
> > > > > >
> > > > > > https://issues.apache.org/jira/browse/KAFKA-4785
> > > > > >
> > > > > >
> > > > > > So the record timestamp used during aggregation should be the
> same
> > as
> > > > the
> > > > > > one in the payload, if you do observe that is not the case, this
> is
> > > > > > unexpected. In that case could you share your complete code
> > snippet,
> > > > > > especially how input stream "in" is defined, and your config
> > > properties
> > > > > > defined for us to investigate?
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > >
> > > > > > On Mon, Mar 5, 2018 at 5:53 AM, Dmitriy Vsekhvalnov <
> > > > > > dvsekhval...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Good morning,
> > > > > > >
> > > > > > > we have simple use-case where we want to count number of events
> > by
> > > > each
> > > > > > > hour grouped by some fields from event itself.
> > > > > > >
> > > > > > > Our event timestamp is embedded into messages itself (json) and
> > we
> > > > > using
> > > > > > > trivial custom timestamp extractor (which called and works as
> > > > > expected).
> > > > > > >
> > > > > > > What we facing is that there is always timestamp used that
> coming
> > > > > > > from ExtractRecordMe

Re: kafka-streams, late data, tumbling windows aggregations and event time

2018-03-05 Thread Dmitriy Vsekhvalnov
Thanks, that's an option, i'll take a look at configuration.

But yeah, i was thinking same, if streams relies on the fact that internal
topics should use 'CreateTime' configuration, then it is streams library
responsibility to configure it.

I can open a Jira ticket :)

On Mon, Mar 5, 2018 at 11:18 PM, Guozhang Wang  wrote:

> Hello Dmitriy,
>
> In your case, you can override this config to CreateTime only for the
> internal topics created by Streams, this is documented in
>
> https://kafka.apache.org/10/javadoc/org/apache/kafka/
> streams/StreamsConfig.html#TOPIC_PREFIX
>
>
> We are also discussing to always override the log.message.timestamp.type
> config for internal topics to CreateTime, I vaguely remember there is a
> JIRA open for it in case you are interested in contributing to Streams
> library.
>
> Guozhang
>
>
> On Mon, Mar 5, 2018 at 10:50 AM, Dmitriy Vsekhvalnov <
> dvsekhval...@gmail.com
> > wrote:
>
> > Which effectively means given scenario is not working with LogAppendTime,
> > correct? Because all internal re-partition topics will always contain
> "now"
> > instead of real timestamp from original payload message?
> >
> > Is kafka-streams designed to work with LogAppendTime at all? It seems a
> lot
> > of stuff will NOT work correctly using
> > built-in ExtractRecordMetadataTimestamp ?
> >
> > On Mon, Mar 5, 2018 at 9:30 PM, Guozhang Wang 
> wrote:
> >
> > > If broker configures log.message.timestamp.type=LogAppendTime
> > universally,
> > > it will ignore whatever timestamp set in the message metadata and
> > override
> > > it with the append time. So when the messages are fetched by downstream
> > > processors which always use the metadata timestamp extractor, it will
> get
> > > the append timestamp set by brokers.
> > >
> > >
> > > Guozhang
> > >
> > > On Mon, Mar 5, 2018 at 9:53 AM, Dmitriy Vsekhvalnov <
> > > dvsekhval...@gmail.com>
> > > wrote:
> > >
> > > > Hi Guozhang,
> > > >
> > > > interesting, will same logic applies (internal topic rewrite) for
> > brokers
> > > > configured with:
> > > >   log.message.timestamp.type=LogAppendTime
> > > >
> > > > ?
> > > >
> > > > On Mon, Mar 5, 2018 at 8:33 PM, Guozhang Wang 
> > > wrote:
> > > >
> > > > > Hello Dmitriy,
> > > > >
> > > > > What you have observed is by design, and it maybe a bit confusing
> at
> > > > first
> > > > > place. Let me explain:
> > > > >
> > > > > When you do a group-by aggregation like the above case, during the
> > > > > "groupBy((key,
> > > > > value) -> ..)" stage Streams library will do a re-partitioning
> by
> > > > > sending the original data stream into an internal repartition topic
> > > based
> > > > > on the aggregation key defined in the "groupBy" function and fetch
> > from
> > > > > that topic again. This is similar to a shuffle phase in distributed
> > > > > computing frameworks to make sure the down stream aggregations can
> be
> > > > done
> > > > > in parallel. When the "groupBy" operator sends the messages to this
> > > > > repartition topic, it will set in the record metadata the extracted
> > > > > timestamp from the payload, and hence for the downstream
> aggregation
> > > > > operator to read from this repartition topic, it is OK to always
> use
> > > > > the ExtractRecordMetadataTimestamp
> > > > > to extract that timestamp and use the extracted value to determine
> > > which
> > > > > window this record should fall into.
> > > > >
> > > > > More details can be found in this JIRA:
> > > > >
> > > > > https://issues.apache.org/jira/browse/KAFKA-4785
> > > > >
> > > > >
> > > > > So the record timestamp used during aggregation should be the same
> as
> > > the
> > > > > one in the payload, if you do observe that is not the case, this is
> > > > > unexpected. In that case could you share your complete code
> snippet,
> > > > > especially how input stream "in" is defined, and your config
> > properties
> > > > > defined for us to investigate?
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > > On Mon, Mar 5, 2018 at 5:53 AM, Dmitriy Vsekhvalnov <
> > > > > dvsekhval...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Good morning,
> > > > > >
> > > > > > we have simple use-case where we want to count number of events
> by
> > > each
> > > > > > hour grouped by some fields from event itself.
> > > > > >
> > > > > > Our event timestamp is embedded into messages itself (json) and
> we
> > > > using
> > > > > > trivial custom timestamp extractor (which called and works as
> > > > expected).
> > > > > >
> > > > > > What we facing is that there is always timestamp used that coming
> > > > > > from ExtractRecordMetadataTimestamp when determining matching
> > windows
> > > > for
> > > > > > event, inside KStreamWindowAggregate.process() and never value
> > from
> > > > our
> > > > > > json timestamp extractor.
> > > > > >
> > > > > > Effectively it doesn't work correctly if we test on late data,
> e.g.
> > > > > > timestamp in a message hour ago from now for instance. Topology
> > > always

ListOffsets parameters

2018-03-05 Thread Emmett Butler
Hi users,

I'm the maintainer of the PyKafka 
library and I'm working on improving its support for the ListOffsets API.
Some questions:

Kafka version: 1.0.0
I'm using this documentation
 for reference.

In some cases, ListOffsets requests return an empty array of offsets. Is
this expected behavior? If so, when?

What format is the Timestamp parameter expected in? I've been using
milliseconds since epoch (python: time.time() * 1000), but I haven't
managed to get a response that isn't either [0] or [] using this approach.
Could this have to do with the number of log segments on my topic, or the
presence of a time index? How do I make a request including a Timestamp
(not special values -1 or -2) that returns a valid offset? What is the
meaning of a [0] response in this context?

What is the MaxNumberOfOffsets parameter supposed to do? When I request max
10 offsets, I get back [12345, 0] (two offsets). Again, could this have to
do with the number of log segments on my topic?

Related PyKafka issue tickets for reference:
https://github.com/Parsely/pykafka/issues/728
https://github.com/Parsely/pykafka/issues/733

Thanks for your help.

-- 
Emmett Butler | Software Engineer



Re: kafka-streams, late data, tumbling windows aggregations and event time

2018-03-05 Thread Guozhang Wang
Hello Dmitriy,

In your case, you can override this config to CreateTime only for the
internal topics created by Streams, this is documented in

https://kafka.apache.org/10/javadoc/org/apache/kafka/streams/StreamsConfig.html#TOPIC_PREFIX


We are also discussing to always override the log.message.timestamp.type
config for internal topics to CreateTime, I vaguely remember there is a
JIRA open for it in case you are interested in contributing to Streams
library.

Guozhang


On Mon, Mar 5, 2018 at 10:50 AM, Dmitriy Vsekhvalnov  wrote:

> Which effectively means given scenario is not working with LogAppendTime,
> correct? Because all internal re-partition topics will always contain "now"
> instead of real timestamp from original payload message?
>
> Is kafka-streams designed to work with LogAppendTime at all? It seems a lot
> of stuff will NOT work correctly using
> built-in ExtractRecordMetadataTimestamp ?
>
> On Mon, Mar 5, 2018 at 9:30 PM, Guozhang Wang  wrote:
>
> > If broker configures log.message.timestamp.type=LogAppendTime
> universally,
> > it will ignore whatever timestamp set in the message metadata and
> override
> > it with the append time. So when the messages are fetched by downstream
> > processors which always use the metadata timestamp extractor, it will get
> > the append timestamp set by brokers.
> >
> >
> > Guozhang
> >
> > On Mon, Mar 5, 2018 at 9:53 AM, Dmitriy Vsekhvalnov <
> > dvsekhval...@gmail.com>
> > wrote:
> >
> > > Hi Guozhang,
> > >
> > > interesting, will same logic applies (internal topic rewrite) for
> brokers
> > > configured with:
> > >   log.message.timestamp.type=LogAppendTime
> > >
> > > ?
> > >
> > > On Mon, Mar 5, 2018 at 8:33 PM, Guozhang Wang 
> > wrote:
> > >
> > > > Hello Dmitriy,
> > > >
> > > > What you have observed is by design, and it maybe a bit confusing at
> > > first
> > > > place. Let me explain:
> > > >
> > > > When you do a group-by aggregation like the above case, during the
> > > > "groupBy((key,
> > > > value) -> ..)" stage Streams library will do a re-partitioning by
> > > > sending the original data stream into an internal repartition topic
> > based
> > > > on the aggregation key defined in the "groupBy" function and fetch
> from
> > > > that topic again. This is similar to a shuffle phase in distributed
> > > > computing frameworks to make sure the down stream aggregations can be
> > > done
> > > > in parallel. When the "groupBy" operator sends the messages to this
> > > > repartition topic, it will set in the record metadata the extracted
> > > > timestamp from the payload, and hence for the downstream aggregation
> > > > operator to read from this repartition topic, it is OK to always use
> > > > the ExtractRecordMetadataTimestamp
> > > > to extract that timestamp and use the extracted value to determine
> > which
> > > > window this record should fall into.
> > > >
> > > > More details can be found in this JIRA:
> > > >
> > > > https://issues.apache.org/jira/browse/KAFKA-4785
> > > >
> > > >
> > > > So the record timestamp used during aggregation should be the same as
> > the
> > > > one in the payload, if you do observe that is not the case, this is
> > > > unexpected. In that case could you share your complete code snippet,
> > > > especially how input stream "in" is defined, and your config
> properties
> > > > defined for us to investigate?
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Mon, Mar 5, 2018 at 5:53 AM, Dmitriy Vsekhvalnov <
> > > > dvsekhval...@gmail.com>
> > > > wrote:
> > > >
> > > > > Good morning,
> > > > >
> > > > > we have simple use-case where we want to count number of events by
> > each
> > > > > hour grouped by some fields from event itself.
> > > > >
> > > > > Our event timestamp is embedded into messages itself (json) and we
> > > using
> > > > > trivial custom timestamp extractor (which called and works as
> > > expected).
> > > > >
> > > > > What we facing is that there is always timestamp used that coming
> > > > > from ExtractRecordMetadataTimestamp when determining matching
> windows
> > > for
> > > > > event, inside KStreamWindowAggregate.process() and never value
> from
> > > our
> > > > > json timestamp extractor.
> > > > >
> > > > > Effectively it doesn't work correctly if we test on late data, e.g.
> > > > > timestamp in a message hour ago from now for instance. Topology
> > always
> > > > > calculating matching hour bucket (window) using record timestamp,
> not
> > > > > payload.
> > > > >
> > > > > Is it expected behaviour ? Are we getting windowing wrong? Any
> > settings
> > > > or
> > > > > other tricks to accommodate our use-case?
> > > > >
> > > > > For reference our setup: brokers, kafka-stream and kafka-clients
> all
> > of
> > > > > v1.0.0
> > > > > And here is code:
> > > > >
> > > > > KTable, Long> summaries = in
> > > > >.groupBy((key, value) -> ..)
> > > > >.windowedBy(TimeWindows.of(TimeUnit.HOURS.toMillis(1l)))
> > > > >.count();
> > > > >
> > > > > Thank you.
> > 

Re: Kafka Setup for Daily counts on wide array of keys

2018-03-05 Thread Thakrar, Jayesh
Yep, exactly.

So there is some buffering that you need to do in your client and also deal 
with edge cases.
E.g. how long should you hold on to a batch before you send a smaller batch to 
producer since you want a balance between batch optimization and expedience.

You may need to do some experiments to balance between system throughput, 
record size, batch size and potential batching delay for a given rate of 
incoming requests.


From: Matt Daum 
Date: Monday, March 5, 2018 at 1:59 PM
To: "Thakrar, Jayesh" 
Cc: "users@kafka.apache.org" 
Subject: Re: Kafka Setup for Daily counts on wide array of keys

Ah good call, so you are really having an AVRO wrapper around your single class 
right?  IE an array of records, correct?  Then when you hit a size you are 
happy you send it to the producer?

On Mon, Mar 5, 2018 at 12:07 PM, Thakrar, Jayesh 
mailto:jthak...@conversantmedia.com>> wrote:
Good luck on your test!

As for the batching within Avro and by Kafka Producer, here are my thoughts 
without any empirical proof.
There is a certain amount of overhead in terms of execution AND bytes in 
converting a request record into Avro and producing (generating) a Kafka 
message out of it.
For requests of size 100-200 bytes, that can be a substantial amount - 
especially the fact that you will be bundling the Avro schema for each request 
in its Kafka message.

By batching the requests, you are significantly amortizing that overhead across 
many rows.

From: Matt Daum mailto:m...@setfive.com>>
Date: Monday, March 5, 2018 at 5:54 AM

To: "Thakrar, Jayesh" 
mailto:jthak...@conversantmedia.com>>
Cc: "users@kafka.apache.org" 
mailto:users@kafka.apache.org>>
Subject: Re: Kafka Setup for Daily counts on wide array of keys

Thanks for the suggestions!  It does look like it's using local RocksDB stores 
for the state info by default.  Will look into using an external one.

As for the "millions of different values per grouped attribute" an example 
would be assume on each requests there is a parameters "X" which at the end of 
each day I want to know the counts per unique value, it could have 100's of 
millions of possible values.

I'll start to hopefully work this week on an initial test of everything and 
will report back.  A few last questions if you have the time:
- For the batching of the AVRO files, would this be different than the Producer 
batching?
- Any other things you'd suggest looking out for as gotcha's or configurations 
that probably will be good to tweak further?

Thanks!
Matt

On Sun, Mar 4, 2018 at 11:23 PM, Thakrar, Jayesh 
mailto:jthak...@conversantmedia.com>> wrote:
BTW - I did not mean to rule-out Aerospike as a possible datastore.
Its just that I am not familiar with it, but surely looks like a good candidate 
to store the raw and/or aggregated data, given that it also has a Kafka Connect 
module.

From: "Thakrar, Jayesh" 
mailto:jthak...@conversantmedia.com>>
Date: Sunday, March 4, 2018 at 9:25 PM
To: Matt Daum mailto:m...@setfive.com>>

Cc: "users@kafka.apache.org" 
mailto:users@kafka.apache.org>>
Subject: Re: Kafka Setup for Daily counts on wide array of keys

I don’t have any experience/knowledge on the Kafka inbuilt datastore, but 
believe thatfor some
portions of streaming Kafka uses (used?) RocksDB to locally store some state 
info in the brokers.

Personally  I would use an external datastore.
There's a wide choice out there - regular key-value stores like Cassandra, 
ScyllaDB, RocksDB, timeseries key-value stores like InfluxDB to regular RDBMSes.
If you have hadoop in the picture, its even possible to bypass a datastore 
completely (if appropriate) and store the raw data on HDFS organized by (say) 
date+hour
by using periodic (minute to hourly) extract jobs and store data in 
hive-compatible directory structure using ORC or Parquet.

The reason for shying away from NoSQL datastores is their tendency to do 
compaction on data which leads to unnecessary reads and writes (referred to as 
write-amplification).
With periodic jobs in Hadoop, you (usually) write your data once only. Ofcourse 
with that approach you loose the "random/keyed access" to the data,
but if you are only interested in the aggregations across various dimensions, 
those can be stored in a SQL/NoSQL datastore.

As for "having millions of different values per grouped attribute" - not sure 
what you mean by them.
Is it that each record has some fields that represent different kinds of 
attributes and that their domain can have millions to hundreds of millions of 
values?
I don't think that should matter.

From: Matt Daum mailto:m...@setfive.com>>
Date: Sunday, March 4, 2018 at 2:39 PM
To: "Thakrar, Jayesh" 
mailto:jthak...@conversantmedia.com>>
Cc: "users@kafka.apache.org" 
mailto:users@kafka.apache.org>>
Subject: Re: Kafka Setup for Daily counts on wide array of keys

Thanks! For the counts I'd need to use a global table to make sure it's 

Re: Kafka Setup for Daily counts on wide array of keys

2018-03-05 Thread Matt Daum
Ah good call, so you are really having an AVRO wrapper around your single
class right?  IE an array of records, correct?  Then when you hit a size
you are happy you send it to the producer?

On Mon, Mar 5, 2018 at 12:07 PM, Thakrar, Jayesh <
jthak...@conversantmedia.com> wrote:

> Good luck on your test!
>
>
>
> As for the batching within Avro and by Kafka Producer, here are my
> thoughts without any empirical proof.
>
> There is a certain amount of overhead in terms of execution AND bytes in
> converting a request record into Avro and producing (generating) a Kafka
> message out of it.
>
> For requests of size 100-200 bytes, that can be a substantial amount -
> especially the fact that you will be bundling the Avro schema for each
> request in its Kafka message.
>
>
>
> By batching the requests, you are significantly amortizing that overhead
> across many rows.
>
>
>
> *From: *Matt Daum 
> *Date: *Monday, March 5, 2018 at 5:54 AM
>
> *To: *"Thakrar, Jayesh" 
> *Cc: *"users@kafka.apache.org" 
> *Subject: *Re: Kafka Setup for Daily counts on wide array of keys
>
>
>
> Thanks for the suggestions!  It does look like it's using local RocksDB
> stores for the state info by default.  Will look into using an external
> one.
>
>
>
> As for the "millions of different values per grouped attribute" an example
> would be assume on each requests there is a parameters "X" which at the end
> of each day I want to know the counts per unique value, it could have 100's
> of millions of possible values.
>
>
>
> I'll start to hopefully work this week on an initial test of everything
> and will report back.  A few last questions if you have the time:
>
> - For the batching of the AVRO files, would this be different than the
> Producer batching?
>
> - Any other things you'd suggest looking out for as gotcha's or
> configurations that probably will be good to tweak further?
>
>
>
> Thanks!
>
> Matt
>
>
>
> On Sun, Mar 4, 2018 at 11:23 PM, Thakrar, Jayesh <
> jthak...@conversantmedia.com> wrote:
>
> BTW - I did not mean to rule-out Aerospike as a possible datastore.
>
> Its just that I am not familiar with it, but surely looks like a good
> candidate to store the raw and/or aggregated data, given that it also has a
> Kafka Connect module.
>
>
>
> *From: *"Thakrar, Jayesh" 
> *Date: *Sunday, March 4, 2018 at 9:25 PM
> *To: *Matt Daum 
>
>
> *Cc: *"users@kafka.apache.org" 
> *Subject: *Re: Kafka Setup for Daily counts on wide array of keys
>
>
>
> I don’t have any experience/knowledge on the Kafka inbuilt datastore, but
> believe thatfor some
>
> portions of streaming Kafka uses (used?) RocksDB to locally store some
> state info in the brokers.
>
>
>
> Personally  I would use an external datastore.
>
> There's a wide choice out there - regular key-value stores like Cassandra,
> ScyllaDB, RocksDB, timeseries key-value stores like InfluxDB to regular
> RDBMSes.
>
> If you have hadoop in the picture, its even possible to bypass a datastore
> completely (if appropriate) and store the raw data on HDFS organized by
> (say) date+hour
>
> by using periodic (minute to hourly) extract jobs and store data in
> hive-compatible directory structure using ORC or Parquet.
>
>
>
> The reason for shying away from NoSQL datastores is their tendency to do
> compaction on data which leads to unnecessary reads and writes (referred to
> as write-amplification).
>
> With periodic jobs in Hadoop, you (usually) write your data once only.
> Ofcourse with that approach you loose the "random/keyed access" to the
> data,
>
> but if you are only interested in the aggregations across various
> dimensions, those can be stored in a SQL/NoSQL datastore.
>
>
>
> As for "having millions of different values per grouped attribute" - not
> sure what you mean by them.
>
> Is it that each record has some fields that represent different kinds of
> attributes and that their domain can have millions to hundreds of millions
> of values?
>
> I don't think that should matter.
>
>
>
> *From: *Matt Daum 
> *Date: *Sunday, March 4, 2018 at 2:39 PM
> *To: *"Thakrar, Jayesh" 
> *Cc: *"users@kafka.apache.org" 
> *Subject: *Re: Kafka Setup for Daily counts on wide array of keys
>
>
>
> Thanks! For the counts I'd need to use a global table to make sure it's
> across all the data right?   Also having millions of different values per
> grouped attribute will scale ok?
>
>
>
> On Mar 4, 2018 8:45 AM, "Thakrar, Jayesh" 
> wrote:
>
> Yes, that's the general design pattern. Another thing to look into is to
> compress the data. Now Kafka consumer/producer can already do it for you,
> but we choose to compress in the applications due to a historic issue that
> drgraded performance,  although it has been resolved now.
>
> Also,  just keep in mind that while you do your batching, kafka producer
> also tries to batch msgs to Kafka, and you will need to ensure you have
> enough buffer memory. However that's all configurable.
>
> Finally ensure you have the latest java updates and have kafka 0.10.2 

Re: kafka-streams, late data, tumbling windows aggregations and event time

2018-03-05 Thread Dmitriy Vsekhvalnov
Which effectively means given scenario is not working with LogAppendTime,
correct? Because all internal re-partition topics will always contain "now"
instead of real timestamp from original payload message?

Is kafka-streams designed to work with LogAppendTime at all? It seems a lot
of stuff will NOT work correctly using
built-in ExtractRecordMetadataTimestamp ?

On Mon, Mar 5, 2018 at 9:30 PM, Guozhang Wang  wrote:

> If broker configures log.message.timestamp.type=LogAppendTime universally,
> it will ignore whatever timestamp set in the message metadata and override
> it with the append time. So when the messages are fetched by downstream
> processors which always use the metadata timestamp extractor, it will get
> the append timestamp set by brokers.
>
>
> Guozhang
>
> On Mon, Mar 5, 2018 at 9:53 AM, Dmitriy Vsekhvalnov <
> dvsekhval...@gmail.com>
> wrote:
>
> > Hi Guozhang,
> >
> > interesting, will same logic applies (internal topic rewrite) for brokers
> > configured with:
> >   log.message.timestamp.type=LogAppendTime
> >
> > ?
> >
> > On Mon, Mar 5, 2018 at 8:33 PM, Guozhang Wang 
> wrote:
> >
> > > Hello Dmitriy,
> > >
> > > What you have observed is by design, and it maybe a bit confusing at
> > first
> > > place. Let me explain:
> > >
> > > When you do a group-by aggregation like the above case, during the
> > > "groupBy((key,
> > > value) -> ..)" stage Streams library will do a re-partitioning by
> > > sending the original data stream into an internal repartition topic
> based
> > > on the aggregation key defined in the "groupBy" function and fetch from
> > > that topic again. This is similar to a shuffle phase in distributed
> > > computing frameworks to make sure the down stream aggregations can be
> > done
> > > in parallel. When the "groupBy" operator sends the messages to this
> > > repartition topic, it will set in the record metadata the extracted
> > > timestamp from the payload, and hence for the downstream aggregation
> > > operator to read from this repartition topic, it is OK to always use
> > > the ExtractRecordMetadataTimestamp
> > > to extract that timestamp and use the extracted value to determine
> which
> > > window this record should fall into.
> > >
> > > More details can be found in this JIRA:
> > >
> > > https://issues.apache.org/jira/browse/KAFKA-4785
> > >
> > >
> > > So the record timestamp used during aggregation should be the same as
> the
> > > one in the payload, if you do observe that is not the case, this is
> > > unexpected. In that case could you share your complete code snippet,
> > > especially how input stream "in" is defined, and your config properties
> > > defined for us to investigate?
> > >
> > > Guozhang
> > >
> > >
> > > On Mon, Mar 5, 2018 at 5:53 AM, Dmitriy Vsekhvalnov <
> > > dvsekhval...@gmail.com>
> > > wrote:
> > >
> > > > Good morning,
> > > >
> > > > we have simple use-case where we want to count number of events by
> each
> > > > hour grouped by some fields from event itself.
> > > >
> > > > Our event timestamp is embedded into messages itself (json) and we
> > using
> > > > trivial custom timestamp extractor (which called and works as
> > expected).
> > > >
> > > > What we facing is that there is always timestamp used that coming
> > > > from ExtractRecordMetadataTimestamp when determining matching windows
> > for
> > > > event, inside KStreamWindowAggregate.process() and never value from
> > our
> > > > json timestamp extractor.
> > > >
> > > > Effectively it doesn't work correctly if we test on late data, e.g.
> > > > timestamp in a message hour ago from now for instance. Topology
> always
> > > > calculating matching hour bucket (window) using record timestamp, not
> > > > payload.
> > > >
> > > > Is it expected behaviour ? Are we getting windowing wrong? Any
> settings
> > > or
> > > > other tricks to accommodate our use-case?
> > > >
> > > > For reference our setup: brokers, kafka-stream and kafka-clients all
> of
> > > > v1.0.0
> > > > And here is code:
> > > >
> > > > KTable, Long> summaries = in
> > > >.groupBy((key, value) -> ..)
> > > >.windowedBy(TimeWindows.of(TimeUnit.HOURS.toMillis(1l)))
> > > >.count();
> > > >
> > > > Thank you.
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>


Re: kafka-streams, late data, tumbling windows aggregations and event time

2018-03-05 Thread Guozhang Wang
If broker configures log.message.timestamp.type=LogAppendTime universally,
it will ignore whatever timestamp set in the message metadata and override
it with the append time. So when the messages are fetched by downstream
processors which always use the metadata timestamp extractor, it will get
the append timestamp set by brokers.


Guozhang

On Mon, Mar 5, 2018 at 9:53 AM, Dmitriy Vsekhvalnov 
wrote:

> Hi Guozhang,
>
> interesting, will same logic applies (internal topic rewrite) for brokers
> configured with:
>   log.message.timestamp.type=LogAppendTime
>
> ?
>
> On Mon, Mar 5, 2018 at 8:33 PM, Guozhang Wang  wrote:
>
> > Hello Dmitriy,
> >
> > What you have observed is by design, and it maybe a bit confusing at
> first
> > place. Let me explain:
> >
> > When you do a group-by aggregation like the above case, during the
> > "groupBy((key,
> > value) -> ..)" stage Streams library will do a re-partitioning by
> > sending the original data stream into an internal repartition topic based
> > on the aggregation key defined in the "groupBy" function and fetch from
> > that topic again. This is similar to a shuffle phase in distributed
> > computing frameworks to make sure the down stream aggregations can be
> done
> > in parallel. When the "groupBy" operator sends the messages to this
> > repartition topic, it will set in the record metadata the extracted
> > timestamp from the payload, and hence for the downstream aggregation
> > operator to read from this repartition topic, it is OK to always use
> > the ExtractRecordMetadataTimestamp
> > to extract that timestamp and use the extracted value to determine which
> > window this record should fall into.
> >
> > More details can be found in this JIRA:
> >
> > https://issues.apache.org/jira/browse/KAFKA-4785
> >
> >
> > So the record timestamp used during aggregation should be the same as the
> > one in the payload, if you do observe that is not the case, this is
> > unexpected. In that case could you share your complete code snippet,
> > especially how input stream "in" is defined, and your config properties
> > defined for us to investigate?
> >
> > Guozhang
> >
> >
> > On Mon, Mar 5, 2018 at 5:53 AM, Dmitriy Vsekhvalnov <
> > dvsekhval...@gmail.com>
> > wrote:
> >
> > > Good morning,
> > >
> > > we have simple use-case where we want to count number of events by each
> > > hour grouped by some fields from event itself.
> > >
> > > Our event timestamp is embedded into messages itself (json) and we
> using
> > > trivial custom timestamp extractor (which called and works as
> expected).
> > >
> > > What we facing is that there is always timestamp used that coming
> > > from ExtractRecordMetadataTimestamp when determining matching windows
> for
> > > event, inside KStreamWindowAggregate.process() and never value from
> our
> > > json timestamp extractor.
> > >
> > > Effectively it doesn't work correctly if we test on late data, e.g.
> > > timestamp in a message hour ago from now for instance. Topology always
> > > calculating matching hour bucket (window) using record timestamp, not
> > > payload.
> > >
> > > Is it expected behaviour ? Are we getting windowing wrong? Any settings
> > or
> > > other tricks to accommodate our use-case?
> > >
> > > For reference our setup: brokers, kafka-stream and kafka-clients all of
> > > v1.0.0
> > > And here is code:
> > >
> > > KTable, Long> summaries = in
> > >.groupBy((key, value) -> ..)
> > >.windowedBy(TimeWindows.of(TimeUnit.HOURS.toMillis(1l)))
> > >.count();
> > >
> > > Thank you.
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang


Re: kafka-streams, late data, tumbling windows aggregations and event time

2018-03-05 Thread Dmitriy Vsekhvalnov
Hi Guozhang,

interesting, will same logic applies (internal topic rewrite) for brokers
configured with:
  log.message.timestamp.type=LogAppendTime

?

On Mon, Mar 5, 2018 at 8:33 PM, Guozhang Wang  wrote:

> Hello Dmitriy,
>
> What you have observed is by design, and it maybe a bit confusing at first
> place. Let me explain:
>
> When you do a group-by aggregation like the above case, during the
> "groupBy((key,
> value) -> ..)" stage Streams library will do a re-partitioning by
> sending the original data stream into an internal repartition topic based
> on the aggregation key defined in the "groupBy" function and fetch from
> that topic again. This is similar to a shuffle phase in distributed
> computing frameworks to make sure the down stream aggregations can be done
> in parallel. When the "groupBy" operator sends the messages to this
> repartition topic, it will set in the record metadata the extracted
> timestamp from the payload, and hence for the downstream aggregation
> operator to read from this repartition topic, it is OK to always use
> the ExtractRecordMetadataTimestamp
> to extract that timestamp and use the extracted value to determine which
> window this record should fall into.
>
> More details can be found in this JIRA:
>
> https://issues.apache.org/jira/browse/KAFKA-4785
>
>
> So the record timestamp used during aggregation should be the same as the
> one in the payload, if you do observe that is not the case, this is
> unexpected. In that case could you share your complete code snippet,
> especially how input stream "in" is defined, and your config properties
> defined for us to investigate?
>
> Guozhang
>
>
> On Mon, Mar 5, 2018 at 5:53 AM, Dmitriy Vsekhvalnov <
> dvsekhval...@gmail.com>
> wrote:
>
> > Good morning,
> >
> > we have simple use-case where we want to count number of events by each
> > hour grouped by some fields from event itself.
> >
> > Our event timestamp is embedded into messages itself (json) and we using
> > trivial custom timestamp extractor (which called and works as expected).
> >
> > What we facing is that there is always timestamp used that coming
> > from ExtractRecordMetadataTimestamp when determining matching windows for
> > event, inside KStreamWindowAggregate.process() and never value from our
> > json timestamp extractor.
> >
> > Effectively it doesn't work correctly if we test on late data, e.g.
> > timestamp in a message hour ago from now for instance. Topology always
> > calculating matching hour bucket (window) using record timestamp, not
> > payload.
> >
> > Is it expected behaviour ? Are we getting windowing wrong? Any settings
> or
> > other tricks to accommodate our use-case?
> >
> > For reference our setup: brokers, kafka-stream and kafka-clients all of
> > v1.0.0
> > And here is code:
> >
> > KTable, Long> summaries = in
> >.groupBy((key, value) -> ..)
> >.windowedBy(TimeWindows.of(TimeUnit.HOURS.toMillis(1l)))
> >.count();
> >
> > Thank you.
> >
>
>
>
> --
> -- Guozhang
>


Re: kafka-streams, late data, tumbling windows aggregations and event time

2018-03-05 Thread Guozhang Wang
Hello Dmitriy,

What you have observed is by design, and it maybe a bit confusing at first
place. Let me explain:

When you do a group-by aggregation like the above case, during the
"groupBy((key,
value) -> ..)" stage Streams library will do a re-partitioning by
sending the original data stream into an internal repartition topic based
on the aggregation key defined in the "groupBy" function and fetch from
that topic again. This is similar to a shuffle phase in distributed
computing frameworks to make sure the down stream aggregations can be done
in parallel. When the "groupBy" operator sends the messages to this
repartition topic, it will set in the record metadata the extracted
timestamp from the payload, and hence for the downstream aggregation
operator to read from this repartition topic, it is OK to always use
the ExtractRecordMetadataTimestamp
to extract that timestamp and use the extracted value to determine which
window this record should fall into.

More details can be found in this JIRA:

https://issues.apache.org/jira/browse/KAFKA-4785


So the record timestamp used during aggregation should be the same as the
one in the payload, if you do observe that is not the case, this is
unexpected. In that case could you share your complete code snippet,
especially how input stream "in" is defined, and your config properties
defined for us to investigate?

Guozhang


On Mon, Mar 5, 2018 at 5:53 AM, Dmitriy Vsekhvalnov 
wrote:

> Good morning,
>
> we have simple use-case where we want to count number of events by each
> hour grouped by some fields from event itself.
>
> Our event timestamp is embedded into messages itself (json) and we using
> trivial custom timestamp extractor (which called and works as expected).
>
> What we facing is that there is always timestamp used that coming
> from ExtractRecordMetadataTimestamp when determining matching windows for
> event, inside KStreamWindowAggregate.process() and never value from our
> json timestamp extractor.
>
> Effectively it doesn't work correctly if we test on late data, e.g.
> timestamp in a message hour ago from now for instance. Topology always
> calculating matching hour bucket (window) using record timestamp, not
> payload.
>
> Is it expected behaviour ? Are we getting windowing wrong? Any settings or
> other tricks to accommodate our use-case?
>
> For reference our setup: brokers, kafka-stream and kafka-clients all of
> v1.0.0
> And here is code:
>
> KTable, Long> summaries = in
>.groupBy((key, value) -> ..)
>.windowedBy(TimeWindows.of(TimeUnit.HOURS.toMillis(1l)))
>.count();
>
> Thank you.
>



-- 
-- Guozhang


Offset auto-commit stops after timeout

2018-03-05 Thread ebuck
In our kafka consumer logs, we're seeing the following messages:

2018-03-05 03:57:03,350 INFO  
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking the 
coordinator kafka08:9092 (id: 2147483639 rack: null) dead for group mygroup
2018-03-05 03:57:03,350 WARN  
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Auto-commit 
of offsets {topic1-11=OffsetAndMetadata{offset=64888089, metadata=''}} failed 
for group mygroup: Offset commit failed with a retriable exception. You should 
retry committing offsets. The underlying error was: The request timed out.

After this message appears, the consumer no longer auto-commits offsets to the 
broker, even though the timeout does not repeat (and the consumer continues to 
consume messages from the topic partitions). 

Is this expected behavior that once the coordinator is marked dead from a 
single timeout, it no longer attempts to send auto-commit offsets? Shouldn't 
the timeout apply only to a single attempt at 
ConsumerCoordinator.sendOffsetCommitRequest(), and subsequent offset commits 
would still go through? 

We are using kafka client version 0.11.0.2 and kafka server version kafka-1.0.0.


Re: Kafka Setup for Daily counts on wide array of keys

2018-03-05 Thread Thakrar, Jayesh
Good luck on your test!

As for the batching within Avro and by Kafka Producer, here are my thoughts 
without any empirical proof.
There is a certain amount of overhead in terms of execution AND bytes in 
converting a request record into Avro and producing (generating) a Kafka 
message out of it.
For requests of size 100-200 bytes, that can be a substantial amount - 
especially the fact that you will be bundling the Avro schema for each request 
in its Kafka message.

By batching the requests, you are significantly amortizing that overhead across 
many rows.

From: Matt Daum 
Date: Monday, March 5, 2018 at 5:54 AM
To: "Thakrar, Jayesh" 
Cc: "users@kafka.apache.org" 
Subject: Re: Kafka Setup for Daily counts on wide array of keys

Thanks for the suggestions!  It does look like it's using local RocksDB stores 
for the state info by default.  Will look into using an external one.

As for the "millions of different values per grouped attribute" an example 
would be assume on each requests there is a parameters "X" which at the end of 
each day I want to know the counts per unique value, it could have 100's of 
millions of possible values.

I'll start to hopefully work this week on an initial test of everything and 
will report back.  A few last questions if you have the time:
- For the batching of the AVRO files, would this be different than the Producer 
batching?
- Any other things you'd suggest looking out for as gotcha's or configurations 
that probably will be good to tweak further?

Thanks!
Matt

On Sun, Mar 4, 2018 at 11:23 PM, Thakrar, Jayesh 
mailto:jthak...@conversantmedia.com>> wrote:
BTW - I did not mean to rule-out Aerospike as a possible datastore.
Its just that I am not familiar with it, but surely looks like a good candidate 
to store the raw and/or aggregated data, given that it also has a Kafka Connect 
module.

From: "Thakrar, Jayesh" 
mailto:jthak...@conversantmedia.com>>
Date: Sunday, March 4, 2018 at 9:25 PM
To: Matt Daum mailto:m...@setfive.com>>

Cc: "users@kafka.apache.org" 
mailto:users@kafka.apache.org>>
Subject: Re: Kafka Setup for Daily counts on wide array of keys

I don’t have any experience/knowledge on the Kafka inbuilt datastore, but 
believe thatfor some
portions of streaming Kafka uses (used?) RocksDB to locally store some state 
info in the brokers.

Personally  I would use an external datastore.
There's a wide choice out there - regular key-value stores like Cassandra, 
ScyllaDB, RocksDB, timeseries key-value stores like InfluxDB to regular RDBMSes.
If you have hadoop in the picture, its even possible to bypass a datastore 
completely (if appropriate) and store the raw data on HDFS organized by (say) 
date+hour
by using periodic (minute to hourly) extract jobs and store data in 
hive-compatible directory structure using ORC or Parquet.

The reason for shying away from NoSQL datastores is their tendency to do 
compaction on data which leads to unnecessary reads and writes (referred to as 
write-amplification).
With periodic jobs in Hadoop, you (usually) write your data once only. Ofcourse 
with that approach you loose the "random/keyed access" to the data,
but if you are only interested in the aggregations across various dimensions, 
those can be stored in a SQL/NoSQL datastore.

As for "having millions of different values per grouped attribute" - not sure 
what you mean by them.
Is it that each record has some fields that represent different kinds of 
attributes and that their domain can have millions to hundreds of millions of 
values?
I don't think that should matter.

From: Matt Daum mailto:m...@setfive.com>>
Date: Sunday, March 4, 2018 at 2:39 PM
To: "Thakrar, Jayesh" 
mailto:jthak...@conversantmedia.com>>
Cc: "users@kafka.apache.org" 
mailto:users@kafka.apache.org>>
Subject: Re: Kafka Setup for Daily counts on wide array of keys

Thanks! For the counts I'd need to use a global table to make sure it's across 
all the data right?   Also having millions of different values per grouped 
attribute will scale ok?

On Mar 4, 2018 8:45 AM, "Thakrar, Jayesh" 
mailto:jthak...@conversantmedia.com>> wrote:
Yes, that's the general design pattern. Another thing to look into is to 
compress the data. Now Kafka consumer/producer can already do it for you, but 
we choose to compress in the applications due to a historic issue that drgraded 
performance,  although it has been resolved now.
Also,  just keep in mind that while you do your batching, kafka producer also 
tries to batch msgs to Kafka, and you will need to ensure you have enough 
buffer memory. However that's all configurable.
Finally ensure you have the latest java updates and have kafka 0.10.2 or higher.
Jayesh


From: Matt Daum mailto:m...@setfive.com>>
Sent: Sunday, March 4, 2018 7:06:19 AM
To: Thakrar, Jayesh
Cc: users@kafka.apache.org
Subject: Re: Kafka Setup for Daily counts on wide array of key

kafka-streams, late data, tumbling windows aggregations and event time

2018-03-05 Thread Dmitriy Vsekhvalnov
Good morning,

we have simple use-case where we want to count number of events by each
hour grouped by some fields from event itself.

Our event timestamp is embedded into messages itself (json) and we using
trivial custom timestamp extractor (which called and works as expected).

What we facing is that there is always timestamp used that coming
from ExtractRecordMetadataTimestamp when determining matching windows for
event, inside KStreamWindowAggregate.process() and never value from our
json timestamp extractor.

Effectively it doesn't work correctly if we test on late data, e.g.
timestamp in a message hour ago from now for instance. Topology always
calculating matching hour bucket (window) using record timestamp, not
payload.

Is it expected behaviour ? Are we getting windowing wrong? Any settings or
other tricks to accommodate our use-case?

For reference our setup: brokers, kafka-stream and kafka-clients all of
v1.0.0
And here is code:

KTable, Long> summaries = in
   .groupBy((key, value) -> ..)
   .windowedBy(TimeWindows.of(TimeUnit.HOURS.toMillis(1l)))
   .count();

Thank you.


Re: Broker cannot start switch to Java9 - weird file system issue ?

2018-03-05 Thread Enrico Olivelli
Workaround:
as these brokers are only for test environments I have set very small
values for index file size, which affects pre-allocation
segment.index.bytes=65536
log.index.size.max.bytes=65536

If anyone has some thought it will be very appreciated
Cheers

Enrico


2018-03-05 13:21 GMT+01:00 Enrico Olivelli :

> The only fact I have found is that with Java8 Kafka is creating "SPARSE"
> files and with Java9 this is not true anymore
>
> Enrico
>
> 2018-03-05 12:44 GMT+01:00 Enrico Olivelli :
>
>> Hi,
>> This is a very strage case. I have a Kafka broker (part of a cluster of 3
>> brokers) which cannot start upgrading Java from Oracle JDK8 to Oracle JDK
>> 9.0.4.
>>
>> There are a lot of .index and .timeindex files taking 10MB, they are for
>> empty partiions.
>>
>> Running with Java 9 the server seems to rebuild these files and each file
>> takes "really" 10MB.
>> The sum of all the files (calculated using du -sh) is 22GB and the broker
>> crashes during startup, disk becomes full and no log more is written. (I
>> can send an extraction of the logs, but the tell only  about 'rebuilding
>> index', the same as on Java 8)
>>
>> Reverting the same broker to Java 8 and removing the index files, the
>> broker rebuilds such files, each files take 10MB, but the full sum of sizes
>> (calculated using du -sh) is 38 MB !
>>
>> I am running this broker on CentosOS 7 on EXT4 FS.
>>
>> I have upgraded the broker to latest and greatest Kafka 1.0.0 (from
>> 0.10.2) without any success.
>>
>> All of the other testing clusters on CentOS7 (same SO settings) did not
>> have any problem.
>>
>> The broker is given 4GB or RAM and 4G of Max Direct Memory Size (on Java8
>> it works fine with 1GB and default max direct memory size)
>>
>> Which are the relevant configuration options ?
>>
>> Thoughts ?
>>
>> Thank you
>>
>> Enrico
>>
>>
>>
>>
>


Re: Broker cannot start switch to Java9 - weird file system issue ?

2018-03-05 Thread Enrico Olivelli
The only fact I have found is that with Java8 Kafka is creating "SPARSE"
files and with Java9 this is not true anymore

Enrico

2018-03-05 12:44 GMT+01:00 Enrico Olivelli :

> Hi,
> This is a very strage case. I have a Kafka broker (part of a cluster of 3
> brokers) which cannot start upgrading Java from Oracle JDK8 to Oracle JDK
> 9.0.4.
>
> There are a lot of .index and .timeindex files taking 10MB, they are for
> empty partiions.
>
> Running with Java 9 the server seems to rebuild these files and each file
> takes "really" 10MB.
> The sum of all the files (calculated using du -sh) is 22GB and the broker
> crashes during startup, disk becomes full and no log more is written. (I
> can send an extraction of the logs, but the tell only  about 'rebuilding
> index', the same as on Java 8)
>
> Reverting the same broker to Java 8 and removing the index files, the
> broker rebuilds such files, each files take 10MB, but the full sum of sizes
> (calculated using du -sh) is 38 MB !
>
> I am running this broker on CentosOS 7 on EXT4 FS.
>
> I have upgraded the broker to latest and greatest Kafka 1.0.0 (from
> 0.10.2) without any success.
>
> All of the other testing clusters on CentOS7 (same SO settings) did not
> have any problem.
>
> The broker is given 4GB or RAM and 4G of Max Direct Memory Size (on Java8
> it works fine with 1GB and default max direct memory size)
>
> Which are the relevant configuration options ?
>
> Thoughts ?
>
> Thank you
>
> Enrico
>
>
>
>


Re: Kafka Setup for Daily counts on wide array of keys

2018-03-05 Thread Matt Daum
Thanks for the suggestions!  It does look like it's using local RocksDB
stores for the state info by default.  Will look into using an external one.

As for the "millions of different values per grouped attribute" an example
would be assume on each requests there is a parameters "X" which at the end
of each day I want to know the counts per unique value, it could have 100's
of millions of possible values.

I'll start to hopefully work this week on an initial test of everything and
will report back.  A few last questions if you have the time:
- For the batching of the AVRO files, would this be different than the
Producer batching?
- Any other things you'd suggest looking out for as gotcha's or
configurations that probably will be good to tweak further?

Thanks!
Matt

On Sun, Mar 4, 2018 at 11:23 PM, Thakrar, Jayesh <
jthak...@conversantmedia.com> wrote:

> BTW - I did not mean to rule-out Aerospike as a possible datastore.
>
> Its just that I am not familiar with it, but surely looks like a good
> candidate to store the raw and/or aggregated data, given that it also has a
> Kafka Connect module.
>
>
>
> *From: *"Thakrar, Jayesh" 
> *Date: *Sunday, March 4, 2018 at 9:25 PM
> *To: *Matt Daum 
>
> *Cc: *"users@kafka.apache.org" 
> *Subject: *Re: Kafka Setup for Daily counts on wide array of keys
>
>
>
> I don’t have any experience/knowledge on the Kafka inbuilt datastore, but
> believe thatfor some
>
> portions of streaming Kafka uses (used?) RocksDB to locally store some
> state info in the brokers.
>
>
>
> Personally  I would use an external datastore.
>
> There's a wide choice out there - regular key-value stores like Cassandra,
> ScyllaDB, RocksDB, timeseries key-value stores like InfluxDB to regular
> RDBMSes.
>
> If you have hadoop in the picture, its even possible to bypass a datastore
> completely (if appropriate) and store the raw data on HDFS organized by
> (say) date+hour
>
> by using periodic (minute to hourly) extract jobs and store data in
> hive-compatible directory structure using ORC or Parquet.
>
>
>
> The reason for shying away from NoSQL datastores is their tendency to do
> compaction on data which leads to unnecessary reads and writes (referred to
> as write-amplification).
>
> With periodic jobs in Hadoop, you (usually) write your data once only.
> Ofcourse with that approach you loose the "random/keyed access" to the
> data,
>
> but if you are only interested in the aggregations across various
> dimensions, those can be stored in a SQL/NoSQL datastore.
>
>
>
> As for "having millions of different values per grouped attribute" - not
> sure what you mean by them.
>
> Is it that each record has some fields that represent different kinds of
> attributes and that their domain can have millions to hundreds of millions
> of values?
>
> I don't think that should matter.
>
>
>
> *From: *Matt Daum 
> *Date: *Sunday, March 4, 2018 at 2:39 PM
> *To: *"Thakrar, Jayesh" 
> *Cc: *"users@kafka.apache.org" 
> *Subject: *Re: Kafka Setup for Daily counts on wide array of keys
>
>
>
> Thanks! For the counts I'd need to use a global table to make sure it's
> across all the data right?   Also having millions of different values per
> grouped attribute will scale ok?
>
>
>
> On Mar 4, 2018 8:45 AM, "Thakrar, Jayesh" 
> wrote:
>
> Yes, that's the general design pattern. Another thing to look into is to
> compress the data. Now Kafka consumer/producer can already do it for you,
> but we choose to compress in the applications due to a historic issue that
> drgraded performance,  although it has been resolved now.
>
> Also,  just keep in mind that while you do your batching, kafka producer
> also tries to batch msgs to Kafka, and you will need to ensure you have
> enough buffer memory. However that's all configurable.
>
> Finally ensure you have the latest java updates and have kafka 0.10.2 or
> higher.
>
> Jayesh
>
>
> --
>
> *From:* Matt Daum 
> *Sent:* Sunday, March 4, 2018 7:06:19 AM
> *To:* Thakrar, Jayesh
> *Cc:* users@kafka.apache.org
> *Subject:* Re: Kafka Setup for Daily counts on wide array of keys
>
>
>
> We actually don't have a kafka cluster setup yet at all.  Right now just
> have 8 of our application servers.  We currently sample some impressions
> and then dedupe/count outside at a different DC, but are looking to try to
> analyze all impressions for some overall analytics.
>
>
>
> Our requests are around 100-200 bytes each.  If we lost some of them due
> to network jitter etc. it would be fine we're trying to just get overall a
> rough count of each attribute.  Creating batched messages definitely makes
> sense and will also cut down on the network IO.
>
>
>
> We're trying to determine the required setup for Kafka to do what we're
> looking to do as these are physical servers so we'll most likely need to
> buy new hardware.  For the first run I think we'll try it out on one of our
> application clusters that get a smaller amount traffic (300-400k req/sec)
> and run the 

Broker cannot start switch to Java9 - weird file system issue ?

2018-03-05 Thread Enrico Olivelli
Hi,
This is a very strage case. I have a Kafka broker (part of a cluster of 3
brokers) which cannot start upgrading Java from Oracle JDK8 to Oracle JDK
9.0.4.

There are a lot of .index and .timeindex files taking 10MB, they are for
empty partiions.

Running with Java 9 the server seems to rebuild these files and each file
takes "really" 10MB.
The sum of all the files (calculated using du -sh) is 22GB and the broker
crashes during startup, disk becomes full and no log more is written. (I
can send an extraction of the logs, but the tell only  about 'rebuilding
index', the same as on Java 8)

Reverting the same broker to Java 8 and removing the index files, the
broker rebuilds such files, each files take 10MB, but the full sum of sizes
(calculated using du -sh) is 38 MB !

I am running this broker on CentosOS 7 on EXT4 FS.

I have upgraded the broker to latest and greatest Kafka 1.0.0 (from 0.10.2)
without any success.

All of the other testing clusters on CentOS7 (same SO settings) did not
have any problem.

The broker is given 4GB or RAM and 4G of Max Direct Memory Size (on Java8
it works fine with 1GB and default max direct memory size)

Which are the relevant configuration options ?

Thoughts ?

Thank you

Enrico


RE: difference between 2 options

2018-03-05 Thread adrien ruffie
Perfectly Andras ! thank a lot.

I noted all of your explanations 😊 .


best regards,

Adrien


De : Andras Beni 
Envoyé : samedi 3 mars 2018 09:29:16
À : users@kafka.apache.org
Objet : Re: difference between 2 options

Hello Adrien,

I was wrong. There is only one such file per data dir and not one per
topicpartition dir. It is a text file containing
 - a format version number (0),
 - number of following entries, and
 - one entry for each topicpartition: topic name, partition and offset.

Yes, when the broker starts, it checks these entries. As you probably know,
one topicpartition is written to multiple log segments. If the broker finds
that there are messages after the recovery point, each log segment that
contains such messages will be iterated over and the messages will be
checked and a new index will be built.

I hope this answers your questions.

Best regards,
Andras



On Thu, Mar 1, 2018 at 2:59 AM, adrien ruffie 
wrote:

> Sorry Andras, the the delay of my response.
>
>
> Ok I correctly understood for the deletion thank to your explanation.
>
>
> however, for recovery point I wanted to ask you, the concept's logic:
>
>
> For example I have one recovery-point-offset-checkpoint in topic-0
>
>
> If the broker crashed, and restarted:
>
>
> the fact that a recovery-point-offset-checkpoint is present, this avoid
> recovering the whole log during startup.
>
> But what does that mean exactly ? Only one offset number is present in
> this recovering file ?
>
> If is the case: le broker will simply load in memory all messages in this
> log from this offset?
>
>
> I really want to correctly understand the concept 😊
>
>
> Best regards,
>
>
> Adrien
>
> 
> De : Andras Beni 
> Envoyé : mardi 27 février 2018 15:41:04
> À : users@kafka.apache.org
> Objet : Re: difference between 2 options
>
> 1) We write out one recovery point per log directory, which practically
> means topicpartition. So if your topic is called mytopic, then you will
> have a file called
>
> recovery-point-offset-checkpoint in topic-0/ , in topic-1/ , and in
> topic-2/ .
>
> 2) Data deletion in kafka is not related to what was read by consumers.
> Data is deleted when there is either to much of it (log.retention.bytes
> property) or it is too old (log.retention.ms property). And consumers keep
> track of what they have consumed using the __consumer_offsets topic (or
> some custom logic they choose).
> What we are talking about is DeleteRecordsRequest. It is sent by a command
> line tool called kafka.admin.DeleteRecordsCommand. This does not actually
> delete any data but notes that the data before a given offset should not be
> served anymore. This, just like recovery checkpointing, works on a
> per-partition basis.
>
> Does this answer your questions?
>
> Best regards,
> Andras
>
>
> On Mon, Feb 26, 2018 at 11:43 PM, adrien ruffie  >
> wrote:
>
> > Hi Andras,
> >
> >
> > thank for your response !
> >
> > For log.flush.offset.checkpoint.interval.ms we write out only one
> > recovery point for all logs ?
> >
> > But if I have 3 partitions, and for each partition the offset is
> > different, what's happen ? We save in
> >
> > text file 3 different offset ? Or just only one for the three partitions
> ?
> >
> >
> > When you say "to avoid exposing data that have been deleted by
> > DeleteRecordsRequest"
> >
> > It means the old consumed data ? For example I have 34700 offset, it's to
> > avoid reexposing
> >
> > 34000~34699 records to consumer after crash ?
> >
> > 
> > De : Andras Beni 
> > Envoyé : mardi 27 février 2018 06:16:41
> > À : users@kafka.apache.org
> > Objet : Re: difference between 2 options
> >
> > Hi Adrien,
> >
> > Every log.flush.offset.checkpoint.interval.ms  we write out the current
> > recovery point for all logs to a text file in the log directory to avoid
> > recovering the whole log on startup.
> >
> > and every log.flush.start.offset.checkpoint.interval.ms we write out the
> > current log start offset for all logs to a text file in the log directory
> > to avoid exposing data that have been deleted by DeleteRecordsRequest
> >
> > HTH,
> > Andras
> >
> >
> > On Mon, Feb 26, 2018 at 1:51 PM, adrien ruffie <
> adriennolar...@hotmail.fr>
> > wrote:
> >
> > > Hello all,
> > >
> > >
> > > I have read linked porperties documentation, but I don't really
> > understand
> > > the difference between:
> > >
> > > log.flush.offset.checkpoint.interval.ms
> > >
> > >
> > > and
> > >
> > >
> > > log.flush.start.offset.checkpoint.interval.ms
> > >
> > >
> > > Do you have a usecase of each property's utilization, I can't figure
> out
> > > what the difference ...
> > >
> > >
> > > best regards,
> > >
> > >
> > > Adrien
> > >
> >
>


Re: Setting topic's offset from the shell

2018-03-05 Thread Zoran

That should be it. Thank you very much.

On 02/28/2018 06:59 PM, Manikumar wrote:

we can use  "kafka-consumer-groups.sh --reset-offsets" option to reset
offsets. This is available from Kafka 0.11.0.0..


On Wed, Feb 28, 2018 at 2:59 PM, UMESH CHAUDHARY 
wrote:


You might want to set group.id config in kafka-console-consumer (or in any
other consumer) to the value which you haven't used before. This will
replay all available messages in the topic from start if you use
--from-beginning in console consumer.

On Wed, 28 Feb 2018 at 14:19 Zoran  wrote:


Hi,


If I have a topic that has been fully read by consumers, how to set the
offset from the shell to some previous value in order to reread again
several messages?


Regards.