It is odd as the person that originally reported the problem has verified
that it is fixed.

On Thu, 4 May 2017 at 08:31 Guozhang Wang <wangg...@gmail.com> wrote:

> Ara,
>
> That is a bit weird, I double checked and agreed with Eno that this commit
> is in both trunk and 0.10.2, so I suspect the same issue still persists in
> trunk, hence there might be another issue that is not fixed in 2645. Could
> you help verify if that is the case? In which we can re-open
> https://issues.apache.org/jira/browse/KAFKA-4851 and investigate further.
>
>
> Guozhang
>
>
> On Tue, May 2, 2017 at 1:02 PM, Ara Ebrahimi <ara.ebrah...@argyledata.com>
> wrote:
>
> > No errors. But if I enable caching I see performance drop considerably.
> > The workaround was to disable caching. The same thing is still true in
> > 10.2.1.
> >
> > Ara.
> >
> > > On May 2, 2017, at 12:55 PM, Eno Thereska <eno.there...@gmail.com>
> > wrote:
> > >
> > > Hi Ara,
> > >
> > > The PR https://github.com/apache/kafka/pull/2645 has gone to both
> trunk
> > and
> > > 0.10.2.1, I just checked. What error are you seeing, could you give us
> an
> > > update?
> > >
> > > Thanks
> > > Eno
> > >
> > > On Fri, Apr 28, 2017 at 7:10 PM, Ara Ebrahimi <
> > ara.ebrah...@argyledata.com>
> > > wrote:
> > >
> > >> Hi,
> > >>
> > >> I upgraded to 0.10.2.1 yesterday, enabled caching for session windows
> > and
> > >> tested again. It doesn’t seem to be fixed?
> > >>
> > >> Ara.
> > >>
> > >>> On Mar 27, 2017, at 2:10 PM, Damian Guy <damian....@gmail.com>
> wrote:
> > >>>
> > >>> Hi Ara,
> > >>>
> > >>> There is a performance issue in the 0.10.2 release of session
> windows.
> > It
> > >>> is fixed with this PR: https://github.com/apache/kafka/pull/2645
> > >>> You can work around this on 0.10.2 by calling the aggregate(..),
> > >> reduce(..)
> > >>> etc methods and supplying StateStoreSupplier<SessionStore> with
> > caching
> > >>> disabled, i.e, by doing something like:
> > >>>
> > >>> final StateStoreSupplier<SessionStore> sessionStore =
> > >>> Stores.create(*"session-store-name"*)
> > >>>   .withKeys(Serdes.String())
> > >>>   .withValues(Serdes.String())
> > >>>   .persistent()
> > >>>   .sessionWindowed(TimeUnit.MINUTES.toMillis(7))
> > >>>   .build();
> > >>>
> > >>>
> > >>> The fix has also been cherry-picked to the 0.10.2 branch, so you
> could
> > >>> build from source and not have to create the StateStoreSupplier.
> > >>>
> > >>> Thanks,
> > >>> Damian
> > >>>
> > >>> On Mon, 27 Mar 2017 at 21:56 Ara Ebrahimi <
> ara.ebrah...@argyledata.com
> > >
> > >>> wrote:
> > >>>
> > >>> Thanks for the response Mathias!
> > >>>
> > >>> The reason we want this exact task assignment to happen is that a
> > >> critical
> > >>> part of our pipeline involves grouping relevant records together
> > (that’s
> > >>> what the aggregate function in the topology is for). And for hot keys
> > >> this
> > >>> can lead to sometimes 100s of records to get grouped together. Even
> > >> worse,
> > >>> these records are session bound, we use session windows. Hence we see
> > >> lots
> > >>> of activity around the store backing the aggregate function and even
> > >> though
> > >>> we use SSD drives we’re not seeing the kind of performance we want to
> > >> see.
> > >>> It seems like the aggregate function leads to lots of updates to
> these
> > >> hot
> > >>> keys which lead to lots of rocksdb activity.
> > >>>
> > >>> Now there are many ways to fix this problem:
> > >>> - just don’t aggregate, create an algorithm which is not reliant on
> > >>> grouping/aggregating records. Not what we can do with our tight
> > schedule
> > >>> right now.
> > >>> - do grouping/aggregating but employ n instances and rely on uniform
> > >>> distribution of these tasks. This is the easiest solution and what we
> > >>> expected to work but didn’t work as you can tell from this thread. We
> > >> threw
> > >>> 4 instances at it but only 2 got used.
> > >>> - tune rocksdb? I tried this actually but it didn’t really help us
> > much,
> > >>> aside from the fact that tuning rocksdb is very tricky.
> > >>> - use in-memory store instead? Unfortunately we have to use session
> > >> windows
> > >>> for this aggregate function and apparently there’s no in-memory
> session
> > >>> store impl? I tried to create one but soon realized it’s too much
> work
> > >> :) I
> > >>> looked at default PartitionAssigner code too, but that ain’t trivial
> > >> either.
> > >>>
> > >>> So I’m a bit hopeless :(
> > >>>
> > >>> Ara.
> > >>>
> > >>> On Mar 27, 2017, at 1:35 PM, Matthias J. Sax <matth...@confluent.io
> > >> <mailto:
> > >>> matth...@confluent.io>> wrote:
> > >>>
> > >>>
> > >>>
> > >>>
> > >>> ________________________________
> > >>>
> > >>> This message is for the designated recipient only and may contain
> > >>> privileged, proprietary, or otherwise confidential information. If
> you
> > >> have
> > >>> received it in error, please notify the sender immediately and delete
> > the
> > >>> original. Any other use of the e-mail by you is prohibited. Thank you
> > in
> > >>> advance for your cooperation.
> > >>>
> > >>> ________________________________
> > >>>
> > >>> From: "Matthias J. Sax" <matth...@confluent.io<mailto:
> > >> matth...@confluent.io
> > >>>>>
> > >>> Subject: Re: more uniform task assignment across kafka stream nodes
> > >>> Date: March 27, 2017 at 1:35:30 PM PDT
> > >>> To: users@kafka.apache.org<mailto:users@kafka.apache.org>
> > >>> Reply-To: <users@kafka.apache.org<mailto:users@kafka.apache.org>>
> > >>>
> > >>>
> > >>> Ara,
> > >>>
> > >>> thanks for the detailed information.
> > >>>
> > >>> If I parse this correctly, both instances run the same number of
> tasks
> > >>> (12 each). That is all Streams promises.
> > >>>
> > >>> To come back to your initial question:
> > >>>
> > >>> Is there a way to tell kafka streams to uniformly assign partitions
> > >> across
> > >>> instances? If I have n kafka streams instances running, I want each
> to
> > >>> handle EXACTLY 1/nth number of partitions. No dynamic task assignment
> > >>> logic. Just dumb 1/n assignment.
> > >>>
> > >>> That is exactly what you get: each of you two instances get 24/2 = 12
> > >>> tasks assigned. That is dump 1/n assignment, isn't it? So my original
> > >>> response was correct.
> > >>>
> > >>> However, I now understand better what you are actually meaning by
> your
> > >>> question. Note that Streams does not distinguish "type" of tasks --
> it
> > >>> only sees 24 tasks and assigns those in a balanced way.
> > >>>
> > >>> Thus, currently there is no easy way to get the assignment you want
> to
> > >>> have, except, you implement you own `PartitionAssignor`.
> > >>>
> > >>> This is the current implementation for 0.10.2
> > >>> https://github.com/apache/kafka/blob/0.10.2/streams/src/
> > >> main/java/org/apache/kafka/streams/processor/internals/
> > >> StreamPartitionAssignor.java
> > >>>
> > >>> You can, if you wish write your own assignor and set it via
> > >>> StreamsConfig. However, be aware that this might be tricky to get
> right
> > >>> and also might have runtime implications with regard to rebalancing
> and
> > >>> state store recovery. We recently improve the current implementation
> to
> > >>> avoid costly task movements:
> > >>> https://issues.apache.org/jira/browse/KAFKA-4677
> > >>>
> > >>> Thus, I would not recommend to implement an own `PartitionAssignor`.
> > >>>
> > >>>
> > >>> However, the root question is, why do you need this exact assignment
> > you
> > >>> are looking for in the first place? Why is it "bad" if "types" of
> tasks
> > >>> are not distinguished? I would like to understand your requirement
> > >>> better -- it might be worth to improve Streams here.
> > >>>
> > >>>
> > >>> -Matthias
> > >>>
> > >>>
> > >>> On 3/27/17 12:57 PM, Ara Ebrahimi wrote:
> > >>> Hi,
> > >>>
> > >>> So, I simplified the topology by making sure we have only 1 source
> > topic.
> > >>> Now I have 1 source topic, 8 partitions, 2 instances. And here’s how
> > the
> > >>> topology looks like:
> > >>>
> > >>> instance 1:
> > >>>
> > >>> KafkaStreams processID: 48b58bc0-f600-4ec8-bc92-8cb3ea081aac
> > >>> StreamsThread appId: mar-23-modular
> > >>> StreamsThread clientId: mar-23-modular
> > >>> StreamsThread threadId: StreamThread-1
> > >>> Active tasks:
> > >>> StreamsTask taskId: 0_3
> > >>> ProcessorTopology:
> > >>> KSTREAM-SOURCE-0000000000:
> > >>> topics: [activities-avro-or]
> > >>> children: [KSTREAM-FILTER-0000000001]
> > >>> KSTREAM-FILTER-0000000001:
> > >>> children: [KSTREAM-MAP-0000000002]
> > >>> KSTREAM-MAP-0000000002:
> > >>> children: [KSTREAM-BRANCH-0000000003]
> > >>> KSTREAM-BRANCH-0000000003:
> > >>> children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD-
> > >> 0000000005]
> > >>> KSTREAM-BRANCHCHILD-0000000004:
> > >>> children: [KSTREAM-MAPVALUES-0000000006]
> > >>> KSTREAM-MAPVALUES-0000000006:
> > >>> children: [KSTREAM-FLATMAPVALUES-0000000007]
> > >>> KSTREAM-FLATMAPVALUES-0000000007:
> > >>> children: [KSTREAM-MAP-0000000008]
> > >>> KSTREAM-MAP-0000000008:
> > >>> children: [KSTREAM-FILTER-0000000011]
> > >>> KSTREAM-FILTER-0000000011:
> > >>> children: [KSTREAM-SINK-0000000010]
> > >>> KSTREAM-SINK-0000000010:
> > >>> topic: activities-by-phone-store-or-repartition
> > >>> KSTREAM-BRANCHCHILD-0000000005:
> > >>> Partitions [activities-avro-or-3]
> > >>> Standby tasks:
> > >>>
> > >>> StreamsThread appId: mar-23-modular
> > >>> StreamsThread clientId: mar-23-modular
> > >>> StreamsThread threadId: StreamThread-2
> > >>> Active tasks:
> > >>> StreamsTask taskId: 1_2
> > >>> ProcessorTopology:
> > >>> KSTREAM-SOURCE-0000000012:
> > >>> topics: [activities-by-phone-store-or-repartition]
> > >>> children: [KSTREAM-AGGREGATE-0000000009]
> > >>> KSTREAM-AGGREGATE-0000000009:
> > >>> states: [activities-by-phone-store-or]
> > >>> children: [KTABLE-TOSTREAM-0000000013]
> > >>> KTABLE-TOSTREAM-0000000013:
> > >>> children: [KSTREAM-FILTER-0000000014]
> > >>> KSTREAM-FILTER-0000000014:
> > >>> children: [KSTREAM-FILTER-0000000015]
> > >>> KSTREAM-FILTER-0000000015:
> > >>> children: [KSTREAM-MAP-0000000016]
> > >>> KSTREAM-MAP-0000000016:
> > >>> children: [KSTREAM-MAP-0000000017]
> > >>> KSTREAM-MAP-0000000017:
> > >>> children: [KSTREAM-SINK-0000000018]
> > >>> KSTREAM-SINK-0000000018:
> > >>> topic: ml-features-avro-or
> > >>> Partitions [activities-by-phone-store-or-repartition-2]
> > >>> Standby tasks:
> > >>>
> > >>> StreamsThread appId: mar-23-modular
> > >>> StreamsThread clientId: mar-23-modular
> > >>> StreamsThread threadId: StreamThread-3
> > >>> Active tasks:
> > >>> StreamsTask taskId: 1_1
> > >>> ProcessorTopology:
> > >>> KSTREAM-SOURCE-0000000012:
> > >>> topics: [activities-by-phone-store-or-repartition]
> > >>> children: [KSTREAM-AGGREGATE-0000000009]
> > >>> KSTREAM-AGGREGATE-0000000009:
> > >>> states: [activities-by-phone-store-or]
> > >>> children: [KTABLE-TOSTREAM-0000000013]
> > >>> KTABLE-TOSTREAM-0000000013:
> > >>> children: [KSTREAM-FILTER-0000000014]
> > >>> KSTREAM-FILTER-0000000014:
> > >>> children: [KSTREAM-FILTER-0000000015]
> > >>> KSTREAM-FILTER-0000000015:
> > >>> children: [KSTREAM-MAP-0000000016]
> > >>> KSTREAM-MAP-0000000016:
> > >>> children: [KSTREAM-MAP-0000000017]
> > >>> KSTREAM-MAP-0000000017:
> > >>> children: [KSTREAM-SINK-0000000018]
> > >>> KSTREAM-SINK-0000000018:
> > >>> topic: ml-features-avro-or
> > >>> Partitions [activities-by-phone-store-or-repartition-1]
> > >>> StreamsTask taskId: 2_7
> > >>> ProcessorTopology:
> > >>> KSTREAM-SOURCE-0000000019:
> > >>> topics: [ml-features-avro-or]
> > >>> Partitions [ml-features-avro-or-7]
> > >>> Standby tasks:
> > >>>
> > >>> StreamsThread appId: mar-23-modular
> > >>> StreamsThread clientId: mar-23-modular
> > >>> StreamsThread threadId: StreamThread-4
> > >>> Active tasks:
> > >>> StreamsTask taskId: 2_0
> > >>> ProcessorTopology:
> > >>> KSTREAM-SOURCE-0000000019:
> > >>> topics: [ml-features-avro-or]
> > >>> Partitions [ml-features-avro-or-0]
> > >>> StreamsTask taskId: 2_6
> > >>> ProcessorTopology:
> > >>> KSTREAM-SOURCE-0000000019:
> > >>> topics: [ml-features-avro-or]
> > >>> Partitions [ml-features-avro-or-6]
> > >>> Standby tasks:
> > >>>
> > >>> StreamsThread appId: mar-23-modular
> > >>> StreamsThread clientId: mar-23-modular
> > >>> StreamsThread threadId: StreamThread-5
> > >>> Active tasks:
> > >>> StreamsTask taskId: 0_0
> > >>> ProcessorTopology:
> > >>> KSTREAM-SOURCE-0000000000:
> > >>> topics: [activities-avro-or]
> > >>> children: [KSTREAM-FILTER-0000000001]
> > >>> KSTREAM-FILTER-0000000001:
> > >>> children: [KSTREAM-MAP-0000000002]
> > >>> KSTREAM-MAP-0000000002:
> > >>> children: [KSTREAM-BRANCH-0000000003]
> > >>> KSTREAM-BRANCH-0000000003:
> > >>> children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD-
> > >> 0000000005]
> > >>> KSTREAM-BRANCHCHILD-0000000004:
> > >>> children: [KSTREAM-MAPVALUES-0000000006]
> > >>> KSTREAM-MAPVALUES-0000000006:
> > >>> children: [KSTREAM-FLATMAPVALUES-0000000007]
> > >>> KSTREAM-FLATMAPVALUES-0000000007:
> > >>> children: [KSTREAM-MAP-0000000008]
> > >>> KSTREAM-MAP-0000000008:
> > >>> children: [KSTREAM-FILTER-0000000011]
> > >>> KSTREAM-FILTER-0000000011:
> > >>> children: [KSTREAM-SINK-0000000010]
> > >>> KSTREAM-SINK-0000000010:
> > >>> topic: activities-by-phone-store-or-repartition
> > >>> KSTREAM-BRANCHCHILD-0000000005:
> > >>> Partitions [activities-avro-or-0]
> > >>> StreamsTask taskId: 1_6
> > >>> ProcessorTopology:
> > >>> KSTREAM-SOURCE-0000000012:
> > >>> topics: [activities-by-phone-store-or-repartition]
> > >>> children: [KSTREAM-AGGREGATE-0000000009]
> > >>> KSTREAM-AGGREGATE-0000000009:
> > >>> states: [activities-by-phone-store-or]
> > >>> children: [KTABLE-TOSTREAM-0000000013]
> > >>> KTABLE-TOSTREAM-0000000013:
> > >>> children: [KSTREAM-FILTER-0000000014]
> > >>> KSTREAM-FILTER-0000000014:
> > >>> children: [KSTREAM-FILTER-0000000015]
> > >>> KSTREAM-FILTER-0000000015:
> > >>> children: [KSTREAM-MAP-0000000016]
> > >>> KSTREAM-MAP-0000000016:
> > >>> children: [KSTREAM-MAP-0000000017]
> > >>> KSTREAM-MAP-0000000017:
> > >>> children: [KSTREAM-SINK-0000000018]
> > >>> KSTREAM-SINK-0000000018:
> > >>> topic: ml-features-avro-or
> > >>> Partitions [activities-by-phone-store-or-repartition-6]
> > >>> Standby tasks:
> > >>>
> > >>> StreamsThread appId: mar-23-modular
> > >>> StreamsThread clientId: mar-23-modular
> > >>> StreamsThread threadId: StreamThread-6
> > >>> Active tasks:
> > >>> StreamsTask taskId: 1_0
> > >>> ProcessorTopology:
> > >>> KSTREAM-SOURCE-0000000012:
> > >>> topics: [activities-by-phone-store-or-repartition]
> > >>> children: [KSTREAM-AGGREGATE-0000000009]
> > >>> KSTREAM-AGGREGATE-0000000009:
> > >>> states: [activities-by-phone-store-or]
> > >>> children: [KTABLE-TOSTREAM-0000000013]
> > >>> KTABLE-TOSTREAM-0000000013:
> > >>> children: [KSTREAM-FILTER-0000000014]
> > >>> KSTREAM-FILTER-0000000014:
> > >>> children: [KSTREAM-FILTER-0000000015]
> > >>> KSTREAM-FILTER-0000000015:
> > >>> children: [KSTREAM-MAP-0000000016]
> > >>> KSTREAM-MAP-0000000016:
> > >>> children: [KSTREAM-MAP-0000000017]
> > >>> KSTREAM-MAP-0000000017:
> > >>> children: [KSTREAM-SINK-0000000018]
> > >>> KSTREAM-SINK-0000000018:
> > >>> topic: ml-features-avro-or
> > >>> Partitions [activities-by-phone-store-or-repartition-0]
> > >>> StreamsTask taskId: 0_7
> > >>> ProcessorTopology:
> > >>> KSTREAM-SOURCE-0000000000:
> > >>> topics: [activities-avro-or]
> > >>> children: [KSTREAM-FILTER-0000000001]
> > >>> KSTREAM-FILTER-0000000001:
> > >>> children: [KSTREAM-MAP-0000000002]
> > >>> KSTREAM-MAP-0000000002:
> > >>> children: [KSTREAM-BRANCH-0000000003]
> > >>> KSTREAM-BRANCH-0000000003:
> > >>> children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD-
> > >> 0000000005]
> > >>> KSTREAM-BRANCHCHILD-0000000004:
> > >>> children: [KSTREAM-MAPVALUES-0000000006]
> > >>> KSTREAM-MAPVALUES-0000000006:
> > >>> children: [KSTREAM-FLATMAPVALUES-0000000007]
> > >>> KSTREAM-FLATMAPVALUES-0000000007:
> > >>> children: [KSTREAM-MAP-0000000008]
> > >>> KSTREAM-MAP-0000000008:
> > >>> children: [KSTREAM-FILTER-0000000011]
> > >>> KSTREAM-FILTER-0000000011:
> > >>> children: [KSTREAM-SINK-0000000010]
> > >>> KSTREAM-SINK-0000000010:
> > >>> topic: activities-by-phone-store-or-repartition
> > >>> KSTREAM-BRANCHCHILD-0000000005:
> > >>> Partitions [activities-avro-or-7]
> > >>> Standby tasks:
> > >>>
> > >>> StreamsThread appId: mar-23-modular
> > >>> StreamsThread clientId: mar-23-modular
> > >>> StreamsThread threadId: StreamThread-7
> > >>> Active tasks:
> > >>> StreamsTask taskId: 2_4
> > >>> ProcessorTopology:
> > >>> KSTREAM-SOURCE-0000000019:
> > >>> topics: [ml-features-avro-or]
> > >>> Partitions [ml-features-avro-or-4]
> > >>> Standby tasks:
> > >>>
> > >>> StreamsThread appId: mar-23-modular
> > >>> StreamsThread clientId: mar-23-modular
> > >>> StreamsThread threadId: StreamThread-8
> > >>> Active tasks:
> > >>> StreamsTask taskId: 1_3
> > >>> ProcessorTopology:
> > >>> KSTREAM-SOURCE-0000000012:
> > >>> topics: [activities-by-phone-store-or-repartition]
> > >>> children: [KSTREAM-AGGREGATE-0000000009]
> > >>> KSTREAM-AGGREGATE-0000000009:
> > >>> states: [activities-by-phone-store-or]
> > >>> children: [KTABLE-TOSTREAM-0000000013]
> > >>> KTABLE-TOSTREAM-0000000013:
> > >>> children: [KSTREAM-FILTER-0000000014]
> > >>> KSTREAM-FILTER-0000000014:
> > >>> children: [KSTREAM-FILTER-0000000015]
> > >>> KSTREAM-FILTER-0000000015:
> > >>> children: [KSTREAM-MAP-0000000016]
> > >>> KSTREAM-MAP-0000000016:
> > >>> children: [KSTREAM-MAP-0000000017]
> > >>> KSTREAM-MAP-0000000017:
> > >>> children: [KSTREAM-SINK-0000000018]
> > >>> KSTREAM-SINK-0000000018:
> > >>> topic: ml-features-avro-or
> > >>> Partitions [activities-by-phone-store-or-repartition-3]
> > >>> Standby tasks:
> > >>>
> > >>>
> > >>> instance 2:
> > >>>
> > >>> KafkaStreams processID: 092072f8-87be-4989-a94f-0ed544f5ca44
> > >>> StreamsThread appId: mar-23-modular
> > >>> StreamsThread clientId: mar-23-modular
> > >>> StreamsThread threadId: StreamThread-1
> > >>> Active tasks:
> > >>> StreamsTask taskId: 2_1
> > >>> ProcessorTopology:
> > >>> KSTREAM-SOURCE-0000000019:
> > >>> topics: [ml-features-avro-or]
> > >>> Partitions [ml-features-avro-or-1]
> > >>> StreamsTask taskId: 2_5
> > >>> ProcessorTopology:
> > >>> KSTREAM-SOURCE-0000000019:
> > >>> topics: [ml-features-avro-or]
> > >>> Partitions [ml-features-avro-or-5]
> > >>> Standby tasks:
> > >>>
> > >>> StreamsThread appId: mar-23-modular
> > >>> StreamsThread clientId: mar-23-modular
> > >>> StreamsThread threadId: StreamThread-2
> > >>> Active tasks:
> > >>> StreamsTask taskId: 0_4
> > >>> ProcessorTopology:
> > >>> KSTREAM-SOURCE-0000000000:
> > >>> topics: [activities-avro-or]
> > >>> children: [KSTREAM-FILTER-0000000001]
> > >>> KSTREAM-FILTER-0000000001:
> > >>> children: [KSTREAM-MAP-0000000002]
> > >>> KSTREAM-MAP-0000000002:
> > >>> children: [KSTREAM-BRANCH-0000000003]
> > >>> KSTREAM-BRANCH-0000000003:
> > >>> children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD-
> > >> 0000000005]
> > >>> KSTREAM-BRANCHCHILD-0000000004:
> > >>> children: [KSTREAM-MAPVALUES-0000000006]
> > >>> KSTREAM-MAPVALUES-0000000006:
> > >>> children: [KSTREAM-FLATMAPVALUES-0000000007]
> > >>> KSTREAM-FLATMAPVALUES-0000000007:
> > >>> children: [KSTREAM-MAP-0000000008]
> > >>> KSTREAM-MAP-0000000008:
> > >>> children: [KSTREAM-FILTER-0000000011]
> > >>> KSTREAM-FILTER-0000000011:
> > >>> children: [KSTREAM-SINK-0000000010]
> > >>> KSTREAM-SINK-0000000010:
> > >>> topic: activities-by-phone-store-or-repartition
> > >>> KSTREAM-BRANCHCHILD-0000000005:
> > >>> Partitions [activities-avro-or-4]
> > >>> Standby tasks:
> > >>>
> > >>> StreamsThread appId: mar-23-modular
> > >>> StreamsThread clientId: mar-23-modular
> > >>> StreamsThread threadId: StreamThread-3
> > >>> Active tasks:
> > >>> StreamsTask taskId: 2_2
> > >>> ProcessorTopology:
> > >>> KSTREAM-SOURCE-0000000019:
> > >>> topics: [ml-features-avro-or]
> > >>> Partitions [ml-features-avro-or-2]
> > >>> StreamsTask taskId: 1_7
> > >>> ProcessorTopology:
> > >>> KSTREAM-SOURCE-0000000012:
> > >>> topics: [activities-by-phone-store-or-repartition]
> > >>> children: [KSTREAM-AGGREGATE-0000000009]
> > >>> KSTREAM-AGGREGATE-0000000009:
> > >>> states: [activities-by-phone-store-or]
> > >>> children: [KTABLE-TOSTREAM-0000000013]
> > >>> KTABLE-TOSTREAM-0000000013:
> > >>> children: [KSTREAM-FILTER-0000000014]
> > >>> KSTREAM-FILTER-0000000014:
> > >>> children: [KSTREAM-FILTER-0000000015]
> > >>> KSTREAM-FILTER-0000000015:
> > >>> children: [KSTREAM-MAP-0000000016]
> > >>> KSTREAM-MAP-0000000016:
> > >>> children: [KSTREAM-MAP-0000000017]
> > >>> KSTREAM-MAP-0000000017:
> > >>> children: [KSTREAM-SINK-0000000018]
> > >>> KSTREAM-SINK-0000000018:
> > >>> topic: ml-features-avro-or
> > >>> Partitions [activities-by-phone-store-or-repartition-7]
> > >>> Standby tasks:
> > >>>
> > >>> StreamsThread appId: mar-23-modular
> > >>> StreamsThread clientId: mar-23-modular
> > >>> StreamsThread threadId: StreamThread-4
> > >>> Active tasks:
> > >>> StreamsTask taskId: 2_3
> > >>> ProcessorTopology:
> > >>> KSTREAM-SOURCE-0000000019:
> > >>> topics: [ml-features-avro-or]
> > >>> Partitions [ml-features-avro-or-3]
> > >>> Standby tasks:
> > >>>
> > >>> StreamsThread appId: mar-23-modular
> > >>> StreamsThread clientId: mar-23-modular
> > >>> StreamsThread threadId: StreamThread-5
> > >>> Active tasks:
> > >>> StreamsTask taskId: 0_1
> > >>> ProcessorTopology:
> > >>> KSTREAM-SOURCE-0000000000:
> > >>> topics: [activities-avro-or]
> > >>> children: [KSTREAM-FILTER-0000000001]
> > >>> KSTREAM-FILTER-0000000001:
> > >>> children: [KSTREAM-MAP-0000000002]
> > >>> KSTREAM-MAP-0000000002:
> > >>> children: [KSTREAM-BRANCH-0000000003]
> > >>> KSTREAM-BRANCH-0000000003:
> > >>> children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD-
> > >> 0000000005]
> > >>> KSTREAM-BRANCHCHILD-0000000004:
> > >>> children: [KSTREAM-MAPVALUES-0000000006]
> > >>> KSTREAM-MAPVALUES-0000000006:
> > >>> children: [KSTREAM-FLATMAPVALUES-0000000007]
> > >>> KSTREAM-FLATMAPVALUES-0000000007:
> > >>> children: [KSTREAM-MAP-0000000008]
> > >>> KSTREAM-MAP-0000000008:
> > >>> children: [KSTREAM-FILTER-0000000011]
> > >>> KSTREAM-FILTER-0000000011:
> > >>> children: [KSTREAM-SINK-0000000010]
> > >>> KSTREAM-SINK-0000000010:
> > >>> topic: activities-by-phone-store-or-repartition
> > >>> KSTREAM-BRANCHCHILD-0000000005:
> > >>> Partitions [activities-avro-or-1]
> > >>> StreamsTask taskId: 1_5
> > >>> ProcessorTopology:
> > >>> KSTREAM-SOURCE-0000000012:
> > >>> topics: [activities-by-phone-store-or-repartition]
> > >>> children: [KSTREAM-AGGREGATE-0000000009]
> > >>> KSTREAM-AGGREGATE-0000000009:
> > >>> states: [activities-by-phone-store-or]
> > >>> children: [KTABLE-TOSTREAM-0000000013]
> > >>> KTABLE-TOSTREAM-0000000013:
> > >>> children: [KSTREAM-FILTER-0000000014]
> > >>> KSTREAM-FILTER-0000000014:
> > >>> children: [KSTREAM-FILTER-0000000015]
> > >>> KSTREAM-FILTER-0000000015:
> > >>> children: [KSTREAM-MAP-0000000016]
> > >>> KSTREAM-MAP-0000000016:
> > >>> children: [KSTREAM-MAP-0000000017]
> > >>> KSTREAM-MAP-0000000017:
> > >>> children: [KSTREAM-SINK-0000000018]
> > >>> KSTREAM-SINK-0000000018:
> > >>> topic: ml-features-avro-or
> > >>> Partitions [activities-by-phone-store-or-repartition-5]
> > >>> Standby tasks:
> > >>>
> > >>> StreamsThread appId: mar-23-modular
> > >>> StreamsThread clientId: mar-23-modular
> > >>> StreamsThread threadId: StreamThread-6
> > >>> Active tasks:
> > >>> StreamsTask taskId: 1_4
> > >>> ProcessorTopology:
> > >>> KSTREAM-SOURCE-0000000012:
> > >>> topics: [activities-by-phone-store-or-repartition]
> > >>> children: [KSTREAM-AGGREGATE-0000000009]
> > >>> KSTREAM-AGGREGATE-0000000009:
> > >>> states: [activities-by-phone-store-or]
> > >>> children: [KTABLE-TOSTREAM-0000000013]
> > >>> KTABLE-TOSTREAM-0000000013:
> > >>> children: [KSTREAM-FILTER-0000000014]
> > >>> KSTREAM-FILTER-0000000014:
> > >>> children: [KSTREAM-FILTER-0000000015]
> > >>> KSTREAM-FILTER-0000000015:
> > >>> children: [KSTREAM-MAP-0000000016]
> > >>> KSTREAM-MAP-0000000016:
> > >>> children: [KSTREAM-MAP-0000000017]
> > >>> KSTREAM-MAP-0000000017:
> > >>> children: [KSTREAM-SINK-0000000018]
> > >>> KSTREAM-SINK-0000000018:
> > >>> topic: ml-features-avro-or
> > >>> Partitions [activities-by-phone-store-or-repartition-4]
> > >>> Standby tasks:
> > >>>
> > >>> StreamsThread appId: mar-23-modular
> > >>> StreamsThread clientId: mar-23-modular
> > >>> StreamsThread threadId: StreamThread-7
> > >>> Active tasks:
> > >>> StreamsTask taskId: 0_2
> > >>> ProcessorTopology:
> > >>> KSTREAM-SOURCE-0000000000:
> > >>> topics: [activities-avro-or]
> > >>> children: [KSTREAM-FILTER-0000000001]
> > >>> KSTREAM-FILTER-0000000001:
> > >>> children: [KSTREAM-MAP-0000000002]
> > >>> KSTREAM-MAP-0000000002:
> > >>> children: [KSTREAM-BRANCH-0000000003]
> > >>> KSTREAM-BRANCH-0000000003:
> > >>> children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD-
> > >> 0000000005]
> > >>> KSTREAM-BRANCHCHILD-0000000004:
> > >>> children: [KSTREAM-MAPVALUES-0000000006]
> > >>> KSTREAM-MAPVALUES-0000000006:
> > >>> children: [KSTREAM-FLATMAPVALUES-0000000007]
> > >>> KSTREAM-FLATMAPVALUES-0000000007:
> > >>> children: [KSTREAM-MAP-0000000008]
> > >>> KSTREAM-MAP-0000000008:
> > >>> children: [KSTREAM-FILTER-0000000011]
> > >>> KSTREAM-FILTER-0000000011:
> > >>> children: [KSTREAM-SINK-0000000010]
> > >>> KSTREAM-SINK-0000000010:
> > >>> topic: activities-by-phone-store-or-repartition
> > >>> KSTREAM-BRANCHCHILD-0000000005:
> > >>> Partitions [activities-avro-or-2]
> > >>> StreamsTask taskId: 0_6
> > >>> ProcessorTopology:
> > >>> KSTREAM-SOURCE-0000000000:
> > >>> topics: [activities-avro-or]
> > >>> children: [KSTREAM-FILTER-0000000001]
> > >>> KSTREAM-FILTER-0000000001:
> > >>> children: [KSTREAM-MAP-0000000002]
> > >>> KSTREAM-MAP-0000000002:
> > >>> children: [KSTREAM-BRANCH-0000000003]
> > >>> KSTREAM-BRANCH-0000000003:
> > >>> children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD-
> > >> 0000000005]
> > >>> KSTREAM-BRANCHCHILD-0000000004:
> > >>> children: [KSTREAM-MAPVALUES-0000000006]
> > >>> KSTREAM-MAPVALUES-0000000006:
> > >>> children: [KSTREAM-FLATMAPVALUES-0000000007]
> > >>> KSTREAM-FLATMAPVALUES-0000000007:
> > >>> children: [KSTREAM-MAP-0000000008]
> > >>> KSTREAM-MAP-0000000008:
> > >>> children: [KSTREAM-FILTER-0000000011]
> > >>> KSTREAM-FILTER-0000000011:
> > >>> children: [KSTREAM-SINK-0000000010]
> > >>> KSTREAM-SINK-0000000010:
> > >>> topic: activities-by-phone-store-or-repartition
> > >>> KSTREAM-BRANCHCHILD-0000000005:
> > >>> Partitions [activities-avro-or-6]
> > >>> Standby tasks:
> > >>>
> > >>> StreamsThread appId: mar-23-modular
> > >>> StreamsThread clientId: mar-23-modular
> > >>> StreamsThread threadId: StreamThread-8
> > >>> Active tasks:
> > >>> StreamsTask taskId: 0_5
> > >>> ProcessorTopology:
> > >>> KSTREAM-SOURCE-0000000000:
> > >>> topics: [activities-avro-or]
> > >>> children: [KSTREAM-FILTER-0000000001]
> > >>> KSTREAM-FILTER-0000000001:
> > >>> children: [KSTREAM-MAP-0000000002]
> > >>> KSTREAM-MAP-0000000002:
> > >>> children: [KSTREAM-BRANCH-0000000003]
> > >>> KSTREAM-BRANCH-0000000003:
> > >>> children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD-
> > >> 0000000005]
> > >>> KSTREAM-BRANCHCHILD-0000000004:
> > >>> children: [KSTREAM-MAPVALUES-0000000006]
> > >>> KSTREAM-MAPVALUES-0000000006:
> > >>> children: [KSTREAM-FLATMAPVALUES-0000000007]
> > >>> KSTREAM-FLATMAPVALUES-0000000007:
> > >>> children: [KSTREAM-MAP-0000000008]
> > >>> KSTREAM-MAP-0000000008:
> > >>> children: [KSTREAM-FILTER-0000000011]
> > >>> KSTREAM-FILTER-0000000011:
> > >>> children: [KSTREAM-SINK-0000000010]
> > >>> KSTREAM-SINK-0000000010:
> > >>> topic: activities-by-phone-store-or-repartition
> > >>> KSTREAM-BRANCHCHILD-0000000005:
> > >>> Partitions [activities-avro-or-5]
> > >>> Standby tasks:
> > >>>
> > >>>
> > >>> activities-avro-or is input topic. ml-features-avro-or is output
> topic.
> > >> In
> > >>> the middle we have an aggregate (activities-by-phone-store-or-
> > >> repartition).
> > >>>
> > >>> On instance 1 I see 3 tasks for activities-avro-or and on instance 2
> I
> > >> see
> > >>> 5. Bad.
> > >>>
> > >>> On instance 1 see 4 tasks for ml-features-avro-or. And 4 on instance
> 2.
> > >>> Good.
> > >>>
> > >>> On instance 1 see 5 tasks for activities-by-phone-store-or-
> > repartition.
> > >> And
> > >>> 3 on instance 2. Bad.
> > >>>
> > >>> As I said in terms of offsets for all these partitions I see uniform
> > >>> distribution, so we’re not dealing with a bad key scenario.
> > >>>
> > >>> Ara.
> > >>>
> > >>> On Mar 25, 2017, at 6:43 PM, Matthias J. Sax <matth...@confluent.io
> > >> <mailto:
> > >>> matth...@confluent.io>> wrote:
> > >>>
> > >>>
> > >>>
> > >>>
> > >>> ________________________________
> > >>>
> > >>> This message is for the designated recipient only and may contain
> > >>> privileged, proprietary, or otherwise confidential information. If
> you
> > >> have
> > >>> received it in error, please notify the sender immediately and delete
> > the
> > >>> original. Any other use of the e-mail by you is prohibited. Thank you
> > in
> > >>> advance for your cooperation.
> > >>>
> > >>> ________________________________
> > >>>
> > >>> From: "Matthias J. Sax" <matth...@confluent.io<mailto:
> > >> matth...@confluent.io
> > >>>>>
> > >>> Subject: Re: more uniform task assignment across kafka stream nodes
> > >>> Date: March 25, 2017 at 6:43:12 PM PDT
> > >>> To: users@kafka.apache.org<mailto:users@kafka.apache.org>
> > >>> Reply-To: <users@kafka.apache.org<mailto:users@kafka.apache.org>>
> > >>>
> > >>>
> > >>> Please share the rest of your topology code (without any UDFs /
> > business
> > >>> logic). Otherwise, I cannot give further advice.
> > >>>
> > >>> -Matthias
> > >>>
> > >>>
> > >>> On 3/25/17 6:08 PM, Ara Ebrahimi wrote:
> > >>> Via:
> > >>>
> > >>> builder.stream("topic1");
> > >>> builder.stream("topic2");
> > >>> builder.stream("topic3”);
> > >>>
> > >>> These are different kinds of topics consuming different avro objects.
> > >>>
> > >>> Ara.
> > >>>
> > >>> On Mar 25, 2017, at 6:04 PM, Matthias J. Sax <matth...@confluent.io
> > >> <mailto:
> > >>> matth...@confluent.io><mailto:matth...@confluent.io>> wrote:
> > >>>
> > >>>
> > >>>
> > >>>
> > >>> ________________________________
> > >>>
> > >>> This message is for the designated recipient only and may contain
> > >>> privileged, proprietary, or otherwise confidential information. If
> you
> > >> have
> > >>> received it in error, please notify the sender immediately and delete
> > the
> > >>> original. Any other use of the e-mail by you is prohibited. Thank you
> > in
> > >>> advance for your cooperation.
> > >>>
> > >>> ________________________________
> > >>>
> > >>> From: "Matthias J. Sax" <matth...@confluent.io<mailto:
> > >> matth...@confluent.io
> > >>>> <mailto:matth...@confluent.io>>
> > >>> Subject: Re: more uniform task assignment across kafka stream nodes
> > >>> Date: March 25, 2017 at 6:04:30 PM PDT
> > >>> To: users@kafka.apache.org<mailto:users@kafka.apache.org><mailto:
> > >>> users@kafka.apache.org>
> > >>> Reply-To: <users@kafka.apache.org<mailto:users@kafka.apache.org>
> > <mailto:
> > >>> users@kafka.apache.org>>
> > >>>
> > >>>
> > >>> Ara,
> > >>>
> > >>> How do you consume your topics? Via
> > >>>
> > >>> builder.stream("topic1", "topic2", "topic3);
> > >>>
> > >>> or via
> > >>>
> > >>> builder.stream("topic1");
> > >>> builder.stream("topic2");
> > >>> builder.stream("topic3");
> > >>>
> > >>> Both and handled differently with regard to creating tasks (partition
> > to
> > >>> task assignment also depends on you downstream code though).
> > >>>
> > >>> If this does not help, can you maybe share the structure of
> processing?
> > >>> To dig deeper, we would need to know the topology DAG.
> > >>>
> > >>>
> > >>> -Matthias
> > >>>
> > >>>
> > >>> On 3/25/17 5:56 PM, Ara Ebrahimi wrote:
> > >>> Mathias,
> > >>>
> > >>> This apparently happens because we have more than 1 source topic. We
> > >> have 3
> > >>> source topics in the same application. So it seems like the task
> > >> assignment
> > >>> algorithm creates topologies not for one specific topic at a time but
> > the
> > >>> total partitions across all source topics consumed in an application
> > >>> instance. Because we have some code dependencies between these 3
> source
> > >>> topics we can’t separate them into 3 applications at this time. Hence
> > the
> > >>> reason I want to get the task assignment algorithm basically do a
> > uniform
> > >>> and simple task assignment PER source topic.
> > >>>
> > >>> Ara.
> > >>>
> > >>> On Mar 25, 2017, at 5:21 PM, Matthias J. Sax <matth...@confluent.io
> > >> <mailto:
> > >>> matth...@confluent.io><mailto:matth...@confluent.io><mailto:
> > >>> matth...@confluent.io>> wrote:
> > >>>
> > >>>
> > >>>
> > >>>
> > >>> ________________________________
> > >>>
> > >>> This message is for the designated recipient only and may contain
> > >>> privileged, proprietary, or otherwise confidential information. If
> you
> > >> have
> > >>> received it in error, please notify the sender immediately and delete
> > the
> > >>> original. Any other use of the e-mail by you is prohibited. Thank you
> > in
> > >>> advance for your cooperation.
> > >>>
> > >>> ________________________________
> > >>>
> > >>> From: "Matthias J. Sax" <matth...@confluent.io<mailto:
> > >> matth...@confluent.io
> > >>>> <mailto:matth...@confluent.io><mailto:matth...@confluent.io>>
> > >>> Subject: Re: more uniform task assignment across kafka stream nodes
> > >>> Date: March 25, 2017 at 5:21:47 PM PDT
> > >>> To: users@kafka.apache.org<mailto:users@kafka.apache.org><mailto:
> > >>> users@kafka.apache.org><mailto:users@kafka.apache.org>
> > >>> Reply-To: <users@kafka.apache.org<mailto:users@kafka.apache.org>
> > <mailto:
> > >>> users@kafka.apache.org><mailto:users@kafka.apache.org>>
> > >>>
> > >>>
> > >>> Hi,
> > >>>
> > >>> I am wondering why this happens in the first place. Streams,
> > >>> load-balanced over all running instances, and each instance should be
> > >>> the same number of tasks (and thus partitions) assigned.
> > >>>
> > >>> What is the overall assignment? Do you have StandyBy tasks
> configured?
> > >>> What version do you use?
> > >>>
> > >>>
> > >>> -Matthias
> > >>>
> > >>>
> > >>> On 3/24/17 8:09 PM, Ara Ebrahimi wrote:
> > >>> Hi,
> > >>>
> > >>> Is there a way to tell kafka streams to uniformly assign partitions
> > >> across
> > >>> instances? If I have n kafka streams instances running, I want each
> to
> > >>> handle EXACTLY 1/nth number of partitions. No dynamic task assignment
> > >>> logic. Just dumb 1/n assignment.
> > >>>
> > >>> Here’s our scenario. Lets say we have an “source" topic with 8
> > >> partitions.
> > >>> We also have 2 kafka streams instances. Each instances get assigned
> to
> > >>> handle 4 “source" topic partitions. BUT then we do a few maps and an
> > >>> aggregate. So data gets shuffled around. The map function uniformly
> > >>> distributes these across all partitions (I can verify that by looking
> > at
> > >>> the partition offsets). After the map what I notice by looking at the
> > >>> topology is that one kafka streams instance get assigned to handle
> say
> > 2
> > >>> aggregate repartition topics and the other one gets assigned 6. Even
> > >> worse,
> > >>> on bigger clusters (say 4 instances) we see say 2 nodes gets assigned
> > >>> downstream aggregate repartition topics and 2 other nodes assigned
> > >> NOTHING
> > >>> to handle.
> > >>>
> > >>> Ara.
> > >>>
> > >>>
> > >>>
> > >>> ________________________________
> > >>>
> > >>> This message is for the designated recipient only and may contain
> > >>> privileged, proprietary, or otherwise confidential information. If
> you
> > >> have
> > >>> received it in error, please notify the sender immediately and delete
> > the
> > >>> original. Any other use of the e-mail by you is prohibited. Thank you
> > in
> > >>> advance for your cooperation.
> > >>>
> > >>> ________________________________
> > >>>
> > >>>
> > >>>
> > >>>
> > >>>
> > >>>
> > >>>
> > >>>
> > >>> ________________________________
> > >>>
> > >>> This message is for the designated recipient only and may contain
> > >>> privileged, proprietary, or otherwise confidential information. If
> you
> > >> have
> > >>> received it in error, please notify the sender immediately and delete
> > the
> > >>> original. Any other use of the e-mail by you is prohibited. Thank you
> > in
> > >>> advance for your cooperation.
> > >>>
> > >>> ________________________________
> > >>>
> > >>>
> > >>>
> > >>>
> > >>> ________________________________
> > >>>
> > >>> This message is for the designated recipient only and may contain
> > >>> privileged, proprietary, or otherwise confidential information. If
> you
> > >> have
> > >>> received it in error, please notify the sender immediately and delete
> > the
> > >>> original. Any other use of the e-mail by you is prohibited. Thank you
> > in
> > >>> advance for your cooperation.
> > >>>
> > >>> ________________________________
> > >>>
> > >>>
> > >>>
> > >>>
> > >>> ________________________________
> > >>>
> > >>> This message is for the designated recipient only and may contain
> > >>> privileged, proprietary, or otherwise confidential information. If
> you
> > >> have
> > >>> received it in error, please notify the sender immediately and delete
> > the
> > >>> original. Any other use of the e-mail by you is prohibited. Thank you
> > in
> > >>> advance for your cooperation.
> > >>>
> > >>> ________________________________
> > >>>
> > >>>
> > >>>
> > >>>
> > >>>
> > >>>
> > >>>
> > >>>
> > >>> ________________________________
> > >>>
> > >>> This message is for the designated recipient only and may contain
> > >>> privileged, proprietary, or otherwise confidential information. If
> you
> > >> have
> > >>> received it in error, please notify the sender immediately and delete
> > the
> > >>> original. Any other use of the e-mail by you is prohibited. Thank you
> > in
> > >>> advance for your cooperation.
> > >>>
> > >>> ________________________________
> > >>>
> > >>>
> > >>>
> > >>> ________________________________
> > >>>
> > >>> This message is for the designated recipient only and may contain
> > >> privileged, proprietary, or otherwise confidential information. If you
> > have
> > >> received it in error, please notify the sender immediately and delete
> > the
> > >> original. Any other use of the e-mail by you is prohibited. Thank you
> in
> > >> advance for your cooperation.
> > >>>
> > >>> ________________________________
> > >>
> > >>
> > >>
> > >>
> > >> ________________________________
> > >>
> > >> This message is for the designated recipient only and may contain
> > >> privileged, proprietary, or otherwise confidential information. If you
> > have
> > >> received it in error, please notify the sender immediately and delete
> > the
> > >> original. Any other use of the e-mail by you is prohibited. Thank you
> in
> > >> advance for your cooperation.
> > >>
> > >> ________________________________
> > >>
> > >
> > >
> > >
> > > ________________________________
> > >
> > > This message is for the designated recipient only and may contain
> > privileged, proprietary, or otherwise confidential information. If you
> have
> > received it in error, please notify the sender immediately and delete the
> > original. Any other use of the e-mail by you is prohibited. Thank you in
> > advance for your cooperation.
> > >
> > > ________________________________
> >
> >
> >
> >
> > ________________________________
> >
> > This message is for the designated recipient only and may contain
> > privileged, proprietary, or otherwise confidential information. If you
> have
> > received it in error, please notify the sender immediately and delete the
> > original. Any other use of the e-mail by you is prohibited. Thank you in
> > advance for your cooperation.
> >
> > ________________________________
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to