Re: sliding ktable?

2016-11-12 Thread Matthias J. Sax
ach time you append
>> or
> 
>> lookup the list).
> 
> 
> 
> (2) Once I determine a key can be deleted, can I tombstone record
> programmatically in the same process?  I am not clear on how.  My
> input topic is created by a Kafka Connect configuration (no code).
> 
> 
> 
> If interested here is the main part of code so far…
> 
> 
> 
> KStream<GenericRecord, GenericRecord> txnStream =
> builder.stream("LANE_TRANSACTIONS");
> 
> 
> 
> // Create new ktable from original.
> 
> KTable<GenericRecord, Long> countTableStream =  txnStream
> 
> .map((dummy, value) -> {
> 
> return new KeyValue<>(value, value);
> 
> })
> 
> .countByKey(valueAvroSerde, "CountTable");
> 
> 
> 
> // aggregate ktable
> 
> KTable<String, ArrayList> lookup = countTableStream
> 
> .groupBy((key, value) -> {
> 
> return new KeyValue(key.get("LANE_ID").toString(), key);
> 
> },
> 
> stringSerdeKey,
> 
> valueAvroSerde)
> 
> .aggregate(
> 
> () -> { return new ArrayList<>(); },
> 
> (key, record, list) ->
> 
> {
> 
> list.add(record);
> 
> return list;
> 
> },
> 
> (key, record, list) ->
> 
> {
> 
> // list.remove(record);  // this is always
> executed, not sure why
> 
> return list;
> 
> },
> 
> new ArrayListSerde<>(keyAvroSerde,valueAvroSerde),
> "TxnMapByTagPlaza");
> 
> 
> 
> ArrayListSerde arrayListSerde = new
> ArrayListSerde<>(keyAvroSerde,valueAvroSerde);
> 
> lookup.through(stringSerdeKey,arrayListSerde,"DUPLICATE_LOOKUPS_STREAM
");
>
> 
> 
> 
> 
> 
> // Create new stream from original.
> 
> KStream<String, GenericRecord> txnStreamFull =
> 
> txnStream.map((key, record) -> {
> 
> return new KeyValue<>(record.get("LANE_ID").toString(),record);
> 
> })
> 
> .through(stringSerdeKey,valueAvroSerde,"RekeyedIntermediateTopic19")
>
>  ;
> 
> 
> 
> // Join stream to lookup ktable.
> 
> // Right now just evaluating results.
> 
> KStream<String, GenericRecord> duplicatesStream =
> 
> txnStreamFull.leftJoin(lookup,(vTxnStream, vLookupTable) -> {
> 
> 
> 
> // this is where vLookupTable list will be traversed to determine
> any duplicates that match vTxnStream value.
> 
> 
> 
> // from inspecting JOINED objects I can see  vLookupTable values
> are late compared to txnSteamFull stream values.
> 
> 
> 
> return vTxnStream;
> 
> });
> 
> 
> 
> Know that I have spent time reading the docs / google searches, but
> still just not all clear to me, and perhaps there are other docs I
> am not finding.
> 
> 
> 
> (3) Just wanted to get opinion also, is this the type of processing
> I should be doing with kafka streams and ktables or is this
> something that might be better solved using a traditional
> database?
> 
> 
> 
> Thanks again for any insight,
> 
> 
> 
> John
> 
> 
> 
> 
> 
> 
> 
> -Original Message-
> 
> From: R Krishna [mailto:krishna...@gmail.com]
> 
> Sent: Tuesday, November 08, 2016 1:44 AM
> 
> To: users@kafka.apache.org<mailto:users@kafka.apache.org>
> 
> Subject: Re: sliding ktable?
> 
> 
> 
> There is a problem with tombstoning old entries based on a new
> entry, that, the keys which have no new entries will remain there
> forever.
> 
> 
> 
> On Mon, Nov 7, 2016 at 9:38 AM, Matthias J. Sax
> <matth...@confluent.io<mailto:matth...@confluent.io>>
> 
> wrote:
> 
> 
> 
> Hash: SHA512
> 
> 
> 
> John,
> 
> 
> 
> your thinking is on the right track!
> 
> 
> 
> About infinitely growing KTable: It seems you are extending each
> lane
> 
> with a list of all txnId -- so your view needs infinite memory as
> you
> 
> expend your values... A quick fix might be, to delete older txnID
> for
> 
> this list, each time you update the list (as you mentioned you
> only
> 
> need data for the last two weeks -- you might need to add a
> timestamp
> 
> for each txnID in the list to do the pruning each time you append
> or
> 
> lookup the list).
> 
> 
> 
>>>> Ideally if the topic is set to two weeks retention, then once
>>>> an
> 
>>>> item is 'popped off' I would like to do an aggregate
>>>> subtraction for
> 
>>>> it's value.  But I don't think this is how kafka works.  Is
> 
>>>> this possible?  Any other feedback/suggestion?Perhaps a
>>

RE: sliding ktable?

2016-11-11 Thread John Hayles


 return vTxnStream;

});



