Hi Ara,

There is a performance issue in the 0.10.2 release of session windows. It
is fixed with this PR: https://github.com/apache/kafka/pull/2645
You can work around this on 0.10.2 by calling the aggregate(..), reduce(..)
etc methods and supplying StateStoreSupplier<SessionStore> with caching
disabled, i.e, by doing something like:

final StateStoreSupplier<SessionStore> sessionStore =
Stores.create(*"session-store-name"*)
    .withKeys(Serdes.String())
    .withValues(Serdes.String())
    .persistent()
    .sessionWindowed(TimeUnit.MINUTES.toMillis(7))
    .build();


The fix has also been cherry-picked to the 0.10.2 branch, so you could
build from source and not have to create the StateStoreSupplier.

Thanks,
Damian

On Mon, 27 Mar 2017 at 21:56 Ara Ebrahimi <ara.ebrah...@argyledata.com>
wrote:

Thanks for the response Mathias!

The reason we want this exact task assignment to happen is that a critical
part of our pipeline involves grouping relevant records together (that’s
what the aggregate function in the topology is for). And for hot keys this
can lead to sometimes 100s of records to get grouped together. Even worse,
these records are session bound, we use session windows. Hence we see lots
of activity around the store backing the aggregate function and even though
we use SSD drives we’re not seeing the kind of performance we want to see.
It seems like the aggregate function leads to lots of updates to these hot
keys which lead to lots of rocksdb activity.

Now there are many ways to fix this problem:
- just don’t aggregate, create an algorithm which is not reliant on
grouping/aggregating records. Not what we can do with our tight schedule
right now.
- do grouping/aggregating but employ n instances and rely on uniform
distribution of these tasks. This is the easiest solution and what we
expected to work but didn’t work as you can tell from this thread. We threw
4 instances at it but only 2 got used.
- tune rocksdb? I tried this actually but it didn’t really help us much,
aside from the fact that tuning rocksdb is very tricky.
- use in-memory store instead? Unfortunately we have to use session windows
for this aggregate function and apparently there’s no in-memory session
store impl? I tried to create one but soon realized it’s too much work :) I
looked at default PartitionAssigner code too, but that ain’t trivial either.

So I’m a bit hopeless :(

Ara.

On Mar 27, 2017, at 1:35 PM, Matthias J. Sax <matth...@confluent.io<mailto:
matth...@confluent.io>> wrote:




________________________________

This message is for the designated recipient only and may contain
privileged, proprietary, or otherwise confidential information. If you have
received it in error, please notify the sender immediately and delete the
original. Any other use of the e-mail by you is prohibited. Thank you in
advance for your cooperation.

________________________________

From: "Matthias J. Sax" <matth...@confluent.io<mailto:matth...@confluent.io
>>
Subject: Re: more uniform task assignment across kafka stream nodes
Date: March 27, 2017 at 1:35:30 PM PDT
To: users@kafka.apache.org<mailto:users@kafka.apache.org>
Reply-To: <users@kafka.apache.org<mailto:users@kafka.apache.org>>


Ara,

thanks for the detailed information.

If I parse this correctly, both instances run the same number of tasks
(12 each). That is all Streams promises.

To come back to your initial question:

Is there a way to tell kafka streams to uniformly assign partitions across
instances? If I have n kafka streams instances running, I want each to
handle EXACTLY 1/nth number of partitions. No dynamic task assignment
logic. Just dumb 1/n assignment.

That is exactly what you get: each of you two instances get 24/2 = 12
tasks assigned. That is dump 1/n assignment, isn't it? So my original
response was correct.

However, I now understand better what you are actually meaning by your
question. Note that Streams does not distinguish "type" of tasks -- it
only sees 24 tasks and assigns those in a balanced way.

Thus, currently there is no easy way to get the assignment you want to
have, except, you implement you own `PartitionAssignor`.

This is the current implementation for 0.10.2
https://github.com/apache/kafka/blob/0.10.2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java

You can, if you wish write your own assignor and set it via
StreamsConfig. However, be aware that this might be tricky to get right
and also might have runtime implications with regard to rebalancing and
state store recovery. We recently improve the current implementation to
avoid costly task movements:
https://issues.apache.org/jira/browse/KAFKA-4677

Thus, I would not recommend to implement an own `PartitionAssignor`.


However, the root question is, why do you need this exact assignment you
are looking for in the first place? Why is it "bad" if "types" of tasks
are not distinguished? I would like to understand your requirement
better -- it might be worth to improve Streams here.


-Matthias


On 3/27/17 12:57 PM, Ara Ebrahimi wrote:
Hi,

So, I simplified the topology by making sure we have only 1 source topic.
Now I have 1 source topic, 8 partitions, 2 instances. And here’s how the
topology looks like:

instance 1:

KafkaStreams processID: 48b58bc0-f600-4ec8-bc92-8cb3ea081aac
StreamsThread appId: mar-23-modular
StreamsThread clientId: mar-23-modular
StreamsThread threadId: StreamThread-1
Active tasks:
StreamsTask taskId: 0_3
ProcessorTopology:
KSTREAM-SOURCE-0000000000:
topics: [activities-avro-or]
children: [KSTREAM-FILTER-0000000001]
KSTREAM-FILTER-0000000001:
children: [KSTREAM-MAP-0000000002]
KSTREAM-MAP-0000000002:
children: [KSTREAM-BRANCH-0000000003]
KSTREAM-BRANCH-0000000003:
children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD-0000000005]
KSTREAM-BRANCHCHILD-0000000004:
children: [KSTREAM-MAPVALUES-0000000006]
KSTREAM-MAPVALUES-0000000006:
children: [KSTREAM-FLATMAPVALUES-0000000007]
KSTREAM-FLATMAPVALUES-0000000007:
children: [KSTREAM-MAP-0000000008]
KSTREAM-MAP-0000000008:
children: [KSTREAM-FILTER-0000000011]
KSTREAM-FILTER-0000000011:
children: [KSTREAM-SINK-0000000010]
KSTREAM-SINK-0000000010:
topic: activities-by-phone-store-or-repartition
KSTREAM-BRANCHCHILD-0000000005:
Partitions [activities-avro-or-3]
Standby tasks:

