Re: Kafka Streams vs Spark Streaming : reduce by window

2017-06-20 Thread Guozhang Wang
ering t as
> event time to take into account for processing.
>
>
> I'm thinking aloud, considering some scenario that could have a value in
> the IoT space ...
>
>
> Thanks,
>
> Paolo.
>
>
> *Paolo Patierno*
>
> *Senior Software Engineer (IoT) @ Red Hat **Microsoft MVP on **Windows
> Embedded & IoT*
> *Microsoft Azure Advisor*
>
> Twitter : @ppatierno <http://twitter.com/ppatierno>
> Linkedin : paolopatierno <http://it.linkedin.com/in/paolopatierno>
> Blog : DevExperience <http://paolopatierno.wordpress.com/>
>
>
> --
> *From:* Michal Borowiecki <michal.borowie...@openbet.com>
> *Sent:* Sunday, June 18, 2017 9:34 AM
> *To:* d...@kafka.apache.org; Jay Kreps
> *Cc:* users@kafka.apache.org; Matthias J. Sax
>
> *Subject:* Re: Kafka Streams vs Spark Streaming : reduce by window
>
>
> If confusion is the problem, then totally agree no point adding more
> knobs. Perhaps you're right that users don't *really* want
> processing-time semantics. Just *think* they want them until they start
> considering replay/catch-up scenarios. I guess people rarely think about
> those from the start (I sure didn't).
>
> Cheers,
>
> Michał
>
> On 16/06/17 17:54, Jay Kreps wrote:
>
> I think the question is when do you actually *want* processing time
> semantics? There are definitely times when its safe to assume the two are
> close enough that a little lossiness doesn't matter much but it is pretty
> hard to make assumptions about when the processing time is and has been
> hard for us to think of a use case where its actually desirable.
>
> I think mostly what we've seen is confusion about the core concepts:
>
>- stream -- immutable events that occur
>- tables (including windows) -- current state of the world
>
> If the root problem is confusion adding knobs never makes it better. If
> the root problem is we're missing important use cases that justify the
> additional knobs then i think it's good to try to really understand them. I
> think there could be use cases around systems that don't take updates,
> example would be email, twitter, and some metrics stores.
>
> One solution that would be less complexity inducing than allowing new
> semantics, but might help with the use cases we need to collect, would be
> to add a new operator in the DSL. Something like .freezeAfter(30,
> TimeUnit.SECONDS) that collects all updates for a given window and both
> emits and enforces a single output after 30 seconds after the advancement
> of stream time and remembers that it is omitted, suppressing all further
> output (so the output is actually a KStream). This might or might not
> depend on wall clock time. Perhaps this is in fact what you are proposing?
>
> -Jay
>
>
>
> On Fri, Jun 16, 2017 at 2:38 AM, Michal Borowiecki <
> michal.borowie...@openbet.com> wrote:
>
>> I wonder if it's a frequent enough use case that Kafka Streams should
>> consider providing this out of the box - this was asked for multiple times,
>> right?
>>
>> Personally, I agree totally with the philosophy of "no final
>> aggregation", as expressed by Eno's post, but IMO that is predicated
>> totally on event-time semantics.
>>
>> If users want processing-time semantics then, as the docs already point
>> out, there is no such thing as a late-arriving record - every record just
>> falls in the currently open window(s), hence the notion of final
>> aggregation makes perfect sense, from the usability point of view.
>>
>> The single abstraction of "stream time" proves leaky in some cases (e.g.
>> for punctuate method - being addressed in KIP-138). Perhaps this is another
>> case where processing-time semantics warrant explicit handling in the api -
>> but of course, only if there's sufficient user demand for this.
>>
>> What I could imagine is a new type of time window
>> (ProcessingTimeWindow?), that if used in an aggregation, the underlying
>> processor would force the WallclockTimestampExtractor (KAFKA-4144 enables
>> that) and would use the system-time punctuation (KIP-138) to send the final
>> aggregation value once the window has expired and could be configured to
>> not send intermediate updates while the window was open.
>>
>> Of course this is just a helper for the users, since they can implement
>> it all themselves using the low-level API, as Matthias pointed out already.
>> Just seems there's recurring interest in this.
>>
>> Again, this only makes sense for processing time semantics. For
>> event-time semantics I find the arguments for "no final aggregation"

Re: Kafka Streams vs Spark Streaming : reduce by window

2017-06-18 Thread Paolo Patierno
I'm just thinking that having output into a topic every X seconds thanks to the 
windowing could be a useful feature without using something interactive queries 
that are really powerful (I love them) but aren't so useful in this scenario.

Using the caching parameter isn't useful in such scenario because it's in terms 
of bytes not in terms of time.


Let's consider another scenario ...


I have a sensor sending data every 1 seconds. Let's assume that our stream 
processing application is not online and the source topic is filled by sensor 
data with related event time.

When the stream processing application comes online I'd like to have a record 
in the final topic every 5 seconds in order to have an history as well (because 
the application was offline). To be clear ...

Imagine that starting from t = 0, the sensor starts to send data but 
application is offline and the topic is filled from t = 0 to t = 12 (with 12 
events, one per second).

At t = 12 application comes back online and processes the stream in order to 
process data from t = 0 to t = 4 (so first 5 seconds) putting the result into 
the destination queue. Then from t = 5 to t = 9 (other 5 seconds) putting the 
result into the destination queue and so on. If sensor rate isn't so fast then 
the application will start to process in real time at some point (it seems to 
me something like a batch processing which becomes real time processing).

This scenario, for example, isn't possible with Spark today because when the 
application comes back online it process all data from t = 0 to t = 12 
immediately as they were a whole burst of data without considering t as event 
time to take into account for processing.


I'm thinking aloud, considering some scenario that could have a value in the 
IoT space ...


Thanks,

Paolo.


Paolo Patierno
Senior Software Engineer (IoT) @ Red Hat
Microsoft MVP on Windows Embedded & IoT
Microsoft Azure Advisor

Twitter : @ppatierno<http://twitter.com/ppatierno>
Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno>
Blog : DevExperience<http://paolopatierno.wordpress.com/>



From: Michal Borowiecki <michal.borowie...@openbet.com>
Sent: Sunday, June 18, 2017 9:34 AM
To: d...@kafka.apache.org; Jay Kreps
Cc: users@kafka.apache.org; Matthias J. Sax
Subject: Re: Kafka Streams vs Spark Streaming : reduce by window


If confusion is the problem, then totally agree no point adding more knobs. 
Perhaps you're right that users don't really want processing-time semantics. 
Just think they want them until they start considering replay/catch-up 
scenarios. I guess people rarely think about those from the start (I sure 
didn't).

Cheers,

Michał

On 16/06/17 17:54, Jay Kreps wrote:
I think the question is when do you actually want processing time semantics? 
There are definitely times when its safe to assume the two are close enough 
that a little lossiness doesn't matter much but it is pretty hard to make 
assumptions about when the processing time is and has been hard for us to think 
of a use case where its actually desirable.

I think mostly what we've seen is confusion about the core concepts:

  *   stream -- immutable events that occur
  *   tables (including windows) -- current state of the world

If the root problem is confusion adding knobs never makes it better. If the 
root problem is we're missing important use cases that justify the additional 
knobs then i think it's good to try to really understand them. I think there 
could be use cases around systems that don't take updates, example would be 
email, twitter, and some metrics stores.

One solution that would be less complexity inducing than allowing new 
semantics, but might help with the use cases we need to collect, would be to 
add a new operator in the DSL. Something like .freezeAfter(30, 
TimeUnit.SECONDS) that collects all updates for a given window and both emits 
and enforces a single output after 30 seconds after the advancement of stream 
time and remembers that it is omitted, suppressing all further output (so the 
output is actually a KStream). This might or might not depend on wall clock 
time. Perhaps this is in fact what you are proposing?

-Jay



On Fri, Jun 16, 2017 at 2:38 AM, Michal Borowiecki 
<michal.borowie...@openbet.com<mailto:michal.borowie...@openbet.com>> wrote:

I wonder if it's a frequent enough use case that Kafka Streams should consider 
providing this out of the box - this was asked for multiple times, right?

Personally, I agree totally with the philosophy of "no final aggregation", as 
expressed by Eno's post, but IMO that is predicated totally on event-time 
semantics.

If users want processing-time semantics then, as the docs already point out, 
there is no such thing as a late-arriving record - every record just falls in 
the currently open window(s), hence the notion of final aggregation makes 
perfect sense, from the usability

Re: Kafka Streams vs Spark Streaming : reduce by window

2017-06-18 Thread Michal Borowiecki
ndows for which "window end time" passed.


-Matthias

On 6/15/17 9:21 AM, Paolo Patierno wrote:

Hi Eno,


regarding closing window I think that it's up to the streaming application. 
I mean ...

If I want something like I described, I know that a value outside my 5 seconds window 
will be taken into account for the next processing (in the next 5 seconds). I don't think 
I'm losing a record, I am ware that this record will fall in the next 
"processing" window. Btw I'll take a look at your article ! Thanks !


Paolo


Paolo Patierno
Senior Software Engineer (IoT) @ Red Hat
Microsoft MVP on Windows Embedded & IoT
Microsoft Azure Advisor

Twitter : @ppatierno<http://twitter.com/ppatierno> 
<http://twitter.com/ppatierno>
Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno>
<http://it.linkedin.com/in/paolopatierno>
Blog : DevExperience<http://paolopatierno.wordpress.com/>
<http://paolopatierno.wordpress.com/>



From: Eno Thereska<eno.there...@gmail.com> <mailto:eno.there...@gmail.com>
Sent: Thursday, June 15, 2017 3:57 PM
To:users@kafka.apache.org <mailto:users@kafka.apache.org>
Subject: Re: Kafka Streams vs Spark Streaming : reduce by window

Hi Paolo,

Yeah, so if you want fewer records, you should actually "not" disable 
cache. If you disable cache you'll get all the records as you described.

About closing windows: if you close a window and a late record arrives that 
should have been in that window, you basically lose the ability to process that 
record. In Kafka Streams we are robust to that, in that we handle late arriving 
records. There is a comparison here for example when we compare it to other 
methods that depend on watermarks or 
triggers:https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/
<https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/>  
<https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/>
<https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/>

Eno



On 15 Jun 2017, at 14:57, Paolo Patierno<ppatie...@live.com> 
<mailto:ppatie...@live.com>  wrote:

Hi Emo,


thanks for the reply !

Regarding the cache I'm already using CACHE_MAX_BYTES_BUFFERING_CONFIG = 0 
(so disabling cache).

Regarding the interactive query API (I'll take a look) it means that it's 
up to the application doing something like we have oob with Spark.

May I ask what do you mean with "We don’t believe in closing windows" ? 
Isn't it much more code that user has to write for having the same result ?

I'm exploring Kafka Streams and it's very powerful imho even because the 
usage is pretty simple but this scenario could have a lack against Spark.


Thanks,

Paolo.


Paolo Patierno
Senior Software Engineer (IoT) @ Red Hat
Microsoft MVP on Windows Embedded & IoT
Microsoft Azure Advisor

Twitter : @ppatierno<http://twitter.com/ppatierno> 
<http://twitter.com/ppatierno>
Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno>
<http://it.linkedin.com/in/paolopatierno>
Blog : DevExperience<http://paolopatierno.wordpress.com/>
<http://paolopatierno.wordpress.com/>


    ____
From: Eno Thereska<eno.there...@gmail.com> <mailto:eno.there...@gmail.com>
Sent: Thursday, June 15, 2017 1:45 PM
To:users@kafka.apache.org <mailto:users@kafka.apache.org>
Subject: Re: Kafka Streams vs Spark Streaming : reduce by window

Hi Paolo,

That is indeed correct. We don’t believe in closing windows in Kafka 
Streams.
You could reduce the number of downstream records by using record 
caches:http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl

<http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl>
  
<http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl>

<http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl>.

Alternatively you can just query the KTable whenever you want using the 
Interactive Query APIs (so when you query dictates what  data you receive), see 
thishttps://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/

<https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/>
  
<https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/>

<https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/>

Thanks
Eno

On Jun 15, 2017, at 2:

Re: Kafka Streams vs Spark Streaming : reduce by window

2017-06-16 Thread Matthias J. Sax
Thanks Michał!

That is very good feedback.


-Matthias

On 6/16/17 2:38 AM, Michal Borowiecki wrote:
> I wonder if it's a frequent enough use case that Kafka Streams should
> consider providing this out of the box - this was asked for multiple
> times, right?
> 
> Personally, I agree totally with the philosophy of "no final
> aggregation", as expressed by Eno's post, but IMO that is predicated
> totally on event-time semantics.
> 
> If users want processing-time semantics then, as the docs already point
> out, there is no such thing as a late-arriving record - every record
> just falls in the currently open window(s), hence the notion of final
> aggregation makes perfect sense, from the usability point of view.
> 
> The single abstraction of "stream time" proves leaky in some cases (e.g.
> for punctuate method - being addressed in KIP-138). Perhaps this is
> another case where processing-time semantics warrant explicit handling
> in the api - but of course, only if there's sufficient user demand for this.
> 
> What I could imagine is a new type of time window
> (ProcessingTimeWindow?), that if used in an aggregation, the underlying
> processor would force the WallclockTimestampExtractor (KAFKA-4144
> enables that) and would use the system-time punctuation (KIP-138) to
> send the final aggregation value once the window has expired and could
> be configured to not send intermediate updates while the window was open.
> 
> Of course this is just a helper for the users, since they can implement
> it all themselves using the low-level API, as Matthias pointed out
> already. Just seems there's recurring interest in this.
> 
> Again, this only makes sense for processing time semantics. For
> event-time semantics I find the arguments for "no final aggregation"
> totally convincing.
> 
> 
> Cheers,
> 
> Michał
> 
> 
> On 16/06/17 00:08, Matthias J. Sax wrote:
>> Hi Paolo,
>>
>> This SO question might help, too:
>> https://stackoverflow.com/questions/38935904/how-to-send-final-kafka-streams-aggregation-result-of-a-time-windowed-ktable
>>
>> For Streams, the basic model is based on "change" and we report updates
>> to the "current" result immediately reducing latency to a minimum.
>>
>> Last, if you say it's going to fall into the next window, you won't get
>> event time semantics but you fall back processing time semantics, that
>> cannot provide exact results
>>
>> If you really want to trade-off correctness version getting (late)
>> updates and want to use processing time semantics, you should configure
>> WallclockTimestampExtractor and implement a "update deduplication"
>> operator using table.toStream().transform(). You can attached a state to
>> your transformer and store all update there (ie, newer update overwrite
>> older updates). Punctuations allow you to emit "final" results for
>> windows for which "window end time" passed.
>>
>>
>> -Matthias
>>
>> On 6/15/17 9:21 AM, Paolo Patierno wrote:
>>> Hi Eno,
>>>
>>>
>>> regarding closing window I think that it's up to the streaming application. 
>>> I mean ...
>>>
>>> If I want something like I described, I know that a value outside my 5 
>>> seconds window will be taken into account for the next processing (in the 
>>> next 5 seconds). I don't think I'm losing a record, I am ware that this 
>>> record will fall in the next "processing" window. Btw I'll take a look at 
>>> your article ! Thanks !
>>>
>>>
>>> Paolo
>>>
>>>
>>> Paolo Patierno
>>> Senior Software Engineer (IoT) @ Red Hat
>>> Microsoft MVP on Windows Embedded & IoT
>>> Microsoft Azure Advisor
>>>
>>> Twitter : @ppatierno<http://twitter.com/ppatierno>
>>> Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno>
>>> Blog : DevExperience<http://paolopatierno.wordpress.com/>
>>>
>>>
>>> 
>>> From: Eno Thereska <eno.there...@gmail.com>
>>> Sent: Thursday, June 15, 2017 3:57 PM
>>> To: users@kafka.apache.org
>>> Subject: Re: Kafka Streams vs Spark Streaming : reduce by window
>>>
>>> Hi Paolo,
>>>
>>> Yeah, so if you want fewer records, you should actually "not" disable 
>>> cache. If you disable cache you'll get all the records as you described.
>>>
>>> About closing windows: if you close a window and a late record arri

Re: Kafka Streams vs Spark Streaming : reduce by window

2017-06-16 Thread Jay Kreps
m ware that this 
> record will fall in the next "processing" window. Btw I'll take a look at 
> your article ! Thanks !
>
>
> Paolo
>
>
> Paolo Patierno
> Senior Software Engineer (IoT) @ Red Hat
> Microsoft MVP on Windows Embedded & IoT
> Microsoft Azure Advisor
>
> Twitter : @ppatierno<http://twitter.com/ppatierno> 
> <http://twitter.com/ppatierno>
> Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno> 
> <http://it.linkedin.com/in/paolopatierno>
> Blog : DevExperience<http://paolopatierno.wordpress.com/> 
> <http://paolopatierno.wordpress.com/>
>
>
> 
> From: Eno Thereska <eno.there...@gmail.com> <eno.there...@gmail.com>
> Sent: Thursday, June 15, 2017 3:57 PM
> To: users@kafka.apache.org
> Subject: Re: Kafka Streams vs Spark Streaming : reduce by window
>
> Hi Paolo,
>
> Yeah, so if you want fewer records, you should actually "not" disable cache. 
> If you disable cache you'll get all the records as you described.
>
> About closing windows: if you close a window and a late record arrives that 
> should have been in that window, you basically lose the ability to process 
> that record. In Kafka Streams we are robust to that, in that we handle late 
> arriving records. There is a comparison here for example when we compare it 
> to other methods that depend on watermarks or triggers: 
> https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/ 
> <https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/> 
> <https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/>
>
> Eno
>
>
>
> On 15 Jun 2017, at 14:57, Paolo Patierno <ppatie...@live.com> 
> <ppatie...@live.com> wrote:
>
> Hi Emo,
>
>
> thanks for the reply !
>
> Regarding the cache I'm already using CACHE_MAX_BYTES_BUFFERING_CONFIG = 0 
> (so disabling cache).
>
> Regarding the interactive query API (I'll take a look) it means that it's up 
> to the application doing something like we have oob with Spark.
>
> May I ask what do you mean with "We don’t believe in closing windows" ? Isn't 
> it much more code that user has to write for having the same result ?
>
> I'm exploring Kafka Streams and it's very powerful imho even because the 
> usage is pretty simple but this scenario could have a lack against Spark.
>
>
> Thanks,
>
> Paolo.
>
>
> Paolo Patierno
> Senior Software Engineer (IoT) @ Red Hat
> Microsoft MVP on Windows Embedded & IoT
> Microsoft Azure Advisor
>
> Twitter : @ppatierno<http://twitter.com/ppatierno> 
> <http://twitter.com/ppatierno>
> Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno> 
> <http://it.linkedin.com/in/paolopatierno>
> Blog : DevExperience<http://paolopatierno.wordpress.com/> 
> <http://paolopatierno.wordpress.com/>
>
>
> 
> From: Eno Thereska <eno.there...@gmail.com> <eno.there...@gmail.com>
> Sent: Thursday, June 15, 2017 1:45 PM
> To: users@kafka.apache.org
> Subject: Re: Kafka Streams vs Spark Streaming : reduce by window
>
> Hi Paolo,
>
> That is indeed correct. We don’t believe in closing windows in Kafka Streams.
> You could reduce the number of downstream records by using record caches: 
> http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl
>  
> <http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl>
>  
> <http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl>.
>
> Alternatively you can just query the KTable whenever you want using the 
> Interactive Query APIs (so when you query dictates what  data you receive), 
> see this 
> https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/
>  
> <https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/>
>  
> <https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/>
>
> Thanks
> Eno
>
> On Jun 15, 2017, at 2:38 PM, Paolo Patierno <ppatie...@live.com> 
> <ppatie...@live.com> wrote:
>
> Hi,
>
>
> using the streams library I noticed a difference (or there is a lack of 
> knowledge on my side)with Apache Spark.
>
> Imagine following scenario ...
>
>
> I have a source topic where numeric values come in and I want to check the 
> maximum value in the latest 5 seconds but ... putting the max value into a 
> destination topic every 5 seconds.
>
> Thi

Re: Kafka Streams vs Spark Streaming : reduce by window

2017-06-16 Thread Jay Kreps
m ware that this 
> record will fall in the next "processing" window. Btw I'll take a look at 
> your article ! Thanks !
>
>
> Paolo
>
>
> Paolo Patierno
> Senior Software Engineer (IoT) @ Red Hat
> Microsoft MVP on Windows Embedded & IoT
> Microsoft Azure Advisor
>
> Twitter : @ppatierno<http://twitter.com/ppatierno> 
> <http://twitter.com/ppatierno>
> Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno> 
> <http://it.linkedin.com/in/paolopatierno>
> Blog : DevExperience<http://paolopatierno.wordpress.com/> 
> <http://paolopatierno.wordpress.com/>
>
>
> 
> From: Eno Thereska <eno.there...@gmail.com> <eno.there...@gmail.com>
> Sent: Thursday, June 15, 2017 3:57 PM
> To: users@kafka.apache.org
> Subject: Re: Kafka Streams vs Spark Streaming : reduce by window
>
> Hi Paolo,
>
> Yeah, so if you want fewer records, you should actually "not" disable cache. 
> If you disable cache you'll get all the records as you described.
>
> About closing windows: if you close a window and a late record arrives that 
> should have been in that window, you basically lose the ability to process 
> that record. In Kafka Streams we are robust to that, in that we handle late 
> arriving records. There is a comparison here for example when we compare it 
> to other methods that depend on watermarks or triggers: 
> https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/ 
> <https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/> 
> <https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/>
>
> Eno
>
>
>
> On 15 Jun 2017, at 14:57, Paolo Patierno <ppatie...@live.com> 
> <ppatie...@live.com> wrote:
>
> Hi Emo,
>
>
> thanks for the reply !
>
> Regarding the cache I'm already using CACHE_MAX_BYTES_BUFFERING_CONFIG = 0 
> (so disabling cache).
>
> Regarding the interactive query API (I'll take a look) it means that it's up 
> to the application doing something like we have oob with Spark.
>
> May I ask what do you mean with "We don’t believe in closing windows" ? Isn't 
> it much more code that user has to write for having the same result ?
>
> I'm exploring Kafka Streams and it's very powerful imho even because the 
> usage is pretty simple but this scenario could have a lack against Spark.
>
>
> Thanks,
>
> Paolo.
>
>
> Paolo Patierno
> Senior Software Engineer (IoT) @ Red Hat
> Microsoft MVP on Windows Embedded & IoT
> Microsoft Azure Advisor
>
> Twitter : @ppatierno<http://twitter.com/ppatierno> 
> <http://twitter.com/ppatierno>
> Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno> 
> <http://it.linkedin.com/in/paolopatierno>
> Blog : DevExperience<http://paolopatierno.wordpress.com/> 
> <http://paolopatierno.wordpress.com/>
>
>
> 
> From: Eno Thereska <eno.there...@gmail.com> <eno.there...@gmail.com>
> Sent: Thursday, June 15, 2017 1:45 PM
> To: users@kafka.apache.org
> Subject: Re: Kafka Streams vs Spark Streaming : reduce by window
>
> Hi Paolo,
>
> That is indeed correct. We don’t believe in closing windows in Kafka Streams.
> You could reduce the number of downstream records by using record caches: 
> http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl
>  
> <http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl>
>  
> <http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl>.
>
> Alternatively you can just query the KTable whenever you want using the 
> Interactive Query APIs (so when you query dictates what  data you receive), 
> see this 
> https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/
>  
> <https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/>
>  
> <https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/>
>
> Thanks
> Eno
>
> On Jun 15, 2017, at 2:38 PM, Paolo Patierno <ppatie...@live.com> 
> <ppatie...@live.com> wrote:
>
> Hi,
>
>
> using the streams library I noticed a difference (or there is a lack of 
> knowledge on my side)with Apache Spark.
>
> Imagine following scenario ...
>
>
> I have a source topic where numeric values come in and I want to check the 
> maximum value in the latest 5 seconds but ... putting the max value into a 
> destination topic every 5 seconds.
>
> Thi

Re: Kafka Streams vs Spark Streaming : reduce by window

2017-06-16 Thread Michal Borowiecki
I wonder if it's a frequent enough use case that Kafka Streams should 
consider providing this out of the box - this was asked for multiple 
times, right?


Personally, I agree totally with the philosophy of "no final 
aggregation", as expressed by Eno's post, but IMO that is predicated 
totally on event-time semantics.


If users want processing-time semantics then, as the docs already point 
out, there is no such thing as a late-arriving record - every record 
just falls in the currently open window(s), hence the notion of final 
aggregation makes perfect sense, from the usability point of view.


The single abstraction of "stream time" proves leaky in some cases (e.g. 
for punctuate method - being addressed in KIP-138). Perhaps this is 
another case where processing-time semantics warrant explicit handling 
in the api - but of course, only if there's sufficient user demand for this.


What I could imagine is a new type of time window 
(ProcessingTimeWindow?), that if used in an aggregation, the underlying 
processor would force the WallclockTimestampExtractor (KAFKA-4144 
enables that) and would use the system-time punctuation (KIP-138) to 
send the final aggregation value once the window has expired and could 
be configured to not send intermediate updates while the window was open.


Of course this is just a helper for the users, since they can implement 
it all themselves using the low-level API, as Matthias pointed out 
already. Just seems there's recurring interest in this.


Again, this only makes sense for processing time semantics. For 
event-time semantics I find the arguments for "no final aggregation" 
totally convincing.



Cheers,

Michał


On 16/06/17 00:08, Matthias J. Sax wrote:

Hi Paolo,

This SO question might help, too:
https://stackoverflow.com/questions/38935904/how-to-send-final-kafka-streams-aggregation-result-of-a-time-windowed-ktable

For Streams, the basic model is based on "change" and we report updates
to the "current" result immediately reducing latency to a minimum.

Last, if you say it's going to fall into the next window, you won't get
event time semantics but you fall back processing time semantics, that
cannot provide exact results

If you really want to trade-off correctness version getting (late)
updates and want to use processing time semantics, you should configure
WallclockTimestampExtractor and implement a "update deduplication"
operator using table.toStream().transform(). You can attached a state to
your transformer and store all update there (ie, newer update overwrite
older updates). Punctuations allow you to emit "final" results for
windows for which "window end time" passed.


-Matthias

On 6/15/17 9:21 AM, Paolo Patierno wrote:

Hi Eno,


regarding closing window I think that it's up to the streaming application. I 
mean ...

If I want something like I described, I know that a value outside my 5 seconds window 
will be taken into account for the next processing (in the next 5 seconds). I don't think 
I'm losing a record, I am ware that this record will fall in the next 
"processing" window. Btw I'll take a look at your article ! Thanks !


Paolo


Paolo Patierno
Senior Software Engineer (IoT) @ Red Hat
Microsoft MVP on Windows Embedded & IoT
Microsoft Azure Advisor

Twitter : @ppatierno<http://twitter.com/ppatierno>
Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno>
Blog : DevExperience<http://paolopatierno.wordpress.com/>



From: Eno Thereska <eno.there...@gmail.com>
Sent: Thursday, June 15, 2017 3:57 PM
To: users@kafka.apache.org
Subject: Re: Kafka Streams vs Spark Streaming : reduce by window

Hi Paolo,

Yeah, so if you want fewer records, you should actually "not" disable cache. If 
you disable cache you'll get all the records as you described.

About closing windows: if you close a window and a late record arrives that should 
have been in that window, you basically lose the ability to process that record. In 
Kafka Streams we are robust to that, in that we handle late arriving records. There 
is a comparison here for example when we compare it to other methods that depend on 
watermarks or triggers: 
https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/ 
<https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/>

Eno



On 15 Jun 2017, at 14:57, Paolo Patierno <ppatie...@live.com> wrote:

Hi Emo,


thanks for the reply !

Regarding the cache I'm already using CACHE_MAX_BYTES_BUFFERING_CONFIG = 0 (so 
disabling cache).

Regarding the interactive query API (I'll take a look) it means that it's up to 
the application doing something like we have oob with Spark.

May I ask what do you mean with "We don’t believe in closing windows" ? Isn't 
it much more code that user has to write for having the same result ?

I'm ex

Re: Kafka Streams vs Spark Streaming : reduce by window

2017-06-15 Thread Matthias J. Sax
Hi Paolo,

This SO question might help, too:
https://stackoverflow.com/questions/38935904/how-to-send-final-kafka-streams-aggregation-result-of-a-time-windowed-ktable

For Streams, the basic model is based on "change" and we report updates
to the "current" result immediately reducing latency to a minimum.

Last, if you say it's going to fall into the next window, you won't get
event time semantics but you fall back processing time semantics, that
cannot provide exact results

If you really want to trade-off correctness version getting (late)
updates and want to use processing time semantics, you should configure
WallclockTimestampExtractor and implement a "update deduplication"
operator using table.toStream().transform(). You can attached a state to
your transformer and store all update there (ie, newer update overwrite
older updates). Punctuations allow you to emit "final" results for
windows for which "window end time" passed.


-Matthias

On 6/15/17 9:21 AM, Paolo Patierno wrote:
> Hi Eno,
> 
> 
> regarding closing window I think that it's up to the streaming application. I 
> mean ...
> 
> If I want something like I described, I know that a value outside my 5 
> seconds window will be taken into account for the next processing (in the 
> next 5 seconds). I don't think I'm losing a record, I am ware that this 
> record will fall in the next "processing" window. Btw I'll take a look at 
> your article ! Thanks !
> 
> 
> Paolo
> 
> 
> Paolo Patierno
> Senior Software Engineer (IoT) @ Red Hat
> Microsoft MVP on Windows Embedded & IoT
> Microsoft Azure Advisor
> 
> Twitter : @ppatierno<http://twitter.com/ppatierno>
> Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno>
> Blog : DevExperience<http://paolopatierno.wordpress.com/>
> 
> 
> ____________
> From: Eno Thereska <eno.there...@gmail.com>
> Sent: Thursday, June 15, 2017 3:57 PM
> To: users@kafka.apache.org
> Subject: Re: Kafka Streams vs Spark Streaming : reduce by window
> 
> Hi Paolo,
> 
> Yeah, so if you want fewer records, you should actually "not" disable cache. 
> If you disable cache you'll get all the records as you described.
> 
> About closing windows: if you close a window and a late record arrives that 
> should have been in that window, you basically lose the ability to process 
> that record. In Kafka Streams we are robust to that, in that we handle late 
> arriving records. There is a comparison here for example when we compare it 
> to other methods that depend on watermarks or triggers: 
> https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/ 
> <https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/>
> 
> Eno
> 
> 
>> On 15 Jun 2017, at 14:57, Paolo Patierno <ppatie...@live.com> wrote:
>>
>> Hi Emo,
>>
>>
>> thanks for the reply !
>>
>> Regarding the cache I'm already using CACHE_MAX_BYTES_BUFFERING_CONFIG = 0 
>> (so disabling cache).
>>
>> Regarding the interactive query API (I'll take a look) it means that it's up 
>> to the application doing something like we have oob with Spark.
>>
>> May I ask what do you mean with "We don’t believe in closing windows" ? 
>> Isn't it much more code that user has to write for having the same result ?
>>
>> I'm exploring Kafka Streams and it's very powerful imho even because the 
>> usage is pretty simple but this scenario could have a lack against Spark.
>>
>>
>> Thanks,
>>
>> Paolo.
>>
>>
>> Paolo Patierno
>> Senior Software Engineer (IoT) @ Red Hat
>> Microsoft MVP on Windows Embedded & IoT
>> Microsoft Azure Advisor
>>
>> Twitter : @ppatierno<http://twitter.com/ppatierno>
>> Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno>
>> Blog : DevExperience<http://paolopatierno.wordpress.com/>
>>
>>
>> 
>> From: Eno Thereska <eno.there...@gmail.com>
>> Sent: Thursday, June 15, 2017 1:45 PM
>> To: users@kafka.apache.org
>> Subject: Re: Kafka Streams vs Spark Streaming : reduce by window
>>
>> Hi Paolo,
>>
>> That is indeed correct. We don’t believe in closing windows in Kafka Streams.
>> You could reduce the number of downstream records by using record caches: 
>> http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl
>>  
>> <http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl>.
>>
>> Alternatively you can just query the KT

Re: Kafka Streams vs Spark Streaming : reduce by window

2017-06-15 Thread Paolo Patierno
Hi Eno,


regarding closing window I think that it's up to the streaming application. I 
mean ...

If I want something like I described, I know that a value outside my 5 seconds 
window will be taken into account for the next processing (in the next 5 
seconds). I don't think I'm losing a record, I am ware that this record will 
fall in the next "processing" window. Btw I'll take a look at your article ! 
Thanks !


Paolo


Paolo Patierno
Senior Software Engineer (IoT) @ Red Hat
Microsoft MVP on Windows Embedded & IoT
Microsoft Azure Advisor

Twitter : @ppatierno<http://twitter.com/ppatierno>
Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno>
Blog : DevExperience<http://paolopatierno.wordpress.com/>



From: Eno Thereska <eno.there...@gmail.com>
Sent: Thursday, June 15, 2017 3:57 PM
To: users@kafka.apache.org
Subject: Re: Kafka Streams vs Spark Streaming : reduce by window

Hi Paolo,

Yeah, so if you want fewer records, you should actually "not" disable cache. If 
you disable cache you'll get all the records as you described.

About closing windows: if you close a window and a late record arrives that 
should have been in that window, you basically lose the ability to process that 
record. In Kafka Streams we are robust to that, in that we handle late arriving 
records. There is a comparison here for example when we compare it to other 
methods that depend on watermarks or triggers: 
https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/ 
<https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/>

Eno


> On 15 Jun 2017, at 14:57, Paolo Patierno <ppatie...@live.com> wrote:
>
> Hi Emo,
>
>
> thanks for the reply !
>
> Regarding the cache I'm already using CACHE_MAX_BYTES_BUFFERING_CONFIG = 0 
> (so disabling cache).
>
> Regarding the interactive query API (I'll take a look) it means that it's up 
> to the application doing something like we have oob with Spark.
>
> May I ask what do you mean with "We don’t believe in closing windows" ? Isn't 
> it much more code that user has to write for having the same result ?
>
> I'm exploring Kafka Streams and it's very powerful imho even because the 
> usage is pretty simple but this scenario could have a lack against Spark.
>
>
> Thanks,
>
> Paolo.
>
>
> Paolo Patierno
> Senior Software Engineer (IoT) @ Red Hat
> Microsoft MVP on Windows Embedded & IoT
> Microsoft Azure Advisor
>
> Twitter : @ppatierno<http://twitter.com/ppatierno>
> Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno>
> Blog : DevExperience<http://paolopatierno.wordpress.com/>
>
>
> 
> From: Eno Thereska <eno.there...@gmail.com>
> Sent: Thursday, June 15, 2017 1:45 PM
> To: users@kafka.apache.org
> Subject: Re: Kafka Streams vs Spark Streaming : reduce by window
>
> Hi Paolo,
>
> That is indeed correct. We don’t believe in closing windows in Kafka Streams.
> You could reduce the number of downstream records by using record caches: 
> http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl
>  
> <http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl>.
>
> Alternatively you can just query the KTable whenever you want using the 
> Interactive Query APIs (so when you query dictates what  data you receive), 
> see this 
> https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/
>  
> <https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/>
>
> Thanks
> Eno
>> On Jun 15, 2017, at 2:38 PM, Paolo Patierno <ppatie...@live.com> wrote:
>>
>> Hi,
>>
>>
>> using the streams library I noticed a difference (or there is a lack of 
>> knowledge on my side)with Apache Spark.
>>
>> Imagine following scenario ...
>>
>>
>> I have a source topic where numeric values come in and I want to check the 
>> maximum value in the latest 5 seconds but ... putting the max value into a 
>> destination topic every 5 seconds.
>>
>> This is what happens with reduceByWindow method in Spark.
>>
>> I'm using reduce on a KStream here that process the max value taking into 
>> account previous values in the latest 5 seconds but the final value is put 
>> into the destination topic for each incoming value.
>>
>>
>> For example ...
>>
>>
>> An application sends numeric values every 1 second.
>>
>> With Spark ... the source gets values every 1 second, process max in a 
>> window of 5 seconds, 

Re: Kafka Streams vs Spark Streaming : reduce by window

2017-06-15 Thread Eno Thereska
Hi Paolo,

Yeah, so if you want fewer records, you should actually "not" disable cache. If 
you disable cache you'll get all the records as you described.

About closing windows: if you close a window and a late record arrives that 
should have been in that window, you basically lose the ability to process that 
record. In Kafka Streams we are robust to that, in that we handle late arriving 
records. There is a comparison here for example when we compare it to other 
methods that depend on watermarks or triggers: 
https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/ 
<https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/> 

Eno


> On 15 Jun 2017, at 14:57, Paolo Patierno <ppatie...@live.com> wrote:
> 
> Hi Emo,
> 
> 
> thanks for the reply !
> 
> Regarding the cache I'm already using CACHE_MAX_BYTES_BUFFERING_CONFIG = 0 
> (so disabling cache).
> 
> Regarding the interactive query API (I'll take a look) it means that it's up 
> to the application doing something like we have oob with Spark.
> 
> May I ask what do you mean with "We don’t believe in closing windows" ? Isn't 
> it much more code that user has to write for having the same result ?
> 
> I'm exploring Kafka Streams and it's very powerful imho even because the 
> usage is pretty simple but this scenario could have a lack against Spark.
> 
> 
> Thanks,
> 
> Paolo.
> 
> 
> Paolo Patierno
> Senior Software Engineer (IoT) @ Red Hat
> Microsoft MVP on Windows Embedded & IoT
> Microsoft Azure Advisor
> 
> Twitter : @ppatierno<http://twitter.com/ppatierno>
> Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno>
> Blog : DevExperience<http://paolopatierno.wordpress.com/>
> 
> 
> ____________
> From: Eno Thereska <eno.there...@gmail.com>
> Sent: Thursday, June 15, 2017 1:45 PM
> To: users@kafka.apache.org
> Subject: Re: Kafka Streams vs Spark Streaming : reduce by window
> 
> Hi Paolo,
> 
> That is indeed correct. We don’t believe in closing windows in Kafka Streams.
> You could reduce the number of downstream records by using record caches: 
> http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl
>  
> <http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl>.
> 
> Alternatively you can just query the KTable whenever you want using the 
> Interactive Query APIs (so when you query dictates what  data you receive), 
> see this 
> https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/
>  
> <https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/>
> 
> Thanks
> Eno
>> On Jun 15, 2017, at 2:38 PM, Paolo Patierno <ppatie...@live.com> wrote:
>> 
>> Hi,
>> 
>> 
>> using the streams library I noticed a difference (or there is a lack of 
>> knowledge on my side)with Apache Spark.
>> 
>> Imagine following scenario ...
>> 
>> 
>> I have a source topic where numeric values come in and I want to check the 
>> maximum value in the latest 5 seconds but ... putting the max value into a 
>> destination topic every 5 seconds.
>> 
>> This is what happens with reduceByWindow method in Spark.
>> 
>> I'm using reduce on a KStream here that process the max value taking into 
>> account previous values in the latest 5 seconds but the final value is put 
>> into the destination topic for each incoming value.
>> 
>> 
>> For example ...
>> 
>> 
>> An application sends numeric values every 1 second.
>> 
>> With Spark ... the source gets values every 1 second, process max in a 
>> window of 5 seconds, puts the max into the destination every 5 seconds (so 
>> when the window ends). If the sequence is 21, 25, 22, 20, 26 the output will 
>> be just 26.
>> 
>> With Kafka Streams ... the source gets values every 1 second, process max in 
>> a window of 5 seconds, puts the max into the destination every 1 seconds (so 
>> every time an incoming value arrives). Of course, if for example the 
>> sequence is 21, 25, 22, 20, 26 ... the output will be 21, 25, 25, 25, 26.
>> 
>> 
>> Is it possible with Kafka Streams ? Or it's something to do at application 
>> level ?
>> 
>> 
>> Thanks,
>> 
>> Paolo
>> 
>> 
>> Paolo Patierno
>> Senior Software Engineer (IoT) @ Red Hat
>> Microsoft MVP on Windows Embedded & IoT
>> Microsoft Azure Advisor
>> 
>> Twitter : @ppatierno<http://twitter.com/ppatierno>
>> Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno>
>> Blog : DevExperience<http://paolopatierno.wordpress.com/>
> 



Re: Kafka Streams vs Spark Streaming : reduce by window

2017-06-15 Thread Paolo Patierno
Hi Emo,


thanks for the reply !

Regarding the cache I'm already using CACHE_MAX_BYTES_BUFFERING_CONFIG = 0 (so 
disabling cache).

Regarding the interactive query API (I'll take a look) it means that it's up to 
the application doing something like we have oob with Spark.

May I ask what do you mean with "We don’t believe in closing windows" ? Isn't 
it much more code that user has to write for having the same result ?

I'm exploring Kafka Streams and it's very powerful imho even because the usage 
is pretty simple but this scenario could have a lack against Spark.


Thanks,

Paolo.


Paolo Patierno
Senior Software Engineer (IoT) @ Red Hat
Microsoft MVP on Windows Embedded & IoT
Microsoft Azure Advisor

Twitter : @ppatierno<http://twitter.com/ppatierno>
Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno>
Blog : DevExperience<http://paolopatierno.wordpress.com/>



From: Eno Thereska <eno.there...@gmail.com>
Sent: Thursday, June 15, 2017 1:45 PM
To: users@kafka.apache.org
Subject: Re: Kafka Streams vs Spark Streaming : reduce by window

Hi Paolo,

That is indeed correct. We don’t believe in closing windows in Kafka Streams.
You could reduce the number of downstream records by using record caches: 
http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl
 
<http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl>.

Alternatively you can just query the KTable whenever you want using the 
Interactive Query APIs (so when you query dictates what  data you receive), see 
this 
https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/
 
<https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/>

Thanks
Eno
> On Jun 15, 2017, at 2:38 PM, Paolo Patierno <ppatie...@live.com> wrote:
>
> Hi,
>
>
> using the streams library I noticed a difference (or there is a lack of 
> knowledge on my side)with Apache Spark.
>
> Imagine following scenario ...
>
>
> I have a source topic where numeric values come in and I want to check the 
> maximum value in the latest 5 seconds but ... putting the max value into a 
> destination topic every 5 seconds.
>
> This is what happens with reduceByWindow method in Spark.
>
> I'm using reduce on a KStream here that process the max value taking into 
> account previous values in the latest 5 seconds but the final value is put 
> into the destination topic for each incoming value.
>
>
> For example ...
>
>
> An application sends numeric values every 1 second.
>
> With Spark ... the source gets values every 1 second, process max in a window 
> of 5 seconds, puts the max into the destination every 5 seconds (so when the 
> window ends). If the sequence is 21, 25, 22, 20, 26 the output will be just 
> 26.
>
> With Kafka Streams ... the source gets values every 1 second, process max in 
> a window of 5 seconds, puts the max into the destination every 1 seconds (so 
> every time an incoming value arrives). Of course, if for example the sequence 
> is 21, 25, 22, 20, 26 ... the output will be 21, 25, 25, 25, 26.
>
>
> Is it possible with Kafka Streams ? Or it's something to do at application 
> level ?
>
>
> Thanks,
>
> Paolo
>
>
> Paolo Patierno
> Senior Software Engineer (IoT) @ Red Hat
> Microsoft MVP on Windows Embedded & IoT
> Microsoft Azure Advisor
>
> Twitter : @ppatierno<http://twitter.com/ppatierno>
> Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno>
> Blog : DevExperience<http://paolopatierno.wordpress.com/>



Re: Kafka Streams vs Spark Streaming : reduce by window

2017-06-15 Thread Eno Thereska
Hi Paolo,

That is indeed correct. We don’t believe in closing windows in Kafka Streams. 
You could reduce the number of downstream records by using record caches: 
http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl
 
.

Alternatively you can just query the KTable whenever you want using the 
Interactive Query APIs (so when you query dictates what  data you receive), see 
this 
https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/
 


Thanks
Eno
> On Jun 15, 2017, at 2:38 PM, Paolo Patierno  wrote:
> 
> Hi,
> 
> 
> using the streams library I noticed a difference (or there is a lack of 
> knowledge on my side)with Apache Spark.
> 
> Imagine following scenario ...
> 
> 
> I have a source topic where numeric values come in and I want to check the 
> maximum value in the latest 5 seconds but ... putting the max value into a 
> destination topic every 5 seconds.
> 
> This is what happens with reduceByWindow method in Spark.
> 
> I'm using reduce on a KStream here that process the max value taking into 
> account previous values in the latest 5 seconds but the final value is put 
> into the destination topic for each incoming value.
> 
> 
> For example ...
> 
> 
> An application sends numeric values every 1 second.
> 
> With Spark ... the source gets values every 1 second, process max in a window 
> of 5 seconds, puts the max into the destination every 5 seconds (so when the 
> window ends). If the sequence is 21, 25, 22, 20, 26 the output will be just 
> 26.
> 
> With Kafka Streams ... the source gets values every 1 second, process max in 
> a window of 5 seconds, puts the max into the destination every 1 seconds (so 
> every time an incoming value arrives). Of course, if for example the sequence 
> is 21, 25, 22, 20, 26 ... the output will be 21, 25, 25, 25, 26.
> 
> 
> Is it possible with Kafka Streams ? Or it's something to do at application 
> level ?
> 
> 
> Thanks,
> 
> Paolo
> 
> 
> Paolo Patierno
> Senior Software Engineer (IoT) @ Red Hat
> Microsoft MVP on Windows Embedded & IoT
> Microsoft Azure Advisor
> 
> Twitter : @ppatierno
> Linkedin : paolopatierno
> Blog : DevExperience



Re: Kafka Streams vs Spark Streaming : reduce by window

2017-06-15 Thread Tom Bentley
It sounds like you want a tumbling time window, rather than a sliding window

https://kafka.apache.org/documentation/streams#streams_dsl_windowing

On 15 June 2017 at 14:38, Paolo Patierno  wrote:

> Hi,
>
>
> using the streams library I noticed a difference (or there is a lack of
> knowledge on my side)with Apache Spark.
>
> Imagine following scenario ...
>
>
> I have a source topic where numeric values come in and I want to check the
> maximum value in the latest 5 seconds but ... putting the max value into a
> destination topic every 5 seconds.
>
> This is what happens with reduceByWindow method in Spark.
>
> I'm using reduce on a KStream here that process the max value taking into
> account previous values in the latest 5 seconds but the final value is put
> into the destination topic for each incoming value.
>
>
> For example ...
>
>
> An application sends numeric values every 1 second.
>
> With Spark ... the source gets values every 1 second, process max in a
> window of 5 seconds, puts the max into the destination every 5 seconds (so
> when the window ends). If the sequence is 21, 25, 22, 20, 26 the output
> will be just 26.
>
> With Kafka Streams ... the source gets values every 1 second, process max
> in a window of 5 seconds, puts the max into the destination every 1 seconds
> (so every time an incoming value arrives). Of course, if for example the
> sequence is 21, 25, 22, 20, 26 ... the output will be 21, 25, 25, 25, 26.
>
>
> Is it possible with Kafka Streams ? Or it's something to do at application
> level ?
>
>
> Thanks,
>
> Paolo
>
>
> Paolo Patierno
> Senior Software Engineer (IoT) @ Red Hat
> Microsoft MVP on Windows Embedded & IoT
> Microsoft Azure Advisor
>
> Twitter : @ppatierno
> Linkedin : paolopatierno
> Blog : DevExperience
>


Re: Kafka Streams vs Spark Streaming

2017-03-01 Thread Matthias J. Sax
Steven,

I guess my last answer was not completely correct. You might start with
a new store, if the task gets moved to a different machine. Otherwise,
we don't explicitly wipe out the store, but just reuse it in whatever
state it is on restart.

-Matthias

On 2/28/17 2:19 PM, Matthias J. Sax wrote:
> Sorry. Miss understood your question.
> 
> For a non-logged store, in case of failure, we wipe out the entire state
> (IIRC) -- thus, you will start with an empty state after recovery.
> 
> 
> -Matthias
> 
> 
> On 2/28/17 1:36 PM, Steven Schlansker wrote:
>> Thanks Matthias for this information.  But it seems you are talking about a
>> logged store, since you mention the changelog topic and replaying it and 
>> whatnot.
>>
>> But my question specifically was about *unlogged* state stores, where there 
>> is no
>> such changelog topic available.  Sorry if that wasn't clear before.  Or am I 
>> misunderstanding?
>>
>>> On Feb 28, 2017, at 9:12 AM, Matthias J. Sax  wrote:
>>>
>>> If a store is backed by a changelog topic, the changelog topic is
>>> responsible to hold the latest state of the store. Thus, the topic must
>>> store the latest value per key. For this, we use a compacted topic.
>>>
>>> If case of restore, the local RocksDB store is cleared so it is empty,
>>> and we read the complete changelog topic an apply those updates to the
>>> store.
>>>
>>> This allows a fast recovery, because no source topic rewind and not
>>> reprocessing is required. Furthermore, because the changelog topic is
>>> compacted, it is roughly the size of the number of distinct keys in the
>>> store -- this also reduced recovery time as you don't need to replay
>>> every update to the store.
>>>
>>> We are currently working on an optimization, that allows us to only
>>> reply the tail to the changelog topic in certain cases to get the store
>>> back into a valid state: See
>>> https://issues.apache.org/jira/browse/KAFKA-4317
>>>
>>> Furthermore, changelog topic allow to maintain StandbyTask -- those
>>> tasks only apply all updates to the changelog topic (that are written by
>>> the main task maintaining the store) to a local copy of the store. Thus,
>>> in case of fail-over those StandbyTasks can replace a failed task and
>>> because they have a copy of the state, they can take over even more
>>> quickly than a newly created tasks that needs to reply the changelog to
>>> rebuild the state first.
>>>
>>>
>>>
>>> -Matthias
>>>
>>> On 2/28/17 8:17 AM, Steven Schlansker wrote:

> On Feb 28, 2017, at 12:17 AM, Michael Noll  wrote:
>
> Sachin,
>
> disabling (change)logging for state stores disables the fault-tolerance of
> the state store -- i.e. changes to the state store will not be backed up 
> to
> Kafka, regardless of whether the store uses a RocksDB store, an in-memory
> store, or something else

 One thing I've wanted is a more concrete description of this failure mode.
 What exactly is the process to recover from such a "failed" state store?

 Does Kafka Streams rewind the source topic and replay?  (Including any 
 Processors you may have wired up?)
 Does the state store remain faulted?  Can an administrator fix it by 
 resetting some offsets?

 I looked around both in the project and Confluent documentation and didn't 
 really find
 an answer to how non-logged state stores fail or recover.

 Thanks for any insight!

>
>
>> When disabling this in 0.10.2 what does this exactly means.
>
> See above.
>
>
>> Does this means no longer any rocksdb state store would get created?
>
> No, local state stores will still be created.  By default, the storage
> engine is RocksDB, so if you disable changelogging then you will still 
> have
> local RocksDB stores (as usual) but those stores will not be backed up to
> Kafka behind the scenes.  If, in this situation, you lose a machine that
> has local RocksDB stores, then this state data is lost, too.
>
> So there are two different things at play here:
>
> 1. Whether you want to enable or disable (change)logging of state store,
> and thus to enable/disable fault-tolerant state stores.
>
> 2. Which storage engine you want to use for the state stores.  The default
> is RocksDB.
>
> If, for (2), you do not want to have RocksDB state stores, you can switch
> the storage engine to e.g. the in-memory store.  However, when you do
> switch from RocksDB to in-memory then all your state store's data must fit
> into memory (obviously), otherwise you'll run OOM.
>
> In summary, you can have either of the following:
>
> a. RocksDB state stores with changelogging enabled (= fault-tolerant
> stores).
>
> b. RocksDB state stores with changelogging disabled (= stores are not
> fault-tolerant, you may suffer from data loss 

Re: Kafka Streams vs Spark Streaming

2017-02-28 Thread Matthias J. Sax
Sorry. Miss understood your question.

For a non-logged store, in case of failure, we wipe out the entire state
(IIRC) -- thus, you will start with an empty state after recovery.


-Matthias


On 2/28/17 1:36 PM, Steven Schlansker wrote:
> Thanks Matthias for this information.  But it seems you are talking about a
> logged store, since you mention the changelog topic and replaying it and 
> whatnot.
> 
> But my question specifically was about *unlogged* state stores, where there 
> is no
> such changelog topic available.  Sorry if that wasn't clear before.  Or am I 
> misunderstanding?
> 
>> On Feb 28, 2017, at 9:12 AM, Matthias J. Sax  wrote:
>>
>> If a store is backed by a changelog topic, the changelog topic is
>> responsible to hold the latest state of the store. Thus, the topic must
>> store the latest value per key. For this, we use a compacted topic.
>>
>> If case of restore, the local RocksDB store is cleared so it is empty,
>> and we read the complete changelog topic an apply those updates to the
>> store.
>>
>> This allows a fast recovery, because no source topic rewind and not
>> reprocessing is required. Furthermore, because the changelog topic is
>> compacted, it is roughly the size of the number of distinct keys in the
>> store -- this also reduced recovery time as you don't need to replay
>> every update to the store.
>>
>> We are currently working on an optimization, that allows us to only
>> reply the tail to the changelog topic in certain cases to get the store
>> back into a valid state: See
>> https://issues.apache.org/jira/browse/KAFKA-4317
>>
>> Furthermore, changelog topic allow to maintain StandbyTask -- those
>> tasks only apply all updates to the changelog topic (that are written by
>> the main task maintaining the store) to a local copy of the store. Thus,
>> in case of fail-over those StandbyTasks can replace a failed task and
>> because they have a copy of the state, they can take over even more
>> quickly than a newly created tasks that needs to reply the changelog to
>> rebuild the state first.
>>
>>
>>
>> -Matthias
>>
>> On 2/28/17 8:17 AM, Steven Schlansker wrote:
>>>
 On Feb 28, 2017, at 12:17 AM, Michael Noll  wrote:

 Sachin,

 disabling (change)logging for state stores disables the fault-tolerance of
 the state store -- i.e. changes to the state store will not be backed up to
 Kafka, regardless of whether the store uses a RocksDB store, an in-memory
 store, or something else
>>>
>>> One thing I've wanted is a more concrete description of this failure mode.
>>> What exactly is the process to recover from such a "failed" state store?
>>>
>>> Does Kafka Streams rewind the source topic and replay?  (Including any 
>>> Processors you may have wired up?)
>>> Does the state store remain faulted?  Can an administrator fix it by 
>>> resetting some offsets?
>>>
>>> I looked around both in the project and Confluent documentation and didn't 
>>> really find
>>> an answer to how non-logged state stores fail or recover.
>>>
>>> Thanks for any insight!
>>>


> When disabling this in 0.10.2 what does this exactly means.

 See above.


> Does this means no longer any rocksdb state store would get created?

 No, local state stores will still be created.  By default, the storage
 engine is RocksDB, so if you disable changelogging then you will still have
 local RocksDB stores (as usual) but those stores will not be backed up to
 Kafka behind the scenes.  If, in this situation, you lose a machine that
 has local RocksDB stores, then this state data is lost, too.

 So there are two different things at play here:

 1. Whether you want to enable or disable (change)logging of state store,
 and thus to enable/disable fault-tolerant state stores.

 2. Which storage engine you want to use for the state stores.  The default
 is RocksDB.

 If, for (2), you do not want to have RocksDB state stores, you can switch
 the storage engine to e.g. the in-memory store.  However, when you do
 switch from RocksDB to in-memory then all your state store's data must fit
 into memory (obviously), otherwise you'll run OOM.

 In summary, you can have either of the following:

 a. RocksDB state stores with changelogging enabled (= fault-tolerant
 stores).

 b. RocksDB state stores with changelogging disabled (= stores are not
 fault-tolerant, you may suffer from data loss during e.g. machine 
 failures).

 c. In-memory state stores with changelogging enabled (= fault-tolerant
 stores). But careful: you may run OOM if the state data does not fit into
 the available memory.

 d. In-memory state stores with changelogging disabled (= stores are not
 fault-tolerant, you may suffer from data loss during e.g. machine
 failures). But careful: you may run OOM if the state 

Re: Kafka Streams vs Spark Streaming

2017-02-28 Thread Steven Schlansker
Thanks Matthias for this information.  But it seems you are talking about a
logged store, since you mention the changelog topic and replaying it and 
whatnot.

But my question specifically was about *unlogged* state stores, where there is 
no
such changelog topic available.  Sorry if that wasn't clear before.  Or am I 
misunderstanding?

> On Feb 28, 2017, at 9:12 AM, Matthias J. Sax  wrote:
> 
> If a store is backed by a changelog topic, the changelog topic is
> responsible to hold the latest state of the store. Thus, the topic must
> store the latest value per key. For this, we use a compacted topic.
> 
> If case of restore, the local RocksDB store is cleared so it is empty,
> and we read the complete changelog topic an apply those updates to the
> store.
> 
> This allows a fast recovery, because no source topic rewind and not
> reprocessing is required. Furthermore, because the changelog topic is
> compacted, it is roughly the size of the number of distinct keys in the
> store -- this also reduced recovery time as you don't need to replay
> every update to the store.
> 
> We are currently working on an optimization, that allows us to only
> reply the tail to the changelog topic in certain cases to get the store
> back into a valid state: See
> https://issues.apache.org/jira/browse/KAFKA-4317
> 
> Furthermore, changelog topic allow to maintain StandbyTask -- those
> tasks only apply all updates to the changelog topic (that are written by
> the main task maintaining the store) to a local copy of the store. Thus,
> in case of fail-over those StandbyTasks can replace a failed task and
> because they have a copy of the state, they can take over even more
> quickly than a newly created tasks that needs to reply the changelog to
> rebuild the state first.
> 
> 
> 
> -Matthias
> 
> On 2/28/17 8:17 AM, Steven Schlansker wrote:
>> 
>>> On Feb 28, 2017, at 12:17 AM, Michael Noll  wrote:
>>> 
>>> Sachin,
>>> 
>>> disabling (change)logging for state stores disables the fault-tolerance of
>>> the state store -- i.e. changes to the state store will not be backed up to
>>> Kafka, regardless of whether the store uses a RocksDB store, an in-memory
>>> store, or something else
>> 
>> One thing I've wanted is a more concrete description of this failure mode.
>> What exactly is the process to recover from such a "failed" state store?
>> 
>> Does Kafka Streams rewind the source topic and replay?  (Including any 
>> Processors you may have wired up?)
>> Does the state store remain faulted?  Can an administrator fix it by 
>> resetting some offsets?
>> 
>> I looked around both in the project and Confluent documentation and didn't 
>> really find
>> an answer to how non-logged state stores fail or recover.
>> 
>> Thanks for any insight!
>> 
>>> 
>>> 
 When disabling this in 0.10.2 what does this exactly means.
>>> 
>>> See above.
>>> 
>>> 
 Does this means no longer any rocksdb state store would get created?
>>> 
>>> No, local state stores will still be created.  By default, the storage
>>> engine is RocksDB, so if you disable changelogging then you will still have
>>> local RocksDB stores (as usual) but those stores will not be backed up to
>>> Kafka behind the scenes.  If, in this situation, you lose a machine that
>>> has local RocksDB stores, then this state data is lost, too.
>>> 
>>> So there are two different things at play here:
>>> 
>>> 1. Whether you want to enable or disable (change)logging of state store,
>>> and thus to enable/disable fault-tolerant state stores.
>>> 
>>> 2. Which storage engine you want to use for the state stores.  The default
>>> is RocksDB.
>>> 
>>> If, for (2), you do not want to have RocksDB state stores, you can switch
>>> the storage engine to e.g. the in-memory store.  However, when you do
>>> switch from RocksDB to in-memory then all your state store's data must fit
>>> into memory (obviously), otherwise you'll run OOM.
>>> 
>>> In summary, you can have either of the following:
>>> 
>>> a. RocksDB state stores with changelogging enabled (= fault-tolerant
>>> stores).
>>> 
>>> b. RocksDB state stores with changelogging disabled (= stores are not
>>> fault-tolerant, you may suffer from data loss during e.g. machine failures).
>>> 
>>> c. In-memory state stores with changelogging enabled (= fault-tolerant
>>> stores). But careful: you may run OOM if the state data does not fit into
>>> the available memory.
>>> 
>>> d. In-memory state stores with changelogging disabled (= stores are not
>>> fault-tolerant, you may suffer from data loss during e.g. machine
>>> failures). But careful: you may run OOM if the state data does not fit into
>>> the available memory.
>>> 
>>> 
>>> Hope this helps,
>>> Michael
>>> 
>>> 
>>> 
>>> 
>>> On Tue, Feb 28, 2017 at 8:01 AM, Sachin Mittal  wrote:
>>> 
 I had a question regarding
 http://docs.confluent.io/3.1.2/streams/developer-guide.
 

Re: Kafka Streams vs Spark Streaming

2017-02-28 Thread Matthias J. Sax
Tainji,

Streams provides at-least-once processing guarantees. Thus, all
flush/commits must be aligned -- otherwise, this guarantee might break.


-Matthias

On 2/28/17 6:40 AM, Damian Guy wrote:
> Hi Tainji,
> 
> The changelogs are flushed on the commit interval.  It isn't currently
> possible to change this.
> 
> Thanks,
> Damian
> 
> On Tue, 28 Feb 2017 at 14:00 Tianji Li  wrote:
> 
>> Hi Guys,
>>
>> Thanks very much for your help.
>>
>> A final question, is it possible to use different commit intervals for
>> state-store change-logs topics and for sink topics?
>>
>> Thanks
>> Tianji
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: Kafka Streams vs Spark Streaming

2017-02-28 Thread Matthias J. Sax
If a store is backed by a changelog topic, the changelog topic is
responsible to hold the latest state of the store. Thus, the topic must
store the latest value per key. For this, we use a compacted topic.

If case of restore, the local RocksDB store is cleared so it is empty,
and we read the complete changelog topic an apply those updates to the
store.

This allows a fast recovery, because no source topic rewind and not
reprocessing is required. Furthermore, because the changelog topic is
compacted, it is roughly the size of the number of distinct keys in the
store -- this also reduced recovery time as you don't need to replay
every update to the store.

We are currently working on an optimization, that allows us to only
reply the tail to the changelog topic in certain cases to get the store
back into a valid state: See
https://issues.apache.org/jira/browse/KAFKA-4317

Furthermore, changelog topic allow to maintain StandbyTask -- those
tasks only apply all updates to the changelog topic (that are written by
the main task maintaining the store) to a local copy of the store. Thus,
in case of fail-over those StandbyTasks can replace a failed task and
because they have a copy of the state, they can take over even more
quickly than a newly created tasks that needs to reply the changelog to
rebuild the state first.



-Matthias

On 2/28/17 8:17 AM, Steven Schlansker wrote:
> 
>> On Feb 28, 2017, at 12:17 AM, Michael Noll  wrote:
>>
>> Sachin,
>>
>> disabling (change)logging for state stores disables the fault-tolerance of
>> the state store -- i.e. changes to the state store will not be backed up to
>> Kafka, regardless of whether the store uses a RocksDB store, an in-memory
>> store, or something else
> 
> One thing I've wanted is a more concrete description of this failure mode.
> What exactly is the process to recover from such a "failed" state store?
> 
> Does Kafka Streams rewind the source topic and replay?  (Including any 
> Processors you may have wired up?)
> Does the state store remain faulted?  Can an administrator fix it by 
> resetting some offsets?
> 
> I looked around both in the project and Confluent documentation and didn't 
> really find
> an answer to how non-logged state stores fail or recover.
> 
> Thanks for any insight!
> 
>>
>>
>>> When disabling this in 0.10.2 what does this exactly means.
>>
>> See above.
>>
>>
>>> Does this means no longer any rocksdb state store would get created?
>>
>> No, local state stores will still be created.  By default, the storage
>> engine is RocksDB, so if you disable changelogging then you will still have
>> local RocksDB stores (as usual) but those stores will not be backed up to
>> Kafka behind the scenes.  If, in this situation, you lose a machine that
>> has local RocksDB stores, then this state data is lost, too.
>>
>> So there are two different things at play here:
>>
>> 1. Whether you want to enable or disable (change)logging of state store,
>> and thus to enable/disable fault-tolerant state stores.
>>
>> 2. Which storage engine you want to use for the state stores.  The default
>> is RocksDB.
>>
>> If, for (2), you do not want to have RocksDB state stores, you can switch
>> the storage engine to e.g. the in-memory store.  However, when you do
>> switch from RocksDB to in-memory then all your state store's data must fit
>> into memory (obviously), otherwise you'll run OOM.
>>
>> In summary, you can have either of the following:
>>
>> a. RocksDB state stores with changelogging enabled (= fault-tolerant
>> stores).
>>
>> b. RocksDB state stores with changelogging disabled (= stores are not
>> fault-tolerant, you may suffer from data loss during e.g. machine failures).
>>
>> c. In-memory state stores with changelogging enabled (= fault-tolerant
>> stores). But careful: you may run OOM if the state data does not fit into
>> the available memory.
>>
>> d. In-memory state stores with changelogging disabled (= stores are not
>> fault-tolerant, you may suffer from data loss during e.g. machine
>> failures). But careful: you may run OOM if the state data does not fit into
>> the available memory.
>>
>>
>> Hope this helps,
>> Michael
>>
>>
>>
>>
>> On Tue, Feb 28, 2017 at 8:01 AM, Sachin Mittal  wrote:
>>
>>> I had a question regarding
>>> http://docs.confluent.io/3.1.2/streams/developer-guide.
>>> html#enable-disable-state-store-changelogs
>>>
>>> When disabling this in 0.10.2 what does this exactly means.
>>> Dos this means no longer any rocksdb state store would get created?
>>>
>>> On this subject we had started with spark streaming, but we ran into memory
>>> issues and the hardware we have got is not so fantastic to support spark
>>> streaming.
>>>
>>> So we switched to high level DSL kafka streaming .
>>>
>>> I think if your source is kafka queues, kafka streaming is good and simple
>>> to use. However you need to plan ahead as anticipate the (max) load and
>>> create adequate partitions based on some key on 

Re: Kafka Streams vs Spark Streaming

2017-02-28 Thread Steven Schlansker

> On Feb 28, 2017, at 12:17 AM, Michael Noll  wrote:
> 
> Sachin,
> 
> disabling (change)logging for state stores disables the fault-tolerance of
> the state store -- i.e. changes to the state store will not be backed up to
> Kafka, regardless of whether the store uses a RocksDB store, an in-memory
> store, or something else

One thing I've wanted is a more concrete description of this failure mode.
What exactly is the process to recover from such a "failed" state store?

Does Kafka Streams rewind the source topic and replay?  (Including any 
Processors you may have wired up?)
Does the state store remain faulted?  Can an administrator fix it by resetting 
some offsets?

I looked around both in the project and Confluent documentation and didn't 
really find
an answer to how non-logged state stores fail or recover.

Thanks for any insight!

> 
> 
>> When disabling this in 0.10.2 what does this exactly means.
> 
> See above.
> 
> 
>> Does this means no longer any rocksdb state store would get created?
> 
> No, local state stores will still be created.  By default, the storage
> engine is RocksDB, so if you disable changelogging then you will still have
> local RocksDB stores (as usual) but those stores will not be backed up to
> Kafka behind the scenes.  If, in this situation, you lose a machine that
> has local RocksDB stores, then this state data is lost, too.
> 
> So there are two different things at play here:
> 
> 1. Whether you want to enable or disable (change)logging of state store,
> and thus to enable/disable fault-tolerant state stores.
> 
> 2. Which storage engine you want to use for the state stores.  The default
> is RocksDB.
> 
> If, for (2), you do not want to have RocksDB state stores, you can switch
> the storage engine to e.g. the in-memory store.  However, when you do
> switch from RocksDB to in-memory then all your state store's data must fit
> into memory (obviously), otherwise you'll run OOM.
> 
> In summary, you can have either of the following:
> 
> a. RocksDB state stores with changelogging enabled (= fault-tolerant
> stores).
> 
> b. RocksDB state stores with changelogging disabled (= stores are not
> fault-tolerant, you may suffer from data loss during e.g. machine failures).
> 
> c. In-memory state stores with changelogging enabled (= fault-tolerant
> stores). But careful: you may run OOM if the state data does not fit into
> the available memory.
> 
> d. In-memory state stores with changelogging disabled (= stores are not
> fault-tolerant, you may suffer from data loss during e.g. machine
> failures). But careful: you may run OOM if the state data does not fit into
> the available memory.
> 
> 
> Hope this helps,
> Michael
> 
> 
> 
> 
> On Tue, Feb 28, 2017 at 8:01 AM, Sachin Mittal  wrote:
> 
>> I had a question regarding
>> http://docs.confluent.io/3.1.2/streams/developer-guide.
>> html#enable-disable-state-store-changelogs
>> 
>> When disabling this in 0.10.2 what does this exactly means.
>> Dos this means no longer any rocksdb state store would get created?
>> 
>> On this subject we had started with spark streaming, but we ran into memory
>> issues and the hardware we have got is not so fantastic to support spark
>> streaming.
>> 
>> So we switched to high level DSL kafka streaming .
>> 
>> I think if your source is kafka queues, kafka streaming is good and simple
>> to use. However you need to plan ahead as anticipate the (max) load and
>> create adequate partitions based on some key on which aggregations can be
>> performed independently.
>> 
>> Then you can run cluster of stream threads (same and multiple machines),
>> each processing a partition.
>> 
>> Having said this, we however run into lot of issues of frequent stream
>> re-balance, especially when we have multiple instances of rocks db running
>> on a single machine.
>> Now we don't know if this is some bad VM configuration issue or some
>> problem with kafka streams/rocks db integration, we are still working on
>> that.
>> 
>> So I would suggest if you partition your data well enough and have single
>> streams thread consuming only one partition and not many instances of
>> rocksdb created on a single machine, the overall applications runs fine.
>> Also make sure not to create big time windows and set a not so long
>> retention time, so that state stores size is limited.
>> 
>> We use a sliding 5 minutes window of size 10 minutes and retention of 30
>> minutes and see overall performance much better than say 30 minutes sliding
>> of size 1 hour and retention of 3 hours.
>> 
>> So to conclude if you can manage rocks db, then kafka streams is good to
>> start with, its simple and very intuitive to use.
>> 
>> Again on rocksdb side, is there a way to eliminate that and is
>> 
>> disableLogging
>> 
>> for that?
>> 
>> Thanks
>> Sachin
>> 
>> 
>> 
>> On Mon, Feb 27, 2017 at 7:47 PM, Michael Noll 
>> wrote:
>> 
 Also, is it possible to stop the syncing 

Re: Kafka Streams vs Spark Streaming

2017-02-28 Thread Damian Guy
Hi Tainji,

The changelogs are flushed on the commit interval.  It isn't currently
possible to change this.

Thanks,
Damian

On Tue, 28 Feb 2017 at 14:00 Tianji Li  wrote:

> Hi Guys,
>
> Thanks very much for your help.
>
> A final question, is it possible to use different commit intervals for
> state-store change-logs topics and for sink topics?
>
> Thanks
> Tianji
>


Re: Kafka Streams vs Spark Streaming

2017-02-28 Thread Tianji Li
Hi Guys,

Thanks very much for your help. 

A final question, is it possible to use different commit intervals for 
state-store change-logs topics and for sink topics? 

Thanks
Tianji


Re: Kafka Streams vs Spark Streaming

2017-02-28 Thread Michael Noll
Sachin,

disabling (change)logging for state stores disables the fault-tolerance of
the state store -- i.e. changes to the state store will not be backed up to
Kafka, regardless of whether the store uses a RocksDB store, an in-memory
store, or something else


> When disabling this in 0.10.2 what does this exactly means.

See above.


> Does this means no longer any rocksdb state store would get created?

No, local state stores will still be created.  By default, the storage
engine is RocksDB, so if you disable changelogging then you will still have
local RocksDB stores (as usual) but those stores will not be backed up to
Kafka behind the scenes.  If, in this situation, you lose a machine that
has local RocksDB stores, then this state data is lost, too.

So there are two different things at play here:

1. Whether you want to enable or disable (change)logging of state store,
and thus to enable/disable fault-tolerant state stores.

2. Which storage engine you want to use for the state stores.  The default
is RocksDB.

If, for (2), you do not want to have RocksDB state stores, you can switch
the storage engine to e.g. the in-memory store.  However, when you do
switch from RocksDB to in-memory then all your state store's data must fit
into memory (obviously), otherwise you'll run OOM.

In summary, you can have either of the following:

a. RocksDB state stores with changelogging enabled (= fault-tolerant
stores).

b. RocksDB state stores with changelogging disabled (= stores are not
fault-tolerant, you may suffer from data loss during e.g. machine failures).

c. In-memory state stores with changelogging enabled (= fault-tolerant
stores). But careful: you may run OOM if the state data does not fit into
the available memory.

d. In-memory state stores with changelogging disabled (= stores are not
fault-tolerant, you may suffer from data loss during e.g. machine
failures). But careful: you may run OOM if the state data does not fit into
the available memory.


Hope this helps,
Michael




On Tue, Feb 28, 2017 at 8:01 AM, Sachin Mittal  wrote:

> I had a question regarding
> http://docs.confluent.io/3.1.2/streams/developer-guide.
> html#enable-disable-state-store-changelogs
>
> When disabling this in 0.10.2 what does this exactly means.
> Dos this means no longer any rocksdb state store would get created?
>
> On this subject we had started with spark streaming, but we ran into memory
> issues and the hardware we have got is not so fantastic to support spark
> streaming.
>
> So we switched to high level DSL kafka streaming .
>
> I think if your source is kafka queues, kafka streaming is good and simple
> to use. However you need to plan ahead as anticipate the (max) load and
> create adequate partitions based on some key on which aggregations can be
> performed independently.
>
> Then you can run cluster of stream threads (same and multiple machines),
> each processing a partition.
>
> Having said this, we however run into lot of issues of frequent stream
> re-balance, especially when we have multiple instances of rocks db running
> on a single machine.
> Now we don't know if this is some bad VM configuration issue or some
> problem with kafka streams/rocks db integration, we are still working on
> that.
>
> So I would suggest if you partition your data well enough and have single
> streams thread consuming only one partition and not many instances of
> rocksdb created on a single machine, the overall applications runs fine.
> Also make sure not to create big time windows and set a not so long
> retention time, so that state stores size is limited.
>
> We use a sliding 5 minutes window of size 10 minutes and retention of 30
> minutes and see overall performance much better than say 30 minutes sliding
> of size 1 hour and retention of 3 hours.
>
> So to conclude if you can manage rocks db, then kafka streams is good to
> start with, its simple and very intuitive to use.
>
> Again on rocksdb side, is there a way to eliminate that and is
>
> disableLogging
>
> for that?
>
> Thanks
> Sachin
>
>
>
> On Mon, Feb 27, 2017 at 7:47 PM, Michael Noll 
> wrote:
>
> > > Also, is it possible to stop the syncing between state stores to
> brokers,
> > if I am fine with failures?
> >
> > Yes, you can disable the syncing (or the "changelog" feature) of state
> > stores:
> > http://docs.confluent.io/current/streams/developer-
> > guide.html#enable-disable-state-store-changelogs
> >
> > > I do have a Spark Cluster, but I am not convince how Spark Streaming
> can
> > do this differently.
> > > Guozhang, could you comment anything regarding Kafka Streams vs Spark
> > Streaming, especially
> > > in terms of aggregations/groupbys/joins implementation logic?
> >
> > As you are hinting at yourself, if you want fault-tolerant state, then
> this
> > fault tolerance comes at a price (in Kafka Streams, this is achieved by
> > changelog-ing state stores).  Other tools such as Flink or Spark work in
> a
> 

Re: Kafka Streams vs Spark Streaming

2017-02-27 Thread Sachin Mittal
I had a question regarding
http://docs.confluent.io/3.1.2/streams/developer-guide.html#enable-disable-state-store-changelogs

When disabling this in 0.10.2 what does this exactly means.
Dos this means no longer any rocksdb state store would get created?

On this subject we had started with spark streaming, but we ran into memory
issues and the hardware we have got is not so fantastic to support spark
streaming.

So we switched to high level DSL kafka streaming .

I think if your source is kafka queues, kafka streaming is good and simple
to use. However you need to plan ahead as anticipate the (max) load and
create adequate partitions based on some key on which aggregations can be
performed independently.

Then you can run cluster of stream threads (same and multiple machines),
each processing a partition.

Having said this, we however run into lot of issues of frequent stream
re-balance, especially when we have multiple instances of rocks db running
on a single machine.
Now we don't know if this is some bad VM configuration issue or some
problem with kafka streams/rocks db integration, we are still working on
that.

So I would suggest if you partition your data well enough and have single
streams thread consuming only one partition and not many instances of
rocksdb created on a single machine, the overall applications runs fine.
Also make sure not to create big time windows and set a not so long
retention time, so that state stores size is limited.

We use a sliding 5 minutes window of size 10 minutes and retention of 30
minutes and see overall performance much better than say 30 minutes sliding
of size 1 hour and retention of 3 hours.

So to conclude if you can manage rocks db, then kafka streams is good to
start with, its simple and very intuitive to use.

Again on rocksdb side, is there a way to eliminate that and is

disableLogging

for that?

Thanks
Sachin



On Mon, Feb 27, 2017 at 7:47 PM, Michael Noll  wrote:

> > Also, is it possible to stop the syncing between state stores to brokers,
> if I am fine with failures?
>
> Yes, you can disable the syncing (or the "changelog" feature) of state
> stores:
> http://docs.confluent.io/current/streams/developer-
> guide.html#enable-disable-state-store-changelogs
>
> > I do have a Spark Cluster, but I am not convince how Spark Streaming can
> do this differently.
> > Guozhang, could you comment anything regarding Kafka Streams vs Spark
> Streaming, especially
> > in terms of aggregations/groupbys/joins implementation logic?
>
> As you are hinting at yourself, if you want fault-tolerant state, then this
> fault tolerance comes at a price (in Kafka Streams, this is achieved by
> changelog-ing state stores).  Other tools such as Flink or Spark work in a
> similar fashion, there's no free lunch.
>
> One option, which you brought up above, is to disable the fault tolerance
> functionality for state by disabling the changelogs of state stores (see
> above).  Another option is to leverage Kafka's record caching for Kafka
> Streams, which does lower the amount of data that is sent across the
> network (from your app's state store changelogs to the Kafka cluster and
> vice versa), though you may need to tune some parameters in your situation
> because your key space has high cardinality and message volume per key is
> relatively low (= you don't benefit as much from record caching as most
> other users/use cases).
>
>
> -Michael
>
>
>
>
> On Mon, Feb 27, 2017 at 2:42 PM, Tianji Li  wrote:
>
> > Hi Guozhang and Kohki,
> >
> > Thanks for your replies.
> >
> > I think I know how to deal with partitioning now, but I am still not sure
> > how to deal with the traffic between the hidden state store sizes and
> Kafka
> > Brokers (same as Kohki).
> >
> > I feel like the easiest thing to do is to set a larger commit window, so
> > that the state stores are synced to brokers slower than default.
> >
> > I do have a Spark Cluster, but I am not convince how Spark Streaming can
> > do this differently. Guozhang, could you comment anything regarding Kafka
> > Streams vs Spark Streaming, especially in terms of
> > aggregations/groupbys/joins implementation logic?
> >
> > Also, is it possible to stop the syncing between state stores to brokers,
> > if I am fine with failures?
> >
> > Thanks
> > Tianji
> >
> >
> > On 2017-02-26 23:52 (-0500), Guozhang Wang  wrote:
> > > Hello Tianji,
> > >
> > > As Kohki mentioned, in Streams joins and aggregations are always done
> > > pre-partitioned, and hence locally. So there won't be any inter-node
> > > communications needed to execute the join / aggregations. Also they can
> > be
> > > hosted as persistent local state stores so you don't need to keep them
> in
> > > memory. So for example if you partition your data with K1 / K2, then
> data
> > > with the same values in combo (K1, K2) will always goes to the same
> > > partition, and hence good for aggregations / joins on either 

Re: Kafka Streams vs Spark Streaming

2017-02-27 Thread Guozhang Wang
Kohki,

Thanks for the explanation, it's very helpful.

As we have talked in another email thread you started, originally I thought
the motivation to use "explicit triggers" (i.e. what it achieves with your
watermark) was due to application logic, i.e. whenever you have received a
record that triggers an action (i.e. alarm) in the down stream, you do not
want to update this key's value any more.

But from your description here, it seems the motivation is not only from
application logic, but also from operational concerns, about downstream
traffic, but by using explicit triggers you can reduce the update records
sent to downstream as well, which I agrees.

Just wanting to point out that in your real monitoring app, if you want
each alarm triggered record to be queriable later, you can consider
branching such changelog streams to keep these updates so that they can be
backed in a state store even if a later record with the same key reset it
to be normal.


Guozhang



On Mon, Feb 27, 2017 at 10:08 AM, Kohki Nishio  wrote:

> Guozhang,
> It's a bit difficult to explain, but let me try ... the basic idea is that
> we can assume most of messages have the same clock (per partition at
> least), then if an offset has information about metadata about the target
> time of the offset, fail-over works.
>
> Offset = 1
> Metadata Time = 2/12/2017 12:00:00
>
> After this offset (1), I only process messages later than the metadata
> time. And from the application, it commits only when it's safe to move the
> time bucket forward for all keys, it picks the earliest time of the time
> bucket from each key. But that 'move forward' is not per key decision, it's
> per partition decision. So I need to define the maximum time range in which
> I need to keep data.
>
> But it's not always that simple, there are outliers(network hiccup) and
> delayed arrivals(wrong ntp), my plan is to mark those keys as 'best effort'
> group when it happens. For those keys, as long as JVM keeps running, I can
> handle those but those won't be a part of 'per partition decision'..
>
> Hope this gives some idea about what I'm trying to do .. I'm planning to
> use Processor API to do this.
>
> Thanks
> -Kohki
>
>
>
>
>
>
> On Sun, Feb 26, 2017 at 8:56 PM, Guozhang Wang  wrote:
>
> > Hello Kohki,
> >
> > Given your data traffic and the state volume I cannot think of a better
> > solution but suggest using large number of partitioned local states.
> >
> > I'm wondering how would "per partition watermark" can help with your
> > traffic?
> >
> > Guozhang
> >
> > On Sun, Feb 26, 2017 at 10:45 AM, Kohki Nishio 
> wrote:
> >
> > > Guozhang,
> > >
> > > Let me explain what I'm trying to do. The message volume is large (TB
> per
> > > Day) and that is coming to a topic. Now I want to do per minute
> > > aggregation(Windowed) and send the output to the downstream (a topic)
> > >
> > > (Topic1 - Large Volume) -> [Stream App] -> (Topic2 - Large Volume)
> > >
> > > I assume the internal topic would have the same amount of data as
> topic1
> > > and the same goes to the local store, I know we can tweak retention
> > period
> > > but network traffic would be same (or even more)
> > >
> > > The key point here is that most of incoming stream will end up a single
> > > data point per a minute (aggregation window), but the variance of the
> key
> > > is huge (high cardinality), then buffering wouldn't really help reduce
> > the
> > > data/traffic size.
> > >
> > > I'm going to do something like per partition watermarking with offset
> > > metadata. It wouldn't increase the traffic that much
> > > thanks
> > > -Kohki
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>
>
>
> --
> Kohki Nishio
>



-- 
-- Guozhang


Re: Kafka Streams vs Spark Streaming

2017-02-27 Thread Guozhang Wang
Tianji,

To add to what Michael already mentioned: Setting commit interval higher
can help with the producer batching changelog records sent to Kafka
broker's changelog topic and hence better leverage bandwidth, however the
traffic would still be the same in your case as you mentioned that each
key's average update rate is around 1 and hence caching would not help too
much.

So if you are really fine with failures, i.e. it is affordable to just
restart from scratch whenever that happens, you can disable changelogging
on the state stores as Michael mentioned.

Guozhang


On Mon, Feb 27, 2017 at 6:17 AM, Michael Noll  wrote:

> > Also, is it possible to stop the syncing between state stores to brokers,
> if I am fine with failures?
>
> Yes, you can disable the syncing (or the "changelog" feature) of state
> stores:
> http://docs.confluent.io/current/streams/developer-
> guide.html#enable-disable-state-store-changelogs
>
> > I do have a Spark Cluster, but I am not convince how Spark Streaming can
> do this differently.
> > Guozhang, could you comment anything regarding Kafka Streams vs Spark
> Streaming, especially
> > in terms of aggregations/groupbys/joins implementation logic?
>
> As you are hinting at yourself, if you want fault-tolerant state, then this
> fault tolerance comes at a price (in Kafka Streams, this is achieved by
> changelog-ing state stores).  Other tools such as Flink or Spark work in a
> similar fashion, there's no free lunch.
>
> One option, which you brought up above, is to disable the fault tolerance
> functionality for state by disabling the changelogs of state stores (see
> above).  Another option is to leverage Kafka's record caching for Kafka
> Streams, which does lower the amount of data that is sent across the
> network (from your app's state store changelogs to the Kafka cluster and
> vice versa), though you may need to tune some parameters in your situation
> because your key space has high cardinality and message volume per key is
> relatively low (= you don't benefit as much from record caching as most
> other users/use cases).
>
>
> -Michael
>
>
>
>
> On Mon, Feb 27, 2017 at 2:42 PM, Tianji Li  wrote:
>
> > Hi Guozhang and Kohki,
> >
> > Thanks for your replies.
> >
> > I think I know how to deal with partitioning now, but I am still not sure
> > how to deal with the traffic between the hidden state store sizes and
> Kafka
> > Brokers (same as Kohki).
> >
> > I feel like the easiest thing to do is to set a larger commit window, so
> > that the state stores are synced to brokers slower than default.
> >
> > I do have a Spark Cluster, but I am not convince how Spark Streaming can
> > do this differently. Guozhang, could you comment anything regarding Kafka
> > Streams vs Spark Streaming, especially in terms of
> > aggregations/groupbys/joins implementation logic?
> >
> > Also, is it possible to stop the syncing between state stores to brokers,
> > if I am fine with failures?
> >
> > Thanks
> > Tianji
> >
> >
> > On 2017-02-26 23:52 (-0500), Guozhang Wang  wrote:
> > > Hello Tianji,
> > >
> > > As Kohki mentioned, in Streams joins and aggregations are always done
> > > pre-partitioned, and hence locally. So there won't be any inter-node
> > > communications needed to execute the join / aggregations. Also they can
> > be
> > > hosted as persistent local state stores so you don't need to keep them
> in
> > > memory. So for example if you partition your data with K1 / K2, then
> data
> > > with the same values in combo (K1, K2) will always goes to the same
> > > partition, and hence good for aggregations / joins on either K1, K2, or
> > > combo(K1, K2), but not sufficient for combo(K1, K2, K3, K4), as data
> with
> > > the same values of K3 / K4 might still goes to different partitions
> > > processed by different Streams instances.
> > >
> > > So what you want is really to partition based on the "maximum superset"
> > of
> > > all the involved keys. Note that with the superset of all the keys one
> > > thing to watch out is the even distribution of the partitions. If it is
> > not
> > > evenly distributed, then some instance might become hot points. This
> can
> > be
> > > tackled by customizing the "PartitionGrouper" interface in Streams,
> which
> > > indicates which set of partitions will be assigned to each of the tasks
> > (by
> > > default each one partition from the source topics will form a task, and
> > > task is the unit of parallelism in Streams).
> > >
> > > Hope this helps.
> > >
> > > Guozhang
> > >
> > >
> > > On Sun, Feb 26, 2017 at 10:57 AM, Kohki Nishio 
> > wrote:
> > >
> > > > Tianji,
> > > > KStream is indeed Append mode as long as I do stateless processing,
> but
> > > > when you do aggregation that is a stateful operation and it turns to
> > KTable
> > > > and that does Update mode.
> > > >
> > > > In regard to your aggregation, I believe Kafka's aggregation works
> for
> > a

Re: Kafka Streams vs Spark Streaming

2017-02-27 Thread Kohki Nishio
Guozhang,
It's a bit difficult to explain, but let me try ... the basic idea is that
we can assume most of messages have the same clock (per partition at
least), then if an offset has information about metadata about the target
time of the offset, fail-over works.

Offset = 1
Metadata Time = 2/12/2017 12:00:00

After this offset (1), I only process messages later than the metadata
time. And from the application, it commits only when it's safe to move the
time bucket forward for all keys, it picks the earliest time of the time
bucket from each key. But that 'move forward' is not per key decision, it's
per partition decision. So I need to define the maximum time range in which
I need to keep data.

But it's not always that simple, there are outliers(network hiccup) and
delayed arrivals(wrong ntp), my plan is to mark those keys as 'best effort'
group when it happens. For those keys, as long as JVM keeps running, I can
handle those but those won't be a part of 'per partition decision'..

Hope this gives some idea about what I'm trying to do .. I'm planning to
use Processor API to do this.

Thanks
-Kohki






On Sun, Feb 26, 2017 at 8:56 PM, Guozhang Wang  wrote:

> Hello Kohki,
>
> Given your data traffic and the state volume I cannot think of a better
> solution but suggest using large number of partitioned local states.
>
> I'm wondering how would "per partition watermark" can help with your
> traffic?
>
> Guozhang
>
> On Sun, Feb 26, 2017 at 10:45 AM, Kohki Nishio  wrote:
>
> > Guozhang,
> >
> > Let me explain what I'm trying to do. The message volume is large (TB per
> > Day) and that is coming to a topic. Now I want to do per minute
> > aggregation(Windowed) and send the output to the downstream (a topic)
> >
> > (Topic1 - Large Volume) -> [Stream App] -> (Topic2 - Large Volume)
> >
> > I assume the internal topic would have the same amount of data as topic1
> > and the same goes to the local store, I know we can tweak retention
> period
> > but network traffic would be same (or even more)
> >
> > The key point here is that most of incoming stream will end up a single
> > data point per a minute (aggregation window), but the variance of the key
> > is huge (high cardinality), then buffering wouldn't really help reduce
> the
> > data/traffic size.
> >
> > I'm going to do something like per partition watermarking with offset
> > metadata. It wouldn't increase the traffic that much
> > thanks
> > -Kohki
> >
>
>
>
> --
> -- Guozhang
>



-- 
Kohki Nishio


Re: Kafka Streams vs Spark Streaming

2017-02-27 Thread Michael Noll
> Also, is it possible to stop the syncing between state stores to brokers,
if I am fine with failures?

Yes, you can disable the syncing (or the "changelog" feature) of state
stores:
http://docs.confluent.io/current/streams/developer-guide.html#enable-disable-state-store-changelogs

> I do have a Spark Cluster, but I am not convince how Spark Streaming can
do this differently.
> Guozhang, could you comment anything regarding Kafka Streams vs Spark
Streaming, especially
> in terms of aggregations/groupbys/joins implementation logic?

As you are hinting at yourself, if you want fault-tolerant state, then this
fault tolerance comes at a price (in Kafka Streams, this is achieved by
changelog-ing state stores).  Other tools such as Flink or Spark work in a
similar fashion, there's no free lunch.

One option, which you brought up above, is to disable the fault tolerance
functionality for state by disabling the changelogs of state stores (see
above).  Another option is to leverage Kafka's record caching for Kafka
Streams, which does lower the amount of data that is sent across the
network (from your app's state store changelogs to the Kafka cluster and
vice versa), though you may need to tune some parameters in your situation
because your key space has high cardinality and message volume per key is
relatively low (= you don't benefit as much from record caching as most
other users/use cases).


-Michael




On Mon, Feb 27, 2017 at 2:42 PM, Tianji Li  wrote:

> Hi Guozhang and Kohki,
>
> Thanks for your replies.
>
> I think I know how to deal with partitioning now, but I am still not sure
> how to deal with the traffic between the hidden state store sizes and Kafka
> Brokers (same as Kohki).
>
> I feel like the easiest thing to do is to set a larger commit window, so
> that the state stores are synced to brokers slower than default.
>
> I do have a Spark Cluster, but I am not convince how Spark Streaming can
> do this differently. Guozhang, could you comment anything regarding Kafka
> Streams vs Spark Streaming, especially in terms of
> aggregations/groupbys/joins implementation logic?
>
> Also, is it possible to stop the syncing between state stores to brokers,
> if I am fine with failures?
>
> Thanks
> Tianji
>
>
> On 2017-02-26 23:52 (-0500), Guozhang Wang  wrote:
> > Hello Tianji,
> >
> > As Kohki mentioned, in Streams joins and aggregations are always done
> > pre-partitioned, and hence locally. So there won't be any inter-node
> > communications needed to execute the join / aggregations. Also they can
> be
> > hosted as persistent local state stores so you don't need to keep them in
> > memory. So for example if you partition your data with K1 / K2, then data
> > with the same values in combo (K1, K2) will always goes to the same
> > partition, and hence good for aggregations / joins on either K1, K2, or
> > combo(K1, K2), but not sufficient for combo(K1, K2, K3, K4), as data with
> > the same values of K3 / K4 might still goes to different partitions
> > processed by different Streams instances.
> >
> > So what you want is really to partition based on the "maximum superset"
> of
> > all the involved keys. Note that with the superset of all the keys one
> > thing to watch out is the even distribution of the partitions. If it is
> not
> > evenly distributed, then some instance might become hot points. This can
> be
> > tackled by customizing the "PartitionGrouper" interface in Streams, which
> > indicates which set of partitions will be assigned to each of the tasks
> (by
> > default each one partition from the source topics will form a task, and
> > task is the unit of parallelism in Streams).
> >
> > Hope this helps.
> >
> > Guozhang
> >
> >
> > On Sun, Feb 26, 2017 at 10:57 AM, Kohki Nishio 
> wrote:
> >
> > > Tianji,
> > > KStream is indeed Append mode as long as I do stateless processing, but
> > > when you do aggregation that is a stateful operation and it turns to
> KTable
> > > and that does Update mode.
> > >
> > > In regard to your aggregation, I believe Kafka's aggregation works for
> a
> > > single partition not over multiple partitions, are you doing 100
> > > different aggregation against record key ? Then you should have a
> single
> > > data object for those 100 values, anyway it sounds like we have similar
> > > problem ..
> > >
> > > -Kohki
> > >
> > >
> > >
> > >
> > >
> > > On Sat, Feb 25, 2017 at 1:11 PM, Tianji Li  wrote:
> > >
> > > > Hi Kohki,
> > > >
> > > > Thanks very much for providing your investigation results. Regarding
> > > > 'append' mode with Kafka Streams, isn't KStream the thing you want?
> > > >
> > > > Hi Guozhang,
> > > >
> > > > Thanks for the pointers to the two blogs. I read one of them before
> and
> > > > just had a look at the other one.
> > > >
> > > > What I am hoping to do is below, can you help me decide if Kafka
> Stream
> > > is
> > > > a good fit?
> > > >
> > > > We have a few 

Re: Kafka Streams vs Spark Streaming

2017-02-27 Thread Tianji Li
Hi Guozhang and Kohki,

Thanks for your replies.

I think I know how to deal with partitioning now, but I am still not sure how 
to deal with the traffic between the hidden state store sizes and Kafka Brokers 
(same as Kohki).

I feel like the easiest thing to do is to set a larger commit window, so that 
the state stores are synced to brokers slower than default.

I do have a Spark Cluster, but I am not convince how Spark Streaming can do 
this differently. Guozhang, could you comment anything regarding Kafka Streams 
vs Spark Streaming, especially in terms of aggregations/groupbys/joins 
implementation logic?

Also, is it possible to stop the syncing between state stores to brokers, if I 
am fine with failures?

Thanks
Tianji


On 2017-02-26 23:52 (-0500), Guozhang Wang  wrote: 
> Hello Tianji,
> 
> As Kohki mentioned, in Streams joins and aggregations are always done
> pre-partitioned, and hence locally. So there won't be any inter-node
> communications needed to execute the join / aggregations. Also they can be
> hosted as persistent local state stores so you don't need to keep them in
> memory. So for example if you partition your data with K1 / K2, then data
> with the same values in combo (K1, K2) will always goes to the same
> partition, and hence good for aggregations / joins on either K1, K2, or
> combo(K1, K2), but not sufficient for combo(K1, K2, K3, K4), as data with
> the same values of K3 / K4 might still goes to different partitions
> processed by different Streams instances.
> 
> So what you want is really to partition based on the "maximum superset" of
> all the involved keys. Note that with the superset of all the keys one
> thing to watch out is the even distribution of the partitions. If it is not
> evenly distributed, then some instance might become hot points. This can be
> tackled by customizing the "PartitionGrouper" interface in Streams, which
> indicates which set of partitions will be assigned to each of the tasks (by
> default each one partition from the source topics will form a task, and
> task is the unit of parallelism in Streams).
> 
> Hope this helps.
> 
> Guozhang
> 
> 
> On Sun, Feb 26, 2017 at 10:57 AM, Kohki Nishio  wrote:
> 
> > Tianji,
> > KStream is indeed Append mode as long as I do stateless processing, but
> > when you do aggregation that is a stateful operation and it turns to KTable
> > and that does Update mode.
> >
> > In regard to your aggregation, I believe Kafka's aggregation works for a
> > single partition not over multiple partitions, are you doing 100
> > different aggregation against record key ? Then you should have a single
> > data object for those 100 values, anyway it sounds like we have similar
> > problem ..
> >
> > -Kohki
> >
> >
> >
> >
> >
> > On Sat, Feb 25, 2017 at 1:11 PM, Tianji Li  wrote:
> >
> > > Hi Kohki,
> > >
> > > Thanks very much for providing your investigation results. Regarding
> > > 'append' mode with Kafka Streams, isn't KStream the thing you want?
> > >
> > > Hi Guozhang,
> > >
> > > Thanks for the pointers to the two blogs. I read one of them before and
> > > just had a look at the other one.
> > >
> > > What I am hoping to do is below, can you help me decide if Kafka Stream
> > is
> > > a good fit?
> > >
> > > We have a few data sources, and we are hoping to correlate these sources,
> > > and then do aggregations, as *a stream in real-time*.
> > >
> > > The number of aggregations is around 100 which means, if using Kafka
> > > Streams, we need to maintain around 100 state stores with 100 change-log
> > > topics behind
> > > the scene when joining and aggregations.
> > >
> > > The number of unique entries in each of these state stores is expected to
> > > be at the level of < 100M. The size of each record is around 1K bytes and
> > > so,
> > > each state is expected to be ~100G bytes in size. The total number of
> > > bytes in all these state stores is thus around 10T bytes.
> > >
> > > If keeping all these stores in memory, this translates into around 50
> > > machines with 256Gbytes for this purpose alone.
> > >
> > > Plus, the incoming raw data rate could reach 10M records per second in
> > > peak hours. So, during aggregation, data movement between Kafka Streams
> > > instances
> > > will be heavy, i.e., 10M records per second in the cluster for joining
> > and
> > > aggregations.
> > >
> > > Is Kafka Streams good for this? My gut feeling is Kafka Streams is fine.
> > > But I'd like to run this by you.
> > >
> > > And, I am hoping to minimize data movement (to saving bandwidth) during
> > > joins/groupBys. If I partition the raw data with the minimum subset of
> > > aggregation keys (say K1 and K2),  then I wonder if the following
> > > joins/groupBys (say on keys K1, K2, K3, K4) happen on local data, if
> > using
> > > DSL?
> > >
> > > Thanks
> > > Tianji
> > >
> > >
> > > On 2017-02-25 13:49 (-0500), Guozhang Wang  wrote:
> > > > Hello 

Re: Kafka Streams vs Spark Streaming

2017-02-26 Thread Guozhang Wang
Hello Kohki,

Given your data traffic and the state volume I cannot think of a better
solution but suggest using large number of partitioned local states.

I'm wondering how would "per partition watermark" can help with your
traffic?

Guozhang

On Sun, Feb 26, 2017 at 10:45 AM, Kohki Nishio  wrote:

> Guozhang,
>
> Let me explain what I'm trying to do. The message volume is large (TB per
> Day) and that is coming to a topic. Now I want to do per minute
> aggregation(Windowed) and send the output to the downstream (a topic)
>
> (Topic1 - Large Volume) -> [Stream App] -> (Topic2 - Large Volume)
>
> I assume the internal topic would have the same amount of data as topic1
> and the same goes to the local store, I know we can tweak retention period
> but network traffic would be same (or even more)
>
> The key point here is that most of incoming stream will end up a single
> data point per a minute (aggregation window), but the variance of the key
> is huge (high cardinality), then buffering wouldn't really help reduce the
> data/traffic size.
>
> I'm going to do something like per partition watermarking with offset
> metadata. It wouldn't increase the traffic that much
> thanks
> -Kohki
>



-- 
-- Guozhang


Re: Kafka Streams vs Spark Streaming

2017-02-26 Thread Kohki Nishio
Tianji,
KStream is indeed Append mode as long as I do stateless processing, but
when you do aggregation that is a stateful operation and it turns to KTable
and that does Update mode.

In regard to your aggregation, I believe Kafka's aggregation works for a
single partition not over multiple partitions, are you doing 100
different aggregation against record key ? Then you should have a single
data object for those 100 values, anyway it sounds like we have similar
problem ..

-Kohki





On Sat, Feb 25, 2017 at 1:11 PM, Tianji Li  wrote:

> Hi Kohki,
>
> Thanks very much for providing your investigation results. Regarding
> 'append' mode with Kafka Streams, isn't KStream the thing you want?
>
> Hi Guozhang,
>
> Thanks for the pointers to the two blogs. I read one of them before and
> just had a look at the other one.
>
> What I am hoping to do is below, can you help me decide if Kafka Stream is
> a good fit?
>
> We have a few data sources, and we are hoping to correlate these sources,
> and then do aggregations, as *a stream in real-time*.
>
> The number of aggregations is around 100 which means, if using Kafka
> Streams, we need to maintain around 100 state stores with 100 change-log
> topics behind
> the scene when joining and aggregations.
>
> The number of unique entries in each of these state stores is expected to
> be at the level of < 100M. The size of each record is around 1K bytes and
> so,
> each state is expected to be ~100G bytes in size. The total number of
> bytes in all these state stores is thus around 10T bytes.
>
> If keeping all these stores in memory, this translates into around 50
> machines with 256Gbytes for this purpose alone.
>
> Plus, the incoming raw data rate could reach 10M records per second in
> peak hours. So, during aggregation, data movement between Kafka Streams
> instances
> will be heavy, i.e., 10M records per second in the cluster for joining and
> aggregations.
>
> Is Kafka Streams good for this? My gut feeling is Kafka Streams is fine.
> But I'd like to run this by you.
>
> And, I am hoping to minimize data movement (to saving bandwidth) during
> joins/groupBys. If I partition the raw data with the minimum subset of
> aggregation keys (say K1 and K2),  then I wonder if the following
> joins/groupBys (say on keys K1, K2, K3, K4) happen on local data, if using
> DSL?
>
> Thanks
> Tianji
>
>
> On 2017-02-25 13:49 (-0500), Guozhang Wang  wrote:
> > Hello Kohki,>
> >
> > Thanks for the email. I'd like to learn what's your concern of the size
> of>
> > the state store? From your description it's a bit hard to figure out but>
> > I'd guess you have lots of state stores while each of them are
> relatively>
> > small?>
> >
> > Hello Tianji,>
> >
> > Regarding your question about maturity and users of Streams, you can
> take a>
> > look at a bunch of the blog posts written about their Streams usage in>
> > production, for example:>
> >
> > http://engineering.skybettingandgaming.com/2017/01/23/
> streaming-architectures/>
> >
> > http://developers.linecorp.com/blog/?p=3960>
> >
> > Guozhang>
> >
> >
> > On Sat, Feb 25, 2017 at 7:52 AM, Kohki Nishio  wrote:>
> >
> > > I did a bit of research on that matter recently, the comparison is
> between>
> > > Spark Structured Streaming(SSS) and Kafka Streams,>
> > >>
> > > Both are relatively new (~1y) and trying to solve similar problems,
> however>
> > > if you go with Spark, you have to go with a cluster, if your
> environment>
> > > already have a cluster, then it's good. However our team doesn't do
> any>
> > > Spark, so the initial cost would be very high. On the other hand,
> Kafka>
> > > Streams is a java library, since we have a service framework, doing
> stream>
> > > inside a service is super easy.>
> > >>
> > > However for some reason, people see SSS is more mature and Kafka
> Streams is>
> > > not so mature (like Beta). But old fashion stream is both mature
> enough (in>
> > > my opinion), I didn't see any difference in DStream(Spark) and>
> > > KStream(Kafka)>
> > >>
> > > DataFrame (Structured Streaming) and KTable, I found it quite
> different.>
> > > Kafka's model is more like a change log, that means you need to see
> the>
> > > latest entry to make a final decision. I would call this as 'Update'
> model,>
> > > whereas Spark does 'Append' model and it doesn't support 'Update'
> model>
> > > yet. (it's coming to 2.2)>
> > >>
> > > http://spark.apache.org/docs/latest/structured-streaming-pro>
> > > gramming-guide.html#output-modes>
> > >>
> > > I wanted to have 'Append' model with Kafka, but it seems it's not easy>
> > > thing to do, also Kafka Streams uses an internal topic to keep state>
> > > changes for fail-over scenario, but I'm dealing with a lots of tiny>
> > > information and I have a big concern about the size of the state store
> />
> > > topic, so my decision is that I'm going with my own handling of Kafka
> API>
> > > ..>
> > >>
> > > If you do stateless 

Re: Kafka Streams vs Spark Streaming

2017-02-26 Thread Kohki Nishio
Guozhang,

Let me explain what I'm trying to do. The message volume is large (TB per
Day) and that is coming to a topic. Now I want to do per minute
aggregation(Windowed) and send the output to the downstream (a topic)

(Topic1 - Large Volume) -> [Stream App] -> (Topic2 - Large Volume)

I assume the internal topic would have the same amount of data as topic1
and the same goes to the local store, I know we can tweak retention period
but network traffic would be same (or even more)

The key point here is that most of incoming stream will end up a single
data point per a minute (aggregation window), but the variance of the key
is huge (high cardinality), then buffering wouldn't really help reduce the
data/traffic size.

I'm going to do something like per partition watermarking with offset
metadata. It wouldn't increase the traffic that much
thanks
-Kohki


Re: Kafka Streams vs Spark Streaming

2017-02-25 Thread Tianji Li

Hi Kohki,

Thanks very much for providing your investigation results. Regarding 
'append' mode with Kafka Streams, isn't KStream the thing you want?


Hi Guozhang,

Thanks for the pointers to the two blogs. I read one of them before and 
just had a look at the other one.


What I am hoping to do is below, can you help me decide if Kafka Stream 
is a good fit?


We have a few data sources, and we are hoping to correlate these 
sources, and then do aggregations, as *a stream in real-time*.


The number of aggregations is around 100 which means, if using Kafka 
Streams, we need to maintain around 100 state stores with 100 change-log 
topics behind

the scene when joining and aggregations.

The number of unique entries in each of these state stores is expected 
to be at the level of < 100M. The size of each record is around 1K bytes 
and so,
each state is expected to be ~100G bytes in size. The total number of 
bytes in all these state stores is thus around 10T bytes.


If keeping all these stores in memory, this translates into around 50 
machines with 256Gbytes for this purpose alone.


Plus, the incoming raw data rate could reach 10M records per second in 
peak hours. So, during aggregation, data movement between Kafka Streams 
instances
will be heavy, i.e., 10M records per second in the cluster for joining 
and aggregations.


Is Kafka Streams good for this? My gut feeling is Kafka Streams is fine. 
But I'd like to run this by you.


And, I am hoping to minimize data movement (to saving bandwidth) during 
joins/groupBys. If I partition the raw data with the minimum subset of 
aggregation keys (say K1 and K2),  then I wonder if the following 
joins/groupBys (say on keys K1, K2, K3, K4) happen on local data, if 
using DSL?


Thanks
Tianji


On 2017-02-25 13:49 (-0500), Guozhang Wang  wrote:
> Hello Kohki,>
>
> Thanks for the email. I'd like to learn what's your concern of the 
size of>
> the state store? From your description it's a bit hard to figure out 
but>
> I'd guess you have lots of state stores while each of them are 
relatively>

> small?>
>
> Hello Tianji,>
>
> Regarding your question about maturity and users of Streams, you can 
take a>

> look at a bunch of the blog posts written about their Streams usage in>
> production, for example:>
>
> 
http://engineering.skybettingandgaming.com/2017/01/23/streaming-architectures/> 


>
> http://developers.linecorp.com/blog/?p=3960>
>
> Guozhang>
>
>
> On Sat, Feb 25, 2017 at 7:52 AM, Kohki Nishio  wrote:>
>
> > I did a bit of research on that matter recently, the comparison is 
between>

> > Spark Structured Streaming(SSS) and Kafka Streams,>
> >>
> > Both are relatively new (~1y) and trying to solve similar problems, 
however>
> > if you go with Spark, you have to go with a cluster, if your 
environment>
> > already have a cluster, then it's good. However our team doesn't do 
any>
> > Spark, so the initial cost would be very high. On the other hand, 
Kafka>
> > Streams is a java library, since we have a service framework, doing 
stream>

> > inside a service is super easy.>
> >>
> > However for some reason, people see SSS is more mature and Kafka 
Streams is>
> > not so mature (like Beta). But old fashion stream is both mature 
enough (in>

> > my opinion), I didn't see any difference in DStream(Spark) and>
> > KStream(Kafka)>
> >>
> > DataFrame (Structured Streaming) and KTable, I found it quite 
different.>
> > Kafka's model is more like a change log, that means you need to see 
the>
> > latest entry to make a final decision. I would call this as 
'Update' model,>
> > whereas Spark does 'Append' model and it doesn't support 'Update' 
model>

> > yet. (it's coming to 2.2)>
> >>
> > http://spark.apache.org/docs/latest/structured-streaming-pro>
> > gramming-guide.html#output-modes>
> >>
> > I wanted to have 'Append' model with Kafka, but it seems it's not 
easy>

> > thing to do, also Kafka Streams uses an internal topic to keep state>
> > changes for fail-over scenario, but I'm dealing with a lots of tiny>
> > information and I have a big concern about the size of the state 
store />
> > topic, so my decision is that I'm going with my own handling of 
Kafka API>

> > ..>
> >>
> > If you do stateless operation and don't have a spark cluster, yeah 
Kafka>

> > Streams is perfect.>
> > If you do stateful complicated operation and happen to have a spark>
> > cluster, give Spark a try>
> > else you have to write a code which is optimized for your use case>
> >>
> >>
> > thanks>
> > -Kohki>
> >>
> >>
> >>
> >>
> > On Fri, Feb 24, 2017 at 6:22 PM, Tianji Li  wrote:>
> >>
> > > Hi there,>
> > >>
> > > Can anyone give a good explanation in what cases Kafka Streams is>
> > > preferred, and in what cases Sparking Streaming is better?>
> > >>
> > > Thanks>
> > > Tianji>
> > >>
> >>
> >>
> >>
> > -->
> > Kohki Nishio>
> >>
>
>
>
> -- >
> -- Guozhang>
>


Re: Kafka Streams vs Spark Streaming

2017-02-25 Thread Guozhang Wang
Hello Kohki,

Thanks for the email. I'd like to learn what's your concern of the size of
the state store? From your description it's a bit hard to figure out but
I'd guess you have lots of state stores while each of them are relatively
small?

Hello Tianji,

Regarding your question about maturity and users of Streams, you can take a
look at a bunch of the blog posts written about their Streams usage in
production, for example:

http://engineering.skybettingandgaming.com/2017/01/23/streaming-architectures/

http://developers.linecorp.com/blog/?p=3960

Guozhang


On Sat, Feb 25, 2017 at 7:52 AM, Kohki Nishio  wrote:

> I did a bit of research on that matter recently, the comparison is between
> Spark Structured Streaming(SSS) and Kafka Streams,
>
> Both are relatively new (~1y) and trying to solve similar problems, however
> if you go with Spark, you have to go with a cluster, if your environment
> already have a cluster, then it's good. However our team doesn't do any
> Spark, so the initial cost would be very high. On the other hand, Kafka
> Streams is a java library, since we have a service framework, doing stream
> inside a service is super easy.
>
> However for some reason, people see SSS is more mature and Kafka Streams is
> not so mature (like Beta). But old fashion stream is both mature enough (in
> my opinion), I didn't see any difference in DStream(Spark) and
> KStream(Kafka)
>
> DataFrame (Structured Streaming) and KTable, I found it quite different.
> Kafka's model is more like a change log, that means you need to see the
> latest entry to make a final decision. I would call this as 'Update' model,
> whereas Spark does 'Append' model and it doesn't support 'Update' model
> yet. (it's coming to 2.2)
>
> http://spark.apache.org/docs/latest/structured-streaming-pro
> gramming-guide.html#output-modes
>
> I wanted to have 'Append' model with Kafka, but it seems it's not easy
> thing to do, also Kafka Streams uses an internal topic to keep state
> changes for fail-over scenario, but I'm dealing with a lots of tiny
> information and I have a big concern about the size of the state store /
> topic, so my decision is that I'm going with my own handling of Kafka API
> ..
>
> If you do stateless operation and don't have a spark cluster, yeah Kafka
> Streams is perfect.
> If you do stateful complicated operation and happen to have a spark
> cluster, give Spark a try
> else you have to write a code which is optimized for your use case
>
>
> thanks
> -Kohki
>
>
>
>
> On Fri, Feb 24, 2017 at 6:22 PM, Tianji Li  wrote:
>
> > Hi there,
> >
> > Can anyone give a good explanation in what cases Kafka Streams is
> > preferred, and in what cases Sparking Streaming is better?
> >
> > Thanks
> > Tianji
> >
>
>
>
> --
> Kohki Nishio
>



-- 
-- Guozhang


Re: Kafka Streams vs Spark Streaming

2017-02-25 Thread Kohki Nishio
I did a bit of research on that matter recently, the comparison is between
Spark Structured Streaming(SSS) and Kafka Streams,

Both are relatively new (~1y) and trying to solve similar problems, however
if you go with Spark, you have to go with a cluster, if your environment
already have a cluster, then it's good. However our team doesn't do any
Spark, so the initial cost would be very high. On the other hand, Kafka
Streams is a java library, since we have a service framework, doing stream
inside a service is super easy.

However for some reason, people see SSS is more mature and Kafka Streams is
not so mature (like Beta). But old fashion stream is both mature enough (in
my opinion), I didn't see any difference in DStream(Spark) and
KStream(Kafka)

DataFrame (Structured Streaming) and KTable, I found it quite different.
Kafka's model is more like a change log, that means you need to see the
latest entry to make a final decision. I would call this as 'Update' model,
whereas Spark does 'Append' model and it doesn't support 'Update' model
yet. (it's coming to 2.2)

http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes

I wanted to have 'Append' model with Kafka, but it seems it's not easy
thing to do, also Kafka Streams uses an internal topic to keep state
changes for fail-over scenario, but I'm dealing with a lots of tiny
information and I have a big concern about the size of the state store /
topic, so my decision is that I'm going with my own handling of Kafka API ..

If you do stateless operation and don't have a spark cluster, yeah Kafka
Streams is perfect.
If you do stateful complicated operation and happen to have a spark
cluster, give Spark a try
else you have to write a code which is optimized for your use case


thanks
-Kohki




On Fri, Feb 24, 2017 at 6:22 PM, Tianji Li  wrote:

> Hi there,
>
> Can anyone give a good explanation in what cases Kafka Streams is
> preferred, and in what cases Sparking Streaming is better?
>
> Thanks
> Tianji
>



-- 
Kohki Nishio