Know that I have spent time reading the docs / google searches, but still just 
not all clear to me, and perhaps there are other docs I am not finding.



(3) Just wanted to get opinion also, is this the type of processing I should be 
doing with kafka streams and ktables or is this something that might be better 
solved using a traditional database?



Thanks again for any insight,



John







-Original Message-----

From: R Krishna [mailto:krishna...@gmail.com]

Sent: Tuesday, November 08, 2016 1:44 AM

To: users@kafka.apache.org<mailto:users@kafka.apache.org>

Subject: Re: sliding ktable?



There is a problem with tombstoning old entries based on a new entry, that, the 
keys which have no new entries will remain there forever.



On Mon, Nov 7, 2016 at 9:38 AM, Matthias J. Sax 
<matth...@confluent.io<mailto:matth...@confluent.io>>

wrote:



> -BEGIN PGP SIGNED MESSAGE-

> Hash: SHA512

>

> John,

>

> your thinking is on the right track!

>

> About infinitely growing KTable: It seems you are extending each lane

> with a list of all txnId -- so your view needs infinite memory as you

> expend your values... A quick fix might be, to delete older txnID for

> this list, each time you update the list (as you mentioned you only

> need data for the last two weeks -- you might need to add a timestamp

> for each txnID in the list to do the pruning each time you append or

> lookup the list).

>

> > Ideally if the topic is set to two weeks retention, then once an

> > item is 'popped off' I would like to do an aggregate subtraction for

> > it's value.  But I don't think this is how kafka works.  Is

> > this possible?  Any other feedback/suggestion?Perhaps a better

> > approach?

>

> There is no Kafka support for this. You would need to go with the

> suggest as describe above. The only "delete" mechanism Kafka offers is

> for compacted topics via tombstone message (ie, message with

>  format; value == null). However, tombstones do delete the

> whole record with this key, thus I doubt they are useful for your case.

>

> However, reading through your email, I am wondering why you do need

> the all old txnIds. You mentioned that you want to get the previous

> txnId for each duplicate (and you example results verifies this).

> Thus, it would be sufficient to only store the latest tnxId for each

> "lane" IMHO. Furhtermore, for this deduplication it seems sufficient

> to only use a KTable without a join.

>

> The ideas would the as follows: You consumer you streams as a

> changelog (ie, KTable). For each record, you check if there is an

> entry in the view. If not, just put the record itself as result

> because there is no duplicate. If you do find an entry, the current

> record is a duplicate of the record found. The record found, does

> contain it's txnId, you so can use this as "previous txnId". As

> result, you store the current record. You data format would be like

> <lane:(txnId,txnDate)> (for input) and