StreamsThread appId: mar-23-modular
StreamsThread clientId: mar-23-modular
StreamsThread threadId: StreamThread-2
Active tasks:
StreamsTask taskId: 1_2
ProcessorTopology:
KSTREAM-SOURCE-0000000012:
topics: [activities-by-phone-store-or-repartition]
children: [KSTREAM-AGGREGATE-0000000009]
KSTREAM-AGGREGATE-0000000009:
states: [activities-by-phone-store-or]
children: [KTABLE-TOSTREAM-0000000013]
KTABLE-TOSTREAM-0000000013:
children: [KSTREAM-FILTER-0000000014]
KSTREAM-FILTER-0000000014:
children: [KSTREAM-FILTER-0000000015]
KSTREAM-FILTER-0000000015:
children: [KSTREAM-MAP-0000000016]
KSTREAM-MAP-0000000016:
children: [KSTREAM-MAP-0000000017]
KSTREAM-MAP-0000000017:
children: [KSTREAM-SINK-0000000018]
KSTREAM-SINK-0000000018:
topic: ml-features-avro-or
Partitions [activities-by-phone-store-or-repartition-2]
Standby tasks:

StreamsThread appId: mar-23-modular
StreamsThread clientId: mar-23-modular
StreamsThread threadId: StreamThread-3
Active tasks:
StreamsTask taskId: 1_1
ProcessorTopology:
KSTREAM-SOURCE-0000000012:
topics: [activities-by-phone-store-or-repartition]
children: [KSTREAM-AGGREGATE-0000000009]
KSTREAM-AGGREGATE-0000000009:
states: [activities-by-phone-store-or]
children: [KTABLE-TOSTREAM-0000000013]
KTABLE-TOSTREAM-0000000013:
children: [KSTREAM-FILTER-0000000014]
KSTREAM-FILTER-0000000014:
children: [KSTREAM-FILTER-0000000015]
KSTREAM-FILTER-0000000015:
children: [KSTREAM-MAP-0000000016]
KSTREAM-MAP-0000000016:
children: [KSTREAM-MAP-0000000017]
KSTREAM-MAP-0000000017:
children: [KSTREAM-SINK-0000000018]
KSTREAM-SINK-0000000018:
topic: ml-features-avro-or
Partitions [activities-by-phone-store-or-repartition-1]
StreamsTask taskId: 2_7
ProcessorTopology:
KSTREAM-SOURCE-0000000019:
topics: [ml-features-avro-or]
Partitions [ml-features-avro-or-7]
Standby tasks:

