If the mapping is 1-to-1 than you can get it done. That's good. As you
observed by yourself, with non-unique mapping it's way harder (or maybe
even impossible) to get this.

Also your KTable#groupBy(...)#aggregate(...) is a good solution.

Thus, now I am just wondering, what you mean by:

> However, this doesn't work flexibly for all cases (I'll omit them for now)


So let's get down to the actual problem :)


-Matthias


On 2/8/17 1:14 PM, Dmitry Minkovsky wrote:
>> And before we discuss deeper, a follow up question: if you map from
> <k,v> to new_key, is this mapping "unique", or could it be that two
> different k/v-pairs map to the same new_key?
> 
> Yes, this has been central in my exploration so far. For some fields the
> field is unique, for others it is not. There's never ambiguity. Unique
> fields are known to be unique (eg. user email address). Non-unique fields
> are know to be non-unique (e.g. user country).
> 
> Right now I am laying out my topology such that uniqueness is enforced
> before indexing. So the issue I am concerned with here specifically is just
> the indexing. Unless, of course, the two can somehow be tied together in
> some elegant manner? But anyway, my question is based on that assumption
> that uniqueness in indexing input is not a concern.
> 
> And yes, I've noticed that if you have a unique field the result is 1-to-1
> key-to-entity mapping, whereas for non-unique fields each index key may
> have a list of entities it maps to. For non-unique fields where an index
> key may map to thousands of entities, it is not practical maintaining them
> in a single aggregation.
> 
> Any further guidance would be greatly appreciated. Thank you!
> 
> 
> 
> 
> 
> On Wed, Feb 8, 2017 at 3:56 PM, Matthias J. Sax <matth...@confluent.io>
> wrote:
> 
>> It's difficult problem.
>>
>> And before we discuss deeper, a follow up question: if you map from
>> <k,v> to new_key, is this mapping "unique", or could it be that two
>> different k/v-pairs map to the same new_key?
>>
>> If there are overlaps, you end up with a different problem as if there
>> are no overlaps, because you would get "updated" for the new_key from
>> two different places and new_key would actually not be a primary key
>> anymore (while for a "unique" mapping the new_key would still be a
>> primary key)
>>
>>
>> Hope this makes sense. It's a little hard to explain in an email.
>>
>>
>> -Matthias
>>
>> On 2/8/17 10:22 AM, Dmitry Minkovsky wrote:
>>> I have a changelog that I'd like to index by some other key. So,
>> something
>>> like this:
>>>
>>>         class Item {
>>>             byte[] id;
>>>             String name;
>>>         }
>>>
>>>         KStreamBuilder topology = new KStreamBuilder();
>>>
>>>         KTable<byte[], Item> items = topology
>>>           .table("items-changelog", "items");
>>>
>>> To index the Items by name:
>>>
>>>         items
>>>           .toStream()
>>>           .selectKey((k, v) -> v.name)
>>>           .to("items-by-name-changelog");
>>>
>>>         KTable<String, Item> itemsByName = topology
>>>           .table("items-by-name-changelog", "items-by-name");
>>>
>>>
>>> However, if the topic "items-changelog" receives a tombstone for some
>>> particular Item (val: null), the key selector indexing function yields an
>>> NPE because that value is null.
>>>
>>> My solution to this problem has been aggregations:
>>>
>>>         KTable<String, Item> itemsByName = items
>>>           .groupBy((k, v) -> new KeyValue<>(v.name, v))
>>>           .aggregate(() -> null,
>>>                      (k, v, a) -> v,
>>>                      (k, v, a) -> null,
>>>                      "items-by-name");
>>>
>>> This works because the grouping/aggregation maintains state on the
>> incoming
>>> topic, and when an item tombstone arrives it can calculate the key and
>>> propagate the tombstone to the index.
>>>
>>> However, this doesn't work flexibly for all cases (I'll omit them for
>> now)
>>> and I find myself drawn to the lower-level Processor API which is fine
>>> except I'd rather just use the DSL for its expressiveness.
>>>
>>> What is the correct way to do this kind of indexing? I need the index to
>>> update correctly when the key value changes or when a tombstone is
>> received.
>>>
>>>
>>>
>>> Thank you,
>>> Dmitry
>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to