Great! So overall, the issue is not related to task assignment. Also the description below, does not indicate that different task assignment would change anything.
-Matthias On 3/27/17 3:08 PM, Ara Ebrahimi wrote: > Let me clarify, cause I think we’re using different terminologies: > > - message key is phone number, reversed > - all call records for a phone number land on the same partition > - then we apply a session window on them and aggregate+reduce > - so we end up with a group of records for a phone number. This group is > literally an avro object with an array of records in it for the session. > - since records arrive chronologically and since we have a session window, > then all call records for a phone number end up in the same partition and > session (intended behavior). We can easily have many such phone call record > groups with 100s of call records in them. The aggregate object (the avro > object with array of records in it) can get updated 100s of times for the > same phone number in the course of an hour or so. > - we process billions of such call records a day > - we can’t expect our clients to install massive clusters of 10s or 100s of > nodes. We do need to make sure this processing is as efficient as possible. > The session window bug was killing us. Much better with the fix Damian > provided! > > Ara. > > On Mar 27, 2017, at 2:41 PM, Matthias J. Sax > <matth...@confluent.io<mailto:matth...@confluent.io>> wrote: > > > > > ________________________________ > > This message is for the designated recipient only and may contain privileged, > proprietary, or otherwise confidential information. If you have received it > in error, please notify the sender immediately and delete the original. Any > other use of the e-mail by you is prohibited. Thank you in advance for your > cooperation. > > ________________________________ > > From: "Matthias J. Sax" <matth...@confluent.io<mailto:matth...@confluent.io>> > Subject: Re: more uniform task assignment across kafka stream nodes > Date: March 27, 2017 at 2:41:06 PM PDT > To: users@kafka.apache.org<mailto:users@kafka.apache.org> > Reply-To: <users@kafka.apache.org<mailto:users@kafka.apache.org>> > > > Ara, > > I assume your performance issue is most likely related to the fix Damian > pointed out already. > > Couple of follow up comments: > > critical part of our pipeline involves grouping relevant records together > > Can you explain this a little better? The abstraction of a task does > group data together already. Furthermore, if you get multiple tasks, > those are independent units with regard to grouping as consecutive tasks > are "connected" via repartitioning steps. Thus, even if we apply the > task assignment as you ask for, I am not sure if this would change > anything? Maybe you can give a small data example what > behavior/data-co-location you need and what Streams provides for you. > > And for hot keys this can lead to sometimes 100s of records to get grouped > together > > This is independent of task placement -- it a partitioning issue. If you > want to work on that, you can provide a custom `Partioner` for the used > Kafka producers (note, your external Producer writing to your Streams > input topic might already generate "hot" keys, so you might need to use > a custom partitioner there, too) > > Also "100s of records" does not sound much to me. Streams can process > multiple hundredths of thousandth records per thread. That is the > reason, why I think that the fix Damian pointed out will most likely fix > your problem. > > > > -Matthias > > > On 3/27/17 1:56 PM, Ara Ebrahimi wrote: > Thanks for the response Mathias! > > The reason we want this exact task assignment to happen is that a critical > part of our pipeline involves grouping relevant records together (that’s what > the aggregate function in the topology is for). And for hot keys this can > lead to sometimes 100s of records to get grouped together. Even worse, these > records are session bound, we use session windows. Hence we see lots of > activity around the store backing the aggregate function and even though we > use SSD drives we’re not seeing the kind of performance we want to see. It > seems like the aggregate function leads to lots of updates to these hot keys > which lead to lots of rocksdb activity. > > Now there are many ways to fix this problem: > - just don’t aggregate, create an algorithm which is not reliant on > grouping/aggregating records. Not what we can do with our tight schedule > right now. > - do grouping/aggregating but employ n instances and rely on uniform > distribution of these tasks. This is the easiest solution and what we > expected to work but didn’t work as you can tell from this thread. We threw 4 > instances at it but only 2 got used. > - tune rocksdb? I tried this actually but it didn’t really help us much, > aside from the fact that tuning rocksdb is very tricky. > - use in-memory store instead? Unfortunately we have to use session windows > for this aggregate function and apparently there’s no in-memory session store > impl? I tried to create one but soon realized it’s too much work :) I looked > at default PartitionAssigner code too, but that ain’t trivial either. > > So I’m a bit hopeless :( > > Ara. > > On Mar 27, 2017, at 1:35 PM, Matthias J. Sax > <matth...@confluent.io<mailto:matth...@confluent.io><mailto:matth...@confluent.io>> > wrote: > > > > > ________________________________ > > This message is for the designated recipient only and may contain privileged, > proprietary, or otherwise confidential information. If you have received it > in error, please notify the sender immediately and delete the original. Any > other use of the e-mail by you is prohibited. Thank you in advance for your > cooperation. > > ________________________________ > > From: "Matthias J. Sax" > <matth...@confluent.io<mailto:matth...@confluent.io><mailto:matth...@confluent.io>> > Subject: Re: more uniform task assignment across kafka stream nodes > Date: March 27, 2017 at 1:35:30 PM PDT > To: > users@kafka.apache.org<mailto:users@kafka.apache.org><mailto:users@kafka.apache.org> > Reply-To: > <users@kafka.apache.org<mailto:users@kafka.apache.org><mailto:users@kafka.apache.org>> > > > Ara, > > thanks for the detailed information. > > If I parse this correctly, both instances run the same number of tasks > (12 each). That is all Streams promises. > > To come back to your initial question: > > Is there a way to tell kafka streams to uniformly assign partitions across > instances? If I have n kafka streams instances running, I want each to handle > EXACTLY 1/nth number of partitions. No dynamic task assignment logic. Just > dumb 1/n assignment. > > That is exactly what you get: each of you two instances get 24/2 = 12 > tasks assigned. That is dump 1/n assignment, isn't it? So my original > response was correct. > > However, I now understand better what you are actually meaning by your > question. Note that Streams does not distinguish "type" of tasks -- it > only sees 24 tasks and assigns those in a balanced way. > > Thus, currently there is no easy way to get the assignment you want to > have, except, you implement you own `PartitionAssignor`. > > This is the current implementation for 0.10.2 > https://github.com/apache/kafka/blob/0.10.2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java > > You can, if you wish write your own assignor and set it via > StreamsConfig. However, be aware that this might be tricky to get right > and also might have runtime implications with regard to rebalancing and > state store recovery. We recently improve the current implementation to > avoid costly task movements: > https://issues.apache.org/jira/browse/KAFKA-4677 > > Thus, I would not recommend to implement an own `PartitionAssignor`. > > > However, the root question is, why do you need this exact assignment you > are looking for in the first place? Why is it "bad" if "types" of tasks > are not distinguished? I would like to understand your requirement > better -- it might be worth to improve Streams here. > > > -Matthias > > > On 3/27/17 12:57 PM, Ara Ebrahimi wrote: > Hi, > > So, I simplified the topology by making sure we have only 1 source topic. Now > I have 1 source topic, 8 partitions, 2 instances. And here’s how the topology > looks like: > > instance 1: > > KafkaStreams processID: 48b58bc0-f600-4ec8-bc92-8cb3ea081aac > StreamsThread appId: mar-23-modular > StreamsThread clientId: mar-23-modular > StreamsThread threadId: StreamThread-1 > Active tasks: > StreamsTask taskId: 0_3 > ProcessorTopology: > KSTREAM-SOURCE-0000000000: > topics: [activities-avro-or] > children: [KSTREAM-FILTER-0000000001] > KSTREAM-FILTER-0000000001: > children: [KSTREAM-MAP-0000000002] > KSTREAM-MAP-0000000002: > children: [KSTREAM-BRANCH-0000000003] > KSTREAM-BRANCH-0000000003: > children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD-0000000005] > KSTREAM-BRANCHCHILD-0000000004: > children: [KSTREAM-MAPVALUES-0000000006] > KSTREAM-MAPVALUES-0000000006: > children: [KSTREAM-FLATMAPVALUES-0000000007] > KSTREAM-FLATMAPVALUES-0000000007: > children: [KSTREAM-MAP-0000000008] > KSTREAM-MAP-0000000008: > children: [KSTREAM-FILTER-0000000011] > KSTREAM-FILTER-0000000011: > children: [KSTREAM-SINK-0000000010] > KSTREAM-SINK-0000000010: > topic: activities-by-phone-store-or-repartition > KSTREAM-BRANCHCHILD-0000000005: > Partitions [activities-avro-or-3] > Standby tasks: > > StreamsThread appId: mar-23-modular > StreamsThread clientId: mar-23-modular > StreamsThread threadId: StreamThread-2 > Active tasks: > StreamsTask taskId: 1_2 > ProcessorTopology: > KSTREAM-SOURCE-0000000012: > topics: [activities-by-phone-store-or-repartition] > children: [KSTREAM-AGGREGATE-0000000009] > KSTREAM-AGGREGATE-0000000009: > states: [activities-by-phone-store-or] > children: [KTABLE-TOSTREAM-0000000013] > KTABLE-TOSTREAM-0000000013: > children: [KSTREAM-FILTER-0000000014] > KSTREAM-FILTER-0000000014: > children: [KSTREAM-FILTER-0000000015] > KSTREAM-FILTER-0000000015: > children: [KSTREAM-MAP-0000000016] > KSTREAM-MAP-0000000016: > children: [KSTREAM-MAP-0000000017] > KSTREAM-MAP-0000000017: > children: [KSTREAM-SINK-0000000018] > KSTREAM-SINK-0000000018: > topic: ml-features-avro-or > Partitions [activities-by-phone-store-or-repartition-2] > Standby tasks: > > StreamsThread appId: mar-23-modular > StreamsThread clientId: mar-23-modular > StreamsThread threadId: StreamThread-3 > Active tasks: > StreamsTask taskId: 1_1 > ProcessorTopology: > KSTREAM-SOURCE-0000000012: > topics: [activities-by-phone-store-or-repartition] > children: [KSTREAM-AGGREGATE-0000000009] > KSTREAM-AGGREGATE-0000000009: > states: [activities-by-phone-store-or] > children: [KTABLE-TOSTREAM-0000000013] > KTABLE-TOSTREAM-0000000013: > children: [KSTREAM-FILTER-0000000014] > KSTREAM-FILTER-0000000014: > children: [KSTREAM-FILTER-0000000015] > KSTREAM-FILTER-0000000015: > children: [KSTREAM-MAP-0000000016] > KSTREAM-MAP-0000000016: > children: [KSTREAM-MAP-0000000017] > KSTREAM-MAP-0000000017: > children: [KSTREAM-SINK-0000000018] > KSTREAM-SINK-0000000018: > topic: ml-features-avro-or > Partitions [activities-by-phone-store-or-repartition-1] > StreamsTask taskId: 2_7 > ProcessorTopology: > KSTREAM-SOURCE-0000000019: > topics: [ml-features-avro-or] > Partitions [ml-features-avro-or-7] > Standby tasks: > > StreamsThread appId: mar-23-modular > StreamsThread clientId: mar-23-modular > StreamsThread threadId: StreamThread-4 > Active tasks: > StreamsTask taskId: 2_0 > ProcessorTopology: > KSTREAM-SOURCE-0000000019: > topics: [ml-features-avro-or] > Partitions [ml-features-avro-or-0] > StreamsTask taskId: 2_6 > ProcessorTopology: > KSTREAM-SOURCE-0000000019: > topics: [ml-features-avro-or] > Partitions [ml-features-avro-or-6] > Standby tasks: > > StreamsThread appId: mar-23-modular > StreamsThread clientId: mar-23-modular > StreamsThread threadId: StreamThread-5 > Active tasks: > StreamsTask taskId: 0_0 > ProcessorTopology: > KSTREAM-SOURCE-0000000000: > topics: [activities-avro-or] > children: [KSTREAM-FILTER-0000000001] > KSTREAM-FILTER-0000000001: > children: [KSTREAM-MAP-0000000002] > KSTREAM-MAP-0000000002: > children: [KSTREAM-BRANCH-0000000003] > KSTREAM-BRANCH-0000000003: > children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD-0000000005] > KSTREAM-BRANCHCHILD-0000000004: > children: [KSTREAM-MAPVALUES-0000000006] > KSTREAM-MAPVALUES-0000000006: > children: [KSTREAM-FLATMAPVALUES-0000000007] > KSTREAM-FLATMAPVALUES-0000000007: > children: [KSTREAM-MAP-0000000008] > KSTREAM-MAP-0000000008: > children: [KSTREAM-FILTER-0000000011] > KSTREAM-FILTER-0000000011: > children: [KSTREAM-SINK-0000000010] > KSTREAM-SINK-0000000010: > topic: activities-by-phone-store-or-repartition > KSTREAM-BRANCHCHILD-0000000005: > Partitions [activities-avro-or-0] > StreamsTask taskId: 1_6 > ProcessorTopology: > KSTREAM-SOURCE-0000000012: > topics: [activities-by-phone-store-or-repartition] > children: [KSTREAM-AGGREGATE-0000000009] > KSTREAM-AGGREGATE-0000000009: > states: [activities-by-phone-store-or] > children: [KTABLE-TOSTREAM-0000000013] > KTABLE-TOSTREAM-0000000013: > children: [KSTREAM-FILTER-0000000014] > KSTREAM-FILTER-0000000014: > children: [KSTREAM-FILTER-0000000015] > KSTREAM-FILTER-0000000015: > children: [KSTREAM-MAP-0000000016] > KSTREAM-MAP-0000000016: > children: [KSTREAM-MAP-0000000017] > KSTREAM-MAP-0000000017: > children: [KSTREAM-SINK-0000000018] > KSTREAM-SINK-0000000018: > topic: ml-features-avro-or > Partitions [activities-by-phone-store-or-repartition-6] > Standby tasks: > > StreamsThread appId: mar-23-modular > StreamsThread clientId: mar-23-modular > StreamsThread threadId: StreamThread-6 > Active tasks: > StreamsTask taskId: 1_0 > ProcessorTopology: > KSTREAM-SOURCE-0000000012: > topics: [activities-by-phone-store-or-repartition] > children: [KSTREAM-AGGREGATE-0000000009] > KSTREAM-AGGREGATE-0000000009: > states: [activities-by-phone-store-or] > children: [KTABLE-TOSTREAM-0000000013] > KTABLE-TOSTREAM-0000000013: > children: [KSTREAM-FILTER-0000000014] > KSTREAM-FILTER-0000000014: > children: [KSTREAM-FILTER-0000000015] > KSTREAM-FILTER-0000000015: > children: [KSTREAM-MAP-0000000016] > KSTREAM-MAP-0000000016: > children: [KSTREAM-MAP-0000000017] > KSTREAM-MAP-0000000017: > children: [KSTREAM-SINK-0000000018] > KSTREAM-SINK-0000000018: > topic: ml-features-avro-or > Partitions [activities-by-phone-store-or-repartition-0] > StreamsTask taskId: 0_7 > ProcessorTopology: > KSTREAM-SOURCE-0000000000: > topics: [activities-avro-or] > children: [KSTREAM-FILTER-0000000001] > KSTREAM-FILTER-0000000001: > children: [KSTREAM-MAP-0000000002] > KSTREAM-MAP-0000000002: > children: [KSTREAM-BRANCH-0000000003] > KSTREAM-BRANCH-0000000003: > children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD-0000000005] > KSTREAM-BRANCHCHILD-0000000004: > children: [KSTREAM-MAPVALUES-0000000006] > KSTREAM-MAPVALUES-0000000006: > children: [KSTREAM-FLATMAPVALUES-0000000007] > KSTREAM-FLATMAPVALUES-0000000007: > children: [KSTREAM-MAP-0000000008] > KSTREAM-MAP-0000000008: > children: [KSTREAM-FILTER-0000000011] > KSTREAM-FILTER-0000000011: > children: [KSTREAM-SINK-0000000010] > KSTREAM-SINK-0000000010: > topic: activities-by-phone-store-or-repartition > KSTREAM-BRANCHCHILD-0000000005: > Partitions [activities-avro-or-7] > Standby tasks: > > StreamsThread appId: mar-23-modular > StreamsThread clientId: mar-23-modular > StreamsThread threadId: StreamThread-7 > Active tasks: > StreamsTask taskId: 2_4 > ProcessorTopology: > KSTREAM-SOURCE-0000000019: > topics: [ml-features-avro-or] > Partitions [ml-features-avro-or-4] > Standby tasks: > > StreamsThread appId: mar-23-modular > StreamsThread clientId: mar-23-modular > StreamsThread threadId: StreamThread-8 > Active tasks: > StreamsTask taskId: 1_3 > ProcessorTopology: > KSTREAM-SOURCE-0000000012: > topics: [activities-by-phone-store-or-repartition] > children: [KSTREAM-AGGREGATE-0000000009] > KSTREAM-AGGREGATE-0000000009: > states: [activities-by-phone-store-or] > children: [KTABLE-TOSTREAM-0000000013] > KTABLE-TOSTREAM-0000000013: > children: [KSTREAM-FILTER-0000000014] > KSTREAM-FILTER-0000000014: > children: [KSTREAM-FILTER-0000000015] > KSTREAM-FILTER-0000000015: > children: [KSTREAM-MAP-0000000016] > KSTREAM-MAP-0000000016: > children: [KSTREAM-MAP-0000000017] > KSTREAM-MAP-0000000017: > children: [KSTREAM-SINK-0000000018] > KSTREAM-SINK-0000000018: > topic: ml-features-avro-or > Partitions [activities-by-phone-store-or-repartition-3] > Standby tasks: > > > instance 2: > > KafkaStreams processID: 092072f8-87be-4989-a94f-0ed544f5ca44 > StreamsThread appId: mar-23-modular > StreamsThread clientId: mar-23-modular > StreamsThread threadId: StreamThread-1 > Active tasks: > StreamsTask taskId: 2_1 > ProcessorTopology: > KSTREAM-SOURCE-0000000019: > topics: [ml-features-avro-or] > Partitions [ml-features-avro-or-1] > StreamsTask taskId: 2_5 > ProcessorTopology: > KSTREAM-SOURCE-0000000019: > topics: [ml-features-avro-or] > Partitions [ml-features-avro-or-5] > Standby tasks: > > StreamsThread appId: mar-23-modular > StreamsThread clientId: mar-23-modular > StreamsThread threadId: StreamThread-2 > Active tasks: > StreamsTask taskId: 0_4 > ProcessorTopology: > KSTREAM-SOURCE-0000000000: > topics: [activities-avro-or] > children: [KSTREAM-FILTER-0000000001] > KSTREAM-FILTER-0000000001: > children: [KSTREAM-MAP-0000000002] > KSTREAM-MAP-0000000002: > children: [KSTREAM-BRANCH-0000000003] > KSTREAM-BRANCH-0000000003: > children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD-0000000005] > KSTREAM-BRANCHCHILD-0000000004: > children: [KSTREAM-MAPVALUES-0000000006] > KSTREAM-MAPVALUES-0000000006: > children: [KSTREAM-FLATMAPVALUES-0000000007] > KSTREAM-FLATMAPVALUES-0000000007: > children: [KSTREAM-MAP-0000000008] > KSTREAM-MAP-0000000008: > children: [KSTREAM-FILTER-0000000011] > KSTREAM-FILTER-0000000011: > children: [KSTREAM-SINK-0000000010] > KSTREAM-SINK-0000000010: > topic: activities-by-phone-store-or-repartition > KSTREAM-BRANCHCHILD-0000000005: > Partitions [activities-avro-or-4] > Standby tasks: > > StreamsThread appId: mar-23-modular > StreamsThread clientId: mar-23-modular > StreamsThread threadId: StreamThread-3 > Active tasks: > StreamsTask taskId: 2_2 > ProcessorTopology: > KSTREAM-SOURCE-0000000019: > topics: [ml-features-avro-or] > Partitions [ml-features-avro-or-2] > StreamsTask taskId: 1_7 > ProcessorTopology: > KSTREAM-SOURCE-0000000012: > topics: [activities-by-phone-store-or-repartition] > children: [KSTREAM-AGGREGATE-0000000009] > KSTREAM-AGGREGATE-0000000009: > states: [activities-by-phone-store-or] > children: [KTABLE-TOSTREAM-0000000013] > KTABLE-TOSTREAM-0000000013: > children: [KSTREAM-FILTER-0000000014] > KSTREAM-FILTER-0000000014: > children: [KSTREAM-FILTER-0000000015] > KSTREAM-FILTER-0000000015: > children: [KSTREAM-MAP-0000000016] > KSTREAM-MAP-0000000016: > children: [KSTREAM-MAP-0000000017] > KSTREAM-MAP-0000000017: > children: [KSTREAM-SINK-0000000018] > KSTREAM-SINK-0000000018: > topic: ml-features-avro-or > Partitions [activities-by-phone-store-or-repartition-7] > Standby tasks: > > StreamsThread appId: mar-23-modular > StreamsThread clientId: mar-23-modular > StreamsThread threadId: StreamThread-4 > Active tasks: > StreamsTask taskId: 2_3 > ProcessorTopology: > KSTREAM-SOURCE-0000000019: > topics: [ml-features-avro-or] > Partitions [ml-features-avro-or-3] > Standby tasks: > > StreamsThread appId: mar-23-modular > StreamsThread clientId: mar-23-modular > StreamsThread threadId: StreamThread-5 > Active tasks: > StreamsTask taskId: 0_1 > ProcessorTopology: > KSTREAM-SOURCE-0000000000: > topics: [activities-avro-or] > children: [KSTREAM-FILTER-0000000001] > KSTREAM-FILTER-0000000001: > children: [KSTREAM-MAP-0000000002] > KSTREAM-MAP-0000000002: > children: [KSTREAM-BRANCH-0000000003] > KSTREAM-BRANCH-0000000003: > children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD-0000000005] > KSTREAM-BRANCHCHILD-0000000004: > children: [KSTREAM-MAPVALUES-0000000006] > KSTREAM-MAPVALUES-0000000006: > children: [KSTREAM-FLATMAPVALUES-0000000007] > KSTREAM-FLATMAPVALUES-0000000007: > children: [KSTREAM-MAP-0000000008] > KSTREAM-MAP-0000000008: > children: [KSTREAM-FILTER-0000000011] > KSTREAM-FILTER-0000000011: > children: [KSTREAM-SINK-0000000010] > KSTREAM-SINK-0000000010: > topic: activities-by-phone-store-or-repartition > KSTREAM-BRANCHCHILD-0000000005: > Partitions [activities-avro-or-1] > StreamsTask taskId: 1_5 > ProcessorTopology: > KSTREAM-SOURCE-0000000012: > topics: [activities-by-phone-store-or-repartition] > children: [KSTREAM-AGGREGATE-0000000009] > KSTREAM-AGGREGATE-0000000009: > states: [activities-by-phone-store-or] > children: [KTABLE-TOSTREAM-0000000013] > KTABLE-TOSTREAM-0000000013: > children: [KSTREAM-FILTER-0000000014] > KSTREAM-FILTER-0000000014: > children: [KSTREAM-FILTER-0000000015] > KSTREAM-FILTER-0000000015: > children: [KSTREAM-MAP-0000000016] > KSTREAM-MAP-0000000016: > children: [KSTREAM-MAP-0000000017] > KSTREAM-MAP-0000000017: > children: [KSTREAM-SINK-0000000018] > KSTREAM-SINK-0000000018: > topic: ml-features-avro-or > Partitions [activities-by-phone-store-or-repartition-5] > Standby tasks: > > StreamsThread appId: mar-23-modular > StreamsThread clientId: mar-23-modular > StreamsThread threadId: StreamThread-6 > Active tasks: > StreamsTask taskId: 1_4 > ProcessorTopology: > KSTREAM-SOURCE-0000000012: > topics: [activities-by-phone-store-or-repartition] > children: [KSTREAM-AGGREGATE-0000000009] > KSTREAM-AGGREGATE-0000000009: > states: [activities-by-phone-store-or] > children: [KTABLE-TOSTREAM-0000000013] > KTABLE-TOSTREAM-0000000013: > children: [KSTREAM-FILTER-0000000014] > KSTREAM-FILTER-0000000014: > children: [KSTREAM-FILTER-0000000015] > KSTREAM-FILTER-0000000015: > children: [KSTREAM-MAP-0000000016] > KSTREAM-MAP-0000000016: > children: [KSTREAM-MAP-0000000017] > KSTREAM-MAP-0000000017: > children: [KSTREAM-SINK-0000000018] > KSTREAM-SINK-0000000018: > topic: ml-features-avro-or > Partitions [activities-by-phone-store-or-repartition-4] > Standby tasks: > > StreamsThread appId: mar-23-modular > StreamsThread clientId: mar-23-modular > StreamsThread threadId: StreamThread-7 > Active tasks: > StreamsTask taskId: 0_2 > ProcessorTopology: > KSTREAM-SOURCE-0000000000: > topics: [activities-avro-or] > children: [KSTREAM-FILTER-0000000001] > KSTREAM-FILTER-0000000001: > children: [KSTREAM-MAP-0000000002] > KSTREAM-MAP-0000000002: > children: [KSTREAM-BRANCH-0000000003] > KSTREAM-BRANCH-0000000003: > children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD-0000000005] > KSTREAM-BRANCHCHILD-0000000004: > children: [KSTREAM-MAPVALUES-0000000006] > KSTREAM-MAPVALUES-0000000006: > children: [KSTREAM-FLATMAPVALUES-0000000007] > KSTREAM-FLATMAPVALUES-0000000007: > children: [KSTREAM-MAP-0000000008] > KSTREAM-MAP-0000000008: > children: [KSTREAM-FILTER-0000000011] > KSTREAM-FILTER-0000000011: > children: [KSTREAM-SINK-0000000010] > KSTREAM-SINK-0000000010: > topic: activities-by-phone-store-or-repartition > KSTREAM-BRANCHCHILD-0000000005: > Partitions [activities-avro-or-2] > StreamsTask taskId: 0_6 > ProcessorTopology: > KSTREAM-SOURCE-0000000000: > topics: [activities-avro-or] > children: [KSTREAM-FILTER-0000000001] > KSTREAM-FILTER-0000000001: > children: [KSTREAM-MAP-0000000002] > KSTREAM-MAP-0000000002: > children: [KSTREAM-BRANCH-0000000003] > KSTREAM-BRANCH-0000000003: > children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD-0000000005] > KSTREAM-BRANCHCHILD-0000000004: > children: [KSTREAM-MAPVALUES-0000000006] > KSTREAM-MAPVALUES-0000000006: > children: [KSTREAM-FLATMAPVALUES-0000000007] > KSTREAM-FLATMAPVALUES-0000000007: > children: [KSTREAM-MAP-0000000008] > KSTREAM-MAP-0000000008: > children: [KSTREAM-FILTER-0000000011] > KSTREAM-FILTER-0000000011: > children: [KSTREAM-SINK-0000000010] > KSTREAM-SINK-0000000010: > topic: activities-by-phone-store-or-repartition > KSTREAM-BRANCHCHILD-0000000005: > Partitions [activities-avro-or-6] > Standby tasks: > > StreamsThread appId: mar-23-modular > StreamsThread clientId: mar-23-modular > StreamsThread threadId: StreamThread-8 > Active tasks: > StreamsTask taskId: 0_5 > ProcessorTopology: > KSTREAM-SOURCE-0000000000: > topics: [activities-avro-or] > children: [KSTREAM-FILTER-0000000001] > KSTREAM-FILTER-0000000001: > children: [KSTREAM-MAP-0000000002] > KSTREAM-MAP-0000000002: > children: [KSTREAM-BRANCH-0000000003] > KSTREAM-BRANCH-0000000003: > children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD-0000000005] > KSTREAM-BRANCHCHILD-0000000004: > children: [KSTREAM-MAPVALUES-0000000006] > KSTREAM-MAPVALUES-0000000006: > children: [KSTREAM-FLATMAPVALUES-0000000007] > KSTREAM-FLATMAPVALUES-0000000007: > children: [KSTREAM-MAP-0000000008] > KSTREAM-MAP-0000000008: > children: [KSTREAM-FILTER-0000000011] > KSTREAM-FILTER-0000000011: > children: [KSTREAM-SINK-0000000010] > KSTREAM-SINK-0000000010: > topic: activities-by-phone-store-or-repartition > KSTREAM-BRANCHCHILD-0000000005: > Partitions [activities-avro-or-5] > Standby tasks: > > > activities-avro-or is input topic. ml-features-avro-or is output topic. In > the middle we have an aggregate (activities-by-phone-store-or-repartition). > > On instance 1 I see 3 tasks for activities-avro-or and on instance 2 I see 5. > Bad. > > On instance 1 see 4 tasks for ml-features-avro-or. And 4 on instance 2. Good. > > On instance 1 see 5 tasks for activities-by-phone-store-or-repartition. And 3 > on instance 2. Bad. > > As I said in terms of offsets for all these partitions I see uniform > distribution, so we’re not dealing with a bad key scenario. > > Ara. > > On Mar 25, 2017, at 6:43 PM, Matthias J. Sax > <matth...@confluent.io<mailto:matth...@confluent.io>> wrote: > > > > > ________________________________ > > This message is for the designated recipient only and may contain privileged, > proprietary, or otherwise confidential information. If you have received it > in error, please notify the sender immediately and delete the original. Any > other use of the e-mail by you is prohibited. Thank you in advance for your > cooperation. > > ________________________________ > > From: "Matthias J. Sax" <matth...@confluent.io<mailto:matth...@confluent.io>> > Subject: Re: more uniform task assignment across kafka stream nodes > Date: March 25, 2017 at 6:43:12 PM PDT > To: users@kafka.apache.org<mailto:users@kafka.apache.org> > Reply-To: <users@kafka.apache.org<mailto:users@kafka.apache.org>> > > > Please share the rest of your topology code (without any UDFs / business > logic). Otherwise, I cannot give further advice. > > -Matthias > > > On 3/25/17 6:08 PM, Ara Ebrahimi wrote: > Via: > > builder.stream("topic1"); > builder.stream("topic2"); > builder.stream("topic3”); > > These are different kinds of topics consuming different avro objects. > > Ara. > > On Mar 25, 2017, at 6:04 PM, Matthias J. Sax > <matth...@confluent.io<mailto:matth...@confluent.io><mailto:matth...@confluent.io>> > wrote: > > > > > ________________________________ > > This message is for the designated recipient only and may contain privileged, > proprietary, or otherwise confidential information. If you have received it > in error, please notify the sender immediately and delete the original. Any > other use of the e-mail by you is prohibited. Thank you in advance for your > cooperation. > > ________________________________ > > From: "Matthias J. Sax" > <matth...@confluent.io<mailto:matth...@confluent.io><mailto:matth...@confluent.io>> > Subject: Re: more uniform task assignment across kafka stream nodes > Date: March 25, 2017 at 6:04:30 PM PDT > To: > users@kafka.apache.org<mailto:users@kafka.apache.org><mailto:users@kafka.apache.org> > Reply-To: > <users@kafka.apache.org<mailto:users@kafka.apache.org><mailto:users@kafka.apache.org>> > > > Ara, > > How do you consume your topics? Via > > builder.stream("topic1", "topic2", "topic3); > > or via > > builder.stream("topic1"); > builder.stream("topic2"); > builder.stream("topic3"); > > Both and handled differently with regard to creating tasks (partition to > task assignment also depends on you downstream code though). > > If this does not help, can you maybe share the structure of processing? > To dig deeper, we would need to know the topology DAG. > > > -Matthias > > > On 3/25/17 5:56 PM, Ara Ebrahimi wrote: > Mathias, > > This apparently happens because we have more than 1 source topic. We have 3 > source topics in the same application. So it seems like the task assignment > algorithm creates topologies not for one specific topic at a time but the > total partitions across all source topics consumed in an application > instance. Because we have some code dependencies between these 3 source > topics we can’t separate them into 3 applications at this time. Hence the > reason I want to get the task assignment algorithm basically do a uniform and > simple task assignment PER source topic. > > Ara. > > On Mar 25, 2017, at 5:21 PM, Matthias J. Sax > <matth...@confluent.io<mailto:matth...@confluent.io><mailto:matth...@confluent.io><mailto:matth...@confluent.io>> > wrote: > > > > > ________________________________ > > This message is for the designated recipient only and may contain privileged, > proprietary, or otherwise confidential information. If you have received it > in error, please notify the sender immediately and delete the original. Any > other use of the e-mail by you is prohibited. Thank you in advance for your > cooperation. > > ________________________________ > > From: "Matthias J. Sax" > <matth...@confluent.io<mailto:matth...@confluent.io><mailto:matth...@confluent.io><mailto:matth...@confluent.io>> > Subject: Re: more uniform task assignment across kafka stream nodes > Date: March 25, 2017 at 5:21:47 PM PDT > To: > users@kafka.apache.org<mailto:users@kafka.apache.org><mailto:users@kafka.apache.org><mailto:users@kafka.apache.org> > Reply-To: > <users@kafka.apache.org<mailto:users@kafka.apache.org><mailto:users@kafka.apache.org><mailto:users@kafka.apache.org>> > > > Hi, > > I am wondering why this happens in the first place. Streams, > load-balanced over all running instances, and each instance should be > the same number of tasks (and thus partitions) assigned. > > What is the overall assignment? Do you have StandyBy tasks configured? > What version do you use? > > > -Matthias > > > On 3/24/17 8:09 PM, Ara Ebrahimi wrote: > Hi, > > Is there a way to tell kafka streams to uniformly assign partitions across > instances? If I have n kafka streams instances running, I want each to handle > EXACTLY 1/nth number of partitions. No dynamic task assignment logic. Just > dumb 1/n assignment. > > Here’s our scenario. Lets say we have an “source" topic with 8 partitions. We > also have 2 kafka streams instances. Each instances get assigned to handle 4 > “source" topic partitions. BUT then we do a few maps and an aggregate. So > data gets shuffled around. The map function uniformly distributes these > across all partitions (I can verify that by looking at the partition > offsets). After the map what I notice by looking at the topology is that one > kafka streams instance get assigned to handle say 2 aggregate repartition > topics and the other one gets assigned 6. Even worse, on bigger clusters (say > 4 instances) we see say 2 nodes gets assigned downstream aggregate > repartition topics and 2 other nodes assigned NOTHING to handle. > > Ara. > > > > ________________________________ > > This message is for the designated recipient only and may contain privileged, > proprietary, or otherwise confidential information. If you have received it > in error, please notify the sender immediately and delete the original. Any > other use of the e-mail by you is prohibited. Thank you in advance for your > cooperation. > > ________________________________ > > > > > > > > > ________________________________ > > This message is for the designated recipient only and may contain privileged, > proprietary, or otherwise confidential information. If you have received it > in error, please notify the sender immediately and delete the original. Any > other use of the e-mail by you is prohibited. Thank you in advance for your > cooperation. > > ________________________________ > > > > > ________________________________ > > This message is for the designated recipient only and may contain privileged, > proprietary, or otherwise confidential information. If you have received it > in error, please notify the sender immediately and delete the original. Any > other use of the e-mail by you is prohibited. Thank you in advance for your > cooperation. > > ________________________________ > > > > > ________________________________ > > This message is for the designated recipient only and may contain privileged, > proprietary, or otherwise confidential information. If you have received it > in error, please notify the sender immediately and delete the original. Any > other use of the e-mail by you is prohibited. Thank you in advance for your > cooperation. > > ________________________________ > > > > > > > > > ________________________________ > > This message is for the designated recipient only and may contain privileged, > proprietary, or otherwise confidential information. If you have received it > in error, please notify the sender immediately and delete the original. Any > other use of the e-mail by you is prohibited. Thank you in advance for your > cooperation. > > ________________________________ > > > > > > > > > ________________________________ > > This message is for the designated recipient only and may contain privileged, > proprietary, or otherwise confidential information. If you have received it > in error, please notify the sender immediately and delete the original. Any > other use of the e-mail by you is prohibited. Thank you in advance for your > cooperation. > > ________________________________ >
signature.asc
Description: OpenPGP digital signature