StreamsThread appId: mar-23-modular
StreamsThread clientId: mar-23-modular
StreamsThread threadId: StreamThread-4
Active tasks:
StreamsTask taskId: 2_0
ProcessorTopology:
KSTREAM-SOURCE-0000000019:
topics: [ml-features-avro-or]
Partitions [ml-features-avro-or-0]
StreamsTask taskId: 2_6
ProcessorTopology:
KSTREAM-SOURCE-0000000019:
topics: [ml-features-avro-or]
Partitions [ml-features-avro-or-6]
Standby tasks:

StreamsThread appId: mar-23-modular
StreamsThread clientId: mar-23-modular
StreamsThread threadId: StreamThread-5
Active tasks:
StreamsTask taskId: 0_0
ProcessorTopology:
KSTREAM-SOURCE-0000000000:
topics: [activities-avro-or]
children: [KSTREAM-FILTER-0000000001]
KSTREAM-FILTER-0000000001:
children: [KSTREAM-MAP-0000000002]
KSTREAM-MAP-0000000002:
children: [KSTREAM-BRANCH-0000000003]
KSTREAM-BRANCH-0000000003:
children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD-0000000005]
KSTREAM-BRANCHCHILD-0000000004:
children: [KSTREAM-MAPVALUES-0000000006]
KSTREAM-MAPVALUES-0000000006:
children: [KSTREAM-FLATMAPVALUES-0000000007]
KSTREAM-FLATMAPVALUES-0000000007:
children: [KSTREAM-MAP-0000000008]
KSTREAM-MAP-0000000008:
children: [KSTREAM-FILTER-0000000011]
KSTREAM-FILTER-0000000011:
children: [KSTREAM-SINK-0000000010]
KSTREAM-SINK-0000000010:
topic: activities-by-phone-store-or-repartition
KSTREAM-BRANCHCHILD-0000000005:
Partitions [activities-avro-or-0]
StreamsTask taskId: 1_6
ProcessorTopology:
KSTREAM-SOURCE-0000000012:
topics: [activities-by-phone-store-or-repartition]
children: [KSTREAM-AGGREGATE-0000000009]
KSTREAM-AGGREGATE-0000000009:
states: [activities-by-phone-store-or]
children: [KTABLE-TOSTREAM-0000000013]
KTABLE-TOSTREAM-0000000013:
children: [KSTREAM-FILTER-0000000014]
KSTREAM-FILTER-0000000014:
children: [KSTREAM-FILTER-0000000015]
KSTREAM-FILTER-0000000015:
children: [KSTREAM-MAP-0000000016]
KSTREAM-MAP-0000000016:
children: [KSTREAM-MAP-0000000017]
KSTREAM-MAP-0000000017:
children: [KSTREAM-SINK-0000000018]
KSTREAM-SINK-0000000018:
topic: ml-features-avro-or
Partitions [activities-by-phone-store-or-repartition-6]
Standby tasks:

