Processor API does not do any automatic repartitioning. If you need to repartition data, you always need to do it "manually" by writeing to and reading back from a topic.
-Matthias On 12/19/17 3:49 AM, Sameer Kumar wrote: > Could you please clarify, if i just choose to use low level processor api, > what directs it to do re partitioning. I am not using them in conjunction > with DSLs, I plan to use only them. > Apart from sink processors, are there any conditions for re partitioning to > occur. > > -Sameer. > > On Tue, Dec 19, 2017 at 11:06 AM, Sameer Kumar <sam.kum.w...@gmail.com> > wrote: > >> I understand it now, even if we are able to attach custom partitioning. >> The data shall still travel from stream nodes to broker on join topic, so >> travel to network will still be there. >> >> -Sameer. >> >> On Tue, Dec 19, 2017 at 1:17 AM, Matthias J. Sax <matth...@confluent.io> >> wrote: >> >>>> need to map the keys, modify them >>>>> and then do a join. >>> >>> This will always trigger a rebalance. There is no API atm to tell KS >>> that partitioning is preserved. >>> >>> Custom partitioner won't help for your case as far as I understand it. >>> >>> >>> -Matthias >>> >>> On 12/17/17 9:48 PM, Sameer Kumar wrote: >>>> Actually, I am doing joining after map. I need to map the keys, modify >>> them >>>> and then do a join. >>>> >>>> I was thinking of using always passing a partition key based on which >>>> partition happens. >>>> Step by step flow is:- >>>> 1. Data is already partitoned by do userid. >>>> 2. I do a map to joins impressions tied to a user with view >>> notifications. >>>> 3. I count valid impressions across different aggregations(i.e. across >>> diff >>>> dimension groups). >>>> >>>> Thanks, >>>> -Sameer. >>>> >>>> On Mon, Dec 18, 2017 at 1:37 AM, Matthias J. Sax <matth...@confluent.io >>>> >>>> wrote: >>>> >>>>> Two comments: >>>>> >>>>> 1) As long, as you don't do an aggregation/join after a map(), there >>>>> will be not repartitioning. Streams does repartitioning "lazy", ie, >>> only >>>>> if it's required. As long as you only chain filter/map etc, no >>>>> repartitioning will be done. >>>>> >>>>> 2) Can't you use mapValue() instead of map()? If you use map() to only >>>>> read the key but only modify the value (-> "data is still local") a >>>>> custom partitioner won't help. Also, we are improving this in upcoming >>>>> version 1.1 and allows read access to a key in mapValue() (cf. KIP-149 >>>>> for details). >>>>> >>>>> Hope this helps. >>>>> >>>>> >>>>> -Matthias >>>>> >>>>> On 12/17/17 8:20 AM, Sameer Kumar wrote: >>>>>> I have multiple map and filter phases in my application dag and >>> though I >>>>> am >>>>>> generating different keys at different points, the data is still >>> local. >>>>>> Re-partitioning for me here is adding unnecessary network shuffling, I >>>>> want >>>>>> to minimize it. >>>>>> >>>>>> -Sameer. >>>>>> >>>>>> On Friday, December 15, 2017, Matthias J. Sax <matth...@confluent.io> >>>>> wrote: >>>>>> >>>>>>> It's not recommended to write a custom partitioner because it's >>> pretty >>>>>>> difficult to write a correct one. There are many dependencies and you >>>>>>> need deep knowledge of Kafka Streams internals to get it write. >>>>>>> Otherwise, your custom partitioner breaks Kafka Streams. >>>>>>> >>>>>>> That is the reason why it's not documented... >>>>>>> >>>>>>> Not sure so, what you try to achieve in the first place. What do you >>>>>>> mean by >>>>>>> >>>>>>>> I want to make sure that during map phase, the keys >>>>>>>>> produced adhere to the customized partitioner. >>>>>>> >>>>>>> Maybe you achieve what you want differently. >>>>>>> >>>>>>> >>>>>>> -Matthias >>>>>>> >>>>>>> On 12/15/17 1:19 AM, Sameer Kumar wrote: >>>>>>>> Hi, >>>>>>>> >>>>>>>> I want to use the custom partitioner in streams, I couldnt find the >>>>> same >>>>>>> in >>>>>>>> the documentation. I want to make sure that during map phase, the >>> keys >>>>>>>> produced adhere to the customized partitioner. >>>>>>>> >>>>>>>> -Sameer. >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>> >>> >>> >> >
signature.asc
Description: OpenPGP digital signature