I was thinking about the source stream/table idea once more and it seems
it would not be too hard to implement:

We add two new classes

  SourceKStream extends KStream

and

  SourceKTable extend KTable

and return both from StreamsBuilder#stream and StreamsBuilder#table

As both are sub-classes, this change is backward compatible. We change
the return type for any single-record transform to this new types, too,
and use KStream/KTable as return type for any multi-record operation.

The new RecordContext API is added to both new classes. For old classes,
we only implement KIP-149 to get access to the key.


WDYT?


-Matthias

On 11/9/17 9:13 PM, Jan Filipiak wrote:
> Okay,
> 
> looks like it would _at least work_ for Cached KTableSources .
> But we make it harder to the user to make mistakes by putting
> features into places where they don't make sense and don't
> help anyone.
> 
> I once again think that my suggestion is easier to implement and
> more correct. I will use this email to express my disagreement with the
> proposed KIP (-1 non binding of course) state that I am open for any
> questions
> regarding this. I will also do the usual thing and point out that the
> friends
> over at Hive got it correct aswell.
> One can not user their
> https://cwiki.apache.org/confluence/display/Hive/LanguageManual+VirtualColumns
> 
> in any place where its not read from the Sources.
> 
> With KSQl in mind it makes me sad how this is evolving here.
> 
> Best Jan
> 
> 
> 
> 
> 
> On 10.11.2017 01:06, Guozhang Wang wrote:
>> Hello Jan,
>>
>> Regarding your question about caching: today we keep the record context
>> with the cached entry already so when we flush the cache which may
>> generate
>> new records forwarding we will set the record context appropriately; and
>> then after the flush is completed we will reset the context to the record
>> before the flush happens. But I think when Jeyhun did the PR it is a good
>> time to double check on such stages to make sure we are not
>> introducing any
>> regressions.
>>
>>
>> Guozhang
>>
>>
>> On Mon, Nov 6, 2017 at 8:54 PM, Jan Filipiak <jan.filip...@trivago.com>
>> wrote:
>>
>>> I Aggree completely.
>>>
>>> Exposing this information in a place where it has no _natural_ belonging
>>> might really be a bad blocker in the long run.
>>>
>>> Concerning your first point. I would argue its not to hard to have a
>>> user
>>> keep track of these. If we still don't want the user
>>> to keep track of these I would argue that all > projection only <
>>> transformations on a Source-backed KTable/KStream
>>> could also return a Ktable/KStream instance of the type we return
>>> from the
>>> topology builder.
>>> Only after any operation that exceeds projection or filter one would
>>> return a KTable not granting access to this any longer.
>>>
>>> Even then its difficult already: I never ran a topology with caching
>>> but I
>>> am not even 100% sure what the record Context means behind
>>> a materialized KTable with Caching? Topic and Partition are probably
>>> with
>>> some reasoning but offset is probably only the offset causing the flush?
>>> So one might aswell think to drop offsets from this RecordContext.
>>>
>>> Best Jan
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On 07.11.2017 03:18, Guozhang Wang wrote:
>>>
>>>> Regarding the API design (the proposed set of overloads v.s. one
>>>> overload
>>>> on #map to enrich the record), I think what we have represents a good
>>>> trade-off between API succinctness and user convenience: on one hand we
>>>> definitely want to keep as fewer overloaded functions as possible.
>>>> But on
>>>> the other hand if we only do that in, say, the #map() function then
>>>> this
>>>> enrichment could be an overkill: think of a topology that has 7
>>>> operators
>>>> in a chain, where users want to access the record context on
>>>> operator #2
>>>> and #6 only, with the "enrichment" manner they need to do the
>>>> enrichment
>>>> on
>>>> operator #2 and keep it that way until #6. In addition, the
>>>> RecordContext
>>>> fields (topic, offset, etc) are really orthogonal to the key-value
>>>> payloads
>>>> themselves, so I think separating them into this object is a cleaner
>>>> way.
>>>>
>>>> Regarding the RecordContext inheritance, this is actually a good point
>>>> that
>>>> have not been discussed thoroughly before. Here are my my two cents:
>>>> one
>>>> natural way would be to inherit the record context from the
>>>> "triggering"
>>>> record, for example in a join operator, if the record from stream A
>>>> triggers the join then the record context is inherited from with that
>>>> record. This is also aligned with the lower-level PAPI interface. A
>>>> counter
>>>> argument, though, would be that this is sort of leaking the internal
>>>> implementations of the DSL, so that moving forward if we did some
>>>> refactoring to our join implementations so that the triggering
>>>> record can
>>>> change, the RecordContext would also be different. I do not know how
>>>> much
>>>> it would really affect end users, but would like to hear your opinions.
>>>>
>>> Agreed to 100% exposing this information
>>>
>>>
>>>> Guozhang
>>>>
>>>>
>>>> On Mon, Nov 6, 2017 at 1:00 PM, Jeyhun Karimov <je.kari...@gmail.com>
>>>> wrote:
>>>>
>>>> Hi Jan,
>>>>> Sorry for late reply.
>>>>>
>>>>>
>>>>> The API Design doesn't look appealing
>>>>>
>>>>>
>>>>> In terms of API design we tried to preserve the java functional
>>>>> interfaces.
>>>>> We applied the same set of rich methods for KTable to make it
>>>>> compatible
>>>>> with the rest of overloaded APIs.
>>>>>
>>>>> It should be 100% sufficient to offer a KTable + KStream that is
>>>>> directly
>>>>>
>>>>>> feed from a topic with 1 additional overload for the #map()
>>>>>> methods to
>>>>>> cover every usecase while keeping the API in a way better state.
>>>>>>
>>>>> - IMO this seems a workaround, rather than a direct solution.
>>>>>
>>>>> Perhaps we should continue this discussion in DISCUSS thread.
>>>>>
>>>>>
>>>>> Cheers,
>>>>> Jeyhun
>>>>>
>>>>>
>>>>> On Mon, Nov 6, 2017 at 9:14 PM Jan Filipiak <jan.filip...@trivago.com>
>>>>> wrote:
>>>>>
>>>>> Hi.
>>>>>> I do understand that it might come in Handy.
>>>>>>    From my POV in any relational algebra this is only a projection.
>>>>>> Currently we hide these "fields" that come with the input record.
>>>>>> It should be 100% sufficient to offer a KTable + KStream that is
>>>>>> directly
>>>>>> feed from a topic with 1 additional overload for the #map()
>>>>>> methods to
>>>>>> cover every usecase while keeping the API in a way better state.
>>>>>>
>>>>>> best Jan
>>>>>>
>>>>>> On 06.11.2017 17:52, Matthias J. Sax wrote:
>>>>>>
>>>>>>> Jan,
>>>>>>>
>>>>>>> I understand what you are saying. However, having a RecordContext is
>>>>>>> super useful for operations that are applied to input topic. Many
>>>>>>> users
>>>>>>> requested this feature -- it's much more convenient that falling
>>>>>>> back
>>>>>>>
>>>>>> to
>>>>>> transform() to implement a a filter() for example that want to access
>>>>>>> some meta data.
>>>>>>>
>>>>>>> Because we cannot distinguish different "origins" of a
>>>>>>> KStream/KTable,
>>>>>>>
>>>>>> I
>>>>>> am not sure if there would be a better way to do this. The only
>>>>>>> "workaround" I see, is to have two KStream/KTable interfaces each
>>>>>>> and
>>>>>>>
>>>>>> we
>>>>>> would use the first one for KStream/KTable with "proper"
>>>>>> RecordContext.
>>>>>>> But this does not seem to be a good solution either.
>>>>>>>
>>>>>>> Note, a KTable can also be read directly from a topic, I agree that
>>>>>>> using RecordContext on a KTable that is the result of an
>>>>>>> aggregation is
>>>>>>> questionable. But I don't see a reason to down vote the KIP for this
>>>>>>>
>>>>>> reason.
>>>>>>
>>>>>>> WDYT about this?
>>>>>>>
>>>>>>>
>>>>>>> -Matthias
>>>>>>>
>>>>>>> On 11/1/17 10:19 PM, Jan Filipiak wrote:
>>>>>>>
>>>>>>>> -1 non binding
>>>>>>>>
>>>>>>>> I don't get the motivation.
>>>>>>>> In 80% of my DSL processors there is no such thing as a reasonable
>>>>>>>> RecordContext.
>>>>>>>> After a join  the record I am processing belongs to at least 2
>>>>>>>> topics.
>>>>>>>> After a Group by the record I am processing was created from
>>>>>>>> multiple
>>>>>>>> offsets.
>>>>>>>>
>>>>>>>> The API Design doesn't look appealing
>>>>>>>>
>>>>>>>> Best Jan
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On 01.11.2017 22:02, Jeyhun Karimov wrote:
>>>>>>>>
>>>>>>>>> Dear community,
>>>>>>>>>
>>>>>>>>> It seems the discussion for KIP-159 [1] converged finally. I would
>>>>>>>>> like to
>>>>>>>>> initiate voting for the particular KIP.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>>>>
>>>>>>>> 159%3A+Introducing+Rich+functions+to+Streams
>>>>>>>>> Cheers,
>>>>>>>>> Jeyhun
>>>>>>>>>
>>>>>>>>>
>>>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to