Created a JIRA: https://issues.apache.org/jira/browse/KAFKA-4969


-Matthias

On 3/27/17 4:33 PM, Ara Ebrahimi wrote:
> Well, even with 4-5x better performance thanks to the session window fix, I 
> expect to get ~10x better performance if I throw 10x more nodes at the 
> problem. That won’t be the case due to task assignment unfortunately. I may 
> end up with say 5-6 nodes with aggregation assigned to them and 4-5 nodes 
> sitting there doing nothing. So it is a problem.
> 
> Ara.
> 
> On Mar 27, 2017, at 4:15 PM, Matthias J. Sax 
> <matth...@confluent.io<mailto:matth...@confluent.io>> wrote:
> 
> 
> 
> 
> ________________________________
> 
> This message is for the designated recipient only and may contain privileged, 
> proprietary, or otherwise confidential information. If you have received it 
> in error, please notify the sender immediately and delete the original. Any 
> other use of the e-mail by you is prohibited. Thank you in advance for your 
> cooperation.
> 
> ________________________________
> 
> From: "Matthias J. Sax" <matth...@confluent.io<mailto:matth...@confluent.io>>
> Subject: Re: more uniform task assignment across kafka stream nodes
> Date: March 27, 2017 at 4:15:07 PM PDT
> To: users@kafka.apache.org<mailto:users@kafka.apache.org>
> Reply-To: <users@kafka.apache.org<mailto:users@kafka.apache.org>>
> 
> 
> Great!
> 
> So overall, the issue is not related to task assignment. Also the
> description below, does not indicate that different task assignment
> would change anything.
> 
> 
> -Matthias
> 
> On 3/27/17 3:08 PM, Ara Ebrahimi wrote:
> Let me clarify, cause I think we’re using different terminologies:
> 
> - message key is phone number, reversed
> - all call records for a phone number land on the same partition
> - then we apply a session window on them and aggregate+reduce
> - so we end up with a group of records for a phone number. This group is 
> literally an avro object with an array of records in it for the session.
> - since records arrive chronologically and since we have a session window, 
> then all call records for a phone number end up in the same partition and 
> session (intended behavior). We can easily have many such phone call record 
> groups with 100s of call records in them. The aggregate object (the avro 
> object with array of records in it) can get updated 100s of times for the 
> same phone number in the course of an hour or so.
> - we process billions of such call records a day
> - we can’t expect our clients to install massive clusters of 10s or 100s of 
> nodes. We do need to make sure this processing is as efficient as possible. 
> The session window bug was killing us. Much better with the fix Damian 
> provided!
> 
> Ara.
> 
> On Mar 27, 2017, at 2:41 PM, Matthias J. Sax 
> <matth...@confluent.io<mailto:matth...@confluent.io><mailto:matth...@confluent.io>>
>  wrote:
> 
> 
> 
> 
> ________________________________
> 
> This message is for the designated recipient only and may contain privileged, 
> proprietary, or otherwise confidential information. If you have received it 
> in error, please notify the sender immediately and delete the original. Any 
> other use of the e-mail by you is prohibited. Thank you in advance for your 
> cooperation.
> 
> ________________________________
> 
> From: "Matthias J. Sax" 
> <matth...@confluent.io<mailto:matth...@confluent.io><mailto:matth...@confluent.io>>
> Subject: Re: more uniform task assignment across kafka stream nodes
> Date: March 27, 2017 at 2:41:06 PM PDT
> To: 
> users@kafka.apache.org<mailto:users@kafka.apache.org><mailto:users@kafka.apache.org>
> Reply-To: 
> <users@kafka.apache.org<mailto:users@kafka.apache.org><mailto:users@kafka.apache.org>>
> 
> 
> Ara,
> 
> I assume your performance issue is most likely related to the fix Damian
> pointed out already.
> 
> Couple of follow up comments:
> 
> critical part of our pipeline involves grouping relevant records together
> 
> Can you explain this a little better? The abstraction of a task does
> group data together already. Furthermore, if you get multiple tasks,
> those are independent units with regard to grouping as consecutive tasks
> are "connected" via repartitioning steps. Thus, even if we apply the
> task assignment as you ask for, I am not sure if this would change
> anything? Maybe you can give a small data example what
> behavior/data-co-location you need and what Streams provides for you.
> 
> And for hot keys this can lead to sometimes 100s of records to get grouped 
> together
> 
> This is independent of task placement -- it a partitioning issue. If you
> want to work on that, you can provide a custom `Partioner` for the used
> Kafka producers (note, your external Producer writing to your Streams
> input topic might already generate "hot" keys, so you might need to use
> a custom partitioner there, too)
> 
> Also "100s of records" does not sound much to me. Streams can process
> multiple hundredths of thousandth records per thread. That is the
> reason, why I think that the fix Damian pointed out will most likely fix
> your problem.
> 
> 
> 
> -Matthias
> 
> 
> On 3/27/17 1:56 PM, Ara Ebrahimi wrote:
> Thanks for the response Mathias!
> 
> The reason we want this exact task assignment to happen is that a critical 
> part of our pipeline involves grouping relevant records together (that’s what 
> the aggregate function in the topology is for). And for hot keys this can 
> lead to sometimes 100s of records to get grouped together. Even worse, these 
> records are session bound, we use session windows. Hence we see lots of 
> activity around the store backing the aggregate function and even though we 
> use SSD drives we’re not seeing the kind of performance we want to see. It 
> seems like the aggregate function leads to lots of updates to these hot keys 
> which lead to lots of rocksdb activity.
> 
> Now there are many ways to fix this problem:
> - just don’t aggregate, create an algorithm which is not reliant on 
> grouping/aggregating records. Not what we can do with our tight schedule 
> right now.
> - do grouping/aggregating but employ n instances and rely on uniform 
> distribution of these tasks. This is the easiest solution and what we 
> expected to work but didn’t work as you can tell from this thread. We threw 4 
> instances at it but only 2 got used.
> - tune rocksdb? I tried this actually but it didn’t really help us much, 
> aside from the fact that tuning rocksdb is very tricky.
> - use in-memory store instead? Unfortunately we have to use session windows 
> for this aggregate function and apparently there’s no in-memory session store 
> impl? I tried to create one but soon realized it’s too much work :) I looked 
> at default PartitionAssigner code too, but that ain’t trivial either.
> 
> So I’m a bit hopeless :(
> 
> Ara.
> 
> On Mar 27, 2017, at 1:35 PM, Matthias J. Sax 
> <matth...@confluent.io<mailto:matth...@confluent.io><mailto:matth...@confluent.io><mailto:matth...@confluent.io>>
>  wrote:
> 
> 
> 
> 
> ________________________________
> 
> This message is for the designated recipient only and may contain privileged, 
> proprietary, or otherwise confidential information. If you have received it 
> in error, please notify the sender immediately and delete the original. Any 
> other use of the e-mail by you is prohibited. Thank you in advance for your 
> cooperation.
> 
> ________________________________
> 
> From: "Matthias J. Sax" 
> <matth...@confluent.io<mailto:matth...@confluent.io><mailto:matth...@confluent.io><mailto:matth...@confluent.io>>
> Subject: Re: more uniform task assignment across kafka stream nodes
> Date: March 27, 2017 at 1:35:30 PM PDT
> To: 
> users@kafka.apache.org<mailto:users@kafka.apache.org><mailto:users@kafka.apache.org><mailto:users@kafka.apache.org>
> Reply-To: 
> <users@kafka.apache.org<mailto:users@kafka.apache.org><mailto:users@kafka.apache.org><mailto:users@kafka.apache.org>>
> 
> 
> Ara,
> 
> thanks for the detailed information.
> 
> If I parse this correctly, both instances run the same number of tasks
> (12 each). That is all Streams promises.
> 
> To come back to your initial question:
> 
> Is there a way to tell kafka streams to uniformly assign partitions across 
> instances? If I have n kafka streams instances running, I want each to handle 
> EXACTLY 1/nth number of partitions. No dynamic task assignment logic. Just 
> dumb 1/n assignment.
> 
> That is exactly what you get: each of you two instances get 24/2 = 12
> tasks assigned. That is dump 1/n assignment, isn't it? So my original
> response was correct.
> 
> However, I now understand better what you are actually meaning by your
> question. Note that Streams does not distinguish "type" of tasks -- it
> only sees 24 tasks and assigns those in a balanced way.
> 
> Thus, currently there is no easy way to get the assignment you want to
> have, except, you implement you own `PartitionAssignor`.
> 
> This is the current implementation for 0.10.2
> https://github.com/apache/kafka/blob/0.10.2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
> 
> You can, if you wish write your own assignor and set it via
> StreamsConfig. However, be aware that this might be tricky to get right
> and also might have runtime implications with regard to rebalancing and
> state store recovery. We recently improve the current implementation to
> avoid costly task movements:
> https://issues.apache.org/jira/browse/KAFKA-4677
> 
> Thus, I would not recommend to implement an own `PartitionAssignor`.
> 
> 
> However, the root question is, why do you need this exact assignment you
> are looking for in the first place? Why is it "bad" if "types" of tasks
> are not distinguished? I would like to understand your requirement
> better -- it might be worth to improve Streams here.
> 
> 
> -Matthias
> 
> 
> On 3/27/17 12:57 PM, Ara Ebrahimi wrote:
> Hi,
> 
> So, I simplified the topology by making sure we have only 1 source topic. Now 
> I have 1 source topic, 8 partitions, 2 instances. And here’s how the topology 
> looks like:
> 
> instance 1:
> 
> KafkaStreams processID: 48b58bc0-f600-4ec8-bc92-8cb3ea081aac
> StreamsThread appId: mar-23-modular
> StreamsThread clientId: mar-23-modular
> StreamsThread threadId: StreamThread-1
> Active tasks:
> StreamsTask taskId: 0_3
> ProcessorTopology:
> KSTREAM-SOURCE-0000000000:
> topics: [activities-avro-or]
> children: [KSTREAM-FILTER-0000000001]
> KSTREAM-FILTER-0000000001:
> children: [KSTREAM-MAP-0000000002]
> KSTREAM-MAP-0000000002:
> children: [KSTREAM-BRANCH-0000000003]
> KSTREAM-BRANCH-0000000003:
> children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD-0000000005]
> KSTREAM-BRANCHCHILD-0000000004:
> children: [KSTREAM-MAPVALUES-0000000006]
> KSTREAM-MAPVALUES-0000000006:
> children: [KSTREAM-FLATMAPVALUES-0000000007]
> KSTREAM-FLATMAPVALUES-0000000007:
> children: [KSTREAM-MAP-0000000008]
> KSTREAM-MAP-0000000008:
> children: [KSTREAM-FILTER-0000000011]
> KSTREAM-FILTER-0000000011:
> children: [KSTREAM-SINK-0000000010]
> KSTREAM-SINK-0000000010:
> topic: activities-by-phone-store-or-repartition
> KSTREAM-BRANCHCHILD-0000000005:
> Partitions [activities-avro-or-3]
> Standby tasks:
> 
> StreamsThread appId: mar-23-modular
> StreamsThread clientId: mar-23-modular
> StreamsThread threadId: StreamThread-2
> Active tasks:
> StreamsTask taskId: 1_2
> ProcessorTopology:
> KSTREAM-SOURCE-0000000012:
> topics: [activities-by-phone-store-or-repartition]
> children: [KSTREAM-AGGREGATE-0000000009]
> KSTREAM-AGGREGATE-0000000009:
> states: [activities-by-phone-store-or]
> children: [KTABLE-TOSTREAM-0000000013]
> KTABLE-TOSTREAM-0000000013:
> children: [KSTREAM-FILTER-0000000014]
> KSTREAM-FILTER-0000000014:
> children: [KSTREAM-FILTER-0000000015]
> KSTREAM-FILTER-0000000015:
> children: [KSTREAM-MAP-0000000016]
> KSTREAM-MAP-0000000016:
> children: [KSTREAM-MAP-0000000017]
> KSTREAM-MAP-0000000017:
> children: [KSTREAM-SINK-0000000018]
> KSTREAM-SINK-0000000018:
> topic: ml-features-avro-or
> Partitions [activities-by-phone-store-or-repartition-2]
> Standby tasks:
> 
> StreamsThread appId: mar-23-modular
> StreamsThread clientId: mar-23-modular
> StreamsThread threadId: StreamThread-3
> Active tasks:
> StreamsTask taskId: 1_1
> ProcessorTopology:
> KSTREAM-SOURCE-0000000012:
> topics: [activities-by-phone-store-or-repartition]
> children: [KSTREAM-AGGREGATE-0000000009]
> KSTREAM-AGGREGATE-0000000009:
> states: [activities-by-phone-store-or]
> children: [KTABLE-TOSTREAM-0000000013]
> KTABLE-TOSTREAM-0000000013:
> children: [KSTREAM-FILTER-0000000014]
> KSTREAM-FILTER-0000000014:
> children: [KSTREAM-FILTER-0000000015]
> KSTREAM-FILTER-0000000015:
> children: [KSTREAM-MAP-0000000016]
> KSTREAM-MAP-0000000016:
> children: [KSTREAM-MAP-0000000017]
> KSTREAM-MAP-0000000017:
> children: [KSTREAM-SINK-0000000018]
> KSTREAM-SINK-0000000018:
> topic: ml-features-avro-or
> Partitions [activities-by-phone-store-or-repartition-1]
> StreamsTask taskId: 2_7
> ProcessorTopology:
> KSTREAM-SOURCE-0000000019:
> topics: [ml-features-avro-or]
> Partitions [ml-features-avro-or-7]
> Standby tasks:
> 
> StreamsThread appId: mar-23-modular
> StreamsThread clientId: mar-23-modular
> StreamsThread threadId: StreamThread-4
> Active tasks:
> StreamsTask taskId: 2_0
> ProcessorTopology:
> KSTREAM-SOURCE-0000000019:
> topics: [ml-features-avro-or]
> Partitions [ml-features-avro-or-0]
> StreamsTask taskId: 2_6
> ProcessorTopology:
> KSTREAM-SOURCE-0000000019:
> topics: [ml-features-avro-or]
> Partitions [ml-features-avro-or-6]
> Standby tasks:
> 
> StreamsThread appId: mar-23-modular
> StreamsThread clientId: mar-23-modular
> StreamsThread threadId: StreamThread-5
> Active tasks:
> StreamsTask taskId: 0_0
> ProcessorTopology:
> KSTREAM-SOURCE-0000000000:
> topics: [activities-avro-or]
> children: [KSTREAM-FILTER-0000000001]
> KSTREAM-FILTER-0000000001:
> children: [KSTREAM-MAP-0000000002]
> KSTREAM-MAP-0000000002:
> children: [KSTREAM-BRANCH-0000000003]
> KSTREAM-BRANCH-0000000003:
> children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD-0000000005]
> KSTREAM-BRANCHCHILD-0000000004:
> children: [KSTREAM-MAPVALUES-0000000006]
> KSTREAM-MAPVALUES-0000000006:
> children: [KSTREAM-FLATMAPVALUES-0000000007]
> KSTREAM-FLATMAPVALUES-0000000007:
> children: [KSTREAM-MAP-0000000008]
> KSTREAM-MAP-0000000008:
> children: [KSTREAM-FILTER-0000000011]
> KSTREAM-FILTER-0000000011:
> children: [KSTREAM-SINK-0000000010]
> KSTREAM-SINK-0000000010:
> topic: activities-by-phone-store-or-repartition
> KSTREAM-BRANCHCHILD-0000000005:
> Partitions [activities-avro-or-0]
> StreamsTask taskId: 1_6
> ProcessorTopology:
> KSTREAM-SOURCE-0000000012:
> topics: [activities-by-phone-store-or-repartition]
> children: [KSTREAM-AGGREGATE-0000000009]
> KSTREAM-AGGREGATE-0000000009:
> states: [activities-by-phone-store-or]
> children: [KTABLE-TOSTREAM-0000000013]
> KTABLE-TOSTREAM-0000000013:
> children: [KSTREAM-FILTER-0000000014]
> KSTREAM-FILTER-0000000014:
> children: [KSTREAM-FILTER-0000000015]
> KSTREAM-FILTER-0000000015:
> children: [KSTREAM-MAP-0000000016]
> KSTREAM-MAP-0000000016:
> children: [KSTREAM-MAP-0000000017]
> KSTREAM-MAP-0000000017:
> children: [KSTREAM-SINK-0000000018]
> KSTREAM-SINK-0000000018:
> topic: ml-features-avro-or
> Partitions [activities-by-phone-store-or-repartition-6]
> Standby tasks:
> 
> StreamsThread appId: mar-23-modular
> StreamsThread clientId: mar-23-modular
> StreamsThread threadId: StreamThread-6
> Active tasks:
> StreamsTask taskId: 1_0
> ProcessorTopology:
> KSTREAM-SOURCE-0000000012:
> topics: [activities-by-phone-store-or-repartition]
> children: [KSTREAM-AGGREGATE-0000000009]
> KSTREAM-AGGREGATE-0000000009:
> states: [activities-by-phone-store-or]
> children: [KTABLE-TOSTREAM-0000000013]
> KTABLE-TOSTREAM-0000000013:
> children: [KSTREAM-FILTER-0000000014]
> KSTREAM-FILTER-0000000014:
> children: [KSTREAM-FILTER-0000000015]
> KSTREAM-FILTER-0000000015:
> children: [KSTREAM-MAP-0000000016]
> KSTREAM-MAP-0000000016:
> children: [KSTREAM-MAP-0000000017]
> KSTREAM-MAP-0000000017:
> children: [KSTREAM-SINK-0000000018]
> KSTREAM-SINK-0000000018:
> topic: ml-features-avro-or
> Partitions [activities-by-phone-store-or-repartition-0]
> StreamsTask taskId: 0_7
> ProcessorTopology:
> KSTREAM-SOURCE-0000000000:
> topics: [activities-avro-or]
> children: [KSTREAM-FILTER-0000000001]
> KSTREAM-FILTER-0000000001:
> children: [KSTREAM-MAP-0000000002]
> KSTREAM-MAP-0000000002:
> children: [KSTREAM-BRANCH-0000000003]
> KSTREAM-BRANCH-0000000003:
> children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD-0000000005]
> KSTREAM-BRANCHCHILD-0000000004:
> children: [KSTREAM-MAPVALUES-0000000006]
> KSTREAM-MAPVALUES-0000000006:
> children: [KSTREAM-FLATMAPVALUES-0000000007]
> KSTREAM-FLATMAPVALUES-0000000007:
> children: [KSTREAM-MAP-0000000008]
> KSTREAM-MAP-0000000008:
> children: [KSTREAM-FILTER-0000000011]
> KSTREAM-FILTER-0000000011:
> children: [KSTREAM-SINK-0000000010]
> KSTREAM-SINK-0000000010:
> topic: activities-by-phone-store-or-repartition
> KSTREAM-BRANCHCHILD-0000000005:
> Partitions [activities-avro-or-7]
> Standby tasks:
> 
> StreamsThread appId: mar-23-modular
> StreamsThread clientId: mar-23-modular
> StreamsThread threadId: StreamThread-7
> Active tasks:
> StreamsTask taskId: 2_4
> ProcessorTopology:
> KSTREAM-SOURCE-0000000019:
> topics: [ml-features-avro-or]
> Partitions [ml-features-avro-or-4]
> Standby tasks:
> 
> StreamsThread appId: mar-23-modular
> StreamsThread clientId: mar-23-modular
> StreamsThread threadId: StreamThread-8
> Active tasks:
> StreamsTask taskId: 1_3
> ProcessorTopology:
> KSTREAM-SOURCE-0000000012:
> topics: [activities-by-phone-store-or-repartition]
> children: [KSTREAM-AGGREGATE-0000000009]
> KSTREAM-AGGREGATE-0000000009:
> states: [activities-by-phone-store-or]
> children: [KTABLE-TOSTREAM-0000000013]
> KTABLE-TOSTREAM-0000000013:
> children: [KSTREAM-FILTER-0000000014]
> KSTREAM-FILTER-0000000014:
> children: [KSTREAM-FILTER-0000000015]
> KSTREAM-FILTER-0000000015:
> children: [KSTREAM-MAP-0000000016]
> KSTREAM-MAP-0000000016:
> children: [KSTREAM-MAP-0000000017]
> KSTREAM-MAP-0000000017:
> children: [KSTREAM-SINK-0000000018]
> KSTREAM-SINK-0000000018:
> topic: ml-features-avro-or
> Partitions [activities-by-phone-store-or-repartition-3]
> Standby tasks:
> 
> 
> instance 2:
> 
> KafkaStreams processID: 092072f8-87be-4989-a94f-0ed544f5ca44
> StreamsThread appId: mar-23-modular
> StreamsThread clientId: mar-23-modular
> StreamsThread threadId: StreamThread-1
> Active tasks:
> StreamsTask taskId: 2_1
> ProcessorTopology:
> KSTREAM-SOURCE-0000000019:
> topics: [ml-features-avro-or]
> Partitions [ml-features-avro-or-1]
> StreamsTask taskId: 2_5
> ProcessorTopology:
> KSTREAM-SOURCE-0000000019:
> topics: [ml-features-avro-or]
> Partitions [ml-features-avro-or-5]
> Standby tasks:
> 
> StreamsThread appId: mar-23-modular
> StreamsThread clientId: mar-23-modular
> StreamsThread threadId: StreamThread-2
> Active tasks:
> StreamsTask taskId: 0_4
> ProcessorTopology:
> KSTREAM-SOURCE-0000000000:
> topics: [activities-avro-or]
> children: [KSTREAM-FILTER-0000000001]
> KSTREAM-FILTER-0000000001:
> children: [KSTREAM-MAP-0000000002]
> KSTREAM-MAP-0000000002:
> children: [KSTREAM-BRANCH-0000000003]
> KSTREAM-BRANCH-0000000003:
> children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD-0000000005]
> KSTREAM-BRANCHCHILD-0000000004:
> children: [KSTREAM-MAPVALUES-0000000006]
> KSTREAM-MAPVALUES-0000000006:
> children: [KSTREAM-FLATMAPVALUES-0000000007]
> KSTREAM-FLATMAPVALUES-0000000007:
> children: [KSTREAM-MAP-0000000008]
> KSTREAM-MAP-0000000008:
> children: [KSTREAM-FILTER-0000000011]
> KSTREAM-FILTER-0000000011:
> children: [KSTREAM-SINK-0000000010]
> KSTREAM-SINK-0000000010:
> topic: activities-by-phone-store-or-repartition
> KSTREAM-BRANCHCHILD-0000000005:
> Partitions [activities-avro-or-4]
> Standby tasks:
> 
> StreamsThread appId: mar-23-modular
> StreamsThread clientId: mar-23-modular
> StreamsThread threadId: StreamThread-3
> Active tasks:
> StreamsTask taskId: 2_2
> ProcessorTopology:
> KSTREAM-SOURCE-0000000019:
> topics: [ml-features-avro-or]
> Partitions [ml-features-avro-or-2]
> StreamsTask taskId: 1_7
> ProcessorTopology:
> KSTREAM-SOURCE-0000000012:
> topics: [activities-by-phone-store-or-repartition]
> children: [KSTREAM-AGGREGATE-0000000009]
> KSTREAM-AGGREGATE-0000000009:
> states: [activities-by-phone-store-or]
> children: [KTABLE-TOSTREAM-0000000013]
> KTABLE-TOSTREAM-0000000013:
> children: [KSTREAM-FILTER-0000000014]
> KSTREAM-FILTER-0000000014:
> children: [KSTREAM-FILTER-0000000015]
> KSTREAM-FILTER-0000000015:
> children: [KSTREAM-MAP-0000000016]
> KSTREAM-MAP-0000000016:
> children: [KSTREAM-MAP-0000000017]
> KSTREAM-MAP-0000000017:
> children: [KSTREAM-SINK-0000000018]
> KSTREAM-SINK-0000000018:
> topic: ml-features-avro-or
> Partitions [activities-by-phone-store-or-repartition-7]
> Standby tasks:
> 
> StreamsThread appId: mar-23-modular
> StreamsThread clientId: mar-23-modular
> StreamsThread threadId: StreamThread-4
> Active tasks:
> StreamsTask taskId: 2_3
> ProcessorTopology:
> KSTREAM-SOURCE-0000000019:
> topics: [ml-features-avro-or]
> Partitions [ml-features-avro-or-3]
> Standby tasks:
> 
> StreamsThread appId: mar-23-modular
> StreamsThread clientId: mar-23-modular
> StreamsThread threadId: StreamThread-5
> Active tasks:
> StreamsTask taskId: 0_1
> ProcessorTopology:
> KSTREAM-SOURCE-0000000000:
> topics: [activities-avro-or]
> children: [KSTREAM-FILTER-0000000001]
> KSTREAM-FILTER-0000000001:
> children: [KSTREAM-MAP-0000000002]
> KSTREAM-MAP-0000000002:
> children: [KSTREAM-BRANCH-0000000003]
> KSTREAM-BRANCH-0000000003:
> children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD-0000000005]
> KSTREAM-BRANCHCHILD-0000000004:
> children: [KSTREAM-MAPVALUES-0000000006]
> KSTREAM-MAPVALUES-0000000006:
> children: [KSTREAM-FLATMAPVALUES-0000000007]
> KSTREAM-FLATMAPVALUES-0000000007:
> children: [KSTREAM-MAP-0000000008]
> KSTREAM-MAP-0000000008:
> children: [KSTREAM-FILTER-0000000011]
> KSTREAM-FILTER-0000000011:
> children: [KSTREAM-SINK-0000000010]
> KSTREAM-SINK-0000000010:
> topic: activities-by-phone-store-or-repartition
> KSTREAM-BRANCHCHILD-0000000005:
> Partitions [activities-avro-or-1]
> StreamsTask taskId: 1_5
> ProcessorTopology:
> KSTREAM-SOURCE-0000000012:
> topics: [activities-by-phone-store-or-repartition]
> children: [KSTREAM-AGGREGATE-0000000009]
> KSTREAM-AGGREGATE-0000000009:
> states: [activities-by-phone-store-or]
> children: [KTABLE-TOSTREAM-0000000013]
> KTABLE-TOSTREAM-0000000013:
> children: [KSTREAM-FILTER-0000000014]
> KSTREAM-FILTER-0000000014:
> children: [KSTREAM-FILTER-0000000015]
> KSTREAM-FILTER-0000000015:
> children: [KSTREAM-MAP-0000000016]
> KSTREAM-MAP-0000000016:
> children: [KSTREAM-MAP-0000000017]
> KSTREAM-MAP-0000000017:
> children: [KSTREAM-SINK-0000000018]
> KSTREAM-SINK-0000000018:
> topic: ml-features-avro-or
> Partitions [activities-by-phone-store-or-repartition-5]
> Standby tasks:
> 
> StreamsThread appId: mar-23-modular
> StreamsThread clientId: mar-23-modular
> StreamsThread threadId: StreamThread-6
> Active tasks:
> StreamsTask taskId: 1_4
> ProcessorTopology:
> KSTREAM-SOURCE-0000000012:
> topics: [activities-by-phone-store-or-repartition]
> children: [KSTREAM-AGGREGATE-0000000009]
> KSTREAM-AGGREGATE-0000000009:
> states: [activities-by-phone-store-or]
> children: [KTABLE-TOSTREAM-0000000013]
> KTABLE-TOSTREAM-0000000013:
> children: [KSTREAM-FILTER-0000000014]
> KSTREAM-FILTER-0000000014:
> children: [KSTREAM-FILTER-0000000015]
> KSTREAM-FILTER-0000000015:
> children: [KSTREAM-MAP-0000000016]
> KSTREAM-MAP-0000000016:
> children: [KSTREAM-MAP-0000000017]
> KSTREAM-MAP-0000000017:
> children: [KSTREAM-SINK-0000000018]
> KSTREAM-SINK-0000000018:
> topic: ml-features-avro-or
> Partitions [activities-by-phone-store-or-repartition-4]
> Standby tasks:
> 
> StreamsThread appId: mar-23-modular
> StreamsThread clientId: mar-23-modular
> StreamsThread threadId: StreamThread-7
> Active tasks:
> StreamsTask taskId: 0_2
> ProcessorTopology:
> KSTREAM-SOURCE-0000000000:
> topics: [activities-avro-or]
> children: [KSTREAM-FILTER-0000000001]
> KSTREAM-FILTER-0000000001:
> children: [KSTREAM-MAP-0000000002]
> KSTREAM-MAP-0000000002:
> children: [KSTREAM-BRANCH-0000000003]
> KSTREAM-BRANCH-0000000003:
> children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD-0000000005]
> KSTREAM-BRANCHCHILD-0000000004:
> children: [KSTREAM-MAPVALUES-0000000006]
> KSTREAM-MAPVALUES-0000000006:
> children: [KSTREAM-FLATMAPVALUES-0000000007]
> KSTREAM-FLATMAPVALUES-0000000007:
> children: [KSTREAM-MAP-0000000008]
> KSTREAM-MAP-0000000008:
> children: [KSTREAM-FILTER-0000000011]
> KSTREAM-FILTER-0000000011:
> children: [KSTREAM-SINK-0000000010]
> KSTREAM-SINK-0000000010:
> topic: activities-by-phone-store-or-repartition
> KSTREAM-BRANCHCHILD-0000000005:
> Partitions [activities-avro-or-2]
> StreamsTask taskId: 0_6
> ProcessorTopology:
> KSTREAM-SOURCE-0000000000:
> topics: [activities-avro-or]
> children: [KSTREAM-FILTER-0000000001]
> KSTREAM-FILTER-0000000001:
> children: [KSTREAM-MAP-0000000002]
> KSTREAM-MAP-0000000002:
> children: [KSTREAM-BRANCH-0000000003]
> KSTREAM-BRANCH-0000000003:
> children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD-0000000005]
> KSTREAM-BRANCHCHILD-0000000004:
> children: [KSTREAM-MAPVALUES-0000000006]
> KSTREAM-MAPVALUES-0000000006:
> children: [KSTREAM-FLATMAPVALUES-0000000007]
> KSTREAM-FLATMAPVALUES-0000000007:
> children: [KSTREAM-MAP-0000000008]
> KSTREAM-MAP-0000000008:
> children: [KSTREAM-FILTER-0000000011]
> KSTREAM-FILTER-0000000011:
> children: [KSTREAM-SINK-0000000010]
> KSTREAM-SINK-0000000010:
> topic: activities-by-phone-store-or-repartition
> KSTREAM-BRANCHCHILD-0000000005:
> Partitions [activities-avro-or-6]
> Standby tasks:
> 
> StreamsThread appId: mar-23-modular
> StreamsThread clientId: mar-23-modular
> StreamsThread threadId: StreamThread-8
> Active tasks:
> StreamsTask taskId: 0_5
> ProcessorTopology:
> KSTREAM-SOURCE-0000000000:
> topics: [activities-avro-or]
> children: [KSTREAM-FILTER-0000000001]
> KSTREAM-FILTER-0000000001:
> children: [KSTREAM-MAP-0000000002]
> KSTREAM-MAP-0000000002:
> children: [KSTREAM-BRANCH-0000000003]
> KSTREAM-BRANCH-0000000003:
> children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD-0000000005]
> KSTREAM-BRANCHCHILD-0000000004:
> children: [KSTREAM-MAPVALUES-0000000006]
> KSTREAM-MAPVALUES-0000000006:
> children: [KSTREAM-FLATMAPVALUES-0000000007]
> KSTREAM-FLATMAPVALUES-0000000007:
> children: [KSTREAM-MAP-0000000008]
> KSTREAM-MAP-0000000008:
> children: [KSTREAM-FILTER-0000000011]
> KSTREAM-FILTER-0000000011:
> children: [KSTREAM-SINK-0000000010]
> KSTREAM-SINK-0000000010:
> topic: activities-by-phone-store-or-repartition
> KSTREAM-BRANCHCHILD-0000000005:
> Partitions [activities-avro-or-5]
> Standby tasks:
> 
> 
> activities-avro-or is input topic. ml-features-avro-or is output topic. In 
> the middle we have an aggregate (activities-by-phone-store-or-repartition).
> 
> On instance 1 I see 3 tasks for activities-avro-or and on instance 2 I see 5. 
> Bad.
> 
> On instance 1 see 4 tasks for ml-features-avro-or. And 4 on instance 2. Good.
> 
> On instance 1 see 5 tasks for activities-by-phone-store-or-repartition. And 3 
> on instance 2. Bad.
> 
> As I said in terms of offsets for all these partitions I see uniform 
> distribution, so we’re not dealing with a bad key scenario.
> 
> Ara.
> 
> On Mar 25, 2017, at 6:43 PM, Matthias J. Sax 
> <matth...@confluent.io<mailto:matth...@confluent.io>> wrote:
> 
> 
> 
> 
> ________________________________
> 
> This message is for the designated recipient only and may contain privileged, 
> proprietary, or otherwise confidential information. If you have received it 
> in error, please notify the sender immediately and delete the original. Any 
> other use of the e-mail by you is prohibited. Thank you in advance for your 
> cooperation.
> 
> ________________________________
> 
> From: "Matthias J. Sax" <matth...@confluent.io<mailto:matth...@confluent.io>>
> Subject: Re: more uniform task assignment across kafka stream nodes
> Date: March 25, 2017 at 6:43:12 PM PDT
> To: users@kafka.apache.org<mailto:users@kafka.apache.org>
> Reply-To: <users@kafka.apache.org<mailto:users@kafka.apache.org>>
> 
> 
> Please share the rest of your topology code (without any UDFs / business
> logic). Otherwise, I cannot give further advice.
> 
> -Matthias
> 
> 
> On 3/25/17 6:08 PM, Ara Ebrahimi wrote:
> Via:
> 
> builder.stream("topic1");
> builder.stream("topic2");
> builder.stream("topic3”);
> 
> These are different kinds of topics consuming different avro objects.
> 
> Ara.
> 
> On Mar 25, 2017, at 6:04 PM, Matthias J. Sax 
> <matth...@confluent.io<mailto:matth...@confluent.io><mailto:matth...@confluent.io>>
>  wrote:
> 
> 
> 
> 
> ________________________________
> 
> This message is for the designated recipient only and may contain privileged, 
> proprietary, or otherwise confidential information. If you have received it 
> in error, please notify the sender immediately and delete the original. Any 
> other use of the e-mail by you is prohibited. Thank you in advance for your 
> cooperation.
> 
> ________________________________
> 
> From: "Matthias J. Sax" 
> <matth...@confluent.io<mailto:matth...@confluent.io><mailto:matth...@confluent.io>>
> Subject: Re: more uniform task assignment across kafka stream nodes
> Date: March 25, 2017 at 6:04:30 PM PDT
> To: 
> users@kafka.apache.org<mailto:users@kafka.apache.org><mailto:users@kafka.apache.org>
> Reply-To: 
> <users@kafka.apache.org<mailto:users@kafka.apache.org><mailto:users@kafka.apache.org>>
> 
> 
> Ara,
> 
> How do you consume your topics? Via
> 
> builder.stream("topic1", "topic2", "topic3);
> 
> or via
> 
> builder.stream("topic1");
> builder.stream("topic2");
> builder.stream("topic3");
> 
> Both and handled differently with regard to creating tasks (partition to
> task assignment also depends on you downstream code though).
> 
> If this does not help, can you maybe share the structure of processing?
> To dig deeper, we would need to know the topology DAG.
> 
> 
> -Matthias
> 
> 
> On 3/25/17 5:56 PM, Ara Ebrahimi wrote:
> Mathias,
> 
> This apparently happens because we have more than 1 source topic. We have 3 
> source topics in the same application. So it seems like the task assignment 
> algorithm creates topologies not for one specific topic at a time but the 
> total partitions across all source topics consumed in an application 
> instance. Because we have some code dependencies between these 3 source 
> topics we can’t separate them into 3 applications at this time. Hence the 
> reason I want to get the task assignment algorithm basically do a uniform and 
> simple task assignment PER source topic.
> 
> Ara.
> 
> On Mar 25, 2017, at 5:21 PM, Matthias J. Sax 
> <matth...@confluent.io<mailto:matth...@confluent.io><mailto:matth...@confluent.io><mailto:matth...@confluent.io>>
>  wrote:
> 
> 
> 
> 
> ________________________________
> 
> This message is for the designated recipient only and may contain privileged, 
> proprietary, or otherwise confidential information. If you have received it 
> in error, please notify the sender immediately and delete the original. Any 
> other use of the e-mail by you is prohibited. Thank you in advance for your 
> cooperation.
> 
> ________________________________
> 
> From: "Matthias J. Sax" 
> <matth...@confluent.io<mailto:matth...@confluent.io><mailto:matth...@confluent.io><mailto:matth...@confluent.io>>
> Subject: Re: more uniform task assignment across kafka stream nodes
> Date: March 25, 2017 at 5:21:47 PM PDT
> To: 
> users@kafka.apache.org<mailto:users@kafka.apache.org><mailto:users@kafka.apache.org><mailto:users@kafka.apache.org>
> Reply-To: 
> <users@kafka.apache.org<mailto:users@kafka.apache.org><mailto:users@kafka.apache.org><mailto:users@kafka.apache.org>>
> 
> 
> Hi,
> 
> I am wondering why this happens in the first place. Streams,
> load-balanced over all running instances, and each instance should be
> the same number of tasks (and thus partitions) assigned.
> 
> What is the overall assignment? Do you have StandyBy tasks configured?
> What version do you use?
> 
> 
> -Matthias
> 
> 
> On 3/24/17 8:09 PM, Ara Ebrahimi wrote:
> Hi,
> 
> Is there a way to tell kafka streams to uniformly assign partitions across 
> instances? If I have n kafka streams instances running, I want each to handle 
> EXACTLY 1/nth number of partitions. No dynamic task assignment logic. Just 
> dumb 1/n assignment.
> 
> Here’s our scenario. Lets say we have an “source" topic with 8 partitions. We 
> also have 2 kafka streams instances. Each instances get assigned to handle 4 
> “source" topic partitions. BUT then we do a few maps and an aggregate. So 
> data gets shuffled around. The map function uniformly distributes these 
> across all partitions (I can verify that by looking at the partition 
> offsets). After the map what I notice by looking at the topology is that one 
> kafka streams instance get assigned to handle say 2 aggregate repartition 
> topics and the other one gets assigned 6. Even worse, on bigger clusters (say 
> 4 instances) we see say 2 nodes gets assigned downstream aggregate 
> repartition topics and 2 other nodes assigned NOTHING to handle.
> 
> Ara.
> 
> 
> 
> ________________________________
> 
> This message is for the designated recipient only and may contain privileged, 
> proprietary, or otherwise confidential information. If you have received it 
> in error, please notify the sender immediately and delete the original. Any 
> other use of the e-mail by you is prohibited. Thank you in advance for your 
> cooperation.
> 
> ________________________________
> 
> 
> 
> 
> 
> 
> 
> 
> ________________________________
> 
> This message is for the designated recipient only and may contain privileged, 
> proprietary, or otherwise confidential information. If you have received it 
> in error, please notify the sender immediately and delete the original. Any 
> other use of the e-mail by you is prohibited. Thank you in advance for your 
> cooperation.
> 
> ________________________________
> 
> 
> 
> 
> ________________________________
> 
> This message is for the designated recipient only and may contain privileged, 
> proprietary, or otherwise confidential information. If you have received it 
> in error, please notify the sender immediately and delete the original. Any 
> other use of the e-mail by you is prohibited. Thank you in advance for your 
> cooperation.
> 
> ________________________________
> 
> 
> 
> 
> ________________________________
> 
> This message is for the designated recipient only and may contain privileged, 
> proprietary, or otherwise confidential information. If you have received it 
> in error, please notify the sender immediately and delete the original. Any 
> other use of the e-mail by you is prohibited. Thank you in advance for your 
> cooperation.
> 
> ________________________________
> 
> 
> 
> 
> 
> 
> 
> 
> ________________________________
> 
> This message is for the designated recipient only and may contain privileged, 
> proprietary, or otherwise confidential information. If you have received it 
> in error, please notify the sender immediately and delete the original. Any 
> other use of the e-mail by you is prohibited. Thank you in advance for your 
> cooperation.
> 
> ________________________________
> 
> 
> 
> 
> 
> 
> 
> 
> ________________________________
> 
> This message is for the designated recipient only and may contain privileged, 
> proprietary, or otherwise confidential information. If you have received it 
> in error, please notify the sender immediately and delete the original. Any 
> other use of the e-mail by you is prohibited. Thank you in advance for your 
> cooperation.
> 
> ________________________________
> 
> 
> 
> 
> 
> 
> 
> 
> ________________________________
> 
> This message is for the designated recipient only and may contain privileged, 
> proprietary, or otherwise confidential information. If you have received it 
> in error, please notify the sender immediately and delete the original. Any 
> other use of the e-mail by you is prohibited. Thank you in advance for your 
> cooperation.
> 
> ________________________________
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to