> <lane:(txnId,txnDate,previoudTxnId)> (for output.

>

> You stream and view would be like:

>

> {'c',('03','11/07/2016')} plus state: EMPTY

>

>   => {'c',('03','11/07/2016',''}// this is output and state update

> at the same time

>

>

>

> {'c',('09','11/07/2016')} plus state: {'c',('03','11/07/2016',null}

>

>   => {'c',('09','11/07/2016','03')} // this is output and state

> update at the same time

>

>

>

> {'c',('11','11/08/2016')} plus state: {'c',('09','11/07/2016','03')}

>

>  => {'c',('11','11/08/2016','09')} // this is output and state

> update at the same time

>

>

> - -Matthias

>

> On 11/7/16 8:22 AM, John Hayles wrote:

> > Thanks for the reply.  I really appreciate the insight. Again newbie

> > here.  I want to expand on what I am struggling with.  It may be

> > that I just need to get my mind thinking more in a streaming mode.

> > Please let me know you thoughts.  Just having problem ‘getting it’

> > on my own.

> >

> >

> >

> > Below is a simple topic I want to identify where the 'lane'

> > duplicates, and when it does get the 'txnId' of the duplicate

> > record.  The txnId is distinct and will never be duplicate.  The

> > lane will seldom have a duplicate.

> >

> >

> >

> >

> >

> > Topic payload {txnId,lane,txnDate}  Notice lane 'c' is dulplicated

> > 3 times.

> >

> >

> >

> > {'01','wfasd','11/07/2016'}

> >

> > {'02','bas','11/07/2016'}

> >

> > {

Re: sliding ktable?

2016-11-08 Thread R Krishna
Yes, thanks.


Re: sliding ktable?

2016-11-08 Thread Matthias J. Sax
;> {'06','drdd','11/07/2016'}
>>>> 
>>>> {'07','tasd','11/07/2016'}
>>>> 
>>>> {'08','ywq','11/07/2016'}
>>>> 
>>>> {'09','c','11/07/2016'}
>>>> 
>>>> {'10','jda','11/07/2016'}
>>>> 
>>>> {'11','c','11/08/2016'}
>>>> 
>>>> {'12','ozs','11/09/2016'}
>>>> 
>>>> . . .
>>>> 
>>>> Note txnId and lane keep getting more distinct values.
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> My thought is to join the data to itself,  one as kstream the
>>>> other as ktable for lookups.
>>>> 
>>>> 
>>>> 
>>>> kstream as
>>>> 
>>>> 
>>>> 
>>>> {lane:(txnId,txnDate)}
>>>> 
>>>> 
>>>> 
>>>> so I visualize like ...
>>>> 
>>>> 
>>>> 
>>>> ('wfasd':('01','11/07/2016')),
>>>> 
>>>> ('bas'  :('02','11/07/2016')),
>>>> 
>>>> ('c':('03','11/07/2016')), ...
>>>> 
>>>> 
>>>> 
>>>> The ktable (lookup table) is an aggregate view I built to
>>>> hold historic data by lane:
>>>> 
>>>> 
>>>> 
>>>> (lane:{(txnId1,txnDate1),
>>>> 
>>>> (txnId2,txnDate2),
>>>> 
>>>> . . .})
>>>> 
>>>> 
>>>> 
>>>> I visualize the materialized view as below.
>>>> 
>>>> 'c' being the important key/value for this example...
>>>> 
>>>> Also note this materialized view will keep growing without
>>>> bound.
>>>> 
>>>> There will always be new keys and txnIds.
>>>> 
>>>> 
>>>> 
>>>> ('wfasd':{('01','11/07/2016')}),
>>>> 
>>>> ('bas'  :{('02','11/07/2016')}),
>>>> 
>>>> ('c':{('03','11/07/2016'),
>>>> 
>>>> ('09','11/07/2016'),
>>>> 
>>>> ('11','11/09/2016')})
>>>> 
>>>> . . .
>>>> 
>>>> 
>>>> 
>>>> Now I can join a kstream to ktable on lane, and duplicates
>>>> are easy to identify.  I can traverse list from value found
>>>> in materialized view to get previous txnId I need.
>>>> 
>>>> 
>>>> 
>>>> So I can build resulting stream / topic like…
>>>> 
>>>> 
>>>> 
>>>> {txnId,lane,txnDate,duplicateTxnId}
>>>> 
>>>> 
>>>> 
>>>> note where c duplicates there is a duplicate txnId...
>>>> 
>>>> 
>>>> 
>>>> {'01','wfasd','11/07/2016',''}
>>>> 
>>>> {'02','bas','11/07/2016',''}
>>>> 
>>>> {'03','c','11/07/2016',''}
>>>> 
>>>> {'04','xxwq','11/07/2016',''}
>>>> 
>>>> {'05','dasf','11/07/2016',''}
>>>> 
>>>> {'06','drdd','11/07/2016',''}
>>>> 
>>>> {'07','tasd','11/07/2016',''}
>>>> 
>>>> {'08','ywq','11/07/2016',''}
>>>> 
>>>> {'09','c','11/07/2016','03'}
>>>> 
>>>> {'10','jda','11/07/2016',''}
>>>> 
>>>> {'11','c','11/08/2016','09'}
>>>> 
>>>> {'12','ozs','11/09/2016',''}
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> The issue is the materialized view of the ktable keeps
>>>> growing without bound, however by business rule I only need
>>>> past 2 weeks, so I think over time there is performance
>>>> impact that is not needed regarding the materialized view,
>>>> one, the size of materialized view keeps growing, and two,
>>>> traversing ever increasing larger value lists.
>>>> 
>>>> 
>>>> 
>>>> Ideally if the topic is set to two weeks retention, then once
>>>> an item is 'popped off' I would like to do an aggregate
>>>> subtraction for it's value.  But I don't think this is how
>>>> kafka works.  Is this possible?  Any other
>>>> feedback/suggestion?Perhaps a better approach?
>>>> 
>>>> 
>>>> 
>>>> Thanks
>>>> 
>>>> John
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>

Re: sliding ktable?

2016-11-07 Thread R Krishna
 visualize like ...
> >
> >
> >
> > ('wfasd':('01','11/07/2016')),
> >
> > ('bas'  :('02','11/07/2016')),
> >
> > ('c':('03','11/07/2016')), ...
> >
> >
> >
> > The ktable (lookup table) is an aggregate view I built to hold
> > historic data by lane:
> >
> >
> >
> > (lane:{(txnId1,txnDate1),
> >
> > (txnId2,txnDate2),
> >
> > . . .})
> >
> >
> >
> > I visualize the materialized view as below.
> >
> > 'c' being the important key/value for this example...
> >
> > Also note this materialized view will keep growing without bound.
> >
> > There will always be new keys and txnIds.
> >
> >
> >
> > ('wfasd':{('01','11/07/2016')}),
> >
> > ('bas'  :{('02','11/07/2016')}),
> >
> > ('c':{('03','11/07/2016'),
> >
> > ('09','11/07/2016'),
> >
> > ('11','11/09/2016')})
> >
> > . . .
> >
> >
> >
> > Now I can join a kstream to ktable on lane, and duplicates are easy
> > to identify.  I can traverse list from value found in materialized
> > view to get previous txnId I need.
> >
> >
> >
> > So I can build resulting stream / topic like…
> >
> >
> >
> > {txnId,lane,txnDate,duplicateTxnId}
> >
> >
> >
> > note where c duplicates there is a duplicate txnId...
> >
> >
> >
> > {'01','wfasd','11/07/2016',''}
> >
> > {'02','bas','11/07/2016',''}
> >
> > {'03','c','11/07/2016',''}
> >
> > {'04','xxwq','11/07/2016',''}
> >
> > {'05','dasf','11/07/2016',''}
> >
> > {'06','drdd','11/07/2016',''}
> >
> > {'07','tasd','11/07/2016',''}
> >
> > {'08','ywq','11/07/2016',''}
> >
> > {'09','c','11/07/2016','03'}
> >
> > {'10','jda','11/07/2016',''}
> >
> > {'11','c','11/08/2016','09'}
> >
> > {'12','ozs','11/09/2016',''}
> >
> >
> >
> >
> >
> > The issue is the materialized view of the ktable keeps growing
> > without bound, however by business rule I only need past 2 weeks,
> > so I think over time there is performance impact that is not needed
> > regarding the materialized view, one, the size of materialized view
> > keeps growing, and two, traversing ever increasing larger value
> > lists.
> >
> >
> >
> > Ideally if the topic is set to two weeks retention, then once an
> > item is 'popped off' I would like to do an aggregate subtraction
> > for it's value.  But I don't think this is how kafka works.  Is
> > this possible?  Any other feedback/suggestion?Perhaps a better
> > approach?
> >
> >
> >
> > Thanks
> >
> > John
> >
> >
> >
> >
> >
> >
> >
> > -Original Message-
> >
> > From: Matthias J. Sax [mailto:matth...@confluent.io]
> >
> > Sent: Thursday, November 03, 2016 4:29 PM
> >
> > To: users@kafka.apache.org<mailto:users@kafka.apache.org>
> >
> > Subject: Re: sliding ktable?
> >
> >
> >
> > Hash: SHA512
> >
> >
> >
> > Hi John,
> >
> >
> >
> > first of all, a KTable is a (changelog) stream; thus, by definition
> > it is infinite.
> >
> >
> >
> > However, I assume you are worried about the internal materialized
> > view, of the changelog stream (ie, a table state). This view only
> > contains the latest value for each key, ie, a single entry for each
> > key. Thus, it's size is bound by the number of key and does not
> > change as long as you number of distinct keys does not change.
> >
> >
> >
> >> At any given time I need at least 2 weeks data in my ktable
> >
> >
> >
> > There is no such think as "data of the last 2 weeks":
> >
> >
> >
> > Using a KTable for a KStream-KTable join to do lookups, each lookup
> > will be done on the current state if the KTable and thus only
> > return a single value for each key. There is no old data in the
> > materialized view with this regard. Of course, if a key does not
> > get any update for a long time, you can consider the corresponding
> > value as old, but it is still the latest (ie, current) value for
> > the key.
> >
> >
> >
> >> ktable.foreach
> >
> >
> >
> > #foreach() is applied to the changelog stream and not the
> > internally materialized view. Thus, it does not scan over the key
&

Re: sliding ktable?

2016-11-07 Thread Matthias J. Sax
t; 
> . . .
> 
> 
> 
> Now I can join a kstream to ktable on lane, and duplicates are easy
> to identify.  I can traverse list from value found in materialized
> view to get previous txnId I need.
> 
> 
> 
> So I can build resulting stream / topic like…
> 
> 
> 
> {txnId,lane,txnDate,duplicateTxnId}
> 
> 
> 
> note where c duplicates there is a duplicate txnId...
> 
> 
> 
> {'01','wfasd','11/07/2016',''}
> 
> {'02','bas','11/07/2016',''}
> 
> {'03','c','11/07/2016',''}
> 
> {'04','xxwq','11/07/2016',''}
> 
> {'05','dasf','11/07/2016',''}
> 
> {'06','drdd','11/07/2016',''}
> 
> {'07','tasd','11/07/2016',''}
> 
> {'08','ywq','11/07/2016',''}
> 
> {'09','c','11/07/2016','03'}
> 
> {'10','jda','11/07/2016',''}
> 
> {'11','c','11/08/2016','09'}
> 
> {'12','ozs','11/09/2016',''}
> 
> 
> 
> 
> 
> The issue is the materialized view of the ktable keeps growing
> without bound, however by business rule I only need past 2 weeks,
> so I think over time there is performance impact that is not needed
> regarding the materialized view, one, the size of materialized view
> keeps growing, and two, traversing ever increasing larger value
> lists.
> 
> 
> 
> Ideally if the topic is set to two weeks retention, then once an
> item is 'popped off' I would like to do an aggregate subtraction
> for it's value.  But I don't think this is how kafka works.  Is
> this possible?  Any other feedback/suggestion?Perhaps a better
> approach?
> 
> 
> 
> Thanks
> 
> John
> 
> 
> 
> 
> 
> 
> 
> -Original Message-
> 
> From: Matthias J. Sax [mailto:matth...@confluent.io]
> 
> Sent: Thursday, November 03, 2016 4:29 PM
> 
> To: users@kafka.apache.org<mailto:users@kafka.apache.org>
> 
> Subject: Re: sliding ktable?
> 
> 
> 
> Hash: SHA512
> 
> 
> 
> Hi John,
> 
> 
> 
> first of all, a KTable is a (changelog) stream; thus, by definition
> it is infinite.
> 
> 
> 
> However, I assume you are worried about the internal materialized
> view, of the changelog stream (ie, a table state). This view only
> contains the latest value for each key, ie, a single entry for each
> key. Thus, it's size is bound by the number of key and does not
> change as long as you number of distinct keys does not change.
> 
> 
> 
>> At any given time I need at least 2 weeks data in my ktable
> 
> 
> 
> There is no such think as "data of the last 2 weeks":
> 
> 
> 
> Using a KTable for a KStream-KTable join to do lookups, each lookup
> will be done on the current state if the KTable and thus only
> return a single value for each key. There is no old data in the
> materialized view with this regard. Of course, if a key does not
> get any update for a long time, you can consider the corresponding
> value as old, but it is still the latest (ie, current) value for
> the key.
> 
> 
> 
>> ktable.foreach
> 
> 
> 
> #foreach() is applied to the changelog stream and not the
> internally materialized view. Thus, it does not scan over the key
> space or is applied to each currently stored key in the view. It is
> rather called for each update record that is in the changelog
> stream.
> 
> 
> 
>> not sure keys can be removed this way
> 
> 
> 
> The only way to delete a key-value entry in the materialized view
> is to send a so-called tombstone record with format  (ie,
> value is null). By "send" I mean that this tombstone record must be
> in the input of the KTable.
> 
> 
> 
> 
> 
> 
> 
> -Matthias
> 
> 
> 
> 
> 
> On 11/3/16 12:39 PM, John Hayles wrote:
> 
>> Newbie here, I am working with Kafka Streams with java 1.8.
> 
> 
> 
> 
> 
> 
> 
>> I want to use the ktable as a lookup table in a join to a
>> kstream.
> 
>> I had no issue implementing this.  However, I do not want the
>> ktable
> 
>> to grow without bounds, I want to limit the ktable to the past 2
>> weeks
> 
>> data, more of a 'sliding' window ktable.  At any given time I
>> need at
> 
>> least 2 weeks data in my ktable, so I don’t think solution like
> 
>> tumbling table will work since it starts over every time it
>> hops.
> 
> 
> 
> 
> 
> 
> 
>> A little simplified example. . .
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
>> KStream<String, GenericRecord> txnStream =
>> builder.stream("TXN_DATA");
> 
> 
> 
> 
> 
> 
> 
>> KStream<String, GenericRecord> txnStreamFull = txnStream
> 
> 
> 

RE: sliding ktable?

2016-11-07 Thread John Hayles
Thanks for the reply.  I really appreciate the insight. Again newbie here.  I 
want to expand on what I am struggling with.  It may be that I just need to get 
my mind thinking more in a streaming mode.  Please let me know you thoughts.  
Just having problem ‘getting it’ on my own.



Below is a simple topic I want to identify where the 'lane' duplicates, and 
when it does get the 'txnId' of the duplicate record.  The txnId is distinct 
and will never be duplicate.  The lane will seldom have a duplicate.





Topic payload {txnId,lane,txnDate}  Notice lane 'c' is dulplicated 3 times.



{'01','wfasd','11/07/2016'}

{'02','bas','11/07/2016'}

{'03','c','11/07/2016'}

{'04','xxwq','11/07/2016'}

{'05','dasf','11/07/2016'}

{'06','drdd','11/07/2016'}

{'07','tasd','11/07/2016'}

{'08','ywq','11/07/2016'}

{'09','c','11/07/2016'}

{'10','jda','11/07/2016'}

{'11','c','11/08/2016'}

{'12','ozs','11/09/2016'}

. . .

Note txnId and lane keep getting more distinct values.





My thought is to join the data to itself,  one as kstream the other as ktable 
for lookups.



kstream as



  {lane:(txnId,txnDate)}



  so I visualize like ...



 ('wfasd':('01','11/07/2016')),

  ('bas'  :('02','11/07/2016')),

 ('c':('03','11/07/2016')), ...



The ktable (lookup table) is an aggregate view I built to hold historic data by 
lane:



 (lane:{(txnId1,txnDate1),

   (txnId2,txnDate2),

   . . .})



  I visualize the materialized view as below.

  'c' being the important key/value for this example...

  Also note this materialized view will keep growing without bound.

  There will always be new keys and txnIds.



  ('wfasd':{('01','11/07/2016')}),

  ('bas'  :{('02','11/07/2016')}),

  ('c':{('03','11/07/2016'),

  ('09','11/07/2016'),

  ('11','11/09/2016')})

  . . .



Now I can join a kstream to ktable on lane, and duplicates are easy to 
identify.  I can traverse list from value found in materialized view to get 
previous txnId I need.



So I can build resulting stream / topic like…



{txnId,lane,txnDate,duplicateTxnId}



note where c duplicates there is a duplicate txnId...



{'01','wfasd','11/07/2016',''}

{'02','bas','11/07/2016',''}

{'03','c','11/07/2016',''}

{'04','xxwq','11/07/2016',''}

{'05','dasf','11/07/2016',''}

{'06','drdd','11/07/2016',''}

{'07','tasd','11/07/2016',''}

{'08','ywq','11/07/2016',''}

{'09','c','11/07/2016','03'}

{'10','jda','11/07/2016',''}

{'11','c','11/08/2016','09'}

{'12','ozs','11/09/2016',''}





The issue is the materialized view of the ktable keeps growing without bound, 
however by business rule I only need past 2 weeks, so I think over time there 
is performance impact that is not needed regarding the materialized view, one, 
the size of materialized view keeps growing, and two, traversing ever 
increasing larger value lists.



Ideally if the topic is set to two weeks retention, then once an item is 
'popped off' I would like to do an aggregate subtraction for it's value.  But I 
don't think this is how kafka works.  Is this possible?  Any other 
feedback/suggestion?Perhaps a better approach?



Thanks

John







-Original Message-

From: Matthias J. Sax [mailto:matth...@confluent.io]

Sent: Thursday, November 03, 2016 4:29 PM

To: users@kafka.apache.org<mailto:users@kafka.apache.org>

Subject: Re: sliding ktable?



-BEGIN PGP SIGNED MESSAGE-

Hash: SHA512



Hi John,



first of all, a KTable is a (changelog) stream; thus, by definition it is 
infinite.



However, I assume you are worried about the internal materialized view, of the 
changelog stream (ie, a table state). This view only contains the latest value 
for each key, ie, a single entry for each key. Thus, it's size is bound by the 
number of key and does not change as long as you number of distinct keys does 
not change.



> At any given time I need at least 2 weeks data in my ktable



There is no such think as "data of the last 2 weeks":



Using a KTable for a KStream-KTable join to do lookups, each lookup will be 
done on the current state if the KTable and thus only return a single value for 
each key. There is no old data in the materialized view with this regard. Of 
course, if a key does not get any update for a long time, you can consider the 
corresponding value as old, but it is still the latest (ie, current) value for 
the key.



> ktable.foreach



#foreach() is applied to the changelog stream and not the internally 
materialized view. Thus, it does not scan over the key space or is applied to 
each currently stored key in the view. It is rather called for each update 
record that is in the changelog stream.



> not sure keys can be removed this way



The only way to delete a key-value entry in the materialized view is to send a 
so-called tombstone record with format  (ie, value is null). By 
"send" I mean that this tombstone record must be in the input of the KTable.







- -Matthias





On 

Re: sliding ktable?

2016-11-03 Thread Matthias J. Sax
-BEGIN PGP SIGNED MESSAGE-
Hash: SHA512

Hi John,

first of all, a KTable is a (changelog) stream; thus, by definition it
is infinite.

However, I assume you are worried about the internal materialized
view, of the changelog stream (ie, a table state). This view only
contains the latest value for each key, ie, a single entry for each
key. Thus, it's size is bound by the number of key and does not change
as long as you number of distinct keys does not change.

> At any given time I need at least 2 weeks data in my ktable

There is no such think as "data of the last 2 weeks":

Using a KTable for a KStream-KTable join to do lookups, each lookup
will be done on the current state if the KTable and thus only return a
single value for each key. There is no old data in the materialized
view with this regard. Of course, if a key does not get any update for
a long time, you can consider the corresponding value as old, but it
is still the latest (ie, current) value for the key.

> ktable.foreach

#foreach() is applied to the changelog stream and not the internally
materialized view. Thus, it does not scan over the key space or is
applied to each currently stored key in the view. It is rather called
for each update record that is in the changelog stream.

> not sure keys can be removed this way

The only way to delete a key-value entry in the materialized view is
to send a so-called tombstone record with format  (ie, value
is null). By "send" I mean that this tombstone record must be in the
input of the KTable.



- -Matthias


On 11/3/16 12:39 PM, John Hayles wrote:
> Newbie here, I am working with Kafka Streams with java 1.8.
> 
> 
> 
> I want to use the ktable as a lookup table in a join to a kstream.
> I had no issue implementing this.  However, I do not want the
> ktable to grow without bounds, I want to limit the ktable to the
> past 2 weeks data, more of a 'sliding' window ktable.  At any given
> time I need at least 2 weeks data in my ktable, so I don’t think
> solution like tumbling table will work since it starts over every
> time it hops.
> 
> 
> 
> A little simplified example. . .
> 
> 
> 
> 
> 
> KStream txnStream =
> builder.stream("TXN_DATA");
> 
> 
> 
> KStream txnStreamFull = txnStream
> 
> .map((key, record) -> {
> 
> return new KeyValue<>(record.get("TXN").toString(), record);
> 
> })
> 
> .through("RekeyedIntermediateTopic1")
> 
> ;
> 
> 
> 
> KTable countTableStream =  txnStream   //  do not want
> this table to grow indefinitely.
> 
> .map((key, record) -> {
> 
> return new KeyValue<>(record.get("TXN").toString(), record);
> 
> })
> 
> .through("RekeyedIntermediateTopic2")
> 
> .countByKey(stringSerdeKey, "DupCountKTable10");
> 
> 
> 
> KStream duplicatesStream =
> 
> txnStreamFull.leftJoin(countTableStream,
> (vTxnStream,vCountTableStream) -> {
> 
> vTxnStream.put("count",
> Long.toString(vCountTableStream.longValue()));
> 
> return vTxnStream;});
> 
> 
> 
> duplicatesStream.to("DUP_TXNS");
> 
> 
> 
> I thought perhaps can schedule ktable.foreach to inspect and clean,
> but not sure keys can be removed this way.
> 
> 
> 
> I may be missing a basic concept here.  I have spent some time
> searching but not finding good answer, thanks for any tips.
> 
> 
> 
> Thanks,
> 
> John
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
-BEGIN PGP SIGNATURE-
Comment: GPGTools - https://gpgtools.org

iQIcBAEBCgAGBQJYG6wiAAoJECnhiMLycopPXr4QAIPS8zpq06TRwkUyJfCqHzvg
xkuaVBDGL5NOG+HvzHi4CHkTYTXV/XPbmDEasEhXjakldZmlnnCv040Zfe63y9g4
tPOUjq1mvoIr7K7gJorZICZFMHwfRfRp1ocyFBW6xjVfryldR/+J7XMEAY+WZmMv
cVaa2Mu1ZexyYdBueQ8rR+wdZ4gA5P7rsQmP2nIRTB1TH872ebMUxGWCNF4ORmkS
rrjYZ56KSCXKfpgPNaVD5OLXpW4GpJwFkc3fUX+qBx4vmxQKm8cN1MrgZh6xnZCg
AR/7gaZsLR2IgEHh8VbMjOVazY1pLqGFLiwEi3gYXWeQ20zDHKa7QCH8O/KpJqQW
U3Q/CV8oqSwRLJkQmWnkVUrwUL05hnjSnz7TDYy2yr+jBE0Lp0pA4PyddqsWVQk2
bCGSrtdQoSaXic1C2v/1ODpDSG+aKk2fz9ZjhcAO9jOUGywKkZN34iBeooOobGC7
8gkolNQOVxJE/MKDRbH/0XpyehYfrIxHUuzIw200Ha9XPgIKyqHLxo4n+BxYEFQx
G6cEMf7r4GHh5/7zNteBmMZNcDwkyWYZeOdhdlglti44NUaa7TpmdLp+6NTZYp+k
UmjO5ZbmGBrmkBBz+2Ma39RR3jDYt8Z+kRlrHBAxWRhgLPwH0ManH3skzPIVY54u
v/kZaji6rOUxUFDsUIYw
=pWF2
-END PGP SIGNATURE-