I agree that minimizing the number of producer connections (while
being a good thing) is really required in very large production
deployments, and the net-effect of the existing change is
counter-intuitive to users who expect an immediate even distribution
across _all_ partitions of the topic.

However, I don't think it is a hack because it is almost exactly the
same behavior as 0.7 in one of its modes. The 0.7 producer (which I
think was even more confusing) had three modes:
i) ZK send
ii) Config send(a): static list of broker1:port1,broker2:port2,etc.
iii) Config send(b): static list of a hardwareVIP:VIPport

(i) and (ii) would achieve even distribution. (iii) would effectively
select one broker and distribute to partitions on that broker within
each reconnect interval. (iii) is very similar to what we now do in
0.8. (Although we stick to one partition during each metadata refresh
interval that can be changed to stick to one broker and distribute
across partitions on that broker).

At the same time, I agree with Joe's suggestion that we should keep
the more intuitive pre-KAFKA-1017 behavior as the default and move the
change in KAFKA-1017 to a more specific partitioner implementation.

Joel


On Sun, Sep 15, 2013 at 8:44 AM, Jay Kreps <jay.kr...@gmail.com> wrote:
> Let me ask another question which I think is more objective. Let's say 100
> random, smart infrastructure specialists try Kafka, of these 100 how many
> do you believe will
> 1. Say that this behavior is what they expected to happen?
> 2. Be happy with this behavior?
> I am not being facetious I am genuinely looking for a numerical estimate. I
> am trying to figure out if nobody thought about this or if my estimate is
> just really different. For what it is worth my estimate is 0 and 5
> respectively.
>
> This would be fine expect that we changed it from the good behavior to the
> bad behavior to fix an issue that probably only we have.
>
> -Jay
>
>
> On Sun, Sep 15, 2013 at 8:37 AM, Jay Kreps <jay.kr...@gmail.com> wrote:
>
>> I just took a look at this change. I agree with Joe, not to put to fine a
>> point on it, but this is a confusing hack.
>>
>> Jun, I don't think wanting to minimizing the number of TCP connections is
>> going to be a very common need for people with less than 10k producers. I
>> also don't think people are going to get very good load balancing out of
>> this because most people don't have a ton of producers. I think instead we
>> will spend the next year explaining this behavior which 99% of people will
>> think is a bug (because it is crazy, non-intuitive, and breaks their usage).
>>
>> Why was this done by adding special default behavior in the null key case
>> instead of as a partitioner? The argument that the partitioner interface
>> doesn't have sufficient information to choose a partition is not a good
>> argument for hacking in changes to the default, it is an argument for *
>> improving* the partitioner interface.
>>
>> The whole point of a partitioner interface is to make it possible to plug
>> in non-standard behavior like this, right?
>>
>> -Jay
>>
>>
>> On Sat, Sep 14, 2013 at 8:15 PM, Jun Rao <jun...@gmail.com> wrote:
>>
>>> Joe,
>>>
>>> Thanks for bringing this up. I want to clarify this a bit.
>>>
>>> 1. Currently, the producer side logic is that if the partitioning key is
>>> not provided (i.e., it is null), the partitioner won't be called. We did
>>> that because we want to select a random and "available" partition to send
>>> messages so that if some partitions are temporarily unavailable (because
>>> of
>>> broker failures), messages can still be sent to other partitions. Doing
>>> this in the partitioner is difficult since the partitioner doesn't know
>>> which partitions are currently available (the DefaultEventHandler does).
>>>
>>> 2. As Joel said, the common use case in production is that there are many
>>> more producers than #partitions in a topic. In this case, sticking to a
>>> partition for a few minutes is not going to cause too much imbalance in
>>> the
>>> partitions and has the benefit of reducing the # of socket connections. My
>>> feeling is that this will benefit most production users. In fact, if one
>>> uses a hardware load balancer for producing data in 0.7, it behaves in
>>> exactly the same way (a producer will stick to a broker until the
>>> reconnect
>>> interval is reached).
>>>
>>> 3. It is true that If one is testing a topic with more than one partition
>>> (which is not the default value), this behavior can be a bit weird.
>>> However, I think it can be mitigated by running multiple test producer
>>> instances.
>>>
>>> 4. Someone reported in the mailing list that all data shows in only one
>>> partition after a few weeks. This is clearly not the expected behavior. We
>>> can take a closer look to see if this is real issue.
>>>
>>> Do you think these address your concerns?
>>>
>>> Thanks,
>>>
>>> Jun
>>>
>>>
>>>
>>> On Sat, Sep 14, 2013 at 11:18 AM, Joe Stein <crypt...@gmail.com> wrote:
>>>
>>> > How about creating a new class called RandomRefreshPartioner and copy
>>> the
>>> > DefaultPartitioner code to it and then revert the DefaultPartitioner
>>> code.
>>> >  I appreciate this is a one time burden for folks using the existing
>>> > 0.8-beta1 bumping into KAFKA-1017 in production having to switch to the
>>> > RandomRefreshPartioner and when folks deploy to production will have to
>>> > consider this property change.
>>> >
>>> > I make this suggestion keeping in mind the new folks that on board with
>>> > Kafka and when everyone is in development and testing mode for the first
>>> > time their experience would be as expected from how it would work in
>>> > production this way.  In dev/test when first using Kafka they won't
>>> have so
>>> > many producers for partitions but would look to parallelize their
>>> consumers
>>> > IMHO.
>>> >
>>> > The random broker change sounds like maybe a bigger change now this late
>>> > in the release cycle if we can accommodate folks trying Kafka for the
>>> first
>>> > time and through their development and testing along with full blown
>>> > production deploys.
>>> >
>>> > /*******************************************
>>> >  Joe Stein
>>> >  Founder, Principal Consultant
>>> >  Big Data Open Source Security LLC
>>> >  http://www.stealth.ly
>>> >  Twitter: @allthingshadoop
>>> > ********************************************/
>>> >
>>> >
>>> > On Sep 14, 2013, at 8:17 AM, Joel Koshy <jjkosh...@gmail.com> wrote:
>>> >
>>> > >>
>>> > >>
>>> > >> Thanks for bringing this up - it is definitely an important point to
>>> > >> discuss. The underlying issue of KAFKA-1017 was uncovered to some
>>> > degree by
>>> > >> the fact that in our deployment we did not significantly increase the
>>> > total
>>> > >> number of partitions over 0.7 - i.e., in 0.7 we had say four
>>> partitions
>>> > per
>>> > >> broker, now we are using (say) eight partitions across the cluster.
>>> So
>>> > with
>>> > >> random partitioning every producer would end up connecting to nearly
>>> > every
>>> > >> broker (unlike 0.7 in which we would connect to only one broker
>>> within
>>> > each
>>> > >> reconnect interval). In a production-scale deployment that causes the
>>> > high
>>> > >> number of connections that KAFKA-1017 addresses.
>>> > >>
>>> > >> You are right that the fix of sticking to one partition over the
>>> > metadata
>>> > >> refresh interval goes against true consumer parallelism, but this
>>> would
>>> > be
>>> > >> the case only if there are few producers. If you have a sizable
>>> number
>>> > of
>>> > >> producers on average all partitions would get uniform volumes of
>>> data.
>>> > >>
>>> > >> One tweak to KAFKA-1017 that I think is reasonable would be instead
>>> of
>>> > >> sticking to a random partition, stick to a random broker and send to
>>> > random
>>> > >> partitions within that broker. This would make the behavior closer to
>>> > 0.7
>>> > >> wrt number of connections and random partitioning provided the
>>> number of
>>> > >> partitions per broker is high enough, which is why I mentioned the
>>> > >> partition count (in our usage) in 0.7 vs 0.8 above. Thoughts?
>>> > >>
>>> > >> Joel
>>> > >>
>>> > >>
>>> > >> On Friday, September 13, 2013, Joe Stein wrote:
>>> > >>>
>>> > >>> First, let me apologize for not realizing/noticing this until today.
>>> >  One
>>> > >>> reason I left my last company was not being paid to work on Kafka
>>> nor
>>> > >> being
>>> > >> able to afford any time for a while to work on it. Now in my new gig
>>> > (just
>>> > >> wrapped up my first week, woo hoo) while I am still not "paid to
>>> work on
>>> > >> Kafka" I can afford some more time for it now and maybe in 6 months I
>>> > will
>>> > >> be able to hire folks to work on Kafka (with more and more time for
>>> > myself
>>> > >> to work on it too) while we also work on client projects (especially
>>> > Kafka
>>> > >> based ones).
>>> > >>
>>> > >> So, I understand about the changes that were made to fix open file
>>> > handles
>>> > >> and make the random pinning be timed based (with a very large default
>>> > >> time).  Got all that.
>>> > >>
>>> > >> But, doesn't this completely negate what has been communicated to the
>>> > >> community for a very long time and the expectation they have? I
>>> think it
>>> > >> does.
>>> > >>
>>> > >> The expected functionality for random partitioning is that "This can
>>> be
>>> > >> done in a round-robin fashion simply to balance load" and that the
>>> > >> "producer" does it for you.
>>> > >>
>>> > >> Isn't a primary use case for partitions to paralyze consumers? If so
>>> > then
>>> > >> the expectation would be that all consumers would be getting in
>>> parallel
>>> > >> equally in a "round robin fashion" the data that was produced for the
>>> > >> topic... simply to balance load...with the producer handling it and
>>> with
>>> > >> the client application not having to-do anything. This randomness
>>> > occurring
>>> > >> every 10 minutes can't balance load.
>>> > >>
>>> > >> If users are going to work around this anyways (as I would honestly
>>> do
>>> > too)
>>> > >> doing a pseudo semantic random key and essentially forcing real
>>> > randomness
>>> > >> to simply balance load to my consumers running in parallel would we
>>> > still
>>> > >> end up hitting the KAFKA-1017 problem anyways? If not then why can't
>>> we
>>> > >> just give users the functionality and put back the 3 lines of code 1)
>>> > >> if(key == null) 2)  random.nextInt(numPartitions) 3) else ... If we
>>> > would
>>> > >> bump into KAFKA-1017 by working around it then we have not really
>>> solved
>>> > >> the root cause problem and removing expected functionality for a
>>> corner
>>> > >> case that might have other work arounds and/or code changes to solve
>>> it
>>> > >> another way or am I still not getting something?
>>> > >>
>>> > >> Also, I was looking at testRandomPartitioner in AsyncProducerTest
>>> and I
>>> > >> don't see how this would ever fail, the assertion is always for
>>> > partitionId
>>> > >> == 0 and it should be checking that data is going to different
>>> > partitions
>>> > >> for a topic, right?
>>> > >>
>>> > >> Let me know, I think this is an important discussion and even if it
>>> > ends up
>>> > >> as telling the community to only use one partition that is all you
>>> need
>>> > and
>>> > >> partitions become our super columns (Apache Cassandra joke, its
>>> funny)
>>> > then
>>> > >> we manage and support it and that is just how it is but if partitions
>>> > are a
>>> > >> good thing and having multiple consumers scale in parrelel for a
>>> single
>>> > >> topic also good then we have to manage and support that.
>>> > >>
>>> > >> /*******************************************
>>> > >> Joe Stein
>>> > >> Founder, Principal Consultant
>>> > >> Big Data Open Source Security LLC
>>> > >> http://www.stealth.ly
>>> > >> Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
>>> > >> ********************************************/
>>> > >>
>>> >
>>>
>>
>>

Reply via email to