StreamsThread appId: mar-23-modular
StreamsThread clientId: mar-23-modular
StreamsThread threadId: StreamThread-6
Active tasks:
StreamsTask taskId: 1_0
ProcessorTopology:
KSTREAM-SOURCE-0000000012:
topics: [activities-by-phone-store-or-repartition]
children: [KSTREAM-AGGREGATE-0000000009]
KSTREAM-AGGREGATE-0000000009:
states: [activities-by-phone-store-or]
children: [KTABLE-TOSTREAM-0000000013]
KTABLE-TOSTREAM-0000000013:
children: [KSTREAM-FILTER-0000000014]
KSTREAM-FILTER-0000000014:
children: [KSTREAM-FILTER-0000000015]
KSTREAM-FILTER-0000000015:
children: [KSTREAM-MAP-0000000016]
KSTREAM-MAP-0000000016:
children: [KSTREAM-MAP-0000000017]
KSTREAM-MAP-0000000017:
children: [KSTREAM-SINK-0000000018]
KSTREAM-SINK-0000000018:
topic: ml-features-avro-or
Partitions [activities-by-phone-store-or-repartition-0]
StreamsTask taskId: 0_7
ProcessorTopology:
KSTREAM-SOURCE-0000000000:
topics: [activities-avro-or]
children: [KSTREAM-FILTER-0000000001]
KSTREAM-FILTER-0000000001:
children: [KSTREAM-MAP-0000000002]
KSTREAM-MAP-0000000002:
children: [KSTREAM-BRANCH-0000000003]
KSTREAM-BRANCH-0000000003:
children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD-0000000005]
KSTREAM-BRANCHCHILD-0000000004:
children: [KSTREAM-MAPVALUES-0000000006]
KSTREAM-MAPVALUES-0000000006:
children: [KSTREAM-FLATMAPVALUES-0000000007]
KSTREAM-FLATMAPVALUES-0000000007:
children: [KSTREAM-MAP-0000000008]
KSTREAM-MAP-0000000008:
children: [KSTREAM-FILTER-0000000011]
KSTREAM-FILTER-0000000011:
children: [KSTREAM-SINK-0000000010]
KSTREAM-SINK-0000000010:
topic: activities-by-phone-store-or-repartition
KSTREAM-BRANCHCHILD-0000000005:
Partitions [activities-avro-or-7]
Standby tasks:

StreamsThread appId: mar-23-modular
StreamsThread clientId: mar-23-modular
StreamsThread threadId: StreamThread-7
Active tasks:
StreamsTask taskId: 2_4
ProcessorTopology:
KSTREAM-SOURCE-0000000019:
topics: [ml-features-avro-or]
Partitions [ml-features-avro-or-4]
Standby tasks:

StreamsThread appId: mar-23-modular
StreamsThread clientId: mar-23-modular
StreamsThread threadId: StreamThread-8
Active tasks:
StreamsTask taskId: 1_3
ProcessorTopology:
KSTREAM-SOURCE-0000000012:
topics: [activities-by-phone-store-or-repartition]
children: [KSTREAM-AGGREGATE-0000000009]
KSTREAM-AGGREGATE-0000000009:
states: [activities-by-phone-store-or]
children: [KTABLE-TOSTREAM-0000000013]
KTABLE-TOSTREAM-0000000013:
children: [KSTREAM-FILTER-0000000014]
KSTREAM-FILTER-0000000014:
children: [KSTREAM-FILTER-0000000015]
KSTREAM-FILTER-0000000015:
children: [KSTREAM-MAP-0000000016]
KSTREAM-MAP-0000000016:
children: [KSTREAM-MAP-0000000017]
KSTREAM-MAP-0000000017:
children: [KSTREAM-SINK-0000000018]
KSTREAM-SINK-0000000018:
topic: ml-features-avro-or
Partitions [activities-by-phone-store-or-repartition-3]
Standby tasks:


instance 2:

KafkaStreams processID: 092072f8-87be-4989-a94f-0ed544f5ca44
StreamsThread appId: mar-23-modular
StreamsThread clientId: mar-23-modular
StreamsThread threadId: StreamThread-1
Active tasks:
StreamsTask taskId: 2_1
ProcessorTopology:
KSTREAM-SOURCE-0000000019:
topics: [ml-features-avro-or]
Partitions [ml-features-avro-or-1]
StreamsTask taskId: 2_5
ProcessorTopology:
KSTREAM-SOURCE-0000000019:
topics: [ml-features-avro-or]
Partitions [ml-features-avro-or-5]
Standby tasks:

StreamsThread appId: mar-23-modular
StreamsThread clientId: mar-23-modular
StreamsThread threadId: StreamThread-2
Active tasks:
StreamsTask taskId: 0_4
ProcessorTopology:
KSTREAM-SOURCE-0000000000:
topics: [activities-avro-or]
children: [KSTREAM-FILTER-0000000001]
KSTREAM-FILTER-0000000001:
children: [KSTREAM-MAP-0000000002]
KSTREAM-MAP-0000000002:
children: [KSTREAM-BRANCH-0000000003]
KSTREAM-BRANCH-0000000003:
children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD-0000000005]
KSTREAM-BRANCHCHILD-0000000004:
children: [KSTREAM-MAPVALUES-0000000006]
KSTREAM-MAPVALUES-0000000006:
children: [KSTREAM-FLATMAPVALUES-0000000007]
KSTREAM-FLATMAPVALUES-0000000007:
children: [KSTREAM-MAP-0000000008]
KSTREAM-MAP-0000000008:
children: [KSTREAM-FILTER-0000000011]
KSTREAM-FILTER-0000000011:
children: [KSTREAM-SINK-0000000010]
KSTREAM-SINK-0000000010:
topic: activities-by-phone-store-or-repartition
KSTREAM-BRANCHCHILD-0000000005:
Partitions [activities-avro-or-4]
Standby tasks:

