agreed, lets hold off until after 0.8 I will update the JIRA ticket I created with your feedback and options we can discuss it there and then deal with changes in 0.8.1 or 0.9 or such.
I will update the FAQ (should have time tomorrow unless someone else gets to it first) I think we should have it in there at the least, yes? /******************************************* Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop> ********************************************/ On Tue, Oct 1, 2013 at 11:26 AM, Jun Rao <jun...@gmail.com> wrote: > This proposal still doesn't address the following fundamental issue: The > random partitioner cannot select a random and AVAILABLE partition. > > So, we have the following two choices. > > 1. Stick with the current partitioner api. > Then, we have to pick one way to do random partitioning (when key is null). > The current behavior may not be very intuitive, but is one of the possible > behaviors in 0.7. > > 2. Change the partitioner api so that we can (1) be aware of available > partitions and (2) have pluggable partitioners for doing random > distribution. > > Option (2) is probably the right approach. However, it's a non-trivial > change. So, I am not sure if it should be done in 0.8 or not. > > Thanks, > > Jun > > > > On Mon, Sep 30, 2013 at 10:21 PM, Joe Stein <crypt...@gmail.com> wrote: > > > How about making UUID.randomUUID.toString() the default in KeyedMessage > > instead of null if not supplied > > > > def this(topic: String, message: V) = this(topic, > > UUID.randomUUID.toString(), > > message) > > > > and if you want the random refresh behavior then pass in "*" on the > > KeyedMessage construction which we can then later check for in > > defaulteventhandler > > > > val partition = > > if(key =="*") { > > > > we then throw NPE if key == null in KeyedMessage like we do topic > > > > I believe any null flow control logic is something to shy away from > > > > if this is wrong or too much or still not the best solution we could also > > hold over and just put this in the FAQ with the JIRA and let people know > > when they run into this and want to randomize in development / testing > and > > in many production situations where the producer count is not large > enough > > then they have to pass in their own continuous random key... if we can > get > > a consensus for what we want to-do with minimal changes then I think it > is > > important for 0.8 otherwise wait. > > > > On Sun, Sep 29, 2013 at 12:14 PM, Jun Rao <jun...@gmail.com> wrote: > > > > > The main issue is that if we do that, when key is null, we can only > > select > > > a random partition, but not a random and available partition, without > > > changing the partitioner api. Being able to do the latter is important > in > > > my opinion. For example, a user may choose the replication factor of a > > > topic to be 1. If a broker is down, it's much better to select > partitions > > > on other brokers for producing than losing messages. > > > > > > Thanks, > > > > > > Jun > > > > > > > > > > > > On Sat, Sep 28, 2013 at 9:51 PM, Guozhang Wang <wangg...@gmail.com> > > wrote: > > > > > > > I think Joe's suggesting that we can remove the checking logic for > > > > key==null in DefaultEventHandler, and do that in partitioner. > > > > > > > > One thing about this idea is any customized partitioner also has to > > > > consider key == null case then. > > > > > > > > Guozhang > > > > > > > > > > > > On Fri, Sep 27, 2013 at 9:12 PM, Jun Rao <jun...@gmail.com> wrote: > > > > > > > > > We have the following code in DefaultEventHandler: > > > > > > > > > > val partition = > > > > > if(key == null) { > > > > > // If the key is null, we don't really need a partitioner > > > > > // So we look up in the send partition cache for the topic > to > > > > > decide the target partition > > > > > val id = sendPartitionPerTopicCache.get(topic) > > > > > id match { > > > > > case Some(partitionId) => > > > > > // directly return the partitionId without checking > > > > > availability of the leader, > > > > > // since we want to postpone the failure until the send > > > > > operation anyways > > > > > partitionId > > > > > case None => > > > > > val availablePartitions = > > > > > topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined) > > > > > if (availablePartitions.isEmpty) > > > > > throw new LeaderNotAvailableException("No leader for > > any > > > > > partition in topic " + topic) > > > > > val index = > > Utils.abs(partitionCounter.getAndIncrement()) % > > > > > availablePartitions.size > > > > > val partitionId = > availablePartitions(index).partitionId > > > > > sendPartitionPerTopicCache.put(topic, partitionId) > > > > > partitionId > > > > > } > > > > > } else > > > > > partitioner.partition(key, numPartitions) > > > > > > > > > > So, if key is null, the partitioner is ignored. > > > > > > > > > > Thanks, > > > > > > > > > > Jun > > > > > > > > > > > > > > > On Fri, Sep 27, 2013 at 10:30 AM, Joe Stein <crypt...@gmail.com> > > > wrote: > > > > > > > > > > > hmmm, yeah, on I don't want todo that ... if we don't have to. > > > > > > > > > > > > What if the DefaultPartitioner code looked like this instead =8^) > > > > > > > > > > > > private class DefaultPartitioner[T](props: VerifiableProperties = > > > null) > > > > > > extends Partitioner[T] { > > > > > > > > > > > > def partition(key: T, numPartitions: Int): Int = { > > > > > > if (key == null) { > > > > > > import java.util.UUID > > > > > > Utils.abs(UUID.randomUUID.toString()) % numPartitions > > > > > > } > > > > > > else { > > > > > > Utils.abs(key.hashCode) % numPartitions > > > > > > } > > > > > > } > > > > > > } > > > > > > > > > > > > > > > > > > Again the goal here is the simple (often initial and dev side up > > and > > > > > > running out of the box) so folks don't have to randomize the keys > > > > > > themselves to get this effect > > > > > > > > > > > > We would still have to also have RandomMetaRefreshPartitioner > class > > > > > right? > > > > > > so null keys there would wait for the time refresh for that use > > case, > > > > > > right? > > > > > > > > > > > > private class RandomMetaRefreshPartitioner[T](props: > > > > > VerifiableProperties = > > > > > > null) extends Partitioner[T] { > > > > > > > > > > > > def partition(key: T, numPartitions: Int): Int = { > > > > > > Utils.abs(key.hashCode) % numPartitions > > > > > > } > > > > > > } > > > > > > > > > > > > > > > > > > On Fri, Sep 27, 2013 at 1:10 PM, Jun Rao <jun...@gmail.com> > wrote: > > > > > > > > > > > > > However, currently, if key is null, the partitioner is not even > > > > called. > > > > > > Do > > > > > > > you want to change DefaultEventHandler too? > > > > > > > > > > > > > > This also doesn't allow the partitioner to select a random and > > > > > available > > > > > > > partition, which in my opinion is more important than making > > > > partitions > > > > > > > perfectly evenly balanced. > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > > > > > > On Fri, Sep 27, 2013 at 9:53 AM, Joe Stein <crypt...@gmail.com > > > > > > wrote: > > > > > > > > > > > > > > > What I was proposing was two fold > > > > > > > > > > > > > > > > 1) revert the DefaultPartitioner class > > > > > > > > > > > > > > > > then > > > > > > > > > > > > > > > > 2) create a new partitioner that folks could use (like at > > > LinkedIn > > > > > you > > > > > > > > would use this partitioner instead) in ProducerConfig > > > > > > > > > > > > > > > > private class RandomRefreshTimPartitioner[T](props: > > > > > > VerifiableProperties > > > > > > > = > > > > > > > > null) extends Partitioner[T] { > > > > > > > > private val random = new java.util.Random > > > > > > > > > > > > > > > > def partition(key: T, numPartitions: Int): Int = { > > > > > > > > Utils.abs(key.hashCode) % numPartitions > > > > > > > > } > > > > > > > > } > > > > > > > > > > > > > > > > /******************************************* > > > > > > > > Joe Stein > > > > > > > > Founder, Principal Consultant > > > > > > > > Big Data Open Source Security LLC > > > > > > > > http://www.stealth.ly > > > > > > > > Twitter: @allthingshadoop < > > > http://www.twitter.com/allthingshadoop > > > > > > > > > > > > > ********************************************/ > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Sep 27, 2013 at 12:46 PM, Jun Rao <jun...@gmail.com> > > > > wrote: > > > > > > > > > > > > > > > > > Joe, > > > > > > > > > > > > > > > > > > Not sure I fully understand your propose. Do you want to > put > > > the > > > > > > random > > > > > > > > > partitioning selection logic (for messages without a key) > in > > > the > > > > > > > > > partitioner without changing the partitioner api? That's > > > > difficult. > > > > > > The > > > > > > > > > issue is that in the current partitioner api, we don't know > > > which > > > > > > > > > partitions are available. For example, if we have > replication > > > > > factor > > > > > > 1 > > > > > > > > on a > > > > > > > > > topic and a broker is down, the best thing to do for the > > random > > > > > > > > partitioner > > > > > > > > > is to select an available partition at random (assuming > more > > > > than 1 > > > > > > > > > partition is created for the topic). > > > > > > > > > > > > > > > > > > Another option is to revert the logic in the random > > > partitioning > > > > > > > > selection > > > > > > > > > logic in DefaultEventHandler to select a random partition > per > > > > batch > > > > > > of > > > > > > > > > events (instead of sticking with a random partition for > some > > > > > > configured > > > > > > > > > amount of time). This is doable, but I am not sure if it's > > that > > > > > > > critical. > > > > > > > > > Since this is one of the two possible behaviors in 0.7, > it's > > > hard > > > > > to > > > > > > > say > > > > > > > > > whether people will be surprised by that. Preserving both > > > > behaviors > > > > > > in > > > > > > > > 0.7 > > > > > > > > > will require changing the partitioner api. This is more > work > > > and > > > > I > > > > > > > agree > > > > > > > > > it's better to do this post 0.8.0 final. > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Sep 27, 2013 at 9:24 AM, Joe Stein < > > crypt...@gmail.com > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Jun, can we hold this extra change over for 0.8.1 and > just > > go > > > > > with > > > > > > > > > > reverting where we were before for the default with a new > > > > > partition > > > > > > > for > > > > > > > > > > meta refresh and support both? > > > > > > > > > > > > > > > > > > > > I am not sure I entirely understand why someone would > need > > > the > > > > > > extra > > > > > > > > > > functionality you are talking about which sounds cool > > > though... > > > > > > > adding > > > > > > > > it > > > > > > > > > > to the API (especially now) without people using it may > > just > > > > make > > > > > > > folks > > > > > > > > > ask > > > > > > > > > > more questions and maybe not use it ... IDK ... but in > any > > > case > > > > > we > > > > > > > can > > > > > > > > > work > > > > > > > > > > on buttoning up 0.8 and shipping just the change for two > > > > > > partitioners > > > > > > > > > > https://issues.apache.org/jira/browse/KAFKA-1067 and > > > circling > > > > > back > > > > > > > if > > > > > > > > we > > > > > > > > > > wanted on this extra item (including the discussion) to > > 0.8.1 > > > > or > > > > > > > > greater? > > > > > > > > > > I am always of the mind of reduce complexity unless that > > > > > > complexity > > > > > > > is > > > > > > > > > in > > > > > > > > > > fact better than not having it. > > > > > > > > > > > > > > > > > > > > On Sun, Sep 22, 2013 at 8:56 PM, Jun Rao < > jun...@gmail.com > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > It's reasonable to make the behavior of random > producers > > > > > > > customizable > > > > > > > > > > > through a pluggable partitioner. So, if one doesn't > care > > > > about > > > > > # > > > > > > of > > > > > > > > > > socket > > > > > > > > > > > connections, one can choose to select a random > partition > > on > > > > > every > > > > > > > > send. > > > > > > > > > > If > > > > > > > > > > > one does have many producers, one can choose to > > > periodically > > > > > > > select a > > > > > > > > > > > random partition. To support this, the partitioner api > > > needs > > > > to > > > > > > be > > > > > > > > > > changed > > > > > > > > > > > though. > > > > > > > > > > > > > > > > > > > > > > Instead of > > > > > > > > > > > def partition(key: T, numPartitions: Int): Int > > > > > > > > > > > > > > > > > > > > > > we probably need something like the following: > > > > > > > > > > > def partition(key: T, numPartitions: Int, > > > > > > availablePartitionList: > > > > > > > > > > > List[Int], isNewBatch: boolean, isRefreshMetadata: > > > boolean): > > > > > Int > > > > > > > > > > > > > > > > > > > > > > availablePartitionList: allows us to select only > > partitions > > > > > that > > > > > > > are > > > > > > > > > > > available. > > > > > > > > > > > isNewBatch: allows us to select the same partition for > > all > > > > > > messages > > > > > > > > in > > > > > > > > > a > > > > > > > > > > > given batch in the async mode. > > > > > > > > > > > isRefreshMedatadata: allows us to implement the policy > of > > > > > > switching > > > > > > > > to > > > > > > > > > a > > > > > > > > > > > random partition periodically. > > > > > > > > > > > > > > > > > > > > > > This will make the partitioner api a bit more > > complicated. > > > > > > However, > > > > > > > > it > > > > > > > > > > does > > > > > > > > > > > provide enough information for customization. > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Sep 18, 2013 at 4:23 PM, Joe Stein < > > > > crypt...@gmail.com > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > Sounds good, I will create a JIRA and upload a patch. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > /******************************************* > > > > > > > > > > > > Joe Stein > > > > > > > > > > > > Founder, Principal Consultant > > > > > > > > > > > > Big Data Open Source Security LLC > > > > > > > > > > > > http://www.stealth.ly > > > > > > > > > > > > Twitter: @allthingshadoop > > > > > > > > > > > > ********************************************/ > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Sep 17, 2013, at 1:19 PM, Joel Koshy < > > > > jjkosh...@gmail.com > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > I agree that minimizing the number of producer > > > > connections > > > > > > > (while > > > > > > > > > > > > > being a good thing) is really required in very > large > > > > > > production > > > > > > > > > > > > > deployments, and the net-effect of the existing > > change > > > is > > > > > > > > > > > > > counter-intuitive to users who expect an immediate > > even > > > > > > > > > distribution > > > > > > > > > > > > > across _all_ partitions of the topic. > > > > > > > > > > > > > > > > > > > > > > > > > > However, I don't think it is a hack because it is > > > almost > > > > > > > exactly > > > > > > > > > the > > > > > > > > > > > > > same behavior as 0.7 in one of its modes. The 0.7 > > > > producer > > > > > > > > (which I > > > > > > > > > > > > > think was even more confusing) had three modes: > > > > > > > > > > > > > i) ZK send > > > > > > > > > > > > > ii) Config send(a): static list of > > > > > > > > broker1:port1,broker2:port2,etc. > > > > > > > > > > > > > iii) Config send(b): static list of a > > > hardwareVIP:VIPport > > > > > > > > > > > > > > > > > > > > > > > > > > (i) and (ii) would achieve even distribution. (iii) > > > would > > > > > > > > > effectively > > > > > > > > > > > > > select one broker and distribute to partitions on > > that > > > > > broker > > > > > > > > > within > > > > > > > > > > > > > each reconnect interval. (iii) is very similar to > > what > > > we > > > > > now > > > > > > > do > > > > > > > > in > > > > > > > > > > > > > 0.8. (Although we stick to one partition during > each > > > > > metadata > > > > > > > > > refresh > > > > > > > > > > > > > interval that can be changed to stick to one broker > > and > > > > > > > > distribute > > > > > > > > > > > > > across partitions on that broker). > > > > > > > > > > > > > > > > > > > > > > > > > > At the same time, I agree with Joe's suggestion > that > > we > > > > > > should > > > > > > > > keep > > > > > > > > > > > > > the more intuitive pre-KAFKA-1017 behavior as the > > > default > > > > > and > > > > > > > > move > > > > > > > > > > the > > > > > > > > > > > > > change in KAFKA-1017 to a more specific partitioner > > > > > > > > implementation. > > > > > > > > > > > > > > > > > > > > > > > > > > Joel > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Sun, Sep 15, 2013 at 8:44 AM, Jay Kreps < > > > > > > > jay.kr...@gmail.com> > > > > > > > > > > > wrote: > > > > > > > > > > > > >> Let me ask another question which I think is more > > > > > objective. > > > > > > > > Let's > > > > > > > > > > say > > > > > > > > > > > > 100 > > > > > > > > > > > > >> random, smart infrastructure specialists try > Kafka, > > of > > > > > these > > > > > > > 100 > > > > > > > > > how > > > > > > > > > > > > many > > > > > > > > > > > > >> do you believe will > > > > > > > > > > > > >> 1. Say that this behavior is what they expected to > > > > happen? > > > > > > > > > > > > >> 2. Be happy with this behavior? > > > > > > > > > > > > >> I am not being facetious I am genuinely looking > for > > a > > > > > > > numerical > > > > > > > > > > > > estimate. I > > > > > > > > > > > > >> am trying to figure out if nobody thought about > this > > > or > > > > if > > > > > > my > > > > > > > > > > estimate > > > > > > > > > > > > is > > > > > > > > > > > > >> just really different. For what it is worth my > > > estimate > > > > > is 0 > > > > > > > > and 5 > > > > > > > > > > > > >> respectively. > > > > > > > > > > > > >> > > > > > > > > > > > > >> This would be fine expect that we changed it from > > the > > > > good > > > > > > > > > behavior > > > > > > > > > > to > > > > > > > > > > > > the > > > > > > > > > > > > >> bad behavior to fix an issue that probably only we > > > have. > > > > > > > > > > > > >> > > > > > > > > > > > > >> -Jay > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> On Sun, Sep 15, 2013 at 8:37 AM, Jay Kreps < > > > > > > > jay.kr...@gmail.com > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > >> > > > > > > > > > > > > >>> I just took a look at this change. I agree with > > Joe, > > > > not > > > > > to > > > > > > > put > > > > > > > > > to > > > > > > > > > > > > fine a > > > > > > > > > > > > >>> point on it, but this is a confusing hack. > > > > > > > > > > > > >>> > > > > > > > > > > > > >>> Jun, I don't think wanting to minimizing the > number > > > of > > > > > TCP > > > > > > > > > > > connections > > > > > > > > > > > > is > > > > > > > > > > > > >>> going to be a very common need for people with > less > > > > than > > > > > > 10k > > > > > > > > > > > > producers. I > > > > > > > > > > > > >>> also don't think people are going to get very > good > > > load > > > > > > > > balancing > > > > > > > > > > out > > > > > > > > > > > > of > > > > > > > > > > > > >>> this because most people don't have a ton of > > > > producers. I > > > > > > > think > > > > > > > > > > > > instead we > > > > > > > > > > > > >>> will spend the next year explaining this behavior > > > which > > > > > 99% > > > > > > > of > > > > > > > > > > people > > > > > > > > > > > > will > > > > > > > > > > > > >>> think is a bug (because it is crazy, > non-intuitive, > > > and > > > > > > > breaks > > > > > > > > > > their > > > > > > > > > > > > usage). > > > > > > > > > > > > >>> > > > > > > > > > > > > >>> Why was this done by adding special default > > behavior > > > in > > > > > the > > > > > > > > null > > > > > > > > > > key > > > > > > > > > > > > case > > > > > > > > > > > > >>> instead of as a partitioner? The argument that > the > > > > > > > partitioner > > > > > > > > > > > > interface > > > > > > > > > > > > >>> doesn't have sufficient information to choose a > > > > partition > > > > > > is > > > > > > > > not > > > > > > > > > a > > > > > > > > > > > good > > > > > > > > > > > > >>> argument for hacking in changes to the default, > it > > is > > > > an > > > > > > > > argument > > > > > > > > > > > for * > > > > > > > > > > > > >>> improving* the partitioner interface. > > > > > > > > > > > > >>> > > > > > > > > > > > > >>> The whole point of a partitioner interface is to > > make > > > > it > > > > > > > > possible > > > > > > > > > > to > > > > > > > > > > > > plug > > > > > > > > > > > > >>> in non-standard behavior like this, right? > > > > > > > > > > > > >>> > > > > > > > > > > > > >>> -Jay > > > > > > > > > > > > >>> > > > > > > > > > > > > >>> > > > > > > > > > > > > >>> On Sat, Sep 14, 2013 at 8:15 PM, Jun Rao < > > > > > jun...@gmail.com > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > >>> > > > > > > > > > > > > >>>> Joe, > > > > > > > > > > > > >>>> > > > > > > > > > > > > >>>> Thanks for bringing this up. I want to clarify > > this > > > a > > > > > bit. > > > > > > > > > > > > >>>> > > > > > > > > > > > > >>>> 1. Currently, the producer side logic is that if > > the > > > > > > > > > partitioning > > > > > > > > > > > key > > > > > > > > > > > > is > > > > > > > > > > > > >>>> not provided (i.e., it is null), the partitioner > > > won't > > > > > be > > > > > > > > > called. > > > > > > > > > > We > > > > > > > > > > > > did > > > > > > > > > > > > >>>> that because we want to select a random and > > > > "available" > > > > > > > > > partition > > > > > > > > > > to > > > > > > > > > > > > send > > > > > > > > > > > > >>>> messages so that if some partitions are > > temporarily > > > > > > > > unavailable > > > > > > > > > > > > (because > > > > > > > > > > > > >>>> of > > > > > > > > > > > > >>>> broker failures), messages can still be sent to > > > other > > > > > > > > > partitions. > > > > > > > > > > > > Doing > > > > > > > > > > > > >>>> this in the partitioner is difficult since the > > > > > partitioner > > > > > > > > > doesn't > > > > > > > > > > > > know > > > > > > > > > > > > >>>> which partitions are currently available (the > > > > > > > > > DefaultEventHandler > > > > > > > > > > > > does). > > > > > > > > > > > > >>>> > > > > > > > > > > > > >>>> 2. As Joel said, the common use case in > production > > > is > > > > > that > > > > > > > > there > > > > > > > > > > are > > > > > > > > > > > > many > > > > > > > > > > > > >>>> more producers than #partitions in a topic. In > > this > > > > > case, > > > > > > > > > sticking > > > > > > > > > > > to > > > > > > > > > > > > a > > > > > > > > > > > > >>>> partition for a few minutes is not going to > cause > > > too > > > > > much > > > > > > > > > > imbalance > > > > > > > > > > > > in > > > > > > > > > > > > >>>> the > > > > > > > > > > > > >>>> partitions and has the benefit of reducing the # > > of > > > > > socket > > > > > > > > > > > > connections. My > > > > > > > > > > > > >>>> feeling is that this will benefit most > production > > > > users. > > > > > > In > > > > > > > > > fact, > > > > > > > > > > if > > > > > > > > > > > > one > > > > > > > > > > > > >>>> uses a hardware load balancer for producing data > > in > > > > 0.7, > > > > > > it > > > > > > > > > > behaves > > > > > > > > > > > in > > > > > > > > > > > > >>>> exactly the same way (a producer will stick to a > > > > broker > > > > > > > until > > > > > > > > > the > > > > > > > > > > > > >>>> reconnect > > > > > > > > > > > > >>>> interval is reached). > > > > > > > > > > > > >>>> > > > > > > > > > > > > >>>> 3. It is true that If one is testing a topic > with > > > more > > > > > > than > > > > > > > > one > > > > > > > > > > > > partition > > > > > > > > > > > > >>>> (which is not the default value), this behavior > > can > > > > be a > > > > > > bit > > > > > > > > > > weird. > > > > > > > > > > > > >>>> However, I think it can be mitigated by running > > > > multiple > > > > > > > test > > > > > > > > > > > producer > > > > > > > > > > > > >>>> instances. > > > > > > > > > > > > >>>> > > > > > > > > > > > > >>>> 4. Someone reported in the mailing list that all > > > data > > > > > > shows > > > > > > > in > > > > > > > > > > only > > > > > > > > > > > > one > > > > > > > > > > > > >>>> partition after a few weeks. This is clearly not > > the > > > > > > > expected > > > > > > > > > > > > behavior. We > > > > > > > > > > > > >>>> can take a closer look to see if this is real > > issue. > > > > > > > > > > > > >>>> > > > > > > > > > > > > >>>> Do you think these address your concerns? > > > > > > > > > > > > >>>> > > > > > > > > > > > > >>>> Thanks, > > > > > > > > > > > > >>>> > > > > > > > > > > > > >>>> Jun > > > > > > > > > > > > >>>> > > > > > > > > > > > > >>>> > > > > > > > > > > > > >>>> > > > > > > > > > > > > >>>> On Sat, Sep 14, 2013 at 11:18 AM, Joe Stein < > > > > > > > > crypt...@gmail.com > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > >>>> > > > > > > > > > > > > >>>>> How about creating a new class called > > > > > > > RandomRefreshPartioner > > > > > > > > > and > > > > > > > > > > > copy > > > > > > > > > > > > >>>> the > > > > > > > > > > > > >>>>> DefaultPartitioner code to it and then revert > the > > > > > > > > > > > DefaultPartitioner > > > > > > > > > > > > >>>> code. > > > > > > > > > > > > >>>>> I appreciate this is a one time burden for > folks > > > > using > > > > > > the > > > > > > > > > > existing > > > > > > > > > > > > >>>>> 0.8-beta1 bumping into KAFKA-1017 in production > > > > having > > > > > to > > > > > > > > > switch > > > > > > > > > > to > > > > > > > > > > > > the > > > > > > > > > > > > >>>>> RandomRefreshPartioner and when folks deploy to > > > > > > production > > > > > > > > will > > > > > > > > > > > have > > > > > > > > > > > > to > > > > > > > > > > > > >>>>> consider this property change. > > > > > > > > > > > > >>>>> > > > > > > > > > > > > >>>>> I make this suggestion keeping in mind the new > > > folks > > > > > that > > > > > > > on > > > > > > > > > > board > > > > > > > > > > > > with > > > > > > > > > > > > >>>>> Kafka and when everyone is in development and > > > testing > > > > > > mode > > > > > > > > for > > > > > > > > > > the > > > > > > > > > > > > first > > > > > > > > > > > > >>>>> time their experience would be as expected from > > how > > > > it > > > > > > > would > > > > > > > > > work > > > > > > > > > > > in > > > > > > > > > > > > >>>>> production this way. In dev/test when first > > using > > > > > Kafka > > > > > > > they > > > > > > > > > > won't > > > > > > > > > > > > >>>> have so > > > > > > > > > > > > >>>>> many producers for partitions but would look to > > > > > > parallelize > > > > > > > > > their > > > > > > > > > > > > >>>> consumers > > > > > > > > > > > > >>>>> IMHO. > > > > > > > > > > > > >>>>> > > > > > > > > > > > > >>>>> The random broker change sounds like maybe a > > bigger > > > > > > change > > > > > > > > now > > > > > > > > > > this > > > > > > > > > > > > late > > > > > > > > > > > > >>>>> in the release cycle if we can accommodate > folks > > > > trying > > > > > > > Kafka > > > > > > > > > for > > > > > > > > > > > the > > > > > > > > > > > > >>>> first > > > > > > > > > > > > >>>>> time and through their development and testing > > > along > > > > > with > > > > > > > > full > > > > > > > > > > > blown > > > > > > > > > > > > >>>>> production deploys. > > > > > > > > > > > > >>>>> > > > > > > > > > > > > >>>>> /******************************************* > > > > > > > > > > > > >>>>> Joe Stein > > > > > > > > > > > > >>>>> Founder, Principal Consultant > > > > > > > > > > > > >>>>> Big Data Open Source Security LLC > > > > > > > > > > > > >>>>> http://www.stealth.ly > > > > > > > > > > > > >>>>> Twitter: @allthingshadoop > > > > > > > > > > > > >>>>> ********************************************/ > > > > > > > > > > > > >>>>> > > > > > > > > > > > > >>>>> > > > > > > > > > > > > >>>>> On Sep 14, 2013, at 8:17 AM, Joel Koshy < > > > > > > > jjkosh...@gmail.com > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > >>>>> > > > > > > > > > > > > >>>>>>> > > > > > > > > > > > > >>>>>>> > > > > > > > > > > > > >>>>>>> Thanks for bringing this up - it is > definitely > > an > > > > > > > important > > > > > > > > > > point > > > > > > > > > > > > to > > > > > > > > > > > > >>>>>>> discuss. The underlying issue of KAFKA-1017 > was > > > > > > uncovered > > > > > > > > to > > > > > > > > > > some > > > > > > > > > > > > >>>>> degree by > > > > > > > > > > > > >>>>>>> the fact that in our deployment we did not > > > > > > significantly > > > > > > > > > > increase > > > > > > > > > > > > the > > > > > > > > > > > > >>>>> total > > > > > > > > > > > > >>>>>>> number of partitions over 0.7 - i.e., in 0.7 > we > > > had > > > > > say > > > > > > > > four > > > > > > > > > > > > >>>> partitions > > > > > > > > > > > > >>>>> per > > > > > > > > > > > > >>>>>>> broker, now we are using (say) eight > partitions > > > > > across > > > > > > > the > > > > > > > > > > > cluster. > > > > > > > > > > > > >>>> So > > > > > > > > > > > > >>>>> with > > > > > > > > > > > > >>>>>>> random partitioning every producer would end > up > > > > > > > connecting > > > > > > > > to > > > > > > > > > > > > nearly > > > > > > > > > > > > >>>>> every > > > > > > > > > > > > >>>>>>> broker (unlike 0.7 in which we would connect > to > > > > only > > > > > > one > > > > > > > > > broker > > > > > > > > > > > > >>>> within > > > > > > > > > > > > >>>>> each > > > > > > > > > > > > >>>>>>> reconnect interval). In a production-scale > > > > deployment > > > > > > > that > > > > > > > > > > causes > > > > > > > > > > > > the > > > > > > > > > > > > >>>>> high > > > > > > > > > > > > >>>>>>> number of connections that KAFKA-1017 > > addresses. > > > > > > > > > > > > >>>>>>> > > > > > > > > > > > > >>>>>>> You are right that the fix of sticking to one > > > > > partition > > > > > > > > over > > > > > > > > > > the > > > > > > > > > > > > >>>>> metadata > > > > > > > > > > > > >>>>>>> refresh interval goes against true consumer > > > > > > parallelism, > > > > > > > > but > > > > > > > > > > this > > > > > > > > > > > > >>>> would > > > > > > > > > > > > >>>>> be > > > > > > > > > > > > >>>>>>> the case only if there are few producers. If > > you > > > > > have a > > > > > > > > > sizable > > > > > > > > > > > > >>>> number > > > > > > > > > > > > >>>>> of > > > > > > > > > > > > >>>>>>> producers on average all partitions would get > > > > uniform > > > > > > > > volumes > > > > > > > > > > of > > > > > > > > > > > > >>>> data. > > > > > > > > > > > > >>>>>>> > > > > > > > > > > > > >>>>>>> One tweak to KAFKA-1017 that I think is > > > reasonable > > > > > > would > > > > > > > be > > > > > > > > > > > instead > > > > > > > > > > > > >>>> of > > > > > > > > > > > > >>>>>>> sticking to a random partition, stick to a > > random > > > > > > broker > > > > > > > > and > > > > > > > > > > send > > > > > > > > > > > > to > > > > > > > > > > > > >>>>> random > > > > > > > > > > > > >>>>>>> partitions within that broker. This would > make > > > the > > > > > > > behavior > > > > > > > > > > > closer > > > > > > > > > > > > to > > > > > > > > > > > > >>>>> 0.7 > > > > > > > > > > > > >>>>>>> wrt number of connections and random > > partitioning > > > > > > > provided > > > > > > > > > the > > > > > > > > > > > > >>>> number of > > > > > > > > > > > > >>>>>>> partitions per broker is high enough, which > is > > > why > > > > I > > > > > > > > > mentioned > > > > > > > > > > > the > > > > > > > > > > > > >>>>>>> partition count (in our usage) in 0.7 vs 0.8 > > > above. > > > > > > > > Thoughts? > > > > > > > > > > > > >>>>>>> > > > > > > > > > > > > >>>>>>> Joel > > > > > > > > > > > > >>>>>>> > > > > > > > > > > > > >>>>>>> > > > > > > > > > > > > >>>>>>> On Friday, September 13, 2013, Joe Stein > wrote: > > > > > > > > > > > > >>>>>>>> > > > > > > > > > > > > >>>>>>>> First, let me apologize for not > > > realizing/noticing > > > > > > this > > > > > > > > > until > > > > > > > > > > > > today. > > > > > > > > > > > > >>>>> One > > > > > > > > > > > > >>>>>>>> reason I left my last company was not being > > paid > > > > to > > > > > > work > > > > > > > > on > > > > > > > > > > > Kafka > > > > > > > > > > > > >>>> nor > > > > > > > > > > > > >>>>>>> being > > > > > > > > > > > > >>>>>>> able to afford any time for a while to work > on > > > it. > > > > > Now > > > > > > in > > > > > > > > my > > > > > > > > > > new > > > > > > > > > > > > gig > > > > > > > > > > > > >>>>> (just > > > > > > > > > > > > >>>>>>> wrapped up my first week, woo hoo) while I am > > > still > > > > > not > > > > > > > > "paid > > > > > > > > > > to > > > > > > > > > > > > >>>> work on > > > > > > > > > > > > >>>>>>> Kafka" I can afford some more time for it now > > and > > > > > maybe > > > > > > > in > > > > > > > > 6 > > > > > > > > > > > > months I > > > > > > > > > > > > >>>>> will > > > > > > > > > > > > >>>>>>> be able to hire folks to work on Kafka (with > > more > > > > and > > > > > > > more > > > > > > > > > time > > > > > > > > > > > for > > > > > > > > > > > > >>>>> myself > > > > > > > > > > > > >>>>>>> to work on it too) while we also work on > client > > > > > > projects > > > > > > > > > > > > (especially > > > > > > > > > > > > >>>>> Kafka > > > > > > > > > > > > >>>>>>> based ones). > > > > > > > > > > > > >>>>>>> > > > > > > > > > > > > >>>>>>> So, I understand about the changes that were > > made > > > > to > > > > > > fix > > > > > > > > open > > > > > > > > > > > file > > > > > > > > > > > > >>>>> handles > > > > > > > > > > > > >>>>>>> and make the random pinning be timed based > > (with > > > a > > > > > very > > > > > > > > large > > > > > > > > > > > > default > > > > > > > > > > > > >>>>>>> time). Got all that. > > > > > > > > > > > > >>>>>>> > > > > > > > > > > > > >>>>>>> But, doesn't this completely negate what has > > been > > > > > > > > > communicated > > > > > > > > > > to > > > > > > > > > > > > the > > > > > > > > > > > > >>>>>>> community for a very long time and the > > > expectation > > > > > they > > > > > > > > > have? I > > > > > > > > > > > > >>>> think it > > > > > > > > > > > > >>>>>>> does. > > > > > > > > > > > > >>>>>>> > > > > > > > > > > > > >>>>>>> The expected functionality for random > > > partitioning > > > > is > > > > > > > that > > > > > > > > > > "This > > > > > > > > > > > > can > > > > > > > > > > > > >>>> be > > > > > > > > > > > > >>>>>>> done in a round-robin fashion simply to > balance > > > > load" > > > > > > and > > > > > > > > > that > > > > > > > > > > > the > > > > > > > > > > > > >>>>>>> "producer" does it for you. > > > > > > > > > > > > >>>>>>> > > > > > > > > > > > > >>>>>>> Isn't a primary use case for partitions to > > > paralyze > > > > > > > > > consumers? > > > > > > > > > > If > > > > > > > > > > > > so > > > > > > > > > > > > >>>>> then > > > > > > > > > > > > >>>>>>> the expectation would be that all consumers > > would > > > > be > > > > > > > > getting > > > > > > > > > in > > > > > > > > > > > > >>>> parallel > > > > > > > > > > > > >>>>>>> equally in a "round robin fashion" the data > > that > > > > was > > > > > > > > produced > > > > > > > > > > for > > > > > > > > > > > > the > > > > > > > > > > > > >>>>>>> topic... simply to balance load...with the > > > producer > > > > > > > > handling > > > > > > > > > it > > > > > > > > > > > and > > > > > > > > > > > > >>>> with > > > > > > > > > > > > >>>>>>> the client application not having to-do > > anything. > > > > > This > > > > > > > > > > randomness > > > > > > > > > > > > >>>>> occurring > > > > > > > > > > > > >>>>>>> every 10 minutes can't balance load. > > > > > > > > > > > > >>>>>>> > > > > > > > > > > > > >>>>>>> If users are going to work around this > anyways > > > (as > > > > I > > > > > > > would > > > > > > > > > > > honestly > > > > > > > > > > > > >>>> do > > > > > > > > > > > > >>>>> too) > > > > > > > > > > > > >>>>>>> doing a pseudo semantic random key and > > > essentially > > > > > > > forcing > > > > > > > > > real > > > > > > > > > > > > >>>>> randomness > > > > > > > > > > > > >>>>>>> to simply balance load to my consumers > running > > in > > > > > > > parallel > > > > > > > > > > would > > > > > > > > > > > we > > > > > > > > > > > > >>>>> still > > > > > > > > > > > > >>>>>>> end up hitting the KAFKA-1017 problem > anyways? > > If > > > > not > > > > > > > then > > > > > > > > > why > > > > > > > > > > > > can't > > > > > > > > > > > > >>>> we > > > > > > > > > > > > >>>>>>> just give users the functionality and put > back > > > the > > > > 3 > > > > > > > lines > > > > > > > > of > > > > > > > > > > > code > > > > > > > > > > > > 1) > > > > > > > > > > > > >>>>>>> if(key == null) 2) > > random.nextInt(numPartitions) > > > > 3) > > > > > > else > > > > > > > > ... > > > > > > > > > > If > > > > > > > > > > > we > > > > > > > > > > > > >>>>> would > > > > > > > > > > > > >>>>>>> bump into KAFKA-1017 by working around it > then > > we > > > > > have > > > > > > > not > > > > > > > > > > really > > > > > > > > > > > > >>>> solved > > > > > > > > > > > > >>>>>>> the root cause problem and removing expected > > > > > > > functionality > > > > > > > > > for > > > > > > > > > > a > > > > > > > > > > > > >>>> corner > > > > > > > > > > > > >>>>>>> case that might have other work arounds > and/or > > > code > > > > > > > changes > > > > > > > > > to > > > > > > > > > > > > solve > > > > > > > > > > > > >>>> it > > > > > > > > > > > > >>>>>>> another way or am I still not getting > > something? > > > > > > > > > > > > >>>>>>> > > > > > > > > > > > > >>>>>>> Also, I was looking at testRandomPartitioner > in > > > > > > > > > > AsyncProducerTest > > > > > > > > > > > > >>>> and I > > > > > > > > > > > > >>>>>>> don't see how this would ever fail, the > > assertion > > > > is > > > > > > > always > > > > > > > > > for > > > > > > > > > > > > >>>>> partitionId > > > > > > > > > > > > >>>>>>> == 0 and it should be checking that data is > > going > > > > to > > > > > > > > > different > > > > > > > > > > > > >>>>> partitions > > > > > > > > > > > > >>>>>>> for a topic, right? > > > > > > > > > > > > >>>>>>> > > > > > > > > > > > > >>>>>>> Let me know, I think this is an important > > > > discussion > > > > > > and > > > > > > > > even > > > > > > > > > > if > > > > > > > > > > > it > > > > > > > > > > > > >>>>> ends up > > > > > > > > > > > > >>>>>>> as telling the community to only use one > > > partition > > > > > that > > > > > > > is > > > > > > > > > all > > > > > > > > > > > you > > > > > > > > > > > > >>>> need > > > > > > > > > > > > >>>>> and > > > > > > > > > > > > >>>>>>> partitions become our super columns (Apache > > > > Cassandra > > > > > > > joke, > > > > > > > > > its > > > > > > > > > > > > >>>> funny) > > > > > > > > > > > > >>>>> then > > > > > > > > > > > > >>>>>>> we manage and support it and that is just how > > it > > > is > > > > > but > > > > > > > if > > > > > > > > > > > > partitions > > > > > > > > > > > > >>>>> are a > > > > > > > > > > > > >>>>>>> good thing and having multiple consumers > scale > > in > > > > > > > parrelel > > > > > > > > > for > > > > > > > > > > a > > > > > > > > > > > > >>>> single > > > > > > > > > > > > >>>>>>> topic also good then we have to manage and > > > support > > > > > > that. > > > > > > > > > > > > >>>>>>> > > > > > > > > > > > > >>>>>>> /******************************************* > > > > > > > > > > > > >>>>>>> Joe Stein > > > > > > > > > > > > >>>>>>> Founder, Principal Consultant > > > > > > > > > > > > >>>>>>> Big Data Open Source Security LLC > > > > > > > > > > > > >>>>>>> http://www.stealth.ly > > > > > > > > > > > > >>>>>>> Twitter: @allthingshadoop < > > > > > > > > > > > http://www.twitter.com/allthingshadoop> > > > > > > > > > > > > >>>>>>> ********************************************/ > > > > > > > > > > > > >>>>>>> > > > > > > > > > > > > >>>>> > > > > > > > > > > > > >>>> > > > > > > > > > > > > >>> > > > > > > > > > > > > >>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > -- Guozhang > > > > > > > > > >