Re: Kafka Streams: How to best maintain changelog indices using the DSL?

2017-02-08 Thread Matthias J. Sax
If the mapping is 1-to-1 than you can get it done. That's good. As you
observed by yourself, with non-unique mapping it's way harder (or maybe
even impossible) to get this.

Also your KTable#groupBy(...)#aggregate(...) is a good solution.

Thus, now I am just wondering, what you mean by:

> However, this doesn't work flexibly for all cases (I'll omit them for now)


So let's get down to the actual problem :)


-Matthias


On 2/8/17 1:14 PM, Dmitry Minkovsky wrote:
>> And before we discuss deeper, a follow up question: if you map from
>  to new_key, is this mapping "unique", or could it be that two
> different k/v-pairs map to the same new_key?
> 
> Yes, this has been central in my exploration so far. For some fields the
> field is unique, for others it is not. There's never ambiguity. Unique
> fields are known to be unique (eg. user email address). Non-unique fields
> are know to be non-unique (e.g. user country).
> 
> Right now I am laying out my topology such that uniqueness is enforced
> before indexing. So the issue I am concerned with here specifically is just
> the indexing. Unless, of course, the two can somehow be tied together in
> some elegant manner? But anyway, my question is based on that assumption
> that uniqueness in indexing input is not a concern.
> 
> And yes, I've noticed that if you have a unique field the result is 1-to-1
> key-to-entity mapping, whereas for non-unique fields each index key may
> have a list of entities it maps to. For non-unique fields where an index
> key may map to thousands of entities, it is not practical maintaining them
> in a single aggregation.
> 
> Any further guidance would be greatly appreciated. Thank you!
> 
> 
> 
> 
> 
> On Wed, Feb 8, 2017 at 3:56 PM, Matthias J. Sax 
> wrote:
> 
>> It's difficult problem.
>>
>> And before we discuss deeper, a follow up question: if you map from
>>  to new_key, is this mapping "unique", or could it be that two
>> different k/v-pairs map to the same new_key?
>>
>> If there are overlaps, you end up with a different problem as if there
>> are no overlaps, because you would get "updated" for the new_key from
>> two different places and new_key would actually not be a primary key
>> anymore (while for a "unique" mapping the new_key would still be a
>> primary key)
>>
>>
>> Hope this makes sense. It's a little hard to explain in an email.
>>
>>
>> -Matthias
>>
>> On 2/8/17 10:22 AM, Dmitry Minkovsky wrote:
>>> I have a changelog that I'd like to index by some other key. So,
>> something
>>> like this:
>>>
>>> class Item {
>>> byte[] id;
>>> String name;
>>> }
>>>
>>> KStreamBuilder topology = new KStreamBuilder();
>>>
>>> KTable items = topology
>>>   .table("items-changelog", "items");
>>>
>>> To index the Items by name:
>>>
>>> items
>>>   .toStream()
>>>   .selectKey((k, v) -> v.name)
>>>   .to("items-by-name-changelog");
>>>
>>> KTable itemsByName = topology
>>>   .table("items-by-name-changelog", "items-by-name");
>>>
>>>
>>> However, if the topic "items-changelog" receives a tombstone for some
>>> particular Item (val: null), the key selector indexing function yields an
>>> NPE because that value is null.
>>>
>>> My solution to this problem has been aggregations:
>>>
>>> KTable itemsByName = items
>>>   .groupBy((k, v) -> new KeyValue<>(v.name, v))
>>>   .aggregate(() -> null,
>>>  (k, v, a) -> v,
>>>  (k, v, a) -> null,
>>>  "items-by-name");
>>>
>>> This works because the grouping/aggregation maintains state on the
>> incoming
>>> topic, and when an item tombstone arrives it can calculate the key and
>>> propagate the tombstone to the index.
>>>
>>> However, this doesn't work flexibly for all cases (I'll omit them for
>> now)
>>> and I find myself drawn to the lower-level Processor API which is fine
>>> except I'd rather just use the DSL for its expressiveness.
>>>
>>> What is the correct way to do this kind of indexing? I need the index to
>>> update correctly when the key value changes or when a tombstone is
>> received.
>>>
>>>
>>>
>>> Thank you,
>>> Dmitry
>>>
>>
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: Kafka Streams: How to best maintain changelog indices using the DSL?

2017-02-08 Thread Dmitry Minkovsky
Actually... I've got the 1-to-1 variant doing wonders for me. I replaced
the #aggregate() with #reduce(((k, v) -> v, (k, v) -> null) and things are
just lovely. Combining these indices with the various join operations, I am
able to to build up deeply nested structures, or eh, materialized views,
from flat changelog feeds. As the changelog entities change or tombstone,
so do the views.

The 1-to-many case I still have to play more with. I will update here if I
discover anything good.

Thank you.

On Wed, Feb 8, 2017 at 4:14 PM, Dmitry Minkovsky 
wrote:

> > And before we discuss deeper, a follow up question: if you map from
>  to new_key, is this mapping "unique", or could it be that two
> different k/v-pairs map to the same new_key?
>
> Yes, this has been central in my exploration so far. For some fields the
> field is unique, for others it is not. There's never ambiguity. Unique
> fields are known to be unique (eg. user email address). Non-unique fields
> are know to be non-unique (e.g. user country).
>
> Right now I am laying out my topology such that uniqueness is enforced
> before indexing. So the issue I am concerned with here specifically is just
> the indexing. Unless, of course, the two can somehow be tied together in
> some elegant manner? But anyway, my question is based on that assumption
> that uniqueness in indexing input is not a concern.
>
> And yes, I've noticed that if you have a unique field the result is 1-to-1
> key-to-entity mapping, whereas for non-unique fields each index key may
> have a list of entities it maps to. For non-unique fields where an index
> key may map to thousands of entities, it is not practical maintaining them
> in a single aggregation.
>
> Any further guidance would be greatly appreciated. Thank you!
>
>
>
>
>
> On Wed, Feb 8, 2017 at 3:56 PM, Matthias J. Sax 
> wrote:
>
>> It's difficult problem.
>>
>> And before we discuss deeper, a follow up question: if you map from
>>  to new_key, is this mapping "unique", or could it be that two
>> different k/v-pairs map to the same new_key?
>>
>> If there are overlaps, you end up with a different problem as if there
>> are no overlaps, because you would get "updated" for the new_key from
>> two different places and new_key would actually not be a primary key
>> anymore (while for a "unique" mapping the new_key would still be a
>> primary key)
>>
>>
>> Hope this makes sense. It's a little hard to explain in an email.
>>
>>
>> -Matthias
>>
>> On 2/8/17 10:22 AM, Dmitry Minkovsky wrote:
>> > I have a changelog that I'd like to index by some other key. So,
>> something
>> > like this:
>> >
>> > class Item {
>> > byte[] id;
>> > String name;
>> > }
>> >
>> > KStreamBuilder topology = new KStreamBuilder();
>> >
>> > KTable items = topology
>> >   .table("items-changelog", "items");
>> >
>> > To index the Items by name:
>> >
>> > items
>> >   .toStream()
>> >   .selectKey((k, v) -> v.name)
>> >   .to("items-by-name-changelog");
>> >
>> > KTable itemsByName = topology
>> >   .table("items-by-name-changelog", "items-by-name");
>> >
>> >
>> > However, if the topic "items-changelog" receives a tombstone for some
>> > particular Item (val: null), the key selector indexing function yields
>> an
>> > NPE because that value is null.
>> >
>> > My solution to this problem has been aggregations:
>> >
>> > KTable itemsByName = items
>> >   .groupBy((k, v) -> new KeyValue<>(v.name, v))
>> >   .aggregate(() -> null,
>> >  (k, v, a) -> v,
>> >  (k, v, a) -> null,
>> >  "items-by-name");
>> >
>> > This works because the grouping/aggregation maintains state on the
>> incoming
>> > topic, and when an item tombstone arrives it can calculate the key and
>> > propagate the tombstone to the index.
>> >
>> > However, this doesn't work flexibly for all cases (I'll omit them for
>> now)
>> > and I find myself drawn to the lower-level Processor API which is fine
>> > except I'd rather just use the DSL for its expressiveness.
>> >
>> > What is the correct way to do this kind of indexing? I need the index to
>> > update correctly when the key value changes or when a tombstone is
>> received.
>> >
>> >
>> >
>> > Thank you,
>> > Dmitry
>> >
>>
>>
>


Re: Kafka Streams: How to best maintain changelog indices using the DSL?

2017-02-08 Thread Dmitry Minkovsky
> And before we discuss deeper, a follow up question: if you map from
 to new_key, is this mapping "unique", or could it be that two
different k/v-pairs map to the same new_key?

Yes, this has been central in my exploration so far. For some fields the
field is unique, for others it is not. There's never ambiguity. Unique
fields are known to be unique (eg. user email address). Non-unique fields
are know to be non-unique (e.g. user country).

Right now I am laying out my topology such that uniqueness is enforced
before indexing. So the issue I am concerned with here specifically is just
the indexing. Unless, of course, the two can somehow be tied together in
some elegant manner? But anyway, my question is based on that assumption
that uniqueness in indexing input is not a concern.

And yes, I've noticed that if you have a unique field the result is 1-to-1
key-to-entity mapping, whereas for non-unique fields each index key may
have a list of entities it maps to. For non-unique fields where an index
key may map to thousands of entities, it is not practical maintaining them
in a single aggregation.

Any further guidance would be greatly appreciated. Thank you!





On Wed, Feb 8, 2017 at 3:56 PM, Matthias J. Sax 
wrote:

> It's difficult problem.
>
> And before we discuss deeper, a follow up question: if you map from
>  to new_key, is this mapping "unique", or could it be that two
> different k/v-pairs map to the same new_key?
>
> If there are overlaps, you end up with a different problem as if there
> are no overlaps, because you would get "updated" for the new_key from
> two different places and new_key would actually not be a primary key
> anymore (while for a "unique" mapping the new_key would still be a
> primary key)
>
>
> Hope this makes sense. It's a little hard to explain in an email.
>
>
> -Matthias
>
> On 2/8/17 10:22 AM, Dmitry Minkovsky wrote:
> > I have a changelog that I'd like to index by some other key. So,
> something
> > like this:
> >
> > class Item {
> > byte[] id;
> > String name;
> > }
> >
> > KStreamBuilder topology = new KStreamBuilder();
> >
> > KTable items = topology
> >   .table("items-changelog", "items");
> >
> > To index the Items by name:
> >
> > items
> >   .toStream()
> >   .selectKey((k, v) -> v.name)
> >   .to("items-by-name-changelog");
> >
> > KTable itemsByName = topology
> >   .table("items-by-name-changelog", "items-by-name");
> >
> >
> > However, if the topic "items-changelog" receives a tombstone for some
> > particular Item (val: null), the key selector indexing function yields an
> > NPE because that value is null.
> >
> > My solution to this problem has been aggregations:
> >
> > KTable itemsByName = items
> >   .groupBy((k, v) -> new KeyValue<>(v.name, v))
> >   .aggregate(() -> null,
> >  (k, v, a) -> v,
> >  (k, v, a) -> null,
> >  "items-by-name");
> >
> > This works because the grouping/aggregation maintains state on the
> incoming
> > topic, and when an item tombstone arrives it can calculate the key and
> > propagate the tombstone to the index.
> >
> > However, this doesn't work flexibly for all cases (I'll omit them for
> now)
> > and I find myself drawn to the lower-level Processor API which is fine
> > except I'd rather just use the DSL for its expressiveness.
> >
> > What is the correct way to do this kind of indexing? I need the index to
> > update correctly when the key value changes or when a tombstone is
> received.
> >
> >
> >
> > Thank you,
> > Dmitry
> >
>
>


Re: Kafka Streams: How to best maintain changelog indices using the DSL?

2017-02-08 Thread Matthias J. Sax
It's difficult problem.

And before we discuss deeper, a follow up question: if you map from
 to new_key, is this mapping "unique", or could it be that two
different k/v-pairs map to the same new_key?

If there are overlaps, you end up with a different problem as if there
are no overlaps, because you would get "updated" for the new_key from
two different places and new_key would actually not be a primary key
anymore (while for a "unique" mapping the new_key would still be a
primary key)


Hope this makes sense. It's a little hard to explain in an email.


-Matthias

On 2/8/17 10:22 AM, Dmitry Minkovsky wrote:
> I have a changelog that I'd like to index by some other key. So, something
> like this:
> 
> class Item {
> byte[] id;
> String name;
> }
> 
> KStreamBuilder topology = new KStreamBuilder();
> 
> KTable items = topology
>   .table("items-changelog", "items");
> 
> To index the Items by name:
> 
> items
>   .toStream()
>   .selectKey((k, v) -> v.name)
>   .to("items-by-name-changelog");
> 
> KTable itemsByName = topology
>   .table("items-by-name-changelog", "items-by-name");
> 
> 
> However, if the topic "items-changelog" receives a tombstone for some
> particular Item (val: null), the key selector indexing function yields an
> NPE because that value is null.
> 
> My solution to this problem has been aggregations:
> 
> KTable itemsByName = items
>   .groupBy((k, v) -> new KeyValue<>(v.name, v))
>   .aggregate(() -> null,
>  (k, v, a) -> v,
>  (k, v, a) -> null,
>  "items-by-name");
> 
> This works because the grouping/aggregation maintains state on the incoming
> topic, and when an item tombstone arrives it can calculate the key and
> propagate the tombstone to the index.
> 
> However, this doesn't work flexibly for all cases (I'll omit them for now)
> and I find myself drawn to the lower-level Processor API which is fine
> except I'd rather just use the DSL for its expressiveness.
> 
> What is the correct way to do this kind of indexing? I need the index to
> update correctly when the key value changes or when a tombstone is received.
> 
> 
> 
> Thank you,
> Dmitry
> 



signature.asc
Description: OpenPGP digital signature


Kafka Streams: How to best maintain changelog indices using the DSL?

2017-02-08 Thread Dmitry Minkovsky
I have a changelog that I'd like to index by some other key. So, something
like this:

class Item {
byte[] id;
String name;
}

KStreamBuilder topology = new KStreamBuilder();

KTable items = topology
  .table("items-changelog", "items");

To index the Items by name:

items
  .toStream()
  .selectKey((k, v) -> v.name)
  .to("items-by-name-changelog");

KTable itemsByName = topology
  .table("items-by-name-changelog", "items-by-name");


However, if the topic "items-changelog" receives a tombstone for some
particular Item (val: null), the key selector indexing function yields an
NPE because that value is null.

My solution to this problem has been aggregations:

KTable itemsByName = items
  .groupBy((k, v) -> new KeyValue<>(v.name, v))
  .aggregate(() -> null,
 (k, v, a) -> v,
 (k, v, a) -> null,
 "items-by-name");

This works because the grouping/aggregation maintains state on the incoming
topic, and when an item tombstone arrives it can calculate the key and
propagate the tombstone to the index.

However, this doesn't work flexibly for all cases (I'll omit them for now)
and I find myself drawn to the lower-level Processor API which is fine
except I'd rather just use the DSL for its expressiveness.

What is the correct way to do this kind of indexing? I need the index to
update correctly when the key value changes or when a tombstone is received.



Thank you,
Dmitry