Thanks for the comment Matthias. After all the discussion (thanks to all
participants), I think this (single method that passes in a RecordContext
object) is the best alternative.
Just a side note: I think KAFKA-3907 [1] can also be integrated into the
KIP by adding related method inside RecordContext interface.


[1] https://issues.apache.org/jira/browse/KAFKA-3907


Cheers,
Jeyhun

On Tue, Jun 13, 2017 at 7:50 PM Matthias J. Sax <matth...@confluent.io>
wrote:

> Hi,
>
> I would like to push this discussion further. It seems we got nice
> alternatives (thanks for the summary Jeyhun!).
>
> With respect to RichFunctions and allowing them to be stateful, I have
> my doubt as expressed already. From my understanding, the idea was to
> give access to record metadata information only. If you want to do a
> stateful computation you should rather use #transform().
>
> Furthermore, as pointed out, we would need to switch to a
> supplier-pattern introducing many more overloads.
>
> For those reason, I advocate for a simple interface with a single method
> that passes in a RecordContext object.
>
>
> -Matthias
>
>
> On 6/6/17 5:15 PM, Guozhang Wang wrote:
> > Thanks for the comprehensive summary!
> >
> > Personally I'd prefer the option of passing RecordContext as an
> additional
> > parameter into he overloaded function. But I'm also open to other
> arguments
> > if there are sth. that I have overlooked.
> >
> > Guozhang
> >
> >
> > On Mon, Jun 5, 2017 at 3:19 PM, Jeyhun Karimov <je.kari...@gmail.com>
> wrote:
> >
> >> Hi,
> >>
> >> Thanks for your comments Matthias and Guozhang.
> >>
> >> Below I mention the quick summary of the main alternatives we looked at
> to
> >> introduce the Rich functions (I will refer to it as Rich functions
> until we
> >> find better/another name). Initially the proposed alternatives was not
> >> backwards-compatible, so I will not mention them.
> >> The related discussions are spread in KIP-149 and in this KIP (KIP-159)
> >> discussion threads.
> >>
> >>
> >>
> >> 1. The idea of rich functions came into the stage with KIP-149, in
> >> discussion thread. As a result we extended KIP-149 to support Rich
> >> functions as well.
> >>
> >> 2.  To as part of the Rich functions, we provided init
> (ProcessorContext)
> >> method. Afterwards, Dammian suggested that we should not provide
> >> ProcessorContext to users. As a result, we separated the two problems
> into
> >> two separate KIPs, as it seems they can be solved in parallel.
> >>
> >> - One approach we considered was :
> >>
> >> public interface ValueMapperWithKey<K, V, VR> {
> >>     VR apply(final K key, final V value);
> >> }
> >>
> >> public interface RichValueMapper<K, V, VR> extends RichFunction{
> >> }
> >>
> >> public interface RichFunction {
> >>     void init(RecordContext recordContext);
> >>     void close();
> >> }
> >>
> >> public interface RecordContext {
> >>     String applicationId();
> >>     TaskId taskId();
> >>     StreamsMetrics metrics();
> >>     String topic();
> >>     int partition();
> >>     long offset();
> >>     long timestamp();
> >>     Map<String, Object> appConfigs();
> >>     Map<String, Object> appConfigsWithPrefix(String prefix);
> >> }
> >>
> >>
> >> public interface ProcessorContext extends RecordContext {
> >>    // all methods but the ones in RecordContext
> >> }
> >>
> >> As a result:
> >> * . All "withKey" and "withoutKey" interfaces can be converted to their
> >> Rich counterparts (with empty init() and close() methods)
> >> *. All related Processors will accept Rich interfaces in their
> >> constructors.
> >> *. So, we convert the related "withKey" or "withoutKey" interfaces to
> Rich
> >> interface while building the topology and initialize the related
> processors
> >> with Rich interfaces only.
> >> *. We will not need to overloaded methods for rich functions as Rich
> >> interfaces extend withKey interfaces. We will just check the object type
> >> and act accordingly.
> >>
> >>
> >>
> >>
> >> 3. There was some thoughts that the above approach does not support
> lambdas
> >> so we should support only one method, only init(RecordContext), as part
> of
> >> Rich interfaces.
> >> This is still in discussion. Personally I think Rich interfaces are by
> >> definition lambda-free and we should not care much about it.
> >>
> >>
> >> 4. Thanks to Matthias's discussion, an alternative we considered was to
> >> pass in the RecordContext as method parameter.  This might even allow to
> >> use Lambdas and we could keep the name RichFunction as we preserve the
> >> nature of being a function.
> >> "If you go with `init()` and `close()` we basically
> >> allow users to have an in-memory state for a function. Thus, we cannot
> >> share a single instance of RichValueMapper (etc) over multiple tasks and
> >> we would need a supplier pattern similar to #transform(). And this would
> >> "break the flow" of the API, as (Rich)ValueMapperSupplier would not
> >> inherit from ValueMapper and thus we would need many new overload for
> >> KStream/KTable classes". (Copy paste from Matthias's email)
> >>
> >>
> >> Cheers,
> >> Jeyhun
> >>
> >>
> >> On Mon, Jun 5, 2017 at 5:18 AM Matthias J. Sax <matth...@confluent.io>
> >> wrote:
> >>
> >>> Yes, we did consider this, and there is no consensus yet what the best
> >>> alternative is.
> >>>
> >>> @Jeyhun: the email thread got pretty long. Maybe you can give a quick
> >>> summary of the current state of the discussion?
> >>>
> >>>
> >>> -Matthias
> >>>
> >>> On 6/4/17 6:04 PM, Guozhang Wang wrote:
> >>>> Thanks for the explanation Jeyhun and Matthias.
> >>>>
> >>>> I have just read through both KIP-149 and KIP-159 and am wondering if
> >> you
> >>>> guys have considered a slight different approach for rich function,
> >> that
> >>> is
> >>>> to add the `RecordContext` into the apply functions as an additional
> >>>> parameter. For example:
> >>>>
> >>>> ---------------------------
> >>>>
> >>>> interface RichValueMapper<V, VR> {
> >>>>
> >>>> VR apply(final V value, final RecordContext context);
> >>>>
> >>>> }
> >>>>
> >>>> ...
> >>>>
> >>>> // then in KStreams
> >>>>
> >>>> <VR> KStream<K, VR> mapValues(ValueMapper<? super V, ? extends VR>
> >>> mapper);
> >>>> <VR> KStream<K, VR> mapValueswithContext(RichValueMapper <? super V, ?
> >>>> extends VR> mapper);
> >>>>
> >>>> -------------------------------
> >>>>
> >>>> The caveat is that it will introduces more overloads; but I think the
> >>>> #.overloads are mainly introduced by 1) serde overrides and 2)
> >>>> state-store-supplier overides, both of which can be reduced in the
> near
> >>>> future, and I felt this overloading is still worthwhile, as it has the
> >>>> following benefits:
> >>>>
> >>>> 1) still allow lambda expressions.
> >>>> 2) clearer code path (do not need to "convert" from non-rich functions
> >> to
> >>>> rich functions)
> >>>>
> >>>>
> >>>> Maybe this approach has already been discussed and I may have
> >> overlooked
> >>> in
> >>>> the email thread; anyways, lmk.
> >>>>
> >>>>
> >>>> Guozhang
> >>>>
> >>>>
> >>>>
> >>>> On Thu, Jun 1, 2017 at 10:18 PM, Matthias J. Sax <
> >> matth...@confluent.io>
> >>>> wrote:
> >>>>
> >>>>> I agree with Jeyhun. As already mention, the overall API improvement
> >>>>> ideas are overlapping and/or contradicting each other. For this
> >> reason,
> >>>>> not all ideas can be accomplished and some Jira might just be closed
> >> as
> >>>>> "won't fix".
> >>>>>
> >>>>> For this reason, we try to do those KIP discussion with are large
> >> scope
> >>>>> to get an overall picture to converge to an overall consisted API.
> >>>>>
> >>>>>
> >>>>> @Jeyhun: about the overloads. Yes, we might get more overload. It
> >> might
> >>>>> be sufficient though, to do a single xxxWithContext() overload that
> >> will
> >>>>> provide key+value+context. Otherwise, if might get too messy having
> >>>>> ValueMapper, ValueMapperWithKey, ValueMapperWithContext,
> >>>>> ValueMapperWithKeyWithContext.
> >>>>>
> >>>>> On the other hand, we also have the "builder pattern" idea as an API
> >>>>> change and this might mitigate the overload problem. Not for simple
> >>>>> function like map/flatMap etc but for joins and aggregations.
> >>>>>
> >>>>>
> >>>>> On the other hand, as I mentioned in an older email, I am personally
> >>>>> fine to break the pure functional interface, and add
> >>>>>
> >>>>>   - interface WithRecordContext with method `open(RecordContext)` (or
> >>>>> `init(...)`, or any better name) -- but not `close()`)
> >>>>>
> >>>>>   - interface ValueMapperWithRecordContext extends ValueMapper,
> >>>>> WithRecordContext
> >>>>>
> >>>>> This would allow us to avoid any overload. Of course, we don't get a
> >>>>> "pure function" interface and also sacrifices Lambdas.
> >>>>>
> >>>>>
> >>>>>
> >>>>> I am personally a little bit undecided what the better option might
> >> be.
> >>>>> Curious to hear what other think about this trade off.
> >>>>>
> >>>>>
> >>>>> -Matthias
> >>>>>
> >>>>>
> >>>>> On 6/1/17 6:13 PM, Jeyhun Karimov wrote:
> >>>>>> Hi Guozhang,
> >>>>>>
> >>>>>> It subsumes partially. Initially the idea was to support
> >> RichFunctions
> >>>>> as a
> >>>>>> separate interface. Throughout the discussion, however, we
> considered
> >>>>> maybe
> >>>>>> overloading the related methods (with RecodContext param) is better
> >>>>>> approach than providing a separate RichFunction interface.
> >>>>>>
> >>>>>> Cheers,
> >>>>>> Jeyhun
> >>>>>>
> >>>>>> On Fri, Jun 2, 2017 at 2:27 AM Guozhang Wang <wangg...@gmail.com>
> >>> wrote:
> >>>>>>
> >>>>>>> Does this KIP subsume this ticket as well?
> >>>>>>> https://issues.apache.org/jira/browse/KAFKA-4125
> >>>>>>>
> >>>>>>> On Sat, May 20, 2017 at 9:05 AM, Jeyhun Karimov <
> >> je.kari...@gmail.com
> >>>>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Dear community,
> >>>>>>>>
> >>>>>>>> As we discussed in KIP-149 [DISCUSS] thread [1], I would like to
> >>>>> initiate
> >>>>>>>> KIP for rich functions (interfaces) [2].
> >>>>>>>> I would like to get your comments.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> [1]
> >>>>>>>> http://search-hadoop.com/m/Kafka/uyzND1PMjdk2CslH12?subj=
> >>>>>>>> Re+DISCUSS+KIP+149+Enabling+key+access+in+
> >>>>> ValueTransformer+ValueMapper+
> >>>>>>>> and+ValueJoiner
> >>>>>>>> [2]
> >>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >>>>>>>> 159%3A+Introducing+Rich+functions+to+Streams
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Cheers,
> >>>>>>>> Jeyhun
> >>>>>>>> --
> >>>>>>>> -Cheers
> >>>>>>>>
> >>>>>>>> Jeyhun
> >>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> --
> >>>>>>> -- Guozhang
> >>>>>>>
> >>>>>
> >>>>>
> >>>>
> >>>>
> >>>
> >>> --
> >> -Cheers
> >>
> >> Jeyhun
> >>
> >
> >
> >
>
> --
-Cheers

Jeyhun

Reply via email to