Forwarding this thread to the users list too in case people would like to 
comment. It is also on the dev list.

Thanks
Eno

> Begin forwarded message:
> 
> From: "Matthias J. Sax" <matth...@confluent.io>
> Subject: Re: [DISCUSS] KIP-114: KTable materialization and improved semantics
> Date: 24 January 2017 at 19:30:10 GMT
> To: d...@kafka.apache.org
> Reply-To: d...@kafka.apache.org
> 
> That not what I meant by "huge impact".
> 
> I refer to the actions related to materialize a KTable: creating a
> RocksDB store and a changelog topic -- users should be aware about
> runtime implication and this is better expressed by an explicit method
> call, rather than implicitly triggered by using a different overload of
> a method.
> 
> 
> -Matthias
> 
> On 1/24/17 1:35 AM, Damian Guy wrote:
>> I think your definition of a huge impact and mine are rather different ;-P
>> Overloading a few methods  is not really a huge impact IMO. It is also a
>> sacrifice worth making for readability, usability of the API.
>> 
>> On Mon, 23 Jan 2017 at 17:55 Matthias J. Sax <matth...@confluent.io> wrote:
>> 
>>> I understand your argument, but do not agree with it.
>>> 
>>> Your first version (even if the "flow" is not as nice) is more explicit
>>> than the second version. Adding a stateStoreName parameter is quite
>>> implicit but has a huge impact -- thus, I prefer the rather more verbose
>>> but explicit version.
>>> 
>>> 
>>> -Matthias
>>> 
>>> On 1/23/17 1:39 AM, Damian Guy wrote:
>>>> I'm not a fan of materialize. I think it interrupts the flow, i.e,
>>>> 
>>>> table.mapValue(..).materialize().join(..).materialize()
>>>> compared to:
>>>> table.mapValues(..).join(..)
>>>> 
>>>> I know which one i prefer.
>>>> My preference is stil to provide overloaded methods where people can
>>>> specify the store names if they want, otherwise we just generate them.
>>>> 
>>>> On Mon, 23 Jan 2017 at 05:30 Matthias J. Sax <matth...@confluent.io>
>>> wrote:
>>>> 
>>>>> Hi,
>>>>> 
>>>>> thanks for the KIP Eno! Here are my 2 cents:
>>>>> 
>>>>> 1) I like Guozhang's proposal about removing store name from all KTable
>>>>> methods and generate internal names (however, I would do this as
>>>>> overloads). Furthermore, I would not force users to call .materialize()
>>>>> if they want to query a store, but add one more method .stateStoreName()
>>>>> that returns the store name if the KTable is materialized. Thus, also
>>>>> .materialize() must not necessarily have a parameter storeName (ie, we
>>>>> should have some overloads here).
>>>>> 
>>>>> I would also not allow to provide a null store name (to indicate no
>>>>> materialization if not necessary) but throw an exception.
>>>>> 
>>>>> This yields some simplification (see below).
>>>>> 
>>>>> 
>>>>> 2) I also like Guozhang's proposal about KStream#toTable()
>>>>> 
>>>>> 
>>>>> 3)
>>>>>> 
>>>>>>>  3. What will happen when you call materialize on KTable that is
>>>>> already
>>>>>>>  materialized? Will it create another StateStore (providing the name
>>> is
>>>>>>>  different), throw an Exception?
>>>>>> 
>>>>>> Currently an exception is thrown, but see below.
>>>>>> 
>>>>>> 
>>>>> 
>>>>> If we follow approach (1) from Guozhang, there is no need to worry about
>>>>> a second materialization and also no exception must be throws. A call to
>>>>> .materialize() basically sets a "materialized flag" (ie, idempotent
>>>>> operation) and sets a new name.
>>>>> 
>>>>> 
>>>>> 4)
>>>>>>> Rename toStream() to toKStream() for consistency.
>>>>>> 
>>>>>> Not sure whether that is really required. We also use
>>>>>> `KStreamBuilder#stream()` and `KStreamBuilder#table()`, for example,
>>> and
>>>>>> don't care about the "K" prefix.
>>>>> 
>>>>> Eno's reply:
>>>>>> I think changing it to `toKStream` would make it absolutely clear what
>>>>> we are converting it to.
>>>>>> 
>>>>>> I'd say we should probably change the KStreamBuilder methods (but not
>>> in
>>>>> this KIP).
>>>>> 
>>>>> I would keep #toStream(). (see below)
>>>>> 
>>>>> 
>>>>> 5) We should not remove any methods but only deprecate them.
>>>>> 
>>>>> 
>>>>> 
>>>>> A general note:
>>>>> 
>>>>> I do not understand your comments "Rejected Alternatives". You say "Have
>>>>> the KTable be the materialized view" was rejected. But your KIP actually
>>>>> does exactly this -- the changelog abstraction of KTable is secondary
>>>>> after those changes and the "view" abstraction is what a KTable is. And
>>>>> just to be clear, I like this a lot:
>>>>> 
>>>>> - it aligns with the name KTable
>>>>> - is aligns with stream-table-duality
>>>>> - it aligns with IQ
>>>>> 
>>>>> I would say that a KTable is a "view abstraction" (as materialization is
>>>>> optional).
>>>>> 
>>>>> 
>>>>> 
>>>>> -Matthias
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> On 1/22/17 5:05 PM, Guozhang Wang wrote:
>>>>>> Thanks for the KIP Eno, I have a few meta comments and a few detailed
>>>>>> comments:
>>>>>> 
>>>>>> 1. I like the materialize() function in general, but I would like to
>>> see
>>>>>> how other KTable functions should be updated accordingly. For example,
>>> 1)
>>>>>> KStreamBuilder.table(..) has a state store name parameter, and we will
>>>>>> always materialize the KTable unless its state store name is set to
>>> null;
>>>>>> 2) KTable.agg requires the result KTable to be materialized, and hence
>>> it
>>>>>> also have a state store name; 3) KTable.join requires the joining table
>>>>> to
>>>>>> be materialized. And today we do not actually have a mechanism to
>>> enforce
>>>>>> that, but will only throw an exception at runtime if it is not (e.g. if
>>>>> you
>>>>>> have "builder.table("topic", null).join()" a RTE will be thrown).
>>>>>> 
>>>>>> I'd make an extended proposal just to kick off the discussion here:
>>> let's
>>>>>> remove all the state store params in other KTable functions, and if in
>>>>> some
>>>>>> cases KTable have to be materialized (e.g. KTable resulted from
>>> KXX.agg)
>>>>>> and users do not call materialize(), then we treat it as "users are not
>>>>>> interested in querying it at all" and hence use an internal name
>>>>> generated
>>>>>> for the materialized KTable; i.e. although it is materialized the state
>>>>>> store is not exposed to users. And if users call materialize()
>>> afterwards
>>>>>> but we have already decided to materialize it, we can replace the
>>>>> internal
>>>>>> name with the user's provided names. Then from a user's point-view, if
>>>>> they
>>>>>> ever want to query a KTable, they have to call materialize() with a
>>> given
>>>>>> state store name. This approach has one awkwardness though, that serdes
>>>>> and
>>>>>> state store names param are not separated and could be overlapped (see
>>>>>> detailed comment #2 below).
>>>>>> 
>>>>>> 
>>>>>> 2. This step does not need to be included in this KIP, but just as a
>>>>>> reference / future work: as we have discussed before, we may enforce
>>>>>> materialize KTable.join resulted KTables as well in the future. If we
>>> do
>>>>>> that, then:
>>>>>> 
>>>>>> a) KXX.agg resulted KTables are always materialized;
>>>>>> b) KTable.agg requires the aggregating KTable to always be materialized
>>>>>> (otherwise we would not know the old value);
>>>>>> c) KTable.join resulted KTables are always materialized, and so are the
>>>>>> joining KTables to always be materialized.
>>>>>> d) KTable.filter/mapValues resulted KTables materialization depend on
>>> its
>>>>>> parent's materialization;
>>>>>> 
>>>>>> By recursive induction all KTables are actually always materialized,
>>> and
>>>>>> then the effect of the "materialize()" is just for specifying the state
>>>>>> store names. In this scenario, we do not need to send Change<V> in
>>>>>> repartition topics within joins any more, but only for repartitions
>>>>> topics
>>>>>> within aggregations. Instead, we can just send a "tombstone" without
>>> the
>>>>>> old value and we do not need to calculate joins twice (one more time
>>> when
>>>>>> old value is received).
>>>>>> 
>>>>>> 3. I'm wondering if it is worth-while to add a "KStream#toTable()"
>>>>> function
>>>>>> which is interpreted as a dummy-aggregation where the new value always
>>>>>> replaces the old value. I have seen a couple of use cases of this, for
>>>>>> example, users want to read a changelog topic, apply some filters, and
>>>>> then
>>>>>> materialize it into a KTable with state stores without creating
>>>>> duplicated
>>>>>> changelog topics. With materialize() and toTable I'd imagine users can
>>>>>> specify sth. like:
>>>>>> 
>>>>>> "
>>>>>> KStream stream = builder.stream("topic1").filter(..);
>>>>>> KTable table = stream.toTable(..);
>>>>>> table.materialize("state1");
>>>>>> "
>>>>>> 
>>>>>> And the library in this case could set store "state1" 's changelog
>>> topic
>>>>> to
>>>>>> be "topic1", and applying the filter on the fly while (re-)storing its
>>>>>> state by reading from this topic, instead of creating a second
>>> changelog
>>>>>> topic like "appID-state1-changelog" which is a semi-duplicate of
>>>>> "topic1".
>>>>>> 
>>>>>> 
>>>>>> Detailed:
>>>>>> 
>>>>>> 1. I'm +1 with Michael regarding "#toStream"; actually I was thinking
>>>>> about
>>>>>> renaming to "#toChangeLog" but after thinking a bit more I think
>>>>> #toStream
>>>>>> is still better, and we can just mention in the javaDoc that it is
>>>>>> transforming its underlying changelog stream to a normal stream.
>>>>>> 2. As Damian mentioned, there are a few scenarios where the serdes are
>>>>>> already specified in a previous operation whereas it is not known
>>> before
>>>>>> calling materialize, for example:
>>>>>> stream.groupByKey.agg(serde).materialize(serde) v.s.
>>> table.mapValues(/*no
>>>>>> serde specified*/).materialize(serde). We need to specify what are the
>>>>>> handling logic here.
>>>>>> 3. We can remove "KTable#to" call as well, and enforce users to call "
>>>>>> KTable.toStream.to" to be more clear.
>>>>>> 
>>>>>> 
>>>>>> Guozhang
>>>>>> 
>>>>>> 
>>>>>> On Wed, Jan 18, 2017 at 3:22 AM, Eno Thereska <eno.there...@gmail.com>
>>>>>> wrote:
>>>>>> 
>>>>>>> I think changing it to `toKStream` would make it absolutely clear what
>>>>> we
>>>>>>> are converting it to.
>>>>>>> 
>>>>>>> I'd say we should probably change the KStreamBuilder methods (but not
>>> in
>>>>>>> this KIP).
>>>>>>> 
>>>>>>> Thanks
>>>>>>> Eno
>>>>>>> 
>>>>>>>> On 17 Jan 2017, at 13:59, Michael Noll <mich...@confluent.io> wrote:
>>>>>>>> 
>>>>>>>>> Rename toStream() to toKStream() for consistency.
>>>>>>>> 
>>>>>>>> Not sure whether that is really required. We also use
>>>>>>>> `KStreamBuilder#stream()` and `KStreamBuilder#table()`, for example,
>>>>> and
>>>>>>>> don't care about the "K" prefix.
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> On Tue, Jan 17, 2017 at 10:55 AM, Eno Thereska <
>>> eno.there...@gmail.com
>>>>>> 
>>>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> Thanks Damian, answers inline:
>>>>>>>>> 
>>>>>>>>>> On 16 Jan 2017, at 17:17, Damian Guy <damian....@gmail.com> wrote:
>>>>>>>>>> 
>>>>>>>>>> Hi Eno,
>>>>>>>>>> 
>>>>>>>>>> Thanks for the KIP. Some comments:
>>>>>>>>>> 
>>>>>>>>>> 1. I'd probably rename materialized to materialize.
>>>>>>>>> 
>>>>>>>>> Ok.
>>>>>>>>> 
>>>>>>>>>> 2. I don't think the addition of the new Log compaction mechanism
>>> is
>>>>>>>>>> necessary for this KIP, i.e, the KIP is useful without it. Maybe
>>>>> that
>>>>>>>>>> should be a different KIP?
>>>>>>>>> 
>>>>>>>>> Agreed, already removed. Will do a separate KIP for that.
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>>> 3. What will happen when you call materialize on KTable that is
>>>>>>> already
>>>>>>>>>> materialized? Will it create another StateStore (providing the
>>> name
>>>>> is
>>>>>>>>>> different), throw an Exception?
>>>>>>>>> 
>>>>>>>>> Currently an exception is thrown, but see below.
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>>> 4. Have you considered overloading the existing KTable operations
>>> to
>>>>>>>>> add
>>>>>>>>>> a state store name? So if a state store name is provided, then
>>>>>>>>> materialize
>>>>>>>>>> a state store? This would be my preferred approach as i don't
>>> think
>>>>>>>>>> materialize is always a valid operation.
>>>>>>>>> 
>>>>>>>>> Ok I can see your point. This will increase the KIP size since I'll
>>>>> need
>>>>>>>>> to enumerate all overloaded methods, but it's not a problem.
>>>>>>>>> 
>>>>>>>>>> 5. The materialize method will need ta value Serde as some
>>>>> operations,
>>>>>>>>>> i.e., mapValues, join etc can change the value types
>>>>>>>>>> 6. https://issues.apache.org/jira/browse/KAFKA-4609 - might mean
>>>>> that
>>>>>>>>> we
>>>>>>>>>> always need to materialize the StateStore for KTable-KTable joins.
>>>>> If
>>>>>>>>> that
>>>>>>>>>> is the case, then the KTable Join operators will also need Serde
>>>>>>>>>> information.
>>>>>>>>> 
>>>>>>>>> I'll update the KIP with the serdes.
>>>>>>>>> 
>>>>>>>>> Thanks
>>>>>>>>> Eno
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> Cheers,
>>>>>>>>>> Damian
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> On Mon, 16 Jan 2017 at 16:44 Eno Thereska <eno.there...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>> 
>>>>>>>>>>> Hello,
>>>>>>>>>>> 
>>>>>>>>>>> We created "KIP-114: KTable materialization and improved
>>> semantics"
>>>>> to
>>>>>>>>>>> solidify the KTable semantics in Kafka Streams:
>>>>>>>>>>> 
>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>>>> 114%3A+KTable+materialization+and+improved+semantics
>>>>>>>>>>> <
>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>>>> 114:+KTable+materialization+and+improved+semantics
>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> Your feedback is appreciated.
>>>>>>>>>>> Thanks
>>>>>>>>>>> Eno
>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>>> 
>>>>> 
>>>>> 
>>>> 
>>> 
>>> 
>> 
> 

Reply via email to