Hash: SHA512

> Note both ksteam and ktable source are from same topic.   When
> executed (see code below), the joined ktable is 'behind' the
> kstream so lookup will sometimes fail.  I think the ktable is just
> slower to process because it is doing more processing than
> kstream.
> I need to synchronize such that a record must be added to the
> ktable before the join operation occurs for the same record in
> kstream.
> (1) Please give hint if any options to resolve?

Synchronizing KTable and KStream for joins is a know issues. Streams
only provides a best-effort approach right now.

> (2) Once I determine a key can be deleted, can I tombstone record
> programmatically in the same process?  I am not clear on how.  My
> input topic is created by a Kafka Connect configuration (no code).

First, AFAIK your use case, you do not need to send a tombstone to the
input topic, but you your aggregation-result topic. However, this is
not possible programmatically within Streams, as you want to delete
after 2 weeks. However, you can achieve your desired behavior by
manually reconfigure the changelog topic of your list-aggregation. For
each aggregation, Streams creates a topic with name
"<application.id>-<operator-name>-changelog". By default, this topic
is configured to use cleanup policy "compacted". You can reconfigure
it using bin/kafka-topics.sh an set "cleanup.policy=compact,delete" to
also enable retention time. Furthermore, "log.retention.ms" to a ms
value of 2 weeks.

> (3) Just wanted to get opinion also, is this the type of processing
> I should be doing with kafka streams and ktables or is this
> something that might be better solved using a traditional
> database?

For sure, you can handle you use case with Streams. If DSL does not
work for you, you can always use Processor API which give you much
more flexibility to express you logic -- however, it also requires
more coding. You can also mix-and-match DSL and Processor API.

- -Matthias

On 11/11/16 5:42 PM, John Hayles wrote:
> Thanks again for the replies, and the time to suggest alternate
> solution.  I know your time is valuable.
> The reason for the List in the aggregate, which I missed trying to
> keep it simple, is that the order of records added to topic is not
> guaranteed to be chronological. Keeping the list allows to traverse
> and apply more complex business rules to determine the output
> values.
> Topic payload {txnId,lane,txnDate}  Notice lane 'c' is duplicated 3
> times.
> . . .
> {'03','c','11/07/2016  04:00:00'}    // first entry for lane 'c';
> no duplicate
> . . .
> {'11','c','11/08/2016  15:30:00'}     // second entry for lane 'c';
> no duplicate
> . . .
> {'09','c','11/07/2016  04:00:01'}     // third entry for lane 'c' -
> added to topic out of chronological order - is duplicate of txnId
> 03 even though time is slightly different
> . . .
> Resulting materialized view.
> 'c':{('03','11/07/2016 04:00:00') ,
> ('11','11/08/2016 15:30:00') ,
> ('09','11/07/2016 04:00:01')}
> When the kstream is joined to lookup ktable defined above, txnId
> '03' should be identified as duplicate to ‘09’.  So, output topic
> like...
> Topic payload {txnId,lane,txnDate,duplicateTxnId}
> . . .
> {'03','c','11/07/2016  04:00:00',''}
> . .
> {'11','c','11/08/2016  15:30:00',''}
> . . .
> {'09','c','11/07/2016  04:00:01','03'}     // note the duplicate
> value ‘03’ is not from the most recent
> . . .
> Note both ksteam and ktable source are from same topic.   When
> executed (see code below), the joined ktable is 'behind' the
> kstream so lookup will sometimes fail.  I think the ktable is just
> slower to process because it is doing more processing than
> kstream.
> I need to synchronize such that a record must be added to the
> ktable before the join operation occurs for the same record in
> kstream.
> (1) Please give hint if any options to resolve?
> Also, from below comments...
>> About infinitely growing KTable: It seems you are extending each
>> lane
>> with a list of all txnId -- so your view needs infinite memory as
>> you
>> expend your values... A quick fix might be, to delete older txnID
>> for
>> this list, each time you update the list (as you mentioned you
>> only
>> need data for the last two weeks -- you might need to add a
>> timestamp
>> for each txnID in the list to do the pruning each time you append
>> or
>> lookup the list).
> (2) Once I determine a key can be deleted, can I tombstone record
> programmatically in the same process?  I am not clear on how.  My
> input topic is created by a Kafka Connect configuration (no code).
> If interested here is the main part of code so far…
> KStream<GenericRecord, GenericRecord> txnStream =
> builder.stream("LANE_TRANSACTIONS");
> // Create new ktable from original.
> KTable<GenericRecord, Long> countTableStream =  txnStream
> .map((dummy, value) -> {
> return new KeyValue<>(value, value);
> })
> .countByKey(valueAvroSerde, "CountTable");
> // aggregate ktable
> KTable<String, ArrayList<GenericRecord>> lookup = countTableStream
> .groupBy((key, value) -> {
> return new KeyValue(key.get("LANE_ID").toString(), key);
> },
> stringSerdeKey,
> valueAvroSerde)
> .aggregate(
> () -> { return new ArrayList<>(); },
> (key, record, list) ->
> {
> list.add(record);
> return list;
> },
> (key, record, list) ->
> {
> //                         list.remove(record);  // this is always
> executed, not sure why
> return list;
> },
> new ArrayListSerde<>(keyAvroSerde,valueAvroSerde),
> "TxnMapByTagPlaza");
> ArrayListSerde<GenericRecord> arrayListSerde = new
> ArrayListSerde<>(keyAvroSerde,valueAvroSerde);
> lookup.through(stringSerdeKey,arrayListSerde,"DUPLICATE_LOOKUPS_STREAM
> // Create new stream from original.
> KStream<String, GenericRecord> txnStreamFull =
> txnStream.map((key, record) -> {
> return new KeyValue<>(record.get("LANE_ID").toString(),record);
> })
> .through(stringSerdeKey,valueAvroSerde,"RekeyedIntermediateTopic19")
>  ;
> // Join stream to lookup ktable.
> // Right now just evaluating results.
> KStream<String, GenericRecord> duplicatesStream =
> txnStreamFull.leftJoin(lookup,(vTxnStream, vLookupTable) -> {
> // this is where vLookupTable list will be traversed to determine
> any duplicates that match vTxnStream value.
> // from inspecting JOINED objects I can see  vLookupTable values
> are late compared to txnSteamFull stream values.
> return vTxnStream;
> });
> Know that I have spent time reading the docs / google searches, but
> still just not all clear to me, and perhaps there are other docs I
> am not finding.
> (3) Just wanted to get opinion also, is this the type of processing
> I should be doing with kafka streams and ktables or is this
> something that might be better solved using a traditional
> database?
> Thanks again for any insight,
> John
> -----Original Message-----
> From: R Krishna [mailto:krishna...@gmail.com]
> Sent: Tuesday, November 08, 2016 1:44 AM
> To: users@kafka.apache.org<mailto:users@kafka.apache.org>
> Subject: Re: sliding ktable?
> There is a problem with tombstoning old entries based on a new
> entry, that, the keys which have no new entries will remain there
> forever.
> On Mon, Nov 7, 2016 at 9:38 AM, Matthias J. Sax
> <matth...@confluent.io<mailto:matth...@confluent.io>>
> wrote:
> Hash: SHA512
> John,
> your thinking is on the right track!
> About infinitely growing KTable: It seems you are extending each
> lane
> with a list of all txnId -- so your view needs infinite memory as
> you
> expend your values... A quick fix might be, to delete older txnID
> for
> this list, each time you update the list (as you mentioned you
> only
> need data for the last two weeks -- you might need to add a
> timestamp
> for each txnID in the list to do the pruning each time you append
> or
> lookup the list).
>>>> Ideally if the topic is set to two weeks retention, then once
>>>> an
>>>> item is 'popped off' I would like to do an aggregate
>>>> subtraction for
>>>> it's value.  But I don't think this is how kafka works.  Is
>>>> this possible?  Any other feedback/suggestion?    Perhaps a
>>>> better
>>>> approach?
> There is no Kafka support for this. You would need to go with the
> suggest as describe above. The only "delete" mechanism Kafka offers
> is
> for compacted topics via tombstone message (ie, message with
> <key:null> format; value == null). However, tombstones do delete
> the
> whole record with this key, thus I doubt they are useful for your
> case.
> However, reading through your email, I am wondering why you do
> need
> the all old txnIds. You mentioned that you want to get the
> previous
> txnId for each duplicate (and you example results verifies this).
> Thus, it would be sufficient to only store the latest tnxId for
> each
> "lane" IMHO. Furhtermore, for this deduplication it seems
> sufficient
> to only use a KTable without a join.
> The ideas would the as follows: You consumer you streams as a
> changelog (ie, KTable). For each record, you check if there is an
> entry in the view. If not, just put the record itself as result
> because there is no duplicate. If you do find an entry, the
> current
> record is a duplicate of the record found. The record found, does
> contain it's txnId, you so can use this as "previous txnId". As
> result, you store the current record. You data format would be
> like
> <lane:(txnId,txnDate)> (for input) and
> <lane:(txnId,txnDate,previoudTxnId)> (for output.
> You stream and view would be like:
> {'c',('03','11/07/2016')} plus state: EMPTY
> => {'c',('03','11/07/2016',''}    // this is output and state
> update
> at the same time
> {'c',('09','11/07/2016')} plus state:
> {'c',('03','11/07/2016',null}
> => {'c',('09','11/07/2016','03')}     // this is output and state
> update at the same time
> {'c',('11','11/08/2016')} plus state:
> {'c',('09','11/07/2016','03')}
> => {'c',('11','11/08/2016','09')}     // this is output and state
> update at the same time
> -Matthias
> On 11/7/16 8:22 AM, John Hayles wrote:
>>>> Thanks for the reply.  I really appreciate the insight. Again
>>>> newbie
>>>> here.  I want to expand on what I am struggling with.  It may
>>>> be
>>>> that I just need to get my mind thinking more in a streaming
>>>> mode.
>>>> Please let me know you thoughts.  Just having problem
>>>> ‘getting it’
>>>> on my own.
>>>> Below is a simple topic I want to identify where the 'lane'
>>>> duplicates, and when it does get the 'txnId' of the
>>>> duplicate
>>>> record.  The txnId is distinct and will never be duplicate.
>>>> The
>>>> lane will seldom have a duplicate.
>>>> Topic payload {txnId,lane,txnDate}  Notice lane 'c' is
>>>> dulplicated
>>>> 3 times.
>>>> {'01','wfasd','11/07/2016'}
>>>> {'02','bas','11/07/2016'}
>>>> {'03','c','11/07/2016'}
>>>> {'04','xxwq','11/07/2016'}
>>>> {'05','dasf','11/07/2016'}
>>>> {'06','drdd','11/07/2016'}
>>>> {'07','tasd','11/07/2016'}
>>>> {'08','ywq','11/07/2016'}
>>>> {'09','c','11/07/2016'}
>>>> {'10','jda','11/07/2016'}
>>>> {'11','c','11/08/2016'}
>>>> {'12','ozs','11/09/2016'}
>>>> . . .
>>>> Note txnId and lane keep getting more distinct values.
>>>> My thought is to join the data to itself,  one as kstream the
>>>> other
>>>> as ktable for lookups.
>>>> kstream as
>>>> {lane:(txnId,txnDate)}
>>>> so I visualize like ...
>>>> ('wfasd':('01','11/07/2016')),
>>>> ('bas'  :('02','11/07/2016')),
>>>> ('c'    :('03','11/07/2016')), ...
>>>> The ktable (lookup table) is an aggregate view I built to
>>>> hold
>>>> historic data by lane:
>>>> (lane:{(txnId1,txnDate1),
>>>> (txnId2,txnDate2),
>>>> . . .})
>>>> I visualize the materialized view as below.
>>>> 'c' being the important key/value for this example...
>>>> Also note this materialized view will keep growing without
>>>> bound.
>>>> There will always be new keys and txnIds.
>>>> ('wfasd':{('01','11/07/2016')}),
>>>> ('bas'  :{('02','11/07/2016')}),
>>>> ('c'    :{('03','11/07/2016'),
>>>> ('09','11/07/2016'),
>>>> ('11','11/09/2016')})
>>>> . . .
>>>> Now I can join a kstream to ktable on lane, and duplicates
>>>> are easy
>>>> to identify.  I can traverse list from value found in
>>>> materialized
>>>> view to get previous txnId I need.
>>>> So I can build resulting stream / topic like…
>>>> {txnId,lane,txnDate,duplicateTxnId}
>>>> note where c duplicates there is a duplicate txnId...
>>>> {'01','wfasd','11/07/2016',''}
>>>> {'02','bas','11/07/2016',''}
>>>> {'03','c','11/07/2016',''}
>>>> {'04','xxwq','11/07/2016',''}
>>>> {'05','dasf','11/07/2016',''}
>>>> {'06','drdd','11/07/2016',''}
>>>> {'07','tasd','11/07/2016',''}
>>>> {'08','ywq','11/07/2016',''}
>>>> {'09','c','11/07/2016','03'}
>>>> {'10','jda','11/07/2016',''}
>>>> {'11','c','11/08/2016','09'}
>>>> {'12','ozs','11/09/2016',''}
>>>> The issue is the materialized view of the ktable keeps
>>>> growing
>>>> without bound, however by business rule I only need past 2
>>>> weeks, so
>>>> I think over time there is performance impact that is not
>>>> needed
>>>> regarding the materialized view, one, the size of
>>>> materialized view
>>>> keeps growing, and two, traversing ever increasing larger
>>>> value
>>>> lists.
>>>> Ideally if the topic is set to two weeks retention, then once
>>>> an
>>>> item is 'popped off' I would like to do an aggregate
>>>> subtraction for
>>>> it's value.  But I don't think this is how kafka works.  Is
>>>> this possible?  Any other feedback/suggestion?    Perhaps a
>>>> better
>>>> approach?
>>>> Thanks
>>>> John
>>>> -----Original Message-----
>>>> From: Matthias J. Sax [mailto:matth...@confluent.io]
>>>> Sent: Thursday, November 03, 2016 4:29 PM
>>>> To:
>>>> users@kafka.apache.org<mailto:users@kafka.apache.org<mailto:users@k
>>>> Subject: Re: sliding ktable?
>>>> Hash: SHA512
>>>> Hi John,
>>>> first of all, a KTable is a (changelog) stream; thus, by
>>>> definition
>>>> it is infinite.
>>>> However, I assume you are worried about the internal
>>>> materialized
>>>> view, of the changelog stream (ie, a table state). This view
>>>> only
>>>> contains the latest value for each key, ie, a single entry
>>>> for each
>>>> key. Thus, it's size is bound by the number of key and does
>>>> not
>>>> change as long as you number of distinct keys does not
>>>> change.
>>>>> At any given time I need at least 2 weeks data in my
>>>>> ktable
>>>> There is no such think as "data of the last 2 weeks":
>>>> Using a KTable for a KStream-KTable join to do lookups, each
>>>> lookup
>>>> will be done on the current state if the KTable and thus only
>>>> return
>>>> a single value for each key. There is no old data in the
>>>> materialized view with this regard. Of course, if a key does
>>>> not get
>>>> any update for a long time, you can consider the
>>>> corresponding value
>>>> as old, but it is still the latest (ie, current) value for
>>>> the key.
>>>>> ktable.foreach
>>>> #foreach() is applied to the changelog stream and not the
>>>> internally
>>>> materialized view. Thus, it does not scan over the key space
>>>> or is
>>>> applied to each currently stored key in the view. It is
>>>> rather
>>>> called for each update record that is in the changelog
>>>> stream.
>>>>> not sure keys can be removed this way
>>>> The only way to delete a key-value entry in the materialized
>>>> view is
>>>> to send a so-called tombstone record with format <key:null>
>>>> (ie,
>>>> value is null). By "send" I mean that this tombstone record
>>>> must be
>>>> in the input of the KTable.
>>>> -Matthias
>>>> On 11/3/16 12:39 PM, John Hayles wrote:
>>>>> Newbie here, I am working with Kafka Streams with java
>>>>> 1.8.
>>>>> I want to use the ktable as a lookup table in a join to a
>>>>> kstream.
>>>>> I had no issue implementing this.  However, I do not want
>>>>> the
>>>>> ktable
>>>>> to grow without bounds, I want to limit the ktable to the
>>>>> past 2
>>>>> weeks
>>>>> data, more of a 'sliding' window ktable.  At any given time
>>>>> I need
>>>>> at
>>>>> least 2 weeks data in my ktable, so I don’t think solution
>>>>> like
>>>>> tumbling table will work since it starts over every time it
>>>>> hops.
>>>>> A little simplified example. . .
>>>>> KStream<String, GenericRecord> txnStream =
>>>>> builder.stream("TXN_DATA");
>>>>> KStream<String, GenericRecord> txnStreamFull = txnStream
>>>>> .map((key, record) -> {
>>>>> return new KeyValue<>(record.get("TXN").toString(),
>>>>> record);
>>>>> })
>>>>> .through("RekeyedIntermediateTopic1")
>>>>> ;
>>>>> KTable<String,Long> countTableStream =  txnStream   //  do
>>>>> not
>>>>> want
>>>>> this table to grow indefinitely.
>>>>> .map((key, record) -> {
>>>>> return new KeyValue<>(record.get("TXN").toString(),
>>>>> record);
>>>>> })
>>>>> .through("RekeyedIntermediateTopic2")
>>>>> .countByKey(stringSerdeKey, "DupCountKTable10");
>>>>> KStream<String, GenericRecord> duplicatesStream =
>>>>> txnStreamFull.leftJoin(countTableStream,
>>>>> (vTxnStream,vCountTableStream) -> {
>>>>> vTxnStream.put("count",
>>>>> Long.toString(vCountTableStream.longValue()));
>>>>> return vTxnStream;});
>>>>> duplicatesStream.to("DUP_TXNS");
>>>>> I thought perhaps can schedule ktable.foreach to inspect
>>>>> and clean,
>>>>> but not sure keys can be removed this way.
>>>>> I may be missing a basic concept here.  I have spent some
>>>>> time
>>>>> searching but not finding good answer, thanks for any
>>>>> tips.
>>>>> Thanks,
>>>>> John
> --
> Radha Krishna, Proddaturi
> 253-234-5657
Comment: GPGTools - https://gpgtools.org


Reply via email to