Matthias,

The instances are transient (Mesos) so when we roll them we get a brand new
instance.

-russ

On Wed, Feb 7, 2018 at 4:53 PM, Matthias J. Sax <[email protected]>
wrote:

> Russel,
>
> > Yes. We used the reset tool before deploying with 1.1.
>
> Did you also clean up local state? The tool only takes care of "broker
> side" cleanup. You would need to delete local state by calling
> KafkaStreams#cleanup() before restart or by deleting the corresponding
> local state directory manually.
>
>
> -Matthias
>
> On 2/7/18 3:38 PM, Bill Bejeck wrote:
> > Russell,
> >
> > INFO level is fine and it could be just the portion of the logs right
> after
> > streams has finished rebalancing.
> >
> > You can tar them up and attach to this mailing list unless you'd prefer
> not
> > to do so, in which case I can send you my email address directly.
> >
> > Thanks,
> > Bill
> >
> > On Wed, Feb 7, 2018 at 6:16 PM, Russell Teabeault <
> > [email protected]> wrote:
> >
> >> Bill,
> >>
> >> I may be able to.
> >>
> >> - What logging level?
> >> - Do you need logs from all the instances?
> >> - Where should I send them?
> >>
> >> -russ
> >>
> >> On Wed, Feb 7, 2018 at 4:12 PM, Bill Bejeck <[email protected]> wrote:
> >>
> >>> Russell,
> >>>
> >>> Can you share any log files?
> >>>
> >>> Thanks,
> >>> Bill
> >>>
> >>>
> >>>
> >>> On Wed, Feb 7, 2018 at 5:45 PM, Russell Teabeault <
> >>> [email protected]> wrote:
> >>>
> >>>> Hi Matthias,
> >>>>
> >>>> Thanks for the prompt reply. We have built the kafka-streams jar from
> >> the
> >>>> 1.1 branch and deployed our instances. We are only able to upgrade the
> >>>> Kafka Streams to 1.1
> >>>> and can not upgrade to 1.1 for the brokers. I don't think that should
> >>>> matter though. Yes?
> >>>>
> >>>> It does not seem to have helped. We currently have 25 instances with 4
> >>>> threads/instance. Our topology has two topics in it, each having 100
> >>>> partitions. The input topic feeds into a filtering step that uses an
> >>>> in-memory store and that is output via groupBy to an intermediate
> >> topic.
> >>>> The intermediate topic then feeds into an aggregation step which uses
> >> the
> >>>> rocksDB store. So we can see that we have 200 tasks total. After
> >>> switching
> >>>> to 1.1 the task assignments are still wildly uneven. Some instances
> >> only
> >>>> have tasks from one of the topics. Furthermore, the instances keep
> >> dying
> >>>> due to org.apache.kafka.common.errors.NotLeaderForPartitionException:
> >>> This
> >>>> server is not the leader for that topic-partition.
> >>>>
> >>>> Is there something else we need to do to make this updated task
> >>> assignment
> >>>> work?
> >>>>
> >>>> Thanks!
> >>>> -russ
> >>>>
> >>>>
> >>>>
> >>>> On Wed, Feb 7, 2018 at 12:33 PM, Matthias J. Sax <
> >> [email protected]>
> >>>> wrote:
> >>>>
> >>>>> It's a know issue and we addressed it already via
> >>>>> https://issues.apache.org/jira/browse/KAFKA-4969
> >>>>>
> >>>>> The fix will be part of upcoming 1.1 release, but you could try it
> >> out
> >>>>> immediately running from trunk or 1.0 branch. (If you do, feedback
> >>> would
> >>>>> be very welcome :))
> >>>>>
> >>>>> Your proposed workarounds should work. I cannot come up with anything
> >>>>> else you could do, because the task assignment cannot be influenced.
> >>>>>
> >>>>>
> >>>>> -Matthias
> >>>>>
> >>>>> On 2/7/18 10:37 AM, Russell Teabeault wrote:
> >>>>>> We are using Kafka Streams for a project and had some questions
> >> about
> >>>> how
> >>>>>> stream tasks are assigned.
> >>>>>>
> >>>>>> streamBuilder
> >>>>>>   .stream("inbound-topic", Consumed.`with`(keySerde, valueSerde))
> >>>>>>   ... // Do some stuff here
> >>>>>>
> >>>>>>   .through("intermediate-topic")
> >>>>>>   ... // Do some other stuff here
> >>>>>>
> >>>>>> In this example we are streaming from "inbound-topic" and then
> >> doing
> >>>> some
> >>>>>> work before writing the results back out to "intermediate-topic".
> >>>>>> Then we are reading in from "intermediate-topic" and doing some
> >> more
> >>>>> work.
> >>>>>> If both of these topics contain 100 partitions (200 partitions
> >> total)
> >>>>> and I
> >>>>>> create 10 instances of my application then
> >>>>>> what I observe is that there are a total of 20 partitions assigned
> >> to
> >>>>> each
> >>>>>> instance. But the distribution of these partitions across the two
> >>>> topics
> >>>>> is
> >>>>>> not even. For example, one
> >>>>>> instance may have 7 partitions from "inbound-topic" and 13
> >> partitions
> >>>>> from
> >>>>>> "intermediate-topic". I would have hoped that each instance would
> >>> have
> >>>> 10
> >>>>>> partitions from each
> >>>>>> topic. Because of this uneven distribution it can make the resource
> >>>>>> characteristics from instance to instance very different.
> >>>>>>
> >>>>>> In a more concrete example we are reading from an input topic, then
> >>>> using
> >>>>>> an in-memory store to do some filtering, followed by a groupBy, and
> >>>>> finally
> >>>>>> doing an aggregate.
> >>>>>> This results in two topics; the input topic and then the internally
> >>>>> created
> >>>>>> intermediate topic written to by the groupBy and read from by the
> >>>>>> aggregation. What we see is that some
> >>>>>> instances are assigned far more partitions/tasks that are using the
> >>>>>> in-memory store and some instances that have very few and sometimes
> >>> no
> >>>>>> tasks that use the in-memory store. This leads to wildly
> >>>>>> different memory usage patterns across the instances. In turn this
> >>>> leads
> >>>>> us
> >>>>>> to set our memory much higher than needed if the partitions from
> >> each
> >>>>> topic
> >>>>>> were equally distributed across the instances.
> >>>>>>
> >>>>>> The two ways we have figured out how to deal with this problem are:
> >>>>>> 1. Use a new StreamBuilder anytime an intermediate topic is being
> >>> read
> >>>>> from
> >>>>>> in the application.
> >>>>>> 2. Break the topology into separate applications across the
> >> boundary
> >>> of
> >>>>> an
> >>>>>> intermediate topic.
> >>>>>>
> >>>>>> Neither of these seem like great solutions. So I would like to
> >> know:
> >>>>>>
> >>>>>> 1. Is this expected behavior?
> >>>>>> 2. Is there some technique to get equal distribution of
> >>> task/partition
> >>>>>> assignments across instances?
> >>>>>>
> >>>>>> Thanks for the help.
> >>>>>>
> >>>>>> --
> >>>>>> Russell Teabeault | Senior Software Engineer | Twitter |
> >> @rusticules
> >>>>>>
> >>>>>
> >>>>>
> >>>>
> >>>>
> >>>> --
> >>>> --
> >>>> Russell Teabeault | Senior Software Engineer | Twitter | @rusticules
> >>>>
> >>>
> >>
> >>
> >>
> >> --
> >> --
> >> Russell Teabeault | Senior Software Engineer | Twitter | @rusticules
> >>
> >
>
>


-- 
-- 
Russell Teabeault | Senior Software Engineer | Twitter | @rusticules

Reply via email to