Matthias, The instances are transient (Mesos) so when we roll them we get a brand new instance.
-russ On Wed, Feb 7, 2018 at 4:53 PM, Matthias J. Sax <[email protected]> wrote: > Russel, > > > Yes. We used the reset tool before deploying with 1.1. > > Did you also clean up local state? The tool only takes care of "broker > side" cleanup. You would need to delete local state by calling > KafkaStreams#cleanup() before restart or by deleting the corresponding > local state directory manually. > > > -Matthias > > On 2/7/18 3:38 PM, Bill Bejeck wrote: > > Russell, > > > > INFO level is fine and it could be just the portion of the logs right > after > > streams has finished rebalancing. > > > > You can tar them up and attach to this mailing list unless you'd prefer > not > > to do so, in which case I can send you my email address directly. > > > > Thanks, > > Bill > > > > On Wed, Feb 7, 2018 at 6:16 PM, Russell Teabeault < > > [email protected]> wrote: > > > >> Bill, > >> > >> I may be able to. > >> > >> - What logging level? > >> - Do you need logs from all the instances? > >> - Where should I send them? > >> > >> -russ > >> > >> On Wed, Feb 7, 2018 at 4:12 PM, Bill Bejeck <[email protected]> wrote: > >> > >>> Russell, > >>> > >>> Can you share any log files? > >>> > >>> Thanks, > >>> Bill > >>> > >>> > >>> > >>> On Wed, Feb 7, 2018 at 5:45 PM, Russell Teabeault < > >>> [email protected]> wrote: > >>> > >>>> Hi Matthias, > >>>> > >>>> Thanks for the prompt reply. We have built the kafka-streams jar from > >> the > >>>> 1.1 branch and deployed our instances. We are only able to upgrade the > >>>> Kafka Streams to 1.1 > >>>> and can not upgrade to 1.1 for the brokers. I don't think that should > >>>> matter though. Yes? > >>>> > >>>> It does not seem to have helped. We currently have 25 instances with 4 > >>>> threads/instance. Our topology has two topics in it, each having 100 > >>>> partitions. The input topic feeds into a filtering step that uses an > >>>> in-memory store and that is output via groupBy to an intermediate > >> topic. > >>>> The intermediate topic then feeds into an aggregation step which uses > >> the > >>>> rocksDB store. So we can see that we have 200 tasks total. After > >>> switching > >>>> to 1.1 the task assignments are still wildly uneven. Some instances > >> only > >>>> have tasks from one of the topics. Furthermore, the instances keep > >> dying > >>>> due to org.apache.kafka.common.errors.NotLeaderForPartitionException: > >>> This > >>>> server is not the leader for that topic-partition. > >>>> > >>>> Is there something else we need to do to make this updated task > >>> assignment > >>>> work? > >>>> > >>>> Thanks! > >>>> -russ > >>>> > >>>> > >>>> > >>>> On Wed, Feb 7, 2018 at 12:33 PM, Matthias J. Sax < > >> [email protected]> > >>>> wrote: > >>>> > >>>>> It's a know issue and we addressed it already via > >>>>> https://issues.apache.org/jira/browse/KAFKA-4969 > >>>>> > >>>>> The fix will be part of upcoming 1.1 release, but you could try it > >> out > >>>>> immediately running from trunk or 1.0 branch. (If you do, feedback > >>> would > >>>>> be very welcome :)) > >>>>> > >>>>> Your proposed workarounds should work. I cannot come up with anything > >>>>> else you could do, because the task assignment cannot be influenced. > >>>>> > >>>>> > >>>>> -Matthias > >>>>> > >>>>> On 2/7/18 10:37 AM, Russell Teabeault wrote: > >>>>>> We are using Kafka Streams for a project and had some questions > >> about > >>>> how > >>>>>> stream tasks are assigned. > >>>>>> > >>>>>> streamBuilder > >>>>>> .stream("inbound-topic", Consumed.`with`(keySerde, valueSerde)) > >>>>>> ... // Do some stuff here > >>>>>> > >>>>>> .through("intermediate-topic") > >>>>>> ... // Do some other stuff here > >>>>>> > >>>>>> In this example we are streaming from "inbound-topic" and then > >> doing > >>>> some > >>>>>> work before writing the results back out to "intermediate-topic". > >>>>>> Then we are reading in from "intermediate-topic" and doing some > >> more > >>>>> work. > >>>>>> If both of these topics contain 100 partitions (200 partitions > >> total) > >>>>> and I > >>>>>> create 10 instances of my application then > >>>>>> what I observe is that there are a total of 20 partitions assigned > >> to > >>>>> each > >>>>>> instance. But the distribution of these partitions across the two > >>>> topics > >>>>> is > >>>>>> not even. For example, one > >>>>>> instance may have 7 partitions from "inbound-topic" and 13 > >> partitions > >>>>> from > >>>>>> "intermediate-topic". I would have hoped that each instance would > >>> have > >>>> 10 > >>>>>> partitions from each > >>>>>> topic. Because of this uneven distribution it can make the resource > >>>>>> characteristics from instance to instance very different. > >>>>>> > >>>>>> In a more concrete example we are reading from an input topic, then > >>>> using > >>>>>> an in-memory store to do some filtering, followed by a groupBy, and > >>>>> finally > >>>>>> doing an aggregate. > >>>>>> This results in two topics; the input topic and then the internally > >>>>> created > >>>>>> intermediate topic written to by the groupBy and read from by the > >>>>>> aggregation. What we see is that some > >>>>>> instances are assigned far more partitions/tasks that are using the > >>>>>> in-memory store and some instances that have very few and sometimes > >>> no > >>>>>> tasks that use the in-memory store. This leads to wildly > >>>>>> different memory usage patterns across the instances. In turn this > >>>> leads > >>>>> us > >>>>>> to set our memory much higher than needed if the partitions from > >> each > >>>>> topic > >>>>>> were equally distributed across the instances. > >>>>>> > >>>>>> The two ways we have figured out how to deal with this problem are: > >>>>>> 1. Use a new StreamBuilder anytime an intermediate topic is being > >>> read > >>>>> from > >>>>>> in the application. > >>>>>> 2. Break the topology into separate applications across the > >> boundary > >>> of > >>>>> an > >>>>>> intermediate topic. > >>>>>> > >>>>>> Neither of these seem like great solutions. So I would like to > >> know: > >>>>>> > >>>>>> 1. Is this expected behavior? > >>>>>> 2. Is there some technique to get equal distribution of > >>> task/partition > >>>>>> assignments across instances? > >>>>>> > >>>>>> Thanks for the help. > >>>>>> > >>>>>> -- > >>>>>> Russell Teabeault | Senior Software Engineer | Twitter | > >> @rusticules > >>>>>> > >>>>> > >>>>> > >>>> > >>>> > >>>> -- > >>>> -- > >>>> Russell Teabeault | Senior Software Engineer | Twitter | @rusticules > >>>> > >>> > >> > >> > >> > >> -- > >> -- > >> Russell Teabeault | Senior Software Engineer | Twitter | @rusticules > >> > > > > -- -- Russell Teabeault | Senior Software Engineer | Twitter | @rusticules