StreamsThread appId: mar-23-modular
StreamsThread clientId: mar-23-modular
StreamsThread threadId: StreamThread-3
Active tasks:
StreamsTask taskId: 2_2
ProcessorTopology:
KSTREAM-SOURCE-0000000019:
topics: [ml-features-avro-or]
Partitions [ml-features-avro-or-2]
StreamsTask taskId: 1_7
ProcessorTopology:
KSTREAM-SOURCE-0000000012:
topics: [activities-by-phone-store-or-repartition]
children: [KSTREAM-AGGREGATE-0000000009]
KSTREAM-AGGREGATE-0000000009:
states: [activities-by-phone-store-or]
children: [KTABLE-TOSTREAM-0000000013]
KTABLE-TOSTREAM-0000000013:
children: [KSTREAM-FILTER-0000000014]
KSTREAM-FILTER-0000000014:
children: [KSTREAM-FILTER-0000000015]
KSTREAM-FILTER-0000000015:
children: [KSTREAM-MAP-0000000016]
KSTREAM-MAP-0000000016:
children: [KSTREAM-MAP-0000000017]
KSTREAM-MAP-0000000017:
children: [KSTREAM-SINK-0000000018]
KSTREAM-SINK-0000000018:
topic: ml-features-avro-or
Partitions [activities-by-phone-store-or-repartition-7]
Standby tasks:

StreamsThread appId: mar-23-modular
StreamsThread clientId: mar-23-modular
StreamsThread threadId: StreamThread-4
Active tasks:
StreamsTask taskId: 2_3
ProcessorTopology:
KSTREAM-SOURCE-0000000019:
topics: [ml-features-avro-or]
Partitions [ml-features-avro-or-3]
Standby tasks:

StreamsThread appId: mar-23-modular
StreamsThread clientId: mar-23-modular
StreamsThread threadId: StreamThread-5
Active tasks:
StreamsTask taskId: 0_1
ProcessorTopology:
KSTREAM-SOURCE-0000000000:
topics: [activities-avro-or]
children: [KSTREAM-FILTER-0000000001]
KSTREAM-FILTER-0000000001:
children: [KSTREAM-MAP-0000000002]
KSTREAM-MAP-0000000002:
children: [KSTREAM-BRANCH-0000000003]
KSTREAM-BRANCH-0000000003:
children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD-0000000005]
KSTREAM-BRANCHCHILD-0000000004:
children: [KSTREAM-MAPVALUES-0000000006]
KSTREAM-MAPVALUES-0000000006:
children: [KSTREAM-FLATMAPVALUES-0000000007]
KSTREAM-FLATMAPVALUES-0000000007:
children: [KSTREAM-MAP-0000000008]
KSTREAM-MAP-0000000008:
children: [KSTREAM-FILTER-0000000011]
KSTREAM-FILTER-0000000011:
children: [KSTREAM-SINK-0000000010]
KSTREAM-SINK-0000000010:
topic: activities-by-phone-store-or-repartition
KSTREAM-BRANCHCHILD-0000000005:
Partitions [activities-avro-or-1]
StreamsTask taskId: 1_5
ProcessorTopology:
KSTREAM-SOURCE-0000000012:
topics: [activities-by-phone-store-or-repartition]
children: [KSTREAM-AGGREGATE-0000000009]
KSTREAM-AGGREGATE-0000000009:
states: [activities-by-phone-store-or]
children: [KTABLE-TOSTREAM-0000000013]
KTABLE-TOSTREAM-0000000013:
children: [KSTREAM-FILTER-0000000014]
KSTREAM-FILTER-0000000014:
children: [KSTREAM-FILTER-0000000015]
KSTREAM-FILTER-0000000015:
children: [KSTREAM-MAP-0000000016]
KSTREAM-MAP-0000000016:
children: [KSTREAM-MAP-0000000017]
KSTREAM-MAP-0000000017:
children: [KSTREAM-SINK-0000000018]
KSTREAM-SINK-0000000018:
topic: ml-features-avro-or
Partitions [activities-by-phone-store-or-repartition-5]
Standby tasks:

StreamsThread appId: mar-23-modular
StreamsThread clientId: mar-23-modular
StreamsThread threadId: StreamThread-6
Active tasks:
StreamsTask taskId: 1_4
ProcessorTopology:
KSTREAM-SOURCE-0000000012:
topics: [activities-by-phone-store-or-repartition]
children: [KSTREAM-AGGREGATE-0000000009]
KSTREAM-AGGREGATE-0000000009:
states: [activities-by-phone-store-or]
children: [KTABLE-TOSTREAM-0000000013]
KTABLE-TOSTREAM-0000000013:
children: [KSTREAM-FILTER-0000000014]
KSTREAM-FILTER-0000000014:
children: [KSTREAM-FILTER-0000000015]
KSTREAM-FILTER-0000000015:
children: [KSTREAM-MAP-0000000016]
KSTREAM-MAP-0000000016:
children: [KSTREAM-MAP-0000000017]
KSTREAM-MAP-0000000017:
children: [KSTREAM-SINK-0000000018]
KSTREAM-SINK-0000000018:
topic: ml-features-avro-or
Partitions [activities-by-phone-store-or-repartition-4]
Standby tasks:

StreamsThread appId: mar-23-modular
StreamsThread clientId: mar-23-modular
StreamsThread threadId: StreamThread-7
Active tasks:
StreamsTask taskId: 0_2
ProcessorTopology:
KSTREAM-SOURCE-0000000000:
topics: [activities-avro-or]
children: [KSTREAM-FILTER-0000000001]
KSTREAM-FILTER-0000000001:
children: [KSTREAM-MAP-0000000002]
KSTREAM-MAP-0000000002:
children: [KSTREAM-BRANCH-0000000003]
KSTREAM-BRANCH-0000000003:
children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD-0000000005]
KSTREAM-BRANCHCHILD-0000000004:
children: [KSTREAM-MAPVALUES-0000000006]
KSTREAM-MAPVALUES-0000000006:
children: [KSTREAM-FLATMAPVALUES-0000000007]
KSTREAM-FLATMAPVALUES-0000000007:
children: [KSTREAM-MAP-0000000008]
KSTREAM-MAP-0000000008:
children: [KSTREAM-FILTER-0000000011]
KSTREAM-FILTER-0000000011:
children: [KSTREAM-SINK-0000000010]
KSTREAM-SINK-0000000010:
topic: activities-by-phone-store-or-repartition
KSTREAM-BRANCHCHILD-0000000005:
Partitions [activities-avro-or-2]
StreamsTask taskId: 0_6
ProcessorTopology:
KSTREAM-SOURCE-0000000000:
topics: [activities-avro-or]
children: [KSTREAM-FILTER-0000000001]
KSTREAM-FILTER-0000000001:
children: [KSTREAM-MAP-0000000002]
KSTREAM-MAP-0000000002:
children: [KSTREAM-BRANCH-0000000003]
KSTREAM-BRANCH-0000000003:
children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD-0000000005]
KSTREAM-BRANCHCHILD-0000000004:
children: [KSTREAM-MAPVALUES-0000000006]
KSTREAM-MAPVALUES-0000000006:
children: [KSTREAM-FLATMAPVALUES-0000000007]
KSTREAM-FLATMAPVALUES-0000000007:
children: [KSTREAM-MAP-0000000008]
KSTREAM-MAP-0000000008:
children: [KSTREAM-FILTER-0000000011]
KSTREAM-FILTER-0000000011:
children: [KSTREAM-SINK-0000000010]
KSTREAM-SINK-0000000010:
topic: activities-by-phone-store-or-repartition
KSTREAM-BRANCHCHILD-0000000005:
Partitions [activities-avro-or-6]
Standby tasks:

StreamsThread appId: mar-23-modular
StreamsThread clientId: mar-23-modular
StreamsThread threadId: StreamThread-8
Active tasks:
StreamsTask taskId: 0_5
ProcessorTopology:
KSTREAM-SOURCE-0000000000:
topics: [activities-avro-or]
children: [KSTREAM-FILTER-0000000001]
KSTREAM-FILTER-0000000001:
children: [KSTREAM-MAP-0000000002]
KSTREAM-MAP-0000000002:
children: [KSTREAM-BRANCH-0000000003]
KSTREAM-BRANCH-0000000003:
children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD-0000000005]
KSTREAM-BRANCHCHILD-0000000004:
children: [KSTREAM-MAPVALUES-0000000006]
KSTREAM-MAPVALUES-0000000006:
children: [KSTREAM-FLATMAPVALUES-0000000007]
KSTREAM-FLATMAPVALUES-0000000007:
children: [KSTREAM-MAP-0000000008]
KSTREAM-MAP-0000000008:
children: [KSTREAM-FILTER-0000000011]
KSTREAM-FILTER-0000000011:
children: [KSTREAM-SINK-0000000010]
KSTREAM-SINK-0000000010:
topic: activities-by-phone-store-or-repartition
KSTREAM-BRANCHCHILD-0000000005:
Partitions [activities-avro-or-5]
Standby tasks:


activities-avro-or is input topic. ml-features-avro-or is output topic. In
the middle we have an aggregate (activities-by-phone-store-or-repartition).

On instance 1 I see 3 tasks for activities-avro-or and on instance 2 I see
5. Bad.

On instance 1 see 4 tasks for ml-features-avro-or. And 4 on instance 2.
Good.

On instance 1 see 5 tasks for activities-by-phone-store-or-repartition. And
3 on instance 2. Bad.

As I said in terms of offsets for all these partitions I see uniform
distribution, so we’re not dealing with a bad key scenario.

Ara.

On Mar 25, 2017, at 6:43 PM, Matthias J. Sax <matth...@confluent.io<mailto:
matth...@confluent.io>> wrote:




________________________________

This message is for the designated recipient only and may contain
privileged, proprietary, or otherwise confidential information. If you have
received it in error, please notify the sender immediately and delete the
original. Any other use of the e-mail by you is prohibited. Thank you in
advance for your cooperation.

________________________________

From: "Matthias J. Sax" <matth...@confluent.io<mailto:matth...@confluent.io
>>
Subject: Re: more uniform task assignment across kafka stream nodes
Date: March 25, 2017 at 6:43:12 PM PDT
To: users@kafka.apache.org<mailto:users@kafka.apache.org>
Reply-To: <users@kafka.apache.org<mailto:users@kafka.apache.org>>


Please share the rest of your topology code (without any UDFs / business
logic). Otherwise, I cannot give further advice.

-Matthias


On 3/25/17 6:08 PM, Ara Ebrahimi wrote:
Via:

builder.stream("topic1");
builder.stream("topic2");
builder.stream("topic3”);

These are different kinds of topics consuming different avro objects.

Ara.

On Mar 25, 2017, at 6:04 PM, Matthias J. Sax <matth...@confluent.io<mailto:
matth...@confluent.io><mailto:matth...@confluent.io>> wrote:




________________________________

This message is for the designated recipient only and may contain
privileged, proprietary, or otherwise confidential information. If you have
received it in error, please notify the sender immediately and delete the
original. Any other use of the e-mail by you is prohibited. Thank you in
advance for your cooperation.

________________________________

From: "Matthias J. Sax" <matth...@confluent.io<mailto:matth...@confluent.io
><mailto:matth...@confluent.io>>
Subject: Re: more uniform task assignment across kafka stream nodes
Date: March 25, 2017 at 6:04:30 PM PDT
To: users@kafka.apache.org<mailto:users@kafka.apache.org><mailto:
users@kafka.apache.org>
Reply-To: <users@kafka.apache.org<mailto:users@kafka.apache.org><mailto:
users@kafka.apache.org>>


Ara,

How do you consume your topics? Via

builder.stream("topic1", "topic2", "topic3);

or via

builder.stream("topic1");
builder.stream("topic2");
builder.stream("topic3");

Both and handled differently with regard to creating tasks (partition to
task assignment also depends on you downstream code though).

If this does not help, can you maybe share the structure of processing?
To dig deeper, we would need to know the topology DAG.


-Matthias


On 3/25/17 5:56 PM, Ara Ebrahimi wrote:
Mathias,

This apparently happens because we have more than 1 source topic. We have 3
source topics in the same application. So it seems like the task assignment
algorithm creates topologies not for one specific topic at a time but the
total partitions across all source topics consumed in an application
instance. Because we have some code dependencies between these 3 source
topics we can’t separate them into 3 applications at this time. Hence the
reason I want to get the task assignment algorithm basically do a uniform
and simple task assignment PER source topic.

Ara.

On Mar 25, 2017, at 5:21 PM, Matthias J. Sax <matth...@confluent.io<mailto:
matth...@confluent.io><mailto:matth...@confluent.io><mailto:
matth...@confluent.io>> wrote:




________________________________

This message is for the designated recipient only and may contain
privileged, proprietary, or otherwise confidential information. If you have
received it in error, please notify the sender immediately and delete the
original. Any other use of the e-mail by you is prohibited. Thank you in
advance for your cooperation.

________________________________

From: "Matthias J. Sax" <matth...@confluent.io<mailto:matth...@confluent.io
><mailto:matth...@confluent.io><mailto:matth...@confluent.io>>
Subject: Re: more uniform task assignment across kafka stream nodes
Date: March 25, 2017 at 5:21:47 PM PDT
To: users@kafka.apache.org<mailto:users@kafka.apache.org><mailto:
users@kafka.apache.org><mailto:users@kafka.apache.org>
Reply-To: <users@kafka.apache.org<mailto:users@kafka.apache.org><mailto:
users@kafka.apache.org><mailto:users@kafka.apache.org>>


Hi,

I am wondering why this happens in the first place. Streams,
load-balanced over all running instances, and each instance should be
the same number of tasks (and thus partitions) assigned.

What is the overall assignment? Do you have StandyBy tasks configured?
What version do you use?


-Matthias


On 3/24/17 8:09 PM, Ara Ebrahimi wrote:
Hi,

Is there a way to tell kafka streams to uniformly assign partitions across
instances? If I have n kafka streams instances running, I want each to
handle EXACTLY 1/nth number of partitions. No dynamic task assignment
logic. Just dumb 1/n assignment.

Here’s our scenario. Lets say we have an “source" topic with 8 partitions.
We also have 2 kafka streams instances. Each instances get assigned to
handle 4 “source" topic partitions. BUT then we do a few maps and an
aggregate. So data gets shuffled around. The map function uniformly
distributes these across all partitions (I can verify that by looking at
the partition offsets). After the map what I notice by looking at the
topology is that one kafka streams instance get assigned to handle say 2
aggregate repartition topics and the other one gets assigned 6. Even worse,
on bigger clusters (say 4 instances) we see say 2 nodes gets assigned
downstream aggregate repartition topics and 2 other nodes assigned NOTHING
to handle.

Ara.



________________________________

This message is for the designated recipient only and may contain
privileged, proprietary, or otherwise confidential information. If you have
received it in error, please notify the sender immediately and delete the
original. Any other use of the e-mail by you is prohibited. Thank you in
advance for your cooperation.

________________________________








________________________________

This message is for the designated recipient only and may contain
privileged, proprietary, or otherwise confidential information. If you have
received it in error, please notify the sender immediately and delete the
original. Any other use of the e-mail by you is prohibited. Thank you in
advance for your cooperation.

________________________________




________________________________

This message is for the designated recipient only and may contain
privileged, proprietary, or otherwise confidential information. If you have
received it in error, please notify the sender immediately and delete the
original. Any other use of the e-mail by you is prohibited. Thank you in
advance for your cooperation.

________________________________




________________________________

This message is for the designated recipient only and may contain
privileged, proprietary, or otherwise confidential information. If you have
received it in error, please notify the sender immediately and delete the
original. Any other use of the e-mail by you is prohibited. Thank you in
advance for your cooperation.

________________________________








________________________________

This message is for the designated recipient only and may contain
privileged, proprietary, or otherwise confidential information. If you have
received it in error, please notify the sender immediately and delete the
original. Any other use of the e-mail by you is prohibited. Thank you in
advance for your cooperation.

________________________________

Reply via email to