> And before we discuss deeper, a follow up question: if you map from
<k,v> to new_key, is this mapping "unique", or could it be that two
different k/v-pairs map to the same new_key?

Yes, this has been central in my exploration so far. For some fields the
field is unique, for others it is not. There's never ambiguity. Unique
fields are known to be unique (eg. user email address). Non-unique fields
are know to be non-unique (e.g. user country).

Right now I am laying out my topology such that uniqueness is enforced
before indexing. So the issue I am concerned with here specifically is just
the indexing. Unless, of course, the two can somehow be tied together in
some elegant manner? But anyway, my question is based on that assumption
that uniqueness in indexing input is not a concern.

And yes, I've noticed that if you have a unique field the result is 1-to-1
key-to-entity mapping, whereas for non-unique fields each index key may
have a list of entities it maps to. For non-unique fields where an index
key may map to thousands of entities, it is not practical maintaining them
in a single aggregation.

Any further guidance would be greatly appreciated. Thank you!





On Wed, Feb 8, 2017 at 3:56 PM, Matthias J. Sax <matth...@confluent.io>
wrote:

> It's difficult problem.
>
> And before we discuss deeper, a follow up question: if you map from
> <k,v> to new_key, is this mapping "unique", or could it be that two
> different k/v-pairs map to the same new_key?
>
> If there are overlaps, you end up with a different problem as if there
> are no overlaps, because you would get "updated" for the new_key from
> two different places and new_key would actually not be a primary key
> anymore (while for a "unique" mapping the new_key would still be a
> primary key)
>
>
> Hope this makes sense. It's a little hard to explain in an email.
>
>
> -Matthias
>
> On 2/8/17 10:22 AM, Dmitry Minkovsky wrote:
> > I have a changelog that I'd like to index by some other key. So,
> something
> > like this:
> >
> >         class Item {
> >             byte[] id;
> >             String name;
> >         }
> >
> >         KStreamBuilder topology = new KStreamBuilder();
> >
> >         KTable<byte[], Item> items = topology
> >           .table("items-changelog", "items");
> >
> > To index the Items by name:
> >
> >         items
> >           .toStream()
> >           .selectKey((k, v) -> v.name)
> >           .to("items-by-name-changelog");
> >
> >         KTable<String, Item> itemsByName = topology
> >           .table("items-by-name-changelog", "items-by-name");
> >
> >
> > However, if the topic "items-changelog" receives a tombstone for some
> > particular Item (val: null), the key selector indexing function yields an
> > NPE because that value is null.
> >
> > My solution to this problem has been aggregations:
> >
> >         KTable<String, Item> itemsByName = items
> >           .groupBy((k, v) -> new KeyValue<>(v.name, v))
> >           .aggregate(() -> null,
> >                      (k, v, a) -> v,
> >                      (k, v, a) -> null,
> >                      "items-by-name");
> >
> > This works because the grouping/aggregation maintains state on the
> incoming
> > topic, and when an item tombstone arrives it can calculate the key and
> > propagate the tombstone to the index.
> >
> > However, this doesn't work flexibly for all cases (I'll omit them for
> now)
> > and I find myself drawn to the lower-level Processor API which is fine
> > except I'd rather just use the DSL for its expressiveness.
> >
> > What is the correct way to do this kind of indexing? I need the index to
> > update correctly when the key value changes or when a tombstone is
> received.
> >
> >
> >
> > Thank you,
> > Dmitry
> >
>
>

Reply via email to