I see your point about "when to add the processor to the topology". That
is indeed an issue. Not sure it we could allow "updates" to the topology...

I don't see any problem with having all the withXX() in KTable interface
-- but this might be subjective.


However, I don't understand your argument about putting aggregate()
after the withXX() -- all the calls to withXX() set optional parameters
for aggregate() and not for groupBy() -- but a groupBy().withXX()
indicates that the withXX() belongs to the groupBy(). IMHO, this might
be quite confusion for developers.


-Matthias

On 6/28/17 2:55 AM, Damian Guy wrote:
>> I also think that mixing optional parameters with configs is a bad idea.
>> Have not proposal for this atm but just wanted to mention it. Hope to
>> find some time to come up with something.
>>
>>
> Yes, i don't like the mix of config either. But the only real config here
> is the logging config - which we don't really need as it can already be
> done via a custom StateStoreSupplier.
> 
> 
>> What I don't like in the current proposal is the
>> .grouped().withKeyMapper() -- the current solution with .groupBy(...)
>> and .groupByKey() seems better. For clarity, we could rename to
>> .groupByNewKey(...) and .groupByCurrentKey() (even if we should find
>> some better names).
>>
>>
> it could be groupByKey(), groupBy() or something different bt
> 
> 
> 
>> The proposed pattern "chains" grouping and aggregation too close
>> together. I would rather separate both more than less, ie, do into the
>> opposite direction.
>>
>> I am also wondering, if we could so something more "fluent". The initial
>> proposal was like:
>>
>>>> groupedStream.count()
>>>>    .withStoreName("name")
>>>>    .withCachingEnabled(false)
>>>>    .withLoggingEnabled(config)
>>>>    .table()
>>
>> The .table() statement in the end was kinda alien.
>>
> 
> I agree, but then all of the withXXX methods need to be on KTable which is
> worse in my opinion. You also need something that is going to "build" the
> internal processors and add them to the topology.
> 
> 
>> The current proposal put the count() into the end -- ie, the optional
>> parameter for count() have to specified on the .grouped() call -- this
>> does not seems to be the best way either.
>>
>>
> I actually prefer this method as you are building a grouped stream that you
> will aggregate. So table.grouped(...).withOptionalStuff().aggregate(..) etc
> seems natural to me.
> 
> 
>> I did not think this through in detail, but can't we just do the initial
>> proposal with the .table() ?
>>
>> groupedStream.count().withStoreName("name").mapValues(...)
>>
>> Each .withXXX(...) return the current KTable and all the .withXXX() are
>> just added to the KTable interface. Or do I miss anything why this wont'
>> work or any obvious disadvantage?
>>
>>
>>
> See above.
> 
> 
>>
>> -Matthias
>>
>> On 6/22/17 4:06 AM, Damian Guy wrote:
>>> Thanks everyone. My latest attempt is below. It builds on the fluent
>>> approach, but i think it is slightly nicer.
>>> I agree with some of what Eno said about mixing configy stuff in the DSL,
>>> but i think that enabling caching and enabling logging are things that
>>> aren't actually config. I'd probably not add withLogConfig(...) (even
>>> though it is below) as this is actually config and we already have a way
>> of
>>> doing that, via the StateStoreSupplier. Arguably we could use the
>>> StateStoreSupplier for disabling caching etc, but as it stands that is a
>>> bit of a tedious process for someone that just wants to use the default
>>> storage engine, but not have caching enabled.
>>>
>>> There is also an orthogonal concern that Guozhang alluded to.... If you
>>> want to plug in a custom storage engine and you want it to be logged etc,
>>> you would currently need to implement that yourself. Ideally we can
>> provide
>>> a way where we will wrap the custom store with logging, metrics, etc. I
>>> need to think about where this fits, it is probably more appropriate on
>> the
>>> Stores API.
>>>
>>> final KeyValueMapper<String, String, Long> keyMapper = null;
>>> // count with mapped key
>>> final KTable<Long, Long> count = stream.grouped()
>>>         .withKeyMapper(keyMapper)
>>>         .withKeySerde(Serdes.Long())
>>>         .withValueSerde(Serdes.String())
>>>         .withQueryableName("my-store")
>>>         .count();
>>>
>>> // windowed count
>>> final KTable<Windowed<String>, Long> windowedCount = stream.grouped()
>>>         .withQueryableName("my-window-store")
>>>         .windowed(TimeWindows.of(10L).until(10))
>>>         .count();
>>>
>>> // windowed reduce
>>> final Reducer<String> windowedReducer = null;
>>> final KTable<Windowed<String>, String> windowedReduce = stream.grouped()
>>>         .withQueryableName("my-window-store")
>>>         .windowed(TimeWindows.of(10L).until(10))
>>>         .reduce(windowedReducer);
>>>
>>> final Aggregator<String, String, Long> aggregator = null;
>>> final Initializer<Long> init = null;
>>>
>>> // aggregate
>>> final KTable<String, Long> aggregate = stream.grouped()
>>>         .withQueryableName("my-aggregate-store")
>>>         .aggregate(aggregator, init, Serdes.Long());
>>>
>>> final StateStoreSupplier<KeyValueStore<String, Long>> stateStoreSupplier
>> = null;
>>>
>>> // aggregate with custom store
>>> final KTable<String, Long> aggWithCustomStore = stream.grouped()
>>>         .withStateStoreSupplier(stateStoreSupplier)
>>>         .aggregate(aggregator, init);
>>>
>>> // disable caching
>>> stream.grouped()
>>>         .withQueryableName("name")
>>>         .withCachingEnabled(false)
>>>         .count();
>>>
>>> // disable logging
>>> stream.grouped()
>>>         .withQueryableName("q")
>>>         .withLoggingEnabled(false)
>>>         .count();
>>>
>>> // override log config
>>> final Reducer<String> reducer = null;
>>> stream.grouped()
>>>         .withLogConfig(Collections.singletonMap("segment.size", "10"))
>>>         .reduce(reducer);
>>>
>>>
>>> If anyone wants to play around with this you can find the code here:
>>> https://github.com/dguy/kafka/tree/dsl-experiment
>>>
>>> Note: It won't actually work as most of the methods just return null.
>>>
>>> Thanks,
>>> Damian
>>>
>>>
>>> On Thu, 22 Jun 2017 at 11:18 Ismael Juma <ism...@juma.me.uk> wrote:
>>>
>>>> Thanks Damian. I think both options have pros and cons. And both are
>> better
>>>> than overload abuse.
>>>>
>>>> The fluent API approach reads better, no mention of builder or build
>>>> anywhere. The main downside is that the method signatures are a little
>> less
>>>> clear. By reading the method signature, one doesn't necessarily knows
>> what
>>>> it returns. Also, one needs to figure out the special method (`table()`
>> in
>>>> this case) that gives you what you actually care about (`KTable` in this
>>>> case). Not major issues, but worth mentioning while doing the
>> comparison.
>>>>
>>>> The builder approach avoids the issues mentioned above, but it doesn't
>> read
>>>> as well.
>>>>
>>>> Ismael
>>>>
>>>> On Wed, Jun 21, 2017 at 3:37 PM, Damian Guy <damian....@gmail.com>
>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I'd like to get a discussion going around some of the API choices we've
>>>>> made in the DLS. In particular those that relate to stateful operations
>>>>> (though this could expand).
>>>>> As it stands we lean heavily on overloaded methods in the API, i.e,
>> there
>>>>> are 9 overloads for KGroupedStream.count(..)! It is becoming noisy and
>> i
>>>>> feel it is only going to get worse as we add more optional params. In
>>>>> particular we've had some requests to be able to turn caching off, or
>>>>> change log configs,  on a per operator basis (note this can be done now
>>>> if
>>>>> you pass in a StateStoreSupplier, but this can be a bit cumbersome).
>>>>>
>>>>> So this is a bit of an open question. How can we change the DSL
>> overloads
>>>>> so that it flows, is simple to use and understand, and is easily
>> extended
>>>>> in the future?
>>>>>
>>>>> One option would be to use a fluent API approach for providing the
>>>> optional
>>>>> params, so something like this:
>>>>>
>>>>> groupedStream.count()
>>>>>    .withStoreName("name")
>>>>>    .withCachingEnabled(false)
>>>>>    .withLoggingEnabled(config)
>>>>>    .table()
>>>>>
>>>>>
>>>>>
>>>>> Another option would be to provide a Builder to the count method, so it
>>>>> would look something like this:
>>>>> groupedStream.count(new
>>>>> CountBuilder("storeName").withCachingEnabled(false).build())
>>>>>
>>>>> Another option is to say: Hey we don't need this, what are you on
>> about!
>>>>>
>>>>> The above has focussed on state store related overloads, but the same
>>>> ideas
>>>>> could  be applied to joins etc, where we presently have many join
>> methods
>>>>> and many overloads.
>>>>>
>>>>> Anyway, i look forward to hearing your opinions.
>>>>>
>>>>> Thanks,
>>>>> Damian
>>>>>
>>>>
>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to