It is odd as the person that originally reported the problem has verified that it is fixed.
On Thu, 4 May 2017 at 08:31 Guozhang Wang <wangg...@gmail.com> wrote: > Ara, > > That is a bit weird, I double checked and agreed with Eno that this commit > is in both trunk and 0.10.2, so I suspect the same issue still persists in > trunk, hence there might be another issue that is not fixed in 2645. Could > you help verify if that is the case? In which we can re-open > https://issues.apache.org/jira/browse/KAFKA-4851 and investigate further. > > > Guozhang > > > On Tue, May 2, 2017 at 1:02 PM, Ara Ebrahimi <ara.ebrah...@argyledata.com> > wrote: > > > No errors. But if I enable caching I see performance drop considerably. > > The workaround was to disable caching. The same thing is still true in > > 10.2.1. > > > > Ara. > > > > > On May 2, 2017, at 12:55 PM, Eno Thereska <eno.there...@gmail.com> > > wrote: > > > > > > Hi Ara, > > > > > > The PR https://github.com/apache/kafka/pull/2645 has gone to both > trunk > > and > > > 0.10.2.1, I just checked. What error are you seeing, could you give us > an > > > update? > > > > > > Thanks > > > Eno > > > > > > On Fri, Apr 28, 2017 at 7:10 PM, Ara Ebrahimi < > > ara.ebrah...@argyledata.com> > > > wrote: > > > > > >> Hi, > > >> > > >> I upgraded to 0.10.2.1 yesterday, enabled caching for session windows > > and > > >> tested again. It doesn’t seem to be fixed? > > >> > > >> Ara. > > >> > > >>> On Mar 27, 2017, at 2:10 PM, Damian Guy <damian....@gmail.com> > wrote: > > >>> > > >>> Hi Ara, > > >>> > > >>> There is a performance issue in the 0.10.2 release of session > windows. > > It > > >>> is fixed with this PR: https://github.com/apache/kafka/pull/2645 > > >>> You can work around this on 0.10.2 by calling the aggregate(..), > > >> reduce(..) > > >>> etc methods and supplying StateStoreSupplier<SessionStore> with > > caching > > >>> disabled, i.e, by doing something like: > > >>> > > >>> final StateStoreSupplier<SessionStore> sessionStore = > > >>> Stores.create(*"session-store-name"*) > > >>> .withKeys(Serdes.String()) > > >>> .withValues(Serdes.String()) > > >>> .persistent() > > >>> .sessionWindowed(TimeUnit.MINUTES.toMillis(7)) > > >>> .build(); > > >>> > > >>> > > >>> The fix has also been cherry-picked to the 0.10.2 branch, so you > could > > >>> build from source and not have to create the StateStoreSupplier. > > >>> > > >>> Thanks, > > >>> Damian > > >>> > > >>> On Mon, 27 Mar 2017 at 21:56 Ara Ebrahimi < > ara.ebrah...@argyledata.com > > > > > >>> wrote: > > >>> > > >>> Thanks for the response Mathias! > > >>> > > >>> The reason we want this exact task assignment to happen is that a > > >> critical > > >>> part of our pipeline involves grouping relevant records together > > (that’s > > >>> what the aggregate function in the topology is for). And for hot keys > > >> this > > >>> can lead to sometimes 100s of records to get grouped together. Even > > >> worse, > > >>> these records are session bound, we use session windows. Hence we see > > >> lots > > >>> of activity around the store backing the aggregate function and even > > >> though > > >>> we use SSD drives we’re not seeing the kind of performance we want to > > >> see. > > >>> It seems like the aggregate function leads to lots of updates to > these > > >> hot > > >>> keys which lead to lots of rocksdb activity. > > >>> > > >>> Now there are many ways to fix this problem: > > >>> - just don’t aggregate, create an algorithm which is not reliant on > > >>> grouping/aggregating records. Not what we can do with our tight > > schedule > > >>> right now. > > >>> - do grouping/aggregating but employ n instances and rely on uniform > > >>> distribution of these tasks. This is the easiest solution and what we > > >>> expected to work but didn’t work as you can tell from this thread. We > > >> threw > > >>> 4 instances at it but only 2 got used. > > >>> - tune rocksdb? I tried this actually but it didn’t really help us > > much, > > >>> aside from the fact that tuning rocksdb is very tricky. > > >>> - use in-memory store instead? Unfortunately we have to use session > > >> windows > > >>> for this aggregate function and apparently there’s no in-memory > session > > >>> store impl? I tried to create one but soon realized it’s too much > work > > >> :) I > > >>> looked at default PartitionAssigner code too, but that ain’t trivial > > >> either. > > >>> > > >>> So I’m a bit hopeless :( > > >>> > > >>> Ara. > > >>> > > >>> On Mar 27, 2017, at 1:35 PM, Matthias J. Sax <matth...@confluent.io > > >> <mailto: > > >>> matth...@confluent.io>> wrote: > > >>> > > >>> > > >>> > > >>> > > >>> ________________________________ > > >>> > > >>> This message is for the designated recipient only and may contain > > >>> privileged, proprietary, or otherwise confidential information. If > you > > >> have > > >>> received it in error, please notify the sender immediately and delete > > the > > >>> original. Any other use of the e-mail by you is prohibited. Thank you > > in > > >>> advance for your cooperation. > > >>> > > >>> ________________________________ > > >>> > > >>> From: "Matthias J. Sax" <matth...@confluent.io<mailto: > > >> matth...@confluent.io > > >>>>> > > >>> Subject: Re: more uniform task assignment across kafka stream nodes > > >>> Date: March 27, 2017 at 1:35:30 PM PDT > > >>> To: users@kafka.apache.org<mailto:users@kafka.apache.org> > > >>> Reply-To: <users@kafka.apache.org<mailto:users@kafka.apache.org>> > > >>> > > >>> > > >>> Ara, > > >>> > > >>> thanks for the detailed information. > > >>> > > >>> If I parse this correctly, both instances run the same number of > tasks > > >>> (12 each). That is all Streams promises. > > >>> > > >>> To come back to your initial question: > > >>> > > >>> Is there a way to tell kafka streams to uniformly assign partitions > > >> across > > >>> instances? If I have n kafka streams instances running, I want each > to > > >>> handle EXACTLY 1/nth number of partitions. No dynamic task assignment > > >>> logic. Just dumb 1/n assignment. > > >>> > > >>> That is exactly what you get: each of you two instances get 24/2 = 12 > > >>> tasks assigned. That is dump 1/n assignment, isn't it? So my original > > >>> response was correct. > > >>> > > >>> However, I now understand better what you are actually meaning by > your > > >>> question. Note that Streams does not distinguish "type" of tasks -- > it > > >>> only sees 24 tasks and assigns those in a balanced way. > > >>> > > >>> Thus, currently there is no easy way to get the assignment you want > to > > >>> have, except, you implement you own `PartitionAssignor`. > > >>> > > >>> This is the current implementation for 0.10.2 > > >>> https://github.com/apache/kafka/blob/0.10.2/streams/src/ > > >> main/java/org/apache/kafka/streams/processor/internals/ > > >> StreamPartitionAssignor.java > > >>> > > >>> You can, if you wish write your own assignor and set it via > > >>> StreamsConfig. However, be aware that this might be tricky to get > right > > >>> and also might have runtime implications with regard to rebalancing > and > > >>> state store recovery. We recently improve the current implementation > to > > >>> avoid costly task movements: > > >>> https://issues.apache.org/jira/browse/KAFKA-4677 > > >>> > > >>> Thus, I would not recommend to implement an own `PartitionAssignor`. > > >>> > > >>> > > >>> However, the root question is, why do you need this exact assignment > > you > > >>> are looking for in the first place? Why is it "bad" if "types" of > tasks > > >>> are not distinguished? I would like to understand your requirement > > >>> better -- it might be worth to improve Streams here. > > >>> > > >>> > > >>> -Matthias > > >>> > > >>> > > >>> On 3/27/17 12:57 PM, Ara Ebrahimi wrote: > > >>> Hi, > > >>> > > >>> So, I simplified the topology by making sure we have only 1 source > > topic. > > >>> Now I have 1 source topic, 8 partitions, 2 instances. And here’s how > > the > > >>> topology looks like: > > >>> > > >>> instance 1: > > >>> > > >>> KafkaStreams processID: 48b58bc0-f600-4ec8-bc92-8cb3ea081aac > > >>> StreamsThread appId: mar-23-modular > > >>> StreamsThread clientId: mar-23-modular > > >>> StreamsThread threadId: StreamThread-1 > > >>> Active tasks: > > >>> StreamsTask taskId: 0_3 > > >>> ProcessorTopology: > > >>> KSTREAM-SOURCE-0000000000: > > >>> topics: [activities-avro-or] > > >>> children: [KSTREAM-FILTER-0000000001] > > >>> KSTREAM-FILTER-0000000001: > > >>> children: [KSTREAM-MAP-0000000002] > > >>> KSTREAM-MAP-0000000002: > > >>> children: [KSTREAM-BRANCH-0000000003] > > >>> KSTREAM-BRANCH-0000000003: > > >>> children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD- > > >> 0000000005] > > >>> KSTREAM-BRANCHCHILD-0000000004: > > >>> children: [KSTREAM-MAPVALUES-0000000006] > > >>> KSTREAM-MAPVALUES-0000000006: > > >>> children: [KSTREAM-FLATMAPVALUES-0000000007] > > >>> KSTREAM-FLATMAPVALUES-0000000007: > > >>> children: [KSTREAM-MAP-0000000008] > > >>> KSTREAM-MAP-0000000008: > > >>> children: [KSTREAM-FILTER-0000000011] > > >>> KSTREAM-FILTER-0000000011: > > >>> children: [KSTREAM-SINK-0000000010] > > >>> KSTREAM-SINK-0000000010: > > >>> topic: activities-by-phone-store-or-repartition > > >>> KSTREAM-BRANCHCHILD-0000000005: > > >>> Partitions [activities-avro-or-3] > > >>> Standby tasks: > > >>> > > >>> StreamsThread appId: mar-23-modular > > >>> StreamsThread clientId: mar-23-modular > > >>> StreamsThread threadId: StreamThread-2 > > >>> Active tasks: > > >>> StreamsTask taskId: 1_2 > > >>> ProcessorTopology: > > >>> KSTREAM-SOURCE-0000000012: > > >>> topics: [activities-by-phone-store-or-repartition] > > >>> children: [KSTREAM-AGGREGATE-0000000009] > > >>> KSTREAM-AGGREGATE-0000000009: > > >>> states: [activities-by-phone-store-or] > > >>> children: [KTABLE-TOSTREAM-0000000013] > > >>> KTABLE-TOSTREAM-0000000013: > > >>> children: [KSTREAM-FILTER-0000000014] > > >>> KSTREAM-FILTER-0000000014: > > >>> children: [KSTREAM-FILTER-0000000015] > > >>> KSTREAM-FILTER-0000000015: > > >>> children: [KSTREAM-MAP-0000000016] > > >>> KSTREAM-MAP-0000000016: > > >>> children: [KSTREAM-MAP-0000000017] > > >>> KSTREAM-MAP-0000000017: > > >>> children: [KSTREAM-SINK-0000000018] > > >>> KSTREAM-SINK-0000000018: > > >>> topic: ml-features-avro-or > > >>> Partitions [activities-by-phone-store-or-repartition-2] > > >>> Standby tasks: > > >>> > > >>> StreamsThread appId: mar-23-modular > > >>> StreamsThread clientId: mar-23-modular > > >>> StreamsThread threadId: StreamThread-3 > > >>> Active tasks: > > >>> StreamsTask taskId: 1_1 > > >>> ProcessorTopology: > > >>> KSTREAM-SOURCE-0000000012: > > >>> topics: [activities-by-phone-store-or-repartition] > > >>> children: [KSTREAM-AGGREGATE-0000000009] > > >>> KSTREAM-AGGREGATE-0000000009: > > >>> states: [activities-by-phone-store-or] > > >>> children: [KTABLE-TOSTREAM-0000000013] > > >>> KTABLE-TOSTREAM-0000000013: > > >>> children: [KSTREAM-FILTER-0000000014] > > >>> KSTREAM-FILTER-0000000014: > > >>> children: [KSTREAM-FILTER-0000000015] > > >>> KSTREAM-FILTER-0000000015: > > >>> children: [KSTREAM-MAP-0000000016] > > >>> KSTREAM-MAP-0000000016: > > >>> children: [KSTREAM-MAP-0000000017] > > >>> KSTREAM-MAP-0000000017: > > >>> children: [KSTREAM-SINK-0000000018] > > >>> KSTREAM-SINK-0000000018: > > >>> topic: ml-features-avro-or > > >>> Partitions [activities-by-phone-store-or-repartition-1] > > >>> StreamsTask taskId: 2_7 > > >>> ProcessorTopology: > > >>> KSTREAM-SOURCE-0000000019: > > >>> topics: [ml-features-avro-or] > > >>> Partitions [ml-features-avro-or-7] > > >>> Standby tasks: > > >>> > > >>> StreamsThread appId: mar-23-modular > > >>> StreamsThread clientId: mar-23-modular > > >>> StreamsThread threadId: StreamThread-4 > > >>> Active tasks: > > >>> StreamsTask taskId: 2_0 > > >>> ProcessorTopology: > > >>> KSTREAM-SOURCE-0000000019: > > >>> topics: [ml-features-avro-or] > > >>> Partitions [ml-features-avro-or-0] > > >>> StreamsTask taskId: 2_6 > > >>> ProcessorTopology: > > >>> KSTREAM-SOURCE-0000000019: > > >>> topics: [ml-features-avro-or] > > >>> Partitions [ml-features-avro-or-6] > > >>> Standby tasks: > > >>> > > >>> StreamsThread appId: mar-23-modular > > >>> StreamsThread clientId: mar-23-modular > > >>> StreamsThread threadId: StreamThread-5 > > >>> Active tasks: > > >>> StreamsTask taskId: 0_0 > > >>> ProcessorTopology: > > >>> KSTREAM-SOURCE-0000000000: > > >>> topics: [activities-avro-or] > > >>> children: [KSTREAM-FILTER-0000000001] > > >>> KSTREAM-FILTER-0000000001: > > >>> children: [KSTREAM-MAP-0000000002] > > >>> KSTREAM-MAP-0000000002: > > >>> children: [KSTREAM-BRANCH-0000000003] > > >>> KSTREAM-BRANCH-0000000003: > > >>> children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD- > > >> 0000000005] > > >>> KSTREAM-BRANCHCHILD-0000000004: > > >>> children: [KSTREAM-MAPVALUES-0000000006] > > >>> KSTREAM-MAPVALUES-0000000006: > > >>> children: [KSTREAM-FLATMAPVALUES-0000000007] > > >>> KSTREAM-FLATMAPVALUES-0000000007: > > >>> children: [KSTREAM-MAP-0000000008] > > >>> KSTREAM-MAP-0000000008: > > >>> children: [KSTREAM-FILTER-0000000011] > > >>> KSTREAM-FILTER-0000000011: > > >>> children: [KSTREAM-SINK-0000000010] > > >>> KSTREAM-SINK-0000000010: > > >>> topic: activities-by-phone-store-or-repartition > > >>> KSTREAM-BRANCHCHILD-0000000005: > > >>> Partitions [activities-avro-or-0] > > >>> StreamsTask taskId: 1_6 > > >>> ProcessorTopology: > > >>> KSTREAM-SOURCE-0000000012: > > >>> topics: [activities-by-phone-store-or-repartition] > > >>> children: [KSTREAM-AGGREGATE-0000000009] > > >>> KSTREAM-AGGREGATE-0000000009: > > >>> states: [activities-by-phone-store-or] > > >>> children: [KTABLE-TOSTREAM-0000000013] > > >>> KTABLE-TOSTREAM-0000000013: > > >>> children: [KSTREAM-FILTER-0000000014] > > >>> KSTREAM-FILTER-0000000014: > > >>> children: [KSTREAM-FILTER-0000000015] > > >>> KSTREAM-FILTER-0000000015: > > >>> children: [KSTREAM-MAP-0000000016] > > >>> KSTREAM-MAP-0000000016: > > >>> children: [KSTREAM-MAP-0000000017] > > >>> KSTREAM-MAP-0000000017: > > >>> children: [KSTREAM-SINK-0000000018] > > >>> KSTREAM-SINK-0000000018: > > >>> topic: ml-features-avro-or > > >>> Partitions [activities-by-phone-store-or-repartition-6] > > >>> Standby tasks: > > >>> > > >>> StreamsThread appId: mar-23-modular > > >>> StreamsThread clientId: mar-23-modular > > >>> StreamsThread threadId: StreamThread-6 > > >>> Active tasks: > > >>> StreamsTask taskId: 1_0 > > >>> ProcessorTopology: > > >>> KSTREAM-SOURCE-0000000012: > > >>> topics: [activities-by-phone-store-or-repartition] > > >>> children: [KSTREAM-AGGREGATE-0000000009] > > >>> KSTREAM-AGGREGATE-0000000009: > > >>> states: [activities-by-phone-store-or] > > >>> children: [KTABLE-TOSTREAM-0000000013] > > >>> KTABLE-TOSTREAM-0000000013: > > >>> children: [KSTREAM-FILTER-0000000014] > > >>> KSTREAM-FILTER-0000000014: > > >>> children: [KSTREAM-FILTER-0000000015] > > >>> KSTREAM-FILTER-0000000015: > > >>> children: [KSTREAM-MAP-0000000016] > > >>> KSTREAM-MAP-0000000016: > > >>> children: [KSTREAM-MAP-0000000017] > > >>> KSTREAM-MAP-0000000017: > > >>> children: [KSTREAM-SINK-0000000018] > > >>> KSTREAM-SINK-0000000018: > > >>> topic: ml-features-avro-or > > >>> Partitions [activities-by-phone-store-or-repartition-0] > > >>> StreamsTask taskId: 0_7 > > >>> ProcessorTopology: > > >>> KSTREAM-SOURCE-0000000000: > > >>> topics: [activities-avro-or] > > >>> children: [KSTREAM-FILTER-0000000001] > > >>> KSTREAM-FILTER-0000000001: > > >>> children: [KSTREAM-MAP-0000000002] > > >>> KSTREAM-MAP-0000000002: > > >>> children: [KSTREAM-BRANCH-0000000003] > > >>> KSTREAM-BRANCH-0000000003: > > >>> children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD- > > >> 0000000005] > > >>> KSTREAM-BRANCHCHILD-0000000004: > > >>> children: [KSTREAM-MAPVALUES-0000000006] > > >>> KSTREAM-MAPVALUES-0000000006: > > >>> children: [KSTREAM-FLATMAPVALUES-0000000007] > > >>> KSTREAM-FLATMAPVALUES-0000000007: > > >>> children: [KSTREAM-MAP-0000000008] > > >>> KSTREAM-MAP-0000000008: > > >>> children: [KSTREAM-FILTER-0000000011] > > >>> KSTREAM-FILTER-0000000011: > > >>> children: [KSTREAM-SINK-0000000010] > > >>> KSTREAM-SINK-0000000010: > > >>> topic: activities-by-phone-store-or-repartition > > >>> KSTREAM-BRANCHCHILD-0000000005: > > >>> Partitions [activities-avro-or-7] > > >>> Standby tasks: > > >>> > > >>> StreamsThread appId: mar-23-modular > > >>> StreamsThread clientId: mar-23-modular > > >>> StreamsThread threadId: StreamThread-7 > > >>> Active tasks: > > >>> StreamsTask taskId: 2_4 > > >>> ProcessorTopology: > > >>> KSTREAM-SOURCE-0000000019: > > >>> topics: [ml-features-avro-or] > > >>> Partitions [ml-features-avro-or-4] > > >>> Standby tasks: > > >>> > > >>> StreamsThread appId: mar-23-modular > > >>> StreamsThread clientId: mar-23-modular > > >>> StreamsThread threadId: StreamThread-8 > > >>> Active tasks: > > >>> StreamsTask taskId: 1_3 > > >>> ProcessorTopology: > > >>> KSTREAM-SOURCE-0000000012: > > >>> topics: [activities-by-phone-store-or-repartition] > > >>> children: [KSTREAM-AGGREGATE-0000000009] > > >>> KSTREAM-AGGREGATE-0000000009: > > >>> states: [activities-by-phone-store-or] > > >>> children: [KTABLE-TOSTREAM-0000000013] > > >>> KTABLE-TOSTREAM-0000000013: > > >>> children: [KSTREAM-FILTER-0000000014] > > >>> KSTREAM-FILTER-0000000014: > > >>> children: [KSTREAM-FILTER-0000000015] > > >>> KSTREAM-FILTER-0000000015: > > >>> children: [KSTREAM-MAP-0000000016] > > >>> KSTREAM-MAP-0000000016: > > >>> children: [KSTREAM-MAP-0000000017] > > >>> KSTREAM-MAP-0000000017: > > >>> children: [KSTREAM-SINK-0000000018] > > >>> KSTREAM-SINK-0000000018: > > >>> topic: ml-features-avro-or > > >>> Partitions [activities-by-phone-store-or-repartition-3] > > >>> Standby tasks: > > >>> > > >>> > > >>> instance 2: > > >>> > > >>> KafkaStreams processID: 092072f8-87be-4989-a94f-0ed544f5ca44 > > >>> StreamsThread appId: mar-23-modular > > >>> StreamsThread clientId: mar-23-modular > > >>> StreamsThread threadId: StreamThread-1 > > >>> Active tasks: > > >>> StreamsTask taskId: 2_1 > > >>> ProcessorTopology: > > >>> KSTREAM-SOURCE-0000000019: > > >>> topics: [ml-features-avro-or] > > >>> Partitions [ml-features-avro-or-1] > > >>> StreamsTask taskId: 2_5 > > >>> ProcessorTopology: > > >>> KSTREAM-SOURCE-0000000019: > > >>> topics: [ml-features-avro-or] > > >>> Partitions [ml-features-avro-or-5] > > >>> Standby tasks: > > >>> > > >>> StreamsThread appId: mar-23-modular > > >>> StreamsThread clientId: mar-23-modular > > >>> StreamsThread threadId: StreamThread-2 > > >>> Active tasks: > > >>> StreamsTask taskId: 0_4 > > >>> ProcessorTopology: > > >>> KSTREAM-SOURCE-0000000000: > > >>> topics: [activities-avro-or] > > >>> children: [KSTREAM-FILTER-0000000001] > > >>> KSTREAM-FILTER-0000000001: > > >>> children: [KSTREAM-MAP-0000000002] > > >>> KSTREAM-MAP-0000000002: > > >>> children: [KSTREAM-BRANCH-0000000003] > > >>> KSTREAM-BRANCH-0000000003: > > >>> children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD- > > >> 0000000005] > > >>> KSTREAM-BRANCHCHILD-0000000004: > > >>> children: [KSTREAM-MAPVALUES-0000000006] > > >>> KSTREAM-MAPVALUES-0000000006: > > >>> children: [KSTREAM-FLATMAPVALUES-0000000007] > > >>> KSTREAM-FLATMAPVALUES-0000000007: > > >>> children: [KSTREAM-MAP-0000000008] > > >>> KSTREAM-MAP-0000000008: > > >>> children: [KSTREAM-FILTER-0000000011] > > >>> KSTREAM-FILTER-0000000011: > > >>> children: [KSTREAM-SINK-0000000010] > > >>> KSTREAM-SINK-0000000010: > > >>> topic: activities-by-phone-store-or-repartition > > >>> KSTREAM-BRANCHCHILD-0000000005: > > >>> Partitions [activities-avro-or-4] > > >>> Standby tasks: > > >>> > > >>> StreamsThread appId: mar-23-modular > > >>> StreamsThread clientId: mar-23-modular > > >>> StreamsThread threadId: StreamThread-3 > > >>> Active tasks: > > >>> StreamsTask taskId: 2_2 > > >>> ProcessorTopology: > > >>> KSTREAM-SOURCE-0000000019: > > >>> topics: [ml-features-avro-or] > > >>> Partitions [ml-features-avro-or-2] > > >>> StreamsTask taskId: 1_7 > > >>> ProcessorTopology: > > >>> KSTREAM-SOURCE-0000000012: > > >>> topics: [activities-by-phone-store-or-repartition] > > >>> children: [KSTREAM-AGGREGATE-0000000009] > > >>> KSTREAM-AGGREGATE-0000000009: > > >>> states: [activities-by-phone-store-or] > > >>> children: [KTABLE-TOSTREAM-0000000013] > > >>> KTABLE-TOSTREAM-0000000013: > > >>> children: [KSTREAM-FILTER-0000000014] > > >>> KSTREAM-FILTER-0000000014: > > >>> children: [KSTREAM-FILTER-0000000015] > > >>> KSTREAM-FILTER-0000000015: > > >>> children: [KSTREAM-MAP-0000000016] > > >>> KSTREAM-MAP-0000000016: > > >>> children: [KSTREAM-MAP-0000000017] > > >>> KSTREAM-MAP-0000000017: > > >>> children: [KSTREAM-SINK-0000000018] > > >>> KSTREAM-SINK-0000000018: > > >>> topic: ml-features-avro-or > > >>> Partitions [activities-by-phone-store-or-repartition-7] > > >>> Standby tasks: > > >>> > > >>> StreamsThread appId: mar-23-modular > > >>> StreamsThread clientId: mar-23-modular > > >>> StreamsThread threadId: StreamThread-4 > > >>> Active tasks: > > >>> StreamsTask taskId: 2_3 > > >>> ProcessorTopology: > > >>> KSTREAM-SOURCE-0000000019: > > >>> topics: [ml-features-avro-or] > > >>> Partitions [ml-features-avro-or-3] > > >>> Standby tasks: > > >>> > > >>> StreamsThread appId: mar-23-modular > > >>> StreamsThread clientId: mar-23-modular > > >>> StreamsThread threadId: StreamThread-5 > > >>> Active tasks: > > >>> StreamsTask taskId: 0_1 > > >>> ProcessorTopology: > > >>> KSTREAM-SOURCE-0000000000: > > >>> topics: [activities-avro-or] > > >>> children: [KSTREAM-FILTER-0000000001] > > >>> KSTREAM-FILTER-0000000001: > > >>> children: [KSTREAM-MAP-0000000002] > > >>> KSTREAM-MAP-0000000002: > > >>> children: [KSTREAM-BRANCH-0000000003] > > >>> KSTREAM-BRANCH-0000000003: > > >>> children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD- > > >> 0000000005] > > >>> KSTREAM-BRANCHCHILD-0000000004: > > >>> children: [KSTREAM-MAPVALUES-0000000006] > > >>> KSTREAM-MAPVALUES-0000000006: > > >>> children: [KSTREAM-FLATMAPVALUES-0000000007] > > >>> KSTREAM-FLATMAPVALUES-0000000007: > > >>> children: [KSTREAM-MAP-0000000008] > > >>> KSTREAM-MAP-0000000008: > > >>> children: [KSTREAM-FILTER-0000000011] > > >>> KSTREAM-FILTER-0000000011: > > >>> children: [KSTREAM-SINK-0000000010] > > >>> KSTREAM-SINK-0000000010: > > >>> topic: activities-by-phone-store-or-repartition > > >>> KSTREAM-BRANCHCHILD-0000000005: > > >>> Partitions [activities-avro-or-1] > > >>> StreamsTask taskId: 1_5 > > >>> ProcessorTopology: > > >>> KSTREAM-SOURCE-0000000012: > > >>> topics: [activities-by-phone-store-or-repartition] > > >>> children: [KSTREAM-AGGREGATE-0000000009] > > >>> KSTREAM-AGGREGATE-0000000009: > > >>> states: [activities-by-phone-store-or] > > >>> children: [KTABLE-TOSTREAM-0000000013] > > >>> KTABLE-TOSTREAM-0000000013: > > >>> children: [KSTREAM-FILTER-0000000014] > > >>> KSTREAM-FILTER-0000000014: > > >>> children: [KSTREAM-FILTER-0000000015] > > >>> KSTREAM-FILTER-0000000015: > > >>> children: [KSTREAM-MAP-0000000016] > > >>> KSTREAM-MAP-0000000016: > > >>> children: [KSTREAM-MAP-0000000017] > > >>> KSTREAM-MAP-0000000017: > > >>> children: [KSTREAM-SINK-0000000018] > > >>> KSTREAM-SINK-0000000018: > > >>> topic: ml-features-avro-or > > >>> Partitions [activities-by-phone-store-or-repartition-5] > > >>> Standby tasks: > > >>> > > >>> StreamsThread appId: mar-23-modular > > >>> StreamsThread clientId: mar-23-modular > > >>> StreamsThread threadId: StreamThread-6 > > >>> Active tasks: > > >>> StreamsTask taskId: 1_4 > > >>> ProcessorTopology: > > >>> KSTREAM-SOURCE-0000000012: > > >>> topics: [activities-by-phone-store-or-repartition] > > >>> children: [KSTREAM-AGGREGATE-0000000009] > > >>> KSTREAM-AGGREGATE-0000000009: > > >>> states: [activities-by-phone-store-or] > > >>> children: [KTABLE-TOSTREAM-0000000013] > > >>> KTABLE-TOSTREAM-0000000013: > > >>> children: [KSTREAM-FILTER-0000000014] > > >>> KSTREAM-FILTER-0000000014: > > >>> children: [KSTREAM-FILTER-0000000015] > > >>> KSTREAM-FILTER-0000000015: > > >>> children: [KSTREAM-MAP-0000000016] > > >>> KSTREAM-MAP-0000000016: > > >>> children: [KSTREAM-MAP-0000000017] > > >>> KSTREAM-MAP-0000000017: > > >>> children: [KSTREAM-SINK-0000000018] > > >>> KSTREAM-SINK-0000000018: > > >>> topic: ml-features-avro-or > > >>> Partitions [activities-by-phone-store-or-repartition-4] > > >>> Standby tasks: > > >>> > > >>> StreamsThread appId: mar-23-modular > > >>> StreamsThread clientId: mar-23-modular > > >>> StreamsThread threadId: StreamThread-7 > > >>> Active tasks: > > >>> StreamsTask taskId: 0_2 > > >>> ProcessorTopology: > > >>> KSTREAM-SOURCE-0000000000: > > >>> topics: [activities-avro-or] > > >>> children: [KSTREAM-FILTER-0000000001] > > >>> KSTREAM-FILTER-0000000001: > > >>> children: [KSTREAM-MAP-0000000002] > > >>> KSTREAM-MAP-0000000002: > > >>> children: [KSTREAM-BRANCH-0000000003] > > >>> KSTREAM-BRANCH-0000000003: > > >>> children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD- > > >> 0000000005] > > >>> KSTREAM-BRANCHCHILD-0000000004: > > >>> children: [KSTREAM-MAPVALUES-0000000006] > > >>> KSTREAM-MAPVALUES-0000000006: > > >>> children: [KSTREAM-FLATMAPVALUES-0000000007] > > >>> KSTREAM-FLATMAPVALUES-0000000007: > > >>> children: [KSTREAM-MAP-0000000008] > > >>> KSTREAM-MAP-0000000008: > > >>> children: [KSTREAM-FILTER-0000000011] > > >>> KSTREAM-FILTER-0000000011: > > >>> children: [KSTREAM-SINK-0000000010] > > >>> KSTREAM-SINK-0000000010: > > >>> topic: activities-by-phone-store-or-repartition > > >>> KSTREAM-BRANCHCHILD-0000000005: > > >>> Partitions [activities-avro-or-2] > > >>> StreamsTask taskId: 0_6 > > >>> ProcessorTopology: > > >>> KSTREAM-SOURCE-0000000000: > > >>> topics: [activities-avro-or] > > >>> children: [KSTREAM-FILTER-0000000001] > > >>> KSTREAM-FILTER-0000000001: > > >>> children: [KSTREAM-MAP-0000000002] > > >>> KSTREAM-MAP-0000000002: > > >>> children: [KSTREAM-BRANCH-0000000003] > > >>> KSTREAM-BRANCH-0000000003: > > >>> children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD- > > >> 0000000005] > > >>> KSTREAM-BRANCHCHILD-0000000004: > > >>> children: [KSTREAM-MAPVALUES-0000000006] > > >>> KSTREAM-MAPVALUES-0000000006: > > >>> children: [KSTREAM-FLATMAPVALUES-0000000007] > > >>> KSTREAM-FLATMAPVALUES-0000000007: > > >>> children: [KSTREAM-MAP-0000000008] > > >>> KSTREAM-MAP-0000000008: > > >>> children: [KSTREAM-FILTER-0000000011] > > >>> KSTREAM-FILTER-0000000011: > > >>> children: [KSTREAM-SINK-0000000010] > > >>> KSTREAM-SINK-0000000010: > > >>> topic: activities-by-phone-store-or-repartition > > >>> KSTREAM-BRANCHCHILD-0000000005: > > >>> Partitions [activities-avro-or-6] > > >>> Standby tasks: > > >>> > > >>> StreamsThread appId: mar-23-modular > > >>> StreamsThread clientId: mar-23-modular > > >>> StreamsThread threadId: StreamThread-8 > > >>> Active tasks: > > >>> StreamsTask taskId: 0_5 > > >>> ProcessorTopology: > > >>> KSTREAM-SOURCE-0000000000: > > >>> topics: [activities-avro-or] > > >>> children: [KSTREAM-FILTER-0000000001] > > >>> KSTREAM-FILTER-0000000001: > > >>> children: [KSTREAM-MAP-0000000002] > > >>> KSTREAM-MAP-0000000002: > > >>> children: [KSTREAM-BRANCH-0000000003] > > >>> KSTREAM-BRANCH-0000000003: > > >>> children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD- > > >> 0000000005] > > >>> KSTREAM-BRANCHCHILD-0000000004: > > >>> children: [KSTREAM-MAPVALUES-0000000006] > > >>> KSTREAM-MAPVALUES-0000000006: > > >>> children: [KSTREAM-FLATMAPVALUES-0000000007] > > >>> KSTREAM-FLATMAPVALUES-0000000007: > > >>> children: [KSTREAM-MAP-0000000008] > > >>> KSTREAM-MAP-0000000008: > > >>> children: [KSTREAM-FILTER-0000000011] > > >>> KSTREAM-FILTER-0000000011: > > >>> children: [KSTREAM-SINK-0000000010] > > >>> KSTREAM-SINK-0000000010: > > >>> topic: activities-by-phone-store-or-repartition > > >>> KSTREAM-BRANCHCHILD-0000000005: > > >>> Partitions [activities-avro-or-5] > > >>> Standby tasks: > > >>> > > >>> > > >>> activities-avro-or is input topic. ml-features-avro-or is output > topic. > > >> In > > >>> the middle we have an aggregate (activities-by-phone-store-or- > > >> repartition). > > >>> > > >>> On instance 1 I see 3 tasks for activities-avro-or and on instance 2 > I > > >> see > > >>> 5. Bad. > > >>> > > >>> On instance 1 see 4 tasks for ml-features-avro-or. And 4 on instance > 2. > > >>> Good. > > >>> > > >>> On instance 1 see 5 tasks for activities-by-phone-store-or- > > repartition. > > >> And > > >>> 3 on instance 2. Bad. > > >>> > > >>> As I said in terms of offsets for all these partitions I see uniform > > >>> distribution, so we’re not dealing with a bad key scenario. > > >>> > > >>> Ara. > > >>> > > >>> On Mar 25, 2017, at 6:43 PM, Matthias J. Sax <matth...@confluent.io > > >> <mailto: > > >>> matth...@confluent.io>> wrote: > > >>> > > >>> > > >>> > > >>> > > >>> ________________________________ > > >>> > > >>> This message is for the designated recipient only and may contain > > >>> privileged, proprietary, or otherwise confidential information. If > you > > >> have > > >>> received it in error, please notify the sender immediately and delete > > the > > >>> original. Any other use of the e-mail by you is prohibited. Thank you > > in > > >>> advance for your cooperation. > > >>> > > >>> ________________________________ > > >>> > > >>> From: "Matthias J. Sax" <matth...@confluent.io<mailto: > > >> matth...@confluent.io > > >>>>> > > >>> Subject: Re: more uniform task assignment across kafka stream nodes > > >>> Date: March 25, 2017 at 6:43:12 PM PDT > > >>> To: users@kafka.apache.org<mailto:users@kafka.apache.org> > > >>> Reply-To: <users@kafka.apache.org<mailto:users@kafka.apache.org>> > > >>> > > >>> > > >>> Please share the rest of your topology code (without any UDFs / > > business > > >>> logic). Otherwise, I cannot give further advice. > > >>> > > >>> -Matthias > > >>> > > >>> > > >>> On 3/25/17 6:08 PM, Ara Ebrahimi wrote: > > >>> Via: > > >>> > > >>> builder.stream("topic1"); > > >>> builder.stream("topic2"); > > >>> builder.stream("topic3”); > > >>> > > >>> These are different kinds of topics consuming different avro objects. > > >>> > > >>> Ara. > > >>> > > >>> On Mar 25, 2017, at 6:04 PM, Matthias J. Sax <matth...@confluent.io > > >> <mailto: > > >>> matth...@confluent.io><mailto:matth...@confluent.io>> wrote: > > >>> > > >>> > > >>> > > >>> > > >>> ________________________________ > > >>> > > >>> This message is for the designated recipient only and may contain > > >>> privileged, proprietary, or otherwise confidential information. If > you > > >> have > > >>> received it in error, please notify the sender immediately and delete > > the > > >>> original. Any other use of the e-mail by you is prohibited. Thank you > > in > > >>> advance for your cooperation. > > >>> > > >>> ________________________________ > > >>> > > >>> From: "Matthias J. Sax" <matth...@confluent.io<mailto: > > >> matth...@confluent.io > > >>>> <mailto:matth...@confluent.io>> > > >>> Subject: Re: more uniform task assignment across kafka stream nodes > > >>> Date: March 25, 2017 at 6:04:30 PM PDT > > >>> To: users@kafka.apache.org<mailto:users@kafka.apache.org><mailto: > > >>> users@kafka.apache.org> > > >>> Reply-To: <users@kafka.apache.org<mailto:users@kafka.apache.org> > > <mailto: > > >>> users@kafka.apache.org>> > > >>> > > >>> > > >>> Ara, > > >>> > > >>> How do you consume your topics? Via > > >>> > > >>> builder.stream("topic1", "topic2", "topic3); > > >>> > > >>> or via > > >>> > > >>> builder.stream("topic1"); > > >>> builder.stream("topic2"); > > >>> builder.stream("topic3"); > > >>> > > >>> Both and handled differently with regard to creating tasks (partition > > to > > >>> task assignment also depends on you downstream code though). > > >>> > > >>> If this does not help, can you maybe share the structure of > processing? > > >>> To dig deeper, we would need to know the topology DAG. > > >>> > > >>> > > >>> -Matthias > > >>> > > >>> > > >>> On 3/25/17 5:56 PM, Ara Ebrahimi wrote: > > >>> Mathias, > > >>> > > >>> This apparently happens because we have more than 1 source topic. We > > >> have 3 > > >>> source topics in the same application. So it seems like the task > > >> assignment > > >>> algorithm creates topologies not for one specific topic at a time but > > the > > >>> total partitions across all source topics consumed in an application > > >>> instance. Because we have some code dependencies between these 3 > source > > >>> topics we can’t separate them into 3 applications at this time. Hence > > the > > >>> reason I want to get the task assignment algorithm basically do a > > uniform > > >>> and simple task assignment PER source topic. > > >>> > > >>> Ara. > > >>> > > >>> On Mar 25, 2017, at 5:21 PM, Matthias J. Sax <matth...@confluent.io > > >> <mailto: > > >>> matth...@confluent.io><mailto:matth...@confluent.io><mailto: > > >>> matth...@confluent.io>> wrote: > > >>> > > >>> > > >>> > > >>> > > >>> ________________________________ > > >>> > > >>> This message is for the designated recipient only and may contain > > >>> privileged, proprietary, or otherwise confidential information. If > you > > >> have > > >>> received it in error, please notify the sender immediately and delete > > the > > >>> original. Any other use of the e-mail by you is prohibited. Thank you > > in > > >>> advance for your cooperation. > > >>> > > >>> ________________________________ > > >>> > > >>> From: "Matthias J. Sax" <matth...@confluent.io<mailto: > > >> matth...@confluent.io > > >>>> <mailto:matth...@confluent.io><mailto:matth...@confluent.io>> > > >>> Subject: Re: more uniform task assignment across kafka stream nodes > > >>> Date: March 25, 2017 at 5:21:47 PM PDT > > >>> To: users@kafka.apache.org<mailto:users@kafka.apache.org><mailto: > > >>> users@kafka.apache.org><mailto:users@kafka.apache.org> > > >>> Reply-To: <users@kafka.apache.org<mailto:users@kafka.apache.org> > > <mailto: > > >>> users@kafka.apache.org><mailto:users@kafka.apache.org>> > > >>> > > >>> > > >>> Hi, > > >>> > > >>> I am wondering why this happens in the first place. Streams, > > >>> load-balanced over all running instances, and each instance should be > > >>> the same number of tasks (and thus partitions) assigned. > > >>> > > >>> What is the overall assignment? Do you have StandyBy tasks > configured? > > >>> What version do you use? > > >>> > > >>> > > >>> -Matthias > > >>> > > >>> > > >>> On 3/24/17 8:09 PM, Ara Ebrahimi wrote: > > >>> Hi, > > >>> > > >>> Is there a way to tell kafka streams to uniformly assign partitions > > >> across > > >>> instances? If I have n kafka streams instances running, I want each > to > > >>> handle EXACTLY 1/nth number of partitions. No dynamic task assignment > > >>> logic. Just dumb 1/n assignment. > > >>> > > >>> Here’s our scenario. Lets say we have an “source" topic with 8 > > >> partitions. > > >>> We also have 2 kafka streams instances. Each instances get assigned > to > > >>> handle 4 “source" topic partitions. BUT then we do a few maps and an > > >>> aggregate. So data gets shuffled around. The map function uniformly > > >>> distributes these across all partitions (I can verify that by looking > > at > > >>> the partition offsets). After the map what I notice by looking at the > > >>> topology is that one kafka streams instance get assigned to handle > say > > 2 > > >>> aggregate repartition topics and the other one gets assigned 6. Even > > >> worse, > > >>> on bigger clusters (say 4 instances) we see say 2 nodes gets assigned > > >>> downstream aggregate repartition topics and 2 other nodes assigned > > >> NOTHING > > >>> to handle. > > >>> > > >>> Ara. > > >>> > > >>> > > >>> > > >>> ________________________________ > > >>> > > >>> This message is for the designated recipient only and may contain > > >>> privileged, proprietary, or otherwise confidential information. If > you > > >> have > > >>> received it in error, please notify the sender immediately and delete > > the > > >>> original. Any other use of the e-mail by you is prohibited. Thank you > > in > > >>> advance for your cooperation. > > >>> > > >>> ________________________________ > > >>> > > >>> > > >>> > > >>> > > >>> > > >>> > > >>> > > >>> > > >>> ________________________________ > > >>> > > >>> This message is for the designated recipient only and may contain > > >>> privileged, proprietary, or otherwise confidential information. If > you > > >> have > > >>> received it in error, please notify the sender immediately and delete > > the > > >>> original. Any other use of the e-mail by you is prohibited. Thank you > > in > > >>> advance for your cooperation. > > >>> > > >>> ________________________________ > > >>> > > >>> > > >>> > > >>> > > >>> ________________________________ > > >>> > > >>> This message is for the designated recipient only and may contain > > >>> privileged, proprietary, or otherwise confidential information. If > you > > >> have > > >>> received it in error, please notify the sender immediately and delete > > the > > >>> original. Any other use of the e-mail by you is prohibited. Thank you > > in > > >>> advance for your cooperation. > > >>> > > >>> ________________________________ > > >>> > > >>> > > >>> > > >>> > > >>> ________________________________ > > >>> > > >>> This message is for the designated recipient only and may contain > > >>> privileged, proprietary, or otherwise confidential information. If > you > > >> have > > >>> received it in error, please notify the sender immediately and delete > > the > > >>> original. Any other use of the e-mail by you is prohibited. Thank you > > in > > >>> advance for your cooperation. > > >>> > > >>> ________________________________ > > >>> > > >>> > > >>> > > >>> > > >>> > > >>> > > >>> > > >>> > > >>> ________________________________ > > >>> > > >>> This message is for the designated recipient only and may contain > > >>> privileged, proprietary, or otherwise confidential information. If > you > > >> have > > >>> received it in error, please notify the sender immediately and delete > > the > > >>> original. Any other use of the e-mail by you is prohibited. Thank you > > in > > >>> advance for your cooperation. > > >>> > > >>> ________________________________ > > >>> > > >>> > > >>> > > >>> ________________________________ > > >>> > > >>> This message is for the designated recipient only and may contain > > >> privileged, proprietary, or otherwise confidential information. If you > > have > > >> received it in error, please notify the sender immediately and delete > > the > > >> original. Any other use of the e-mail by you is prohibited. Thank you > in > > >> advance for your cooperation. > > >>> > > >>> ________________________________ > > >> > > >> > > >> > > >> > > >> ________________________________ > > >> > > >> This message is for the designated recipient only and may contain > > >> privileged, proprietary, or otherwise confidential information. If you > > have > > >> received it in error, please notify the sender immediately and delete > > the > > >> original. Any other use of the e-mail by you is prohibited. Thank you > in > > >> advance for your cooperation. > > >> > > >> ________________________________ > > >> > > > > > > > > > > > > ________________________________ > > > > > > This message is for the designated recipient only and may contain > > privileged, proprietary, or otherwise confidential information. If you > have > > received it in error, please notify the sender immediately and delete the > > original. Any other use of the e-mail by you is prohibited. Thank you in > > advance for your cooperation. > > > > > > ________________________________ > > > > > > > > > > ________________________________ > > > > This message is for the designated recipient only and may contain > > privileged, proprietary, or otherwise confidential information. If you > have > > received it in error, please notify the sender immediately and delete the > > original. Any other use of the e-mail by you is prohibited. Thank you in > > advance for your cooperation. > > > > ________________________________ > > > > > > -- > -- Guozhang >