Hi Ara, The PR https://github.com/apache/kafka/pull/2645 has gone to both trunk and 0.10.2.1, I just checked. What error are you seeing, could you give us an update?
Thanks Eno On Fri, Apr 28, 2017 at 7:10 PM, Ara Ebrahimi <ara.ebrah...@argyledata.com> wrote: > Hi, > > I upgraded to 0.10.2.1 yesterday, enabled caching for session windows and > tested again. It doesn’t seem to be fixed? > > Ara. > > > On Mar 27, 2017, at 2:10 PM, Damian Guy <damian....@gmail.com> wrote: > > > > Hi Ara, > > > > There is a performance issue in the 0.10.2 release of session windows. It > > is fixed with this PR: https://github.com/apache/kafka/pull/2645 > > You can work around this on 0.10.2 by calling the aggregate(..), > reduce(..) > > etc methods and supplying StateStoreSupplier<SessionStore> with caching > > disabled, i.e, by doing something like: > > > > final StateStoreSupplier<SessionStore> sessionStore = > > Stores.create(*"session-store-name"*) > > .withKeys(Serdes.String()) > > .withValues(Serdes.String()) > > .persistent() > > .sessionWindowed(TimeUnit.MINUTES.toMillis(7)) > > .build(); > > > > > > The fix has also been cherry-picked to the 0.10.2 branch, so you could > > build from source and not have to create the StateStoreSupplier. > > > > Thanks, > > Damian > > > > On Mon, 27 Mar 2017 at 21:56 Ara Ebrahimi <ara.ebrah...@argyledata.com> > > wrote: > > > > Thanks for the response Mathias! > > > > The reason we want this exact task assignment to happen is that a > critical > > part of our pipeline involves grouping relevant records together (that’s > > what the aggregate function in the topology is for). And for hot keys > this > > can lead to sometimes 100s of records to get grouped together. Even > worse, > > these records are session bound, we use session windows. Hence we see > lots > > of activity around the store backing the aggregate function and even > though > > we use SSD drives we’re not seeing the kind of performance we want to > see. > > It seems like the aggregate function leads to lots of updates to these > hot > > keys which lead to lots of rocksdb activity. > > > > Now there are many ways to fix this problem: > > - just don’t aggregate, create an algorithm which is not reliant on > > grouping/aggregating records. Not what we can do with our tight schedule > > right now. > > - do grouping/aggregating but employ n instances and rely on uniform > > distribution of these tasks. This is the easiest solution and what we > > expected to work but didn’t work as you can tell from this thread. We > threw > > 4 instances at it but only 2 got used. > > - tune rocksdb? I tried this actually but it didn’t really help us much, > > aside from the fact that tuning rocksdb is very tricky. > > - use in-memory store instead? Unfortunately we have to use session > windows > > for this aggregate function and apparently there’s no in-memory session > > store impl? I tried to create one but soon realized it’s too much work > :) I > > looked at default PartitionAssigner code too, but that ain’t trivial > either. > > > > So I’m a bit hopeless :( > > > > Ara. > > > > On Mar 27, 2017, at 1:35 PM, Matthias J. Sax <matth...@confluent.io > <mailto: > > matth...@confluent.io>> wrote: > > > > > > > > > > ________________________________ > > > > This message is for the designated recipient only and may contain > > privileged, proprietary, or otherwise confidential information. If you > have > > received it in error, please notify the sender immediately and delete the > > original. Any other use of the e-mail by you is prohibited. Thank you in > > advance for your cooperation. > > > > ________________________________ > > > > From: "Matthias J. Sax" <matth...@confluent.io<mailto: > matth...@confluent.io > >>> > > Subject: Re: more uniform task assignment across kafka stream nodes > > Date: March 27, 2017 at 1:35:30 PM PDT > > To: users@kafka.apache.org<mailto:users@kafka.apache.org> > > Reply-To: <users@kafka.apache.org<mailto:users@kafka.apache.org>> > > > > > > Ara, > > > > thanks for the detailed information. > > > > If I parse this correctly, both instances run the same number of tasks > > (12 each). That is all Streams promises. > > > > To come back to your initial question: > > > > Is there a way to tell kafka streams to uniformly assign partitions > across > > instances? If I have n kafka streams instances running, I want each to > > handle EXACTLY 1/nth number of partitions. No dynamic task assignment > > logic. Just dumb 1/n assignment. > > > > That is exactly what you get: each of you two instances get 24/2 = 12 > > tasks assigned. That is dump 1/n assignment, isn't it? So my original > > response was correct. > > > > However, I now understand better what you are actually meaning by your > > question. Note that Streams does not distinguish "type" of tasks -- it > > only sees 24 tasks and assigns those in a balanced way. > > > > Thus, currently there is no easy way to get the assignment you want to > > have, except, you implement you own `PartitionAssignor`. > > > > This is the current implementation for 0.10.2 > > https://github.com/apache/kafka/blob/0.10.2/streams/src/ > main/java/org/apache/kafka/streams/processor/internals/ > StreamPartitionAssignor.java > > > > You can, if you wish write your own assignor and set it via > > StreamsConfig. However, be aware that this might be tricky to get right > > and also might have runtime implications with regard to rebalancing and > > state store recovery. We recently improve the current implementation to > > avoid costly task movements: > > https://issues.apache.org/jira/browse/KAFKA-4677 > > > > Thus, I would not recommend to implement an own `PartitionAssignor`. > > > > > > However, the root question is, why do you need this exact assignment you > > are looking for in the first place? Why is it "bad" if "types" of tasks > > are not distinguished? I would like to understand your requirement > > better -- it might be worth to improve Streams here. > > > > > > -Matthias > > > > > > On 3/27/17 12:57 PM, Ara Ebrahimi wrote: > > Hi, > > > > So, I simplified the topology by making sure we have only 1 source topic. > > Now I have 1 source topic, 8 partitions, 2 instances. And here’s how the > > topology looks like: > > > > instance 1: > > > > KafkaStreams processID: 48b58bc0-f600-4ec8-bc92-8cb3ea081aac > > StreamsThread appId: mar-23-modular > > StreamsThread clientId: mar-23-modular > > StreamsThread threadId: StreamThread-1 > > Active tasks: > > StreamsTask taskId: 0_3 > > ProcessorTopology: > > KSTREAM-SOURCE-0000000000: > > topics: [activities-avro-or] > > children: [KSTREAM-FILTER-0000000001] > > KSTREAM-FILTER-0000000001: > > children: [KSTREAM-MAP-0000000002] > > KSTREAM-MAP-0000000002: > > children: [KSTREAM-BRANCH-0000000003] > > KSTREAM-BRANCH-0000000003: > > children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD- > 0000000005] > > KSTREAM-BRANCHCHILD-0000000004: > > children: [KSTREAM-MAPVALUES-0000000006] > > KSTREAM-MAPVALUES-0000000006: > > children: [KSTREAM-FLATMAPVALUES-0000000007] > > KSTREAM-FLATMAPVALUES-0000000007: > > children: [KSTREAM-MAP-0000000008] > > KSTREAM-MAP-0000000008: > > children: [KSTREAM-FILTER-0000000011] > > KSTREAM-FILTER-0000000011: > > children: [KSTREAM-SINK-0000000010] > > KSTREAM-SINK-0000000010: > > topic: activities-by-phone-store-or-repartition > > KSTREAM-BRANCHCHILD-0000000005: > > Partitions [activities-avro-or-3] > > Standby tasks: > > > > StreamsThread appId: mar-23-modular > > StreamsThread clientId: mar-23-modular > > StreamsThread threadId: StreamThread-2 > > Active tasks: > > StreamsTask taskId: 1_2 > > ProcessorTopology: > > KSTREAM-SOURCE-0000000012: > > topics: [activities-by-phone-store-or-repartition] > > children: [KSTREAM-AGGREGATE-0000000009] > > KSTREAM-AGGREGATE-0000000009: > > states: [activities-by-phone-store-or] > > children: [KTABLE-TOSTREAM-0000000013] > > KTABLE-TOSTREAM-0000000013: > > children: [KSTREAM-FILTER-0000000014] > > KSTREAM-FILTER-0000000014: > > children: [KSTREAM-FILTER-0000000015] > > KSTREAM-FILTER-0000000015: > > children: [KSTREAM-MAP-0000000016] > > KSTREAM-MAP-0000000016: > > children: [KSTREAM-MAP-0000000017] > > KSTREAM-MAP-0000000017: > > children: [KSTREAM-SINK-0000000018] > > KSTREAM-SINK-0000000018: > > topic: ml-features-avro-or > > Partitions [activities-by-phone-store-or-repartition-2] > > Standby tasks: > > > > StreamsThread appId: mar-23-modular > > StreamsThread clientId: mar-23-modular > > StreamsThread threadId: StreamThread-3 > > Active tasks: > > StreamsTask taskId: 1_1 > > ProcessorTopology: > > KSTREAM-SOURCE-0000000012: > > topics: [activities-by-phone-store-or-repartition] > > children: [KSTREAM-AGGREGATE-0000000009] > > KSTREAM-AGGREGATE-0000000009: > > states: [activities-by-phone-store-or] > > children: [KTABLE-TOSTREAM-0000000013] > > KTABLE-TOSTREAM-0000000013: > > children: [KSTREAM-FILTER-0000000014] > > KSTREAM-FILTER-0000000014: > > children: [KSTREAM-FILTER-0000000015] > > KSTREAM-FILTER-0000000015: > > children: [KSTREAM-MAP-0000000016] > > KSTREAM-MAP-0000000016: > > children: [KSTREAM-MAP-0000000017] > > KSTREAM-MAP-0000000017: > > children: [KSTREAM-SINK-0000000018] > > KSTREAM-SINK-0000000018: > > topic: ml-features-avro-or > > Partitions [activities-by-phone-store-or-repartition-1] > > StreamsTask taskId: 2_7 > > ProcessorTopology: > > KSTREAM-SOURCE-0000000019: > > topics: [ml-features-avro-or] > > Partitions [ml-features-avro-or-7] > > Standby tasks: > > > > StreamsThread appId: mar-23-modular > > StreamsThread clientId: mar-23-modular > > StreamsThread threadId: StreamThread-4 > > Active tasks: > > StreamsTask taskId: 2_0 > > ProcessorTopology: > > KSTREAM-SOURCE-0000000019: > > topics: [ml-features-avro-or] > > Partitions [ml-features-avro-or-0] > > StreamsTask taskId: 2_6 > > ProcessorTopology: > > KSTREAM-SOURCE-0000000019: > > topics: [ml-features-avro-or] > > Partitions [ml-features-avro-or-6] > > Standby tasks: > > > > StreamsThread appId: mar-23-modular > > StreamsThread clientId: mar-23-modular > > StreamsThread threadId: StreamThread-5 > > Active tasks: > > StreamsTask taskId: 0_0 > > ProcessorTopology: > > KSTREAM-SOURCE-0000000000: > > topics: [activities-avro-or] > > children: [KSTREAM-FILTER-0000000001] > > KSTREAM-FILTER-0000000001: > > children: [KSTREAM-MAP-0000000002] > > KSTREAM-MAP-0000000002: > > children: [KSTREAM-BRANCH-0000000003] > > KSTREAM-BRANCH-0000000003: > > children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD- > 0000000005] > > KSTREAM-BRANCHCHILD-0000000004: > > children: [KSTREAM-MAPVALUES-0000000006] > > KSTREAM-MAPVALUES-0000000006: > > children: [KSTREAM-FLATMAPVALUES-0000000007] > > KSTREAM-FLATMAPVALUES-0000000007: > > children: [KSTREAM-MAP-0000000008] > > KSTREAM-MAP-0000000008: > > children: [KSTREAM-FILTER-0000000011] > > KSTREAM-FILTER-0000000011: > > children: [KSTREAM-SINK-0000000010] > > KSTREAM-SINK-0000000010: > > topic: activities-by-phone-store-or-repartition > > KSTREAM-BRANCHCHILD-0000000005: > > Partitions [activities-avro-or-0] > > StreamsTask taskId: 1_6 > > ProcessorTopology: > > KSTREAM-SOURCE-0000000012: > > topics: [activities-by-phone-store-or-repartition] > > children: [KSTREAM-AGGREGATE-0000000009] > > KSTREAM-AGGREGATE-0000000009: > > states: [activities-by-phone-store-or] > > children: [KTABLE-TOSTREAM-0000000013] > > KTABLE-TOSTREAM-0000000013: > > children: [KSTREAM-FILTER-0000000014] > > KSTREAM-FILTER-0000000014: > > children: [KSTREAM-FILTER-0000000015] > > KSTREAM-FILTER-0000000015: > > children: [KSTREAM-MAP-0000000016] > > KSTREAM-MAP-0000000016: > > children: [KSTREAM-MAP-0000000017] > > KSTREAM-MAP-0000000017: > > children: [KSTREAM-SINK-0000000018] > > KSTREAM-SINK-0000000018: > > topic: ml-features-avro-or > > Partitions [activities-by-phone-store-or-repartition-6] > > Standby tasks: > > > > StreamsThread appId: mar-23-modular > > StreamsThread clientId: mar-23-modular > > StreamsThread threadId: StreamThread-6 > > Active tasks: > > StreamsTask taskId: 1_0 > > ProcessorTopology: > > KSTREAM-SOURCE-0000000012: > > topics: [activities-by-phone-store-or-repartition] > > children: [KSTREAM-AGGREGATE-0000000009] > > KSTREAM-AGGREGATE-0000000009: > > states: [activities-by-phone-store-or] > > children: [KTABLE-TOSTREAM-0000000013] > > KTABLE-TOSTREAM-0000000013: > > children: [KSTREAM-FILTER-0000000014] > > KSTREAM-FILTER-0000000014: > > children: [KSTREAM-FILTER-0000000015] > > KSTREAM-FILTER-0000000015: > > children: [KSTREAM-MAP-0000000016] > > KSTREAM-MAP-0000000016: > > children: [KSTREAM-MAP-0000000017] > > KSTREAM-MAP-0000000017: > > children: [KSTREAM-SINK-0000000018] > > KSTREAM-SINK-0000000018: > > topic: ml-features-avro-or > > Partitions [activities-by-phone-store-or-repartition-0] > > StreamsTask taskId: 0_7 > > ProcessorTopology: > > KSTREAM-SOURCE-0000000000: > > topics: [activities-avro-or] > > children: [KSTREAM-FILTER-0000000001] > > KSTREAM-FILTER-0000000001: > > children: [KSTREAM-MAP-0000000002] > > KSTREAM-MAP-0000000002: > > children: [KSTREAM-BRANCH-0000000003] > > KSTREAM-BRANCH-0000000003: > > children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD- > 0000000005] > > KSTREAM-BRANCHCHILD-0000000004: > > children: [KSTREAM-MAPVALUES-0000000006] > > KSTREAM-MAPVALUES-0000000006: > > children: [KSTREAM-FLATMAPVALUES-0000000007] > > KSTREAM-FLATMAPVALUES-0000000007: > > children: [KSTREAM-MAP-0000000008] > > KSTREAM-MAP-0000000008: > > children: [KSTREAM-FILTER-0000000011] > > KSTREAM-FILTER-0000000011: > > children: [KSTREAM-SINK-0000000010] > > KSTREAM-SINK-0000000010: > > topic: activities-by-phone-store-or-repartition > > KSTREAM-BRANCHCHILD-0000000005: > > Partitions [activities-avro-or-7] > > Standby tasks: > > > > StreamsThread appId: mar-23-modular > > StreamsThread clientId: mar-23-modular > > StreamsThread threadId: StreamThread-7 > > Active tasks: > > StreamsTask taskId: 2_4 > > ProcessorTopology: > > KSTREAM-SOURCE-0000000019: > > topics: [ml-features-avro-or] > > Partitions [ml-features-avro-or-4] > > Standby tasks: > > > > StreamsThread appId: mar-23-modular > > StreamsThread clientId: mar-23-modular > > StreamsThread threadId: StreamThread-8 > > Active tasks: > > StreamsTask taskId: 1_3 > > ProcessorTopology: > > KSTREAM-SOURCE-0000000012: > > topics: [activities-by-phone-store-or-repartition] > > children: [KSTREAM-AGGREGATE-0000000009] > > KSTREAM-AGGREGATE-0000000009: > > states: [activities-by-phone-store-or] > > children: [KTABLE-TOSTREAM-0000000013] > > KTABLE-TOSTREAM-0000000013: > > children: [KSTREAM-FILTER-0000000014] > > KSTREAM-FILTER-0000000014: > > children: [KSTREAM-FILTER-0000000015] > > KSTREAM-FILTER-0000000015: > > children: [KSTREAM-MAP-0000000016] > > KSTREAM-MAP-0000000016: > > children: [KSTREAM-MAP-0000000017] > > KSTREAM-MAP-0000000017: > > children: [KSTREAM-SINK-0000000018] > > KSTREAM-SINK-0000000018: > > topic: ml-features-avro-or > > Partitions [activities-by-phone-store-or-repartition-3] > > Standby tasks: > > > > > > instance 2: > > > > KafkaStreams processID: 092072f8-87be-4989-a94f-0ed544f5ca44 > > StreamsThread appId: mar-23-modular > > StreamsThread clientId: mar-23-modular > > StreamsThread threadId: StreamThread-1 > > Active tasks: > > StreamsTask taskId: 2_1 > > ProcessorTopology: > > KSTREAM-SOURCE-0000000019: > > topics: [ml-features-avro-or] > > Partitions [ml-features-avro-or-1] > > StreamsTask taskId: 2_5 > > ProcessorTopology: > > KSTREAM-SOURCE-0000000019: > > topics: [ml-features-avro-or] > > Partitions [ml-features-avro-or-5] > > Standby tasks: > > > > StreamsThread appId: mar-23-modular > > StreamsThread clientId: mar-23-modular > > StreamsThread threadId: StreamThread-2 > > Active tasks: > > StreamsTask taskId: 0_4 > > ProcessorTopology: > > KSTREAM-SOURCE-0000000000: > > topics: [activities-avro-or] > > children: [KSTREAM-FILTER-0000000001] > > KSTREAM-FILTER-0000000001: > > children: [KSTREAM-MAP-0000000002] > > KSTREAM-MAP-0000000002: > > children: [KSTREAM-BRANCH-0000000003] > > KSTREAM-BRANCH-0000000003: > > children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD- > 0000000005] > > KSTREAM-BRANCHCHILD-0000000004: > > children: [KSTREAM-MAPVALUES-0000000006] > > KSTREAM-MAPVALUES-0000000006: > > children: [KSTREAM-FLATMAPVALUES-0000000007] > > KSTREAM-FLATMAPVALUES-0000000007: > > children: [KSTREAM-MAP-0000000008] > > KSTREAM-MAP-0000000008: > > children: [KSTREAM-FILTER-0000000011] > > KSTREAM-FILTER-0000000011: > > children: [KSTREAM-SINK-0000000010] > > KSTREAM-SINK-0000000010: > > topic: activities-by-phone-store-or-repartition > > KSTREAM-BRANCHCHILD-0000000005: > > Partitions [activities-avro-or-4] > > Standby tasks: > > > > StreamsThread appId: mar-23-modular > > StreamsThread clientId: mar-23-modular > > StreamsThread threadId: StreamThread-3 > > Active tasks: > > StreamsTask taskId: 2_2 > > ProcessorTopology: > > KSTREAM-SOURCE-0000000019: > > topics: [ml-features-avro-or] > > Partitions [ml-features-avro-or-2] > > StreamsTask taskId: 1_7 > > ProcessorTopology: > > KSTREAM-SOURCE-0000000012: > > topics: [activities-by-phone-store-or-repartition] > > children: [KSTREAM-AGGREGATE-0000000009] > > KSTREAM-AGGREGATE-0000000009: > > states: [activities-by-phone-store-or] > > children: [KTABLE-TOSTREAM-0000000013] > > KTABLE-TOSTREAM-0000000013: > > children: [KSTREAM-FILTER-0000000014] > > KSTREAM-FILTER-0000000014: > > children: [KSTREAM-FILTER-0000000015] > > KSTREAM-FILTER-0000000015: > > children: [KSTREAM-MAP-0000000016] > > KSTREAM-MAP-0000000016: > > children: [KSTREAM-MAP-0000000017] > > KSTREAM-MAP-0000000017: > > children: [KSTREAM-SINK-0000000018] > > KSTREAM-SINK-0000000018: > > topic: ml-features-avro-or > > Partitions [activities-by-phone-store-or-repartition-7] > > Standby tasks: > > > > StreamsThread appId: mar-23-modular > > StreamsThread clientId: mar-23-modular > > StreamsThread threadId: StreamThread-4 > > Active tasks: > > StreamsTask taskId: 2_3 > > ProcessorTopology: > > KSTREAM-SOURCE-0000000019: > > topics: [ml-features-avro-or] > > Partitions [ml-features-avro-or-3] > > Standby tasks: > > > > StreamsThread appId: mar-23-modular > > StreamsThread clientId: mar-23-modular > > StreamsThread threadId: StreamThread-5 > > Active tasks: > > StreamsTask taskId: 0_1 > > ProcessorTopology: > > KSTREAM-SOURCE-0000000000: > > topics: [activities-avro-or] > > children: [KSTREAM-FILTER-0000000001] > > KSTREAM-FILTER-0000000001: > > children: [KSTREAM-MAP-0000000002] > > KSTREAM-MAP-0000000002: > > children: [KSTREAM-BRANCH-0000000003] > > KSTREAM-BRANCH-0000000003: > > children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD- > 0000000005] > > KSTREAM-BRANCHCHILD-0000000004: > > children: [KSTREAM-MAPVALUES-0000000006] > > KSTREAM-MAPVALUES-0000000006: > > children: [KSTREAM-FLATMAPVALUES-0000000007] > > KSTREAM-FLATMAPVALUES-0000000007: > > children: [KSTREAM-MAP-0000000008] > > KSTREAM-MAP-0000000008: > > children: [KSTREAM-FILTER-0000000011] > > KSTREAM-FILTER-0000000011: > > children: [KSTREAM-SINK-0000000010] > > KSTREAM-SINK-0000000010: > > topic: activities-by-phone-store-or-repartition > > KSTREAM-BRANCHCHILD-0000000005: > > Partitions [activities-avro-or-1] > > StreamsTask taskId: 1_5 > > ProcessorTopology: > > KSTREAM-SOURCE-0000000012: > > topics: [activities-by-phone-store-or-repartition] > > children: [KSTREAM-AGGREGATE-0000000009] > > KSTREAM-AGGREGATE-0000000009: > > states: [activities-by-phone-store-or] > > children: [KTABLE-TOSTREAM-0000000013] > > KTABLE-TOSTREAM-0000000013: > > children: [KSTREAM-FILTER-0000000014] > > KSTREAM-FILTER-0000000014: > > children: [KSTREAM-FILTER-0000000015] > > KSTREAM-FILTER-0000000015: > > children: [KSTREAM-MAP-0000000016] > > KSTREAM-MAP-0000000016: > > children: [KSTREAM-MAP-0000000017] > > KSTREAM-MAP-0000000017: > > children: [KSTREAM-SINK-0000000018] > > KSTREAM-SINK-0000000018: > > topic: ml-features-avro-or > > Partitions [activities-by-phone-store-or-repartition-5] > > Standby tasks: > > > > StreamsThread appId: mar-23-modular > > StreamsThread clientId: mar-23-modular > > StreamsThread threadId: StreamThread-6 > > Active tasks: > > StreamsTask taskId: 1_4 > > ProcessorTopology: > > KSTREAM-SOURCE-0000000012: > > topics: [activities-by-phone-store-or-repartition] > > children: [KSTREAM-AGGREGATE-0000000009] > > KSTREAM-AGGREGATE-0000000009: > > states: [activities-by-phone-store-or] > > children: [KTABLE-TOSTREAM-0000000013] > > KTABLE-TOSTREAM-0000000013: > > children: [KSTREAM-FILTER-0000000014] > > KSTREAM-FILTER-0000000014: > > children: [KSTREAM-FILTER-0000000015] > > KSTREAM-FILTER-0000000015: > > children: [KSTREAM-MAP-0000000016] > > KSTREAM-MAP-0000000016: > > children: [KSTREAM-MAP-0000000017] > > KSTREAM-MAP-0000000017: > > children: [KSTREAM-SINK-0000000018] > > KSTREAM-SINK-0000000018: > > topic: ml-features-avro-or > > Partitions [activities-by-phone-store-or-repartition-4] > > Standby tasks: > > > > StreamsThread appId: mar-23-modular > > StreamsThread clientId: mar-23-modular > > StreamsThread threadId: StreamThread-7 > > Active tasks: > > StreamsTask taskId: 0_2 > > ProcessorTopology: > > KSTREAM-SOURCE-0000000000: > > topics: [activities-avro-or] > > children: [KSTREAM-FILTER-0000000001] > > KSTREAM-FILTER-0000000001: > > children: [KSTREAM-MAP-0000000002] > > KSTREAM-MAP-0000000002: > > children: [KSTREAM-BRANCH-0000000003] > > KSTREAM-BRANCH-0000000003: > > children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD- > 0000000005] > > KSTREAM-BRANCHCHILD-0000000004: > > children: [KSTREAM-MAPVALUES-0000000006] > > KSTREAM-MAPVALUES-0000000006: > > children: [KSTREAM-FLATMAPVALUES-0000000007] > > KSTREAM-FLATMAPVALUES-0000000007: > > children: [KSTREAM-MAP-0000000008] > > KSTREAM-MAP-0000000008: > > children: [KSTREAM-FILTER-0000000011] > > KSTREAM-FILTER-0000000011: > > children: [KSTREAM-SINK-0000000010] > > KSTREAM-SINK-0000000010: > > topic: activities-by-phone-store-or-repartition > > KSTREAM-BRANCHCHILD-0000000005: > > Partitions [activities-avro-or-2] > > StreamsTask taskId: 0_6 > > ProcessorTopology: > > KSTREAM-SOURCE-0000000000: > > topics: [activities-avro-or] > > children: [KSTREAM-FILTER-0000000001] > > KSTREAM-FILTER-0000000001: > > children: [KSTREAM-MAP-0000000002] > > KSTREAM-MAP-0000000002: > > children: [KSTREAM-BRANCH-0000000003] > > KSTREAM-BRANCH-0000000003: > > children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD- > 0000000005] > > KSTREAM-BRANCHCHILD-0000000004: > > children: [KSTREAM-MAPVALUES-0000000006] > > KSTREAM-MAPVALUES-0000000006: > > children: [KSTREAM-FLATMAPVALUES-0000000007] > > KSTREAM-FLATMAPVALUES-0000000007: > > children: [KSTREAM-MAP-0000000008] > > KSTREAM-MAP-0000000008: > > children: [KSTREAM-FILTER-0000000011] > > KSTREAM-FILTER-0000000011: > > children: [KSTREAM-SINK-0000000010] > > KSTREAM-SINK-0000000010: > > topic: activities-by-phone-store-or-repartition > > KSTREAM-BRANCHCHILD-0000000005: > > Partitions [activities-avro-or-6] > > Standby tasks: > > > > StreamsThread appId: mar-23-modular > > StreamsThread clientId: mar-23-modular > > StreamsThread threadId: StreamThread-8 > > Active tasks: > > StreamsTask taskId: 0_5 > > ProcessorTopology: > > KSTREAM-SOURCE-0000000000: > > topics: [activities-avro-or] > > children: [KSTREAM-FILTER-0000000001] > > KSTREAM-FILTER-0000000001: > > children: [KSTREAM-MAP-0000000002] > > KSTREAM-MAP-0000000002: > > children: [KSTREAM-BRANCH-0000000003] > > KSTREAM-BRANCH-0000000003: > > children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD- > 0000000005] > > KSTREAM-BRANCHCHILD-0000000004: > > children: [KSTREAM-MAPVALUES-0000000006] > > KSTREAM-MAPVALUES-0000000006: > > children: [KSTREAM-FLATMAPVALUES-0000000007] > > KSTREAM-FLATMAPVALUES-0000000007: > > children: [KSTREAM-MAP-0000000008] > > KSTREAM-MAP-0000000008: > > children: [KSTREAM-FILTER-0000000011] > > KSTREAM-FILTER-0000000011: > > children: [KSTREAM-SINK-0000000010] > > KSTREAM-SINK-0000000010: > > topic: activities-by-phone-store-or-repartition > > KSTREAM-BRANCHCHILD-0000000005: > > Partitions [activities-avro-or-5] > > Standby tasks: > > > > > > activities-avro-or is input topic. ml-features-avro-or is output topic. > In > > the middle we have an aggregate (activities-by-phone-store-or- > repartition). > > > > On instance 1 I see 3 tasks for activities-avro-or and on instance 2 I > see > > 5. Bad. > > > > On instance 1 see 4 tasks for ml-features-avro-or. And 4 on instance 2. > > Good. > > > > On instance 1 see 5 tasks for activities-by-phone-store-or-repartition. > And > > 3 on instance 2. Bad. > > > > As I said in terms of offsets for all these partitions I see uniform > > distribution, so we’re not dealing with a bad key scenario. > > > > Ara. > > > > On Mar 25, 2017, at 6:43 PM, Matthias J. Sax <matth...@confluent.io > <mailto: > > matth...@confluent.io>> wrote: > > > > > > > > > > ________________________________ > > > > This message is for the designated recipient only and may contain > > privileged, proprietary, or otherwise confidential information. If you > have > > received it in error, please notify the sender immediately and delete the > > original. Any other use of the e-mail by you is prohibited. Thank you in > > advance for your cooperation. > > > > ________________________________ > > > > From: "Matthias J. Sax" <matth...@confluent.io<mailto: > matth...@confluent.io > >>> > > Subject: Re: more uniform task assignment across kafka stream nodes > > Date: March 25, 2017 at 6:43:12 PM PDT > > To: users@kafka.apache.org<mailto:users@kafka.apache.org> > > Reply-To: <users@kafka.apache.org<mailto:users@kafka.apache.org>> > > > > > > Please share the rest of your topology code (without any UDFs / business > > logic). Otherwise, I cannot give further advice. > > > > -Matthias > > > > > > On 3/25/17 6:08 PM, Ara Ebrahimi wrote: > > Via: > > > > builder.stream("topic1"); > > builder.stream("topic2"); > > builder.stream("topic3”); > > > > These are different kinds of topics consuming different avro objects. > > > > Ara. > > > > On Mar 25, 2017, at 6:04 PM, Matthias J. Sax <matth...@confluent.io > <mailto: > > matth...@confluent.io><mailto:matth...@confluent.io>> wrote: > > > > > > > > > > ________________________________ > > > > This message is for the designated recipient only and may contain > > privileged, proprietary, or otherwise confidential information. If you > have > > received it in error, please notify the sender immediately and delete the > > original. Any other use of the e-mail by you is prohibited. Thank you in > > advance for your cooperation. > > > > ________________________________ > > > > From: "Matthias J. Sax" <matth...@confluent.io<mailto: > matth...@confluent.io > >> <mailto:matth...@confluent.io>> > > Subject: Re: more uniform task assignment across kafka stream nodes > > Date: March 25, 2017 at 6:04:30 PM PDT > > To: users@kafka.apache.org<mailto:users@kafka.apache.org><mailto: > > users@kafka.apache.org> > > Reply-To: <users@kafka.apache.org<mailto:users@kafka.apache.org><mailto: > > users@kafka.apache.org>> > > > > > > Ara, > > > > How do you consume your topics? Via > > > > builder.stream("topic1", "topic2", "topic3); > > > > or via > > > > builder.stream("topic1"); > > builder.stream("topic2"); > > builder.stream("topic3"); > > > > Both and handled differently with regard to creating tasks (partition to > > task assignment also depends on you downstream code though). > > > > If this does not help, can you maybe share the structure of processing? > > To dig deeper, we would need to know the topology DAG. > > > > > > -Matthias > > > > > > On 3/25/17 5:56 PM, Ara Ebrahimi wrote: > > Mathias, > > > > This apparently happens because we have more than 1 source topic. We > have 3 > > source topics in the same application. So it seems like the task > assignment > > algorithm creates topologies not for one specific topic at a time but the > > total partitions across all source topics consumed in an application > > instance. Because we have some code dependencies between these 3 source > > topics we can’t separate them into 3 applications at this time. Hence the > > reason I want to get the task assignment algorithm basically do a uniform > > and simple task assignment PER source topic. > > > > Ara. > > > > On Mar 25, 2017, at 5:21 PM, Matthias J. Sax <matth...@confluent.io > <mailto: > > matth...@confluent.io><mailto:matth...@confluent.io><mailto: > > matth...@confluent.io>> wrote: > > > > > > > > > > ________________________________ > > > > This message is for the designated recipient only and may contain > > privileged, proprietary, or otherwise confidential information. If you > have > > received it in error, please notify the sender immediately and delete the > > original. Any other use of the e-mail by you is prohibited. Thank you in > > advance for your cooperation. > > > > ________________________________ > > > > From: "Matthias J. Sax" <matth...@confluent.io<mailto: > matth...@confluent.io > >> <mailto:matth...@confluent.io><mailto:matth...@confluent.io>> > > Subject: Re: more uniform task assignment across kafka stream nodes > > Date: March 25, 2017 at 5:21:47 PM PDT > > To: users@kafka.apache.org<mailto:users@kafka.apache.org><mailto: > > users@kafka.apache.org><mailto:users@kafka.apache.org> > > Reply-To: <users@kafka.apache.org<mailto:users@kafka.apache.org><mailto: > > users@kafka.apache.org><mailto:users@kafka.apache.org>> > > > > > > Hi, > > > > I am wondering why this happens in the first place. Streams, > > load-balanced over all running instances, and each instance should be > > the same number of tasks (and thus partitions) assigned. > > > > What is the overall assignment? Do you have StandyBy tasks configured? > > What version do you use? > > > > > > -Matthias > > > > > > On 3/24/17 8:09 PM, Ara Ebrahimi wrote: > > Hi, > > > > Is there a way to tell kafka streams to uniformly assign partitions > across > > instances? If I have n kafka streams instances running, I want each to > > handle EXACTLY 1/nth number of partitions. No dynamic task assignment > > logic. Just dumb 1/n assignment. > > > > Here’s our scenario. Lets say we have an “source" topic with 8 > partitions. > > We also have 2 kafka streams instances. Each instances get assigned to > > handle 4 “source" topic partitions. BUT then we do a few maps and an > > aggregate. So data gets shuffled around. The map function uniformly > > distributes these across all partitions (I can verify that by looking at > > the partition offsets). After the map what I notice by looking at the > > topology is that one kafka streams instance get assigned to handle say 2 > > aggregate repartition topics and the other one gets assigned 6. Even > worse, > > on bigger clusters (say 4 instances) we see say 2 nodes gets assigned > > downstream aggregate repartition topics and 2 other nodes assigned > NOTHING > > to handle. > > > > Ara. > > > > > > > > ________________________________ > > > > This message is for the designated recipient only and may contain > > privileged, proprietary, or otherwise confidential information. If you > have > > received it in error, please notify the sender immediately and delete the > > original. Any other use of the e-mail by you is prohibited. Thank you in > > advance for your cooperation. > > > > ________________________________ > > > > > > > > > > > > > > > > > > ________________________________ > > > > This message is for the designated recipient only and may contain > > privileged, proprietary, or otherwise confidential information. If you > have > > received it in error, please notify the sender immediately and delete the > > original. Any other use of the e-mail by you is prohibited. Thank you in > > advance for your cooperation. > > > > ________________________________ > > > > > > > > > > ________________________________ > > > > This message is for the designated recipient only and may contain > > privileged, proprietary, or otherwise confidential information. If you > have > > received it in error, please notify the sender immediately and delete the > > original. Any other use of the e-mail by you is prohibited. Thank you in > > advance for your cooperation. > > > > ________________________________ > > > > > > > > > > ________________________________ > > > > This message is for the designated recipient only and may contain > > privileged, proprietary, or otherwise confidential information. If you > have > > received it in error, please notify the sender immediately and delete the > > original. Any other use of the e-mail by you is prohibited. Thank you in > > advance for your cooperation. > > > > ________________________________ > > > > > > > > > > > > > > > > > > ________________________________ > > > > This message is for the designated recipient only and may contain > > privileged, proprietary, or otherwise confidential information. If you > have > > received it in error, please notify the sender immediately and delete the > > original. Any other use of the e-mail by you is prohibited. Thank you in > > advance for your cooperation. > > > > ________________________________ > > > > > > > > ________________________________ > > > > This message is for the designated recipient only and may contain > privileged, proprietary, or otherwise confidential information. If you have > received it in error, please notify the sender immediately and delete the > original. Any other use of the e-mail by you is prohibited. Thank you in > advance for your cooperation. > > > > ________________________________ > > > > > ________________________________ > > This message is for the designated recipient only and may contain > privileged, proprietary, or otherwise confidential information. If you have > received it in error, please notify the sender immediately and delete the > original. Any other use of the e-mail by you is prohibited. Thank you in > advance for your cooperation. > > ________________________________ >