And the test plan has also been updated to simulate disk failure by
changing log directory permission to 000.

On Mon, Feb 13, 2017 at 5:50 PM, Dong Lin <lindon...@gmail.com> wrote:

> Hi Jun,
>
> Thanks for the reply. These comments are very helpful. Let me answer them
> inline.
>
>
> On Mon, Feb 13, 2017 at 3:25 PM, Jun Rao <j...@confluent.io> wrote:
>
>> Hi, Dong,
>>
>> Thanks for the reply. A few more replies and new comments below.
>>
>> On Fri, Feb 10, 2017 at 4:27 PM, Dong Lin <lindon...@gmail.com> wrote:
>>
>> > Hi Jun,
>> >
>> > Thanks for the detailed comments. Please see answers inline:
>> >
>> > On Fri, Feb 10, 2017 at 3:08 PM, Jun Rao <j...@confluent.io> wrote:
>> >
>> > > Hi, Dong,
>> > >
>> > > Thanks for the updated wiki. A few comments below.
>> > >
>> > > 1. Topics get created
>> > > 1.1 Instead of storing successfully created replicas in ZK, could we
>> > store
>> > > unsuccessfully created replicas in ZK? Since the latter is less
>> common,
>> > it
>> > > probably reduces the load on ZK.
>> > >
>> >
>> > We can store unsuccessfully created replicas in ZK. But I am not sure if
>> > that can reduce write load on ZK.
>> >
>> > If we want to reduce write load on ZK using by store unsuccessfully
>> created
>> > replicas in ZK, then broker should not write to ZK if all replicas are
>> > successfully created. It means that if /broker/topics/[topic]/partiti
>> > ons/[partitionId]/controller_managed_state doesn't exist in ZK for a
>> given
>> > partition, we have to assume all replicas of this partition have been
>> > successfully created and send LeaderAndIsrRequest with create = false.
>> This
>> > becomes a problem if controller crashes before receiving
>> > LeaderAndIsrResponse to validate whether a replica has been created.
>> >
>> > I think this approach and reduce the number of bytes stored in ZK. But
>> I am
>> > not sure if this is a concern.
>> >
>> >
>> >
>> I was mostly concerned about the controller failover time. Currently, the
>> controller failover is likely dominated by the cost of reading
>> topic/partition level information from ZK. If we add another partition
>> level path in ZK, it probably will double the controller failover time. If
>> the approach of representing the non-created replicas doesn't work, have
>> you considered just adding the created flag in the leaderAndIsr path in
>> ZK?
>>
>>
> Yes, I have considered adding the created flag in the leaderAndIsr path in
> ZK. If we were to add created flag per replica in the LeaderAndIsrRequest,
> then it requires a lot of change in the code base.
>
> If we don't add created flag per replica in the LeaderAndIsrRequest, then
> the information in leaderAndIsr path in ZK and LeaderAndIsrRequest would be
> different. Further, the procedure for broker to update ISR in ZK will be a
> bit complicated. When leader updates leaderAndIsr path in ZK, it will have
> to first read created flags from ZK, change isr, and write leaderAndIsr
> back to ZK. And it needs to check znode version and re-try write operation
> in ZK if controller has updated ZK during this period. This is in contrast
> to the current implementation where the leader either gets all the
> information from LeaderAndIsrRequest sent by controller, or determine the
> infromation by itself (e.g. ISR), before writing to leaderAndIsr path in ZK.
>
> It seems to me that the above solution is a bit complicated and not clean.
> Thus I come up with the design in this KIP to store this created flag in a
> separate zk path. The path is named controller_managed_state to indicate
> that we can store in this znode all information that is managed by
> controller only, as opposed to ISR.
>
> I agree with your concern of increased ZK read time during controller
> failover. How about we store the "created" information in the
> znode /brokers/topics/[topic]? We can change that znode to have the
> following data format:
>
> {
>   "version" : 2,
>   "created" : {
>     "1" : [1, 2, 3],
>     ...
>   }
>   "partition" : {
>     "1" : [1, 2, 3],
>     ...
>   }
> }
>
> We won't have extra zk read using this solution. It also seems reasonable
> to put the partition assignment information together with replica creation
> information. The latter is only changed once after the partition is created
> or re-assigned.
>
>
>>
>>
>> >
>> > > 1.2 If an error is received for a follower, does the controller
>> eagerly
>> > > remove it from ISR or do we just let the leader removes it after
>> timeout?
>> > >
>> >
>> > No, Controller will not actively remove it from ISR. But controller will
>> > recognize it as offline replica and propagate this information to all
>> > brokers via UpdateMetadataRequest. Each leader can use this information
>> to
>> > actively remove offline replica from ISR set. I have updated to wiki to
>> > clarify it.
>> >
>> >
>>
>> That seems inconsistent with how the controller deals with offline
>> replicas
>> due to broker failures. When that happens, the broker will (1) select a
>> new
>> leader if the offline replica is the leader; (2) remove the replica from
>> ISR if the offline replica is the follower. So, intuitively, it seems that
>> we should be doing the same thing when dealing with offline replicas due
>> to
>> disk failure.
>>
>
> My bad. I misunderstand how the controller currently handles broker
> failure and ISR change. Yes we should do the same thing when dealing with
> offline replicas here. I have updated the KIP to specify that, when an
> offline replica is discovered by controller, the controller removes offline
> replicas from ISR in the ZK and sends LeaderAndIsrRequest with updated ISR
> to be used by partition leaders.
>
>
>>
>>
>>
>> >
>> > > 1.3 Similar, if an error is received for a leader, should the
>> controller
>> > > trigger leader election again?
>> > >
>> >
>> > Yes, controller will trigger leader election if leader replica is
>> offline.
>> > I have updated the wiki to clarify it.
>> >
>> >
>> > >
>> > > 2. A log directory stops working on a broker during runtime:
>> > > 2.1 It seems the broker remembers the failed directory after hitting
>> an
>> > > IOException and the failed directory won't be used for creating new
>> > > partitions until the broker is restarted? If so, could you add that to
>> > the
>> > > wiki.
>> > >
>> >
>> > Right, broker assumes a log directory to be good after it starts, and
>> mark
>> > log directory as bad once there is IOException when broker attempts to
>> > access the log directory. New replicas will only be created on good log
>> > directory. I just added this to the KIP.
>> >
>> >
>> > > 2.2 Could you be a bit more specific on how and during which operation
>> > the
>> > > broker detects directory failure? Is it when the broker hits an
>> > IOException
>> > > during writes, or both reads and writes?  For example, during broker
>> > > startup, it only reads from each of the log directories, if it hits an
>> > > IOException there, does the broker immediately mark the directory as
>> > > offline?
>> > >
>> >
>> > Broker marks log directory as bad once there is IOException when broker
>> > attempts to access the log directory. This includes read and write.
>> These
>> > operations include log append, log read, log cleaning, watermark
>> checkpoint
>> > etc. If broker hits IOException when it reads from each of the log
>> > directory during startup, it immediately mark the directory as offline.
>> >
>> > I just updated the KIP to clarify it.
>> >
>> >
>> > > 3. Partition reassignment: If we know a replica is offline, do we
>> still
>> > > want to send StopReplicaRequest to it?
>> > >
>> >
>> > No, controller doesn't send StopReplicaRequest for an offline replica.
>> > Controller treats this scenario in the same way that exiting Kafka
>> > implementation does when the broker of this replica is offline.
>> >
>> >
>> > >
>> > > 4. UpdateMetadataRequestPartitionState: For offline_replicas, do they
>> > only
>> > > include offline replicas due to log directory failures or do they also
>> > > include offline replicas due to broker failure?
>> > >
>> >
>> > UpdateMetadataRequestPartitionState's offline_replicas include offline
>> > replicas due to both log directory failure and broker failure. This is
>> to
>> > make the semantics of this field easier to understand. Broker can
>> > distinguish whether a replica is offline due to broker failure or disk
>> > failure by checking whether a broker is live in the
>> UpdateMetadataRequest.
>> >
>> >
>> > >
>> > > 5. Tools: Could we add some kind of support in the tool to list
>> offline
>> > > directories?
>> > >
>> >
>> > In KIP-112 we don't have tools to list offline directories because we
>> have
>> > intentionally avoided exposing log directory information (e.g. log
>> > directory path) to user or other brokers. I think we can add this
>> feature
>> > in KIP-113, in which we will have DescribeDirsRequest to list log
>> directory
>> > information (e.g. partition assignment, path, size) needed for
>> rebalance.
>> >
>> >
>> Since we are introducing a new failure mode, if a replica becomes offline
>> due to failure in log directories, the first thing an admin wants to know
>> is which log directories are offline from the broker's perspective.  So,
>> including such a tool will be useful. Do you plan to do KIP-112 and
>> KIP-113
>>  in the same release?
>>
>>
> Yes, I agree that including such a tool is using. This is probably better
> to be added in KIP-113 because we need DescribeDirsRequest to get this
> information. I will update KIP-113 to include this tool.
>
> I plan to do KIP-112 and KIP-113 separately to make each KIP and their
> patch easier to review. I don't have any plan about which release to have
> these KIPs. My plan is to both of them ASAP. Is there particular timeline
> you prefer for code of these two KIPs to checked-in?
>
>
>> >
>> > >
>> > > 6. Metrics: Could we add some metrics to show offline directories?
>> > >
>> >
>> > Sure. I think it makes sense to have each broker report its number of
>> > offline replicas and offline log directories. The previous metric was
>> put
>> > in KIP-113. I just added both metrics in KIP-112.
>> >
>> >
>> > >
>> > > 7. There are still references to kafka-log-dirs.sh. Are they valid?
>> > >
>> >
>> > My bad. I just removed this from "Changes in Operational Procedures" and
>> > "Test Plan" in the KIP.
>> >
>> >
>> > >
>> > > 8. Do you think KIP-113 is ready for review? One thing that KIP-113
>> > > mentions during partition reassignment is to first send
>> > > LeaderAndIsrRequest, followed by ChangeReplicaDirRequest. It seems
>> it's
>> > > better if the replicas are created in the right log directory in the
>> > first
>> > > place? The reason that I brought it up here is because it may affect
>> the
>> > > protocol of LeaderAndIsrRequest.
>> > >
>> >
>> > Yes, KIP-113 is ready for review. The advantage of the current design is
>> > that we can keep LeaderAndIsrRequest log-direcotry-agnostic. The
>> > implementation would be much easier to read if all log related logic
>> (e.g.
>> > various errors) are put in ChangeReplicadIRrequest and the code path of
>> > handling replica movement is separated from leadership handling.
>> >
>> > In other words, I think Kafka may be easier to develop in the long term
>> if
>> > we separate these two requests.
>> >
>> > I agree that ideally we want to create replicas in the right log
>> directory
>> > in the first place. But I am not sure if there is any performance or
>> > correctness concern with the existing way of moving it after it is
>> created.
>> > Besides, does this decision affect the change proposed in KIP-112?
>> >
>> >
>> I am just wondering if you have considered including the log directory for
>> the replicas in the LeaderAndIsrRequest.
>>
>>
> Yeah I have thought about this idea, but only briefly. I rejected this
> idea because log directory is broker's local information and I prefer not
> to expose local config information to the cluster through
> LeaderAndIsrRequest.
>
>
>> 9. Could you describe when the offline replicas due to log directory
>> failure are removed from the replica fetch threads?
>>
>
> Yes. If the offline replica was a leader, either a new leader is elected
> or all follower brokers will stop fetching for this partition. If the
> offline replica is a follower, the broker will stop fetching for this
> replica immediately. A broker stops fetching data for a replica by removing
> the replica from the replica fetch threads. I have updated the KIP to
> clarify it.
>
>
>>
>> 10. The wiki mentioned changing the log directory to a file for simulating
>> disk failure in system tests. Could we just change the permission of the
>> log directory to 000 to simulate that?
>>
>
>
> Sure,
>
>
>>
>> Thanks,
>>
>> Jun
>>
>>
>> > > Jun
>> > >
>> > > On Fri, Feb 10, 2017 at 9:53 AM, Dong Lin <lindon...@gmail.com>
>> wrote:
>> > >
>> > > > Hi Jun,
>> > > >
>> > > > Can I replace zookeeper access with direct RPC for both ISR
>> > notification
>> > > > and disk failure notification in a future KIP, or do you feel we
>> should
>> > > do
>> > > > it in this KIP?
>> > > >
>> > > > Hi Eno, Grant and everyone,
>> > > >
>> > > > Is there further improvement you would like to see with this KIP?
>> > > >
>> > > > Thanks you all for the comments,
>> > > >
>> > > > Dong
>> > > >
>> > > >
>> > > >
>> > > > On Thu, Feb 9, 2017 at 4:45 PM, Dong Lin <lindon...@gmail.com>
>> wrote:
>> > > >
>> > > > >
>> > > > >
>> > > > > On Thu, Feb 9, 2017 at 3:37 PM, Colin McCabe <cmcc...@apache.org>
>> > > wrote:
>> > > > >
>> > > > >> On Thu, Feb 9, 2017, at 11:40, Dong Lin wrote:
>> > > > >> > Thanks for all the comments Colin!
>> > > > >> >
>> > > > >> > To answer your questions:
>> > > > >> > - Yes, a broker will shutdown if all its log directories are
>> bad.
>> > > > >>
>> > > > >> That makes sense.  Can you add this to the writeup?
>> > > > >>
>> > > > >
>> > > > > Sure. This has already been added. You can find it here
>> > > > > <https://cwiki.apache.org/confluence/pages/diffpagesbyversio
>> n.action
>> > ?
>> > > > pageId=67638402&selectedPageVersions=9&selectedPageVersions=10>
>> > > > > .
>> > > > >
>> > > > >
>> > > > >>
>> > > > >> > - I updated the KIP to explicitly state that a log directory
>> will
>> > be
>> > > > >> > assumed to be good until broker sees IOException when it tries
>> to
>> > > > access
>> > > > >> > the log directory.
>> > > > >>
>> > > > >> Thanks.
>> > > > >>
>> > > > >> > - Controller doesn't explicitly know whether there is new log
>> > > > directory
>> > > > >> > or
>> > > > >> > not. All controller knows is whether replicas are online or
>> > offline
>> > > > >> based
>> > > > >> > on LeaderAndIsrResponse. According to the existing Kafka
>> > > > implementation,
>> > > > >> > controller will always send LeaderAndIsrRequest to a broker
>> after
>> > it
>> > > > >> > bounces.
>> > > > >>
>> > > > >> I thought so.  It's good to clarify, though.  Do you think it's
>> > worth
>> > > > >> adding a quick discussion of this on the wiki?
>> > > > >>
>> > > > >
>> > > > > Personally I don't think it is needed. If broker starts with no
>> bad
>> > log
>> > > > > directory, everything should work it is and we should not need to
>> > > clarify
>> > > > > it. The KIP has already covered the scenario when a broker starts
>> > with
>> > > > bad
>> > > > > log directory. Also, the KIP doesn't claim or hint that we support
>> > > > dynamic
>> > > > > addition of new log directories. I think we are good.
>> > > > >
>> > > > >
>> > > > >> best,
>> > > > >> Colin
>> > > > >>
>> > > > >> >
>> > > > >> > Please see this
>> > > > >> > <https://cwiki.apache.org/confluence/pages/diffpagesbyversio
>> > > > >> n.action?pageId=67638402&selectedPageVersions=9&
>> > > > selectedPageVersions=10>
>> > > > >> > for the change of the KIP.
>> > > > >> >
>> > > > >> > On Thu, Feb 9, 2017 at 11:04 AM, Colin McCabe <
>> cmcc...@apache.org
>> > >
>> > > > >> wrote:
>> > > > >> >
>> > > > >> > > On Thu, Feb 9, 2017, at 11:03, Colin McCabe wrote:
>> > > > >> > > > Thanks, Dong L.
>> > > > >> > > >
>> > > > >> > > > Do we plan on bringing down the broker process when all log
>> > > > >> directories
>> > > > >> > > > are offline?
>> > > > >> > > >
>> > > > >> > > > Can you explicitly state on the KIP that the log dirs are
>> all
>> > > > >> considered
>> > > > >> > > > good after the broker process is bounced?  It seems like an
>> > > > >> important
>> > > > >> > > > thing to be clear about.  Also, perhaps discuss how the
>> > > controller
>> > > > >> > > > becomes aware of the newly good log directories after a
>> broker
>> > > > >> bounce
>> > > > >> > > > (and whether this triggers re-election).
>> > > > >> > >
>> > > > >> > > I meant to write, all the log dirs where the broker can still
>> > read
>> > > > the
>> > > > >> > > index and some other files.  Clearly, log dirs that are
>> > completely
>> > > > >> > > inaccessible will still be considered bad after a broker
>> process
>> > > > >> bounce.
>> > > > >> > >
>> > > > >> > > best,
>> > > > >> > > Colin
>> > > > >> > >
>> > > > >> > > >
>> > > > >> > > > +1 (non-binding) aside from that
>> > > > >> > > >
>> > > > >> > > >
>> > > > >> > > >
>> > > > >> > > > On Wed, Feb 8, 2017, at 00:47, Dong Lin wrote:
>> > > > >> > > > > Hi all,
>> > > > >> > > > >
>> > > > >> > > > > Thank you all for the helpful suggestion. I have updated
>> the
>> > > KIP
>> > > > >> to
>> > > > >> > > > > address
>> > > > >> > > > > the comments received so far. See here
>> > > > >> > > > > <https://cwiki.apache.org/conf
>> > luence/pages/diffpagesbyversio
>> > > > >> n.action?
>> > > > >> > > pageId=67638402&selectedPageVersions=8&selectedPageVersions=
>> > 9>to
>> > > > >> > > > > read the changes of the KIP. Here is a summary of change:
>> > > > >> > > > >
>> > > > >> > > > > - Updated the Proposed Change section to change the
>> recovery
>> > > > >> steps.
>> > > > >> > > After
>> > > > >> > > > > this change, broker will also create replica as long as
>> all
>> > > log
>> > > > >> > > > > directories
>> > > > >> > > > > are working.
>> > > > >> > > > > - Removed kafka-log-dirs.sh from this KIP since user no
>> > longer
>> > > > >> needs to
>> > > > >> > > > > use
>> > > > >> > > > > it for recovery from bad disks.
>> > > > >> > > > > - Explained how the znode controller_managed_state is
>> > managed
>> > > in
>> > > > >> the
>> > > > >> > > > > Public
>> > > > >> > > > > interface section.
>> > > > >> > > > > - Explained what happens during controller failover,
>> > partition
>> > > > >> > > > > reassignment
>> > > > >> > > > > and topic deletion in the Proposed Change section.
>> > > > >> > > > > - Updated Future Work section to include the following
>> > > potential
>> > > > >> > > > > improvements
>> > > > >> > > > >   - Let broker notify controller of ISR change and disk
>> > state
>> > > > >> change
>> > > > >> > > via
>> > > > >> > > > > RPC instead of using zookeeper
>> > > > >> > > > >   - Handle various failure scenarios (e.g. slow disk) on
>> a
>> > > > >> case-by-case
>> > > > >> > > > > basis. For example, we may want to detect slow disk and
>> > > consider
>> > > > >> it as
>> > > > >> > > > > offline.
>> > > > >> > > > >   - Allow admin to mark a directory as bad so that it
>> will
>> > not
>> > > > be
>> > > > >> used.
>> > > > >> > > > >
>> > > > >> > > > > Thanks,
>> > > > >> > > > > Dong
>> > > > >> > > > >
>> > > > >> > > > >
>> > > > >> > > > >
>> > > > >> > > > > On Tue, Feb 7, 2017 at 5:23 PM, Dong Lin <
>> > lindon...@gmail.com
>> > > >
>> > > > >> wrote:
>> > > > >> > > > >
>> > > > >> > > > > > Hey Eno,
>> > > > >> > > > > >
>> > > > >> > > > > > Thanks much for the comment!
>> > > > >> > > > > >
>> > > > >> > > > > > I still think the complexity added to Kafka is
>> justified
>> > by
>> > > > its
>> > > > >> > > benefit.
>> > > > >> > > > > > Let me provide my reasons below.
>> > > > >> > > > > >
>> > > > >> > > > > > 1) The additional logic is easy to understand and thus
>> its
>> > > > >> complexity
>> > > > >> > > > > > should be reasonable.
>> > > > >> > > > > >
>> > > > >> > > > > > On the broker side, it needs to catch exception when
>> > access
>> > > > log
>> > > > >> > > directory,
>> > > > >> > > > > > mark log directory and all its replicas as offline,
>> notify
>> > > > >> > > controller by
>> > > > >> > > > > > writing the zookeeper notification path, and specify
>> error
>> > > in
>> > > > >> > > > > > LeaderAndIsrResponse. On the controller side, it will
>> > > listener
>> > > > >> to
>> > > > >> > > > > > zookeeper for disk failure notification, learn about
>> > offline
>> > > > >> > > replicas in
>> > > > >> > > > > > the LeaderAndIsrResponse, and take offline replicas
>> into
>> > > > >> > > consideration when
>> > > > >> > > > > > electing leaders. It also mark replica as created in
>> > > zookeeper
>> > > > >> and
>> > > > >> > > use it
>> > > > >> > > > > > to determine whether a replica is created.
>> > > > >> > > > > >
>> > > > >> > > > > > That is all the logic we need to add in Kafka. I
>> > personally
>> > > > feel
>> > > > >> > > this is
>> > > > >> > > > > > easy to reason about.
>> > > > >> > > > > >
>> > > > >> > > > > > 2) The additional code is not much.
>> > > > >> > > > > >
>> > > > >> > > > > > I expect the code for KIP-112 to be around 1100 lines
>> new
>> > > > code.
>> > > > >> > > Previously
>> > > > >> > > > > > I have implemented a prototype of a slightly different
>> > > design
>> > > > >> (see
>> > > > >> > > here
>> > > > >> > > > > > <https://docs.google.com/docum
>> ent/d/1Izza0SBmZMVUBUt9s_
>> > > > >> > > -Dqi3D8e0KGJQYW8xgEdRsgAI/edit>)
>> > > > >> > > > > > and uploaded it to github (see here
>> > > > >> > > > > > <https://github.com/lindong28/kafka/tree/JBOD>). The
>> > patch
>> > > > >> changed
>> > > > >> > > 33
>> > > > >> > > > > > files, added 1185 lines and deleted 183 lines. The
>> size of
>> > > > >> prototype
>> > > > >> > > patch
>> > > > >> > > > > > is actually smaller than patch of KIP-107 (see here
>> > > > >> > > > > > <https://github.com/apache/kafka/pull/2476>) which is
>> > > already
>> > > > >> > > accepted.
>> > > > >> > > > > > The KIP-107 patch changed 49 files, added 1349 lines
>> and
>> > > > >> deleted 141
>> > > > >> > > lines.
>> > > > >> > > > > >
>> > > > >> > > > > > 3) Comparison with one-broker-per-multiple-volumes
>> > > > >> > > > > >
>> > > > >> > > > > > This KIP can improve the availability of Kafka in this
>> > case
>> > > > such
>> > > > >> > > that one
>> > > > >> > > > > > failed volume doesn't bring down the entire broker.
>> > > > >> > > > > >
>> > > > >> > > > > > 4) Comparison with one-broker-per-volume
>> > > > >> > > > > >
>> > > > >> > > > > > If each volume maps to multiple disks, then we still
>> have
>> > > > >> similar
>> > > > >> > > problem
>> > > > >> > > > > > such that the broker will fail if any disk of the
>> volume
>> > > > failed.
>> > > > >> > > > > >
>> > > > >> > > > > > If each volume maps to one disk, it means that we need
>> to
>> > > > >> deploy 10
>> > > > >> > > > > > brokers on a machine if the machine has 10 disks. I
>> will
>> > > > >> explain the
>> > > > >> > > > > > concern with this approach in order of their
>> importance.
>> > > > >> > > > > >
>> > > > >> > > > > > - It is weird if we were to tell kafka user to deploy
>> 50
>> > > > >> brokers on a
>> > > > >> > > > > > machine of 50 disks.
>> > > > >> > > > > >
>> > > > >> > > > > > - Either when user deploys Kafka on a commercial cloud
>> > > > platform
>> > > > >> or
>> > > > >> > > when
>> > > > >> > > > > > user deploys their own cluster, the size or largest
>> disk
>> > is
>> > > > >> usually
>> > > > >> > > > > > limited. There will be scenarios where user want to
>> > increase
>> > > > >> broker
>> > > > >> > > > > > capacity by having multiple disks per broker. This JBOD
>> > KIP
>> > > > >> makes it
>> > > > >> > > > > > feasible without hurting availability due to single
>> disk
>> > > > >> failure.
>> > > > >> > > > > >
>> > > > >> > > > > > - Automatic load rebalance across disks will be easier
>> and
>> > > > more
>> > > > >> > > flexible
>> > > > >> > > > > > if one broker has multiple disks. This can be future
>> work.
>> > > > >> > > > > >
>> > > > >> > > > > > - There is performance concern when you deploy 10
>> broker
>> > > vs. 1
>> > > > >> > > broker on
>> > > > >> > > > > > one machine. The metadata the cluster, including
>> > > FetchRequest,
>> > > > >> > > > > > ProduceResponse, MetadataRequest and so on will all be
>> 10X
>> > > > >> more. The
>> > > > >> > > > > > packet-per-second will be 10X higher which may limit
>> > > > >> performance if
>> > > > >> > > pps is
>> > > > >> > > > > > the performance bottleneck. The number of socket on the
>> > > > machine
>> > > > >> is
>> > > > >> > > 10X
>> > > > >> > > > > > higher. And the number of replication thread will be
>> 100X
>> > > > more.
>> > > > >> The
>> > > > >> > > impact
>> > > > >> > > > > > will be more significant with increasing number of
>> disks
>> > per
>> > > > >> > > machine. Thus
>> > > > >> > > > > > it will limit Kakfa's scalability in the long term.
>> > > > >> > > > > >
>> > > > >> > > > > > Thanks,
>> > > > >> > > > > > Dong
>> > > > >> > > > > >
>> > > > >> > > > > >
>> > > > >> > > > > > On Tue, Feb 7, 2017 at 1:51 AM, Eno Thereska <
>> > > > >> eno.there...@gmail.com
>> > > > >> > > >
>> > > > >> > > > > > wrote:
>> > > > >> > > > > >
>> > > > >> > > > > >> Hi Dong,
>> > > > >> > > > > >>
>> > > > >> > > > > >> To simplify the discussion today, on my part I'll zoom
>> > into
>> > > > one
>> > > > >> > > thing
>> > > > >> > > > > >> only:
>> > > > >> > > > > >>
>> > > > >> > > > > >> - I'll discuss the options called below :
>> > > > >> "one-broker-per-disk" or
>> > > > >> > > > > >> "one-broker-per-few-disks".
>> > > > >> > > > > >>
>> > > > >> > > > > >> - I completely buy the JBOD vs RAID arguments so
>> there is
>> > > no
>> > > > >> need to
>> > > > >> > > > > >> discuss that part for me. I buy it that JBODs are
>> good.
>> > > > >> > > > > >>
>> > > > >> > > > > >> I find the terminology can be improved a bit. Ideally
>> > we'd
>> > > be
>> > > > >> > > talking
>> > > > >> > > > > >> about volumes, not disks. Just to make it clear that
>> > Kafka
>> > > > >> > > understand
>> > > > >> > > > > >> volumes/directories, not individual raw disks. So by
>> > > > >> > > > > >> "one-broker-per-few-disks" what I mean is that the
>> admin
>> > > can
>> > > > >> pool a
>> > > > >> > > few
>> > > > >> > > > > >> disks together to create a volume/directory and give
>> that
>> > > to
>> > > > >> Kafka.
>> > > > >> > > > > >>
>> > > > >> > > > > >>
>> > > > >> > > > > >> The kernel of my question will be that the admin
>> already
>> > > has
>> > > > >> tools
>> > > > >> > > to 1)
>> > > > >> > > > > >> create volumes/directories from a JBOD and 2) start a
>> > > broker
>> > > > >> on a
>> > > > >> > > desired
>> > > > >> > > > > >> machine and 3) assign a broker resources like a
>> > directory.
>> > > I
>> > > > >> claim
>> > > > >> > > that
>> > > > >> > > > > >> those tools are sufficient to optimise resource
>> > allocation.
>> > > > I
>> > > > >> > > understand
>> > > > >> > > > > >> that a broker could manage point 3) itself, ie juggle
>> the
>> > > > >> > > directories. My
>> > > > >> > > > > >> question is whether the complexity added to Kafka is
>> > > > justified.
>> > > > >> > > > > >> Operationally it seems to me an admin will still have
>> to
>> > do
>> > > > >> all the
>> > > > >> > > three
>> > > > >> > > > > >> items above.
>> > > > >> > > > > >>
>> > > > >> > > > > >> Looking forward to the discussion
>> > > > >> > > > > >> Thanks
>> > > > >> > > > > >> Eno
>> > > > >> > > > > >>
>> > > > >> > > > > >>
>> > > > >> > > > > >> > On 1 Feb 2017, at 17:21, Dong Lin <
>> lindon...@gmail.com
>> > >
>> > > > >> wrote:
>> > > > >> > > > > >> >
>> > > > >> > > > > >> > Hey Eno,
>> > > > >> > > > > >> >
>> > > > >> > > > > >> > Thanks much for the review.
>> > > > >> > > > > >> >
>> > > > >> > > > > >> > I think your suggestion is to split disks of a
>> machine
>> > > into
>> > > > >> > > multiple
>> > > > >> > > > > >> disk
>> > > > >> > > > > >> > sets and run one broker per disk set. Yeah this is
>> > > similar
>> > > > to
>> > > > >> > > Colin's
>> > > > >> > > > > >> > suggestion of one-broker-per-disk, which we have
>> > > evaluated
>> > > > at
>> > > > >> > > LinkedIn
>> > > > >> > > > > >> and
>> > > > >> > > > > >> > considered it to be a good short term approach.
>> > > > >> > > > > >> >
>> > > > >> > > > > >> > As of now I don't think any of these approach is a
>> > better
>> > > > >> > > alternative in
>> > > > >> > > > > >> > the long term. I will summarize these here. I have
>> put
>> > > > these
>> > > > >> > > reasons in
>> > > > >> > > > > >> the
>> > > > >> > > > > >> > KIP's motivation section and rejected alternative
>> > > section.
>> > > > I
>> > > > >> am
>> > > > >> > > happy to
>> > > > >> > > > > >> > discuss more and I would certainly like to use an
>> > > > alternative
>> > > > >> > > solution
>> > > > >> > > > > >> that
>> > > > >> > > > > >> > is easier to do with better performance.
>> > > > >> > > > > >> >
>> > > > >> > > > > >> > - JBOD vs. RAID-10: if we switch from RAID-10 with
>> > > > >> > > > > >> replication-factoer=2 to
>> > > > >> > > > > >> > JBOD with replicatio-factor=3, we get 25% reduction
>> in
>> > > disk
>> > > > >> usage
>> > > > >> > > and
>> > > > >> > > > > >> > doubles the tolerance of broker failure before data
>> > > > >> > > unavailability from
>> > > > >> > > > > >> 1
>> > > > >> > > > > >> > to 2. This is pretty huge gain for any company that
>> > uses
>> > > > >> Kafka at
>> > > > >> > > large
>> > > > >> > > > > >> > scale.
>> > > > >> > > > > >> >
>> > > > >> > > > > >> > - JBOD vs. one-broker-per-disk: The benefit of
>> > > > >> > > one-broker-per-disk is
>> > > > >> > > > > >> that
>> > > > >> > > > > >> > no major code change is needed in Kafka. Among the
>> > > > >> disadvantage of
>> > > > >> > > > > >> > one-broker-per-disk summarized in the KIP and
>> previous
>> > > > email
>> > > > >> with
>> > > > >> > > Colin,
>> > > > >> > > > > >> > the biggest one is the 15% throughput loss compared
>> to
>> > > JBOD
>> > > > >> and
>> > > > >> > > less
>> > > > >> > > > > >> > flexibility to balance across disks. Further, it
>> > probably
>> > > > >> requires
>> > > > >> > > > > >> change
>> > > > >> > > > > >> > to internal deployment tools at various companies to
>> > deal
>> > > > >> with
>> > > > >> > > > > >> > one-broker-per-disk setup.
>> > > > >> > > > > >> >
>> > > > >> > > > > >> > - JBOD vs. RAID-0: This is the setup that used at
>> > > > Microsoft.
>> > > > >> The
>> > > > >> > > > > >> problem is
>> > > > >> > > > > >> > that a broker becomes unavailable if any disk fail.
>> > > Suppose
>> > > > >> > > > > >> > replication-factor=2 and there are 10 disks per
>> > machine.
>> > > > >> Then the
>> > > > >> > > > > >> > probability of of any message becomes unavailable
>> due
>> > to
>> > > > disk
>> > > > >> > > failure
>> > > > >> > > > > >> with
>> > > > >> > > > > >> > RAID-0 is 100X higher than that with JBOD.
>> > > > >> > > > > >> >
>> > > > >> > > > > >> > - JBOD vs. one-broker-per-few-disks:
>> > > > one-broker-per-few-disk
>> > > > >> is
>> > > > >> > > > > >> somewhere
>> > > > >> > > > > >> > between one-broker-per-disk and RAID-0. So it
>> carries
>> > an
>> > > > >> averaged
>> > > > >> > > > > >> > disadvantages of these two approaches.
>> > > > >> > > > > >> >
>> > > > >> > > > > >> > To answer your question regarding, I think it is
>> > > reasonable
>> > > > >> to
>> > > > >> > > mange
>> > > > >> > > > > >> disk
>> > > > >> > > > > >> > in Kafka. By "managing disks" we mean the
>> management of
>> > > > >> > > assignment of
>> > > > >> > > > > >> > replicas across disks. Here are my reasons in more
>> > > detail:
>> > > > >> > > > > >> >
>> > > > >> > > > > >> > - I don't think this KIP is a big step change. By
>> > > allowing
>> > > > >> user to
>> > > > >> > > > > >> > configure Kafka to run multiple log directories or
>> > disks
>> > > as
>> > > > >> of
>> > > > >> > > now, it
>> > > > >> > > > > >> is
>> > > > >> > > > > >> > implicit that Kafka manages disks. It is just not a
>> > > > complete
>> > > > >> > > feature.
>> > > > >> > > > > >> > Microsoft and probably other companies are using
>> this
>> > > > feature
>> > > > >> > > under the
>> > > > >> > > > > >> > undesirable effect that a broker will fail any if
>> any
>> > > disk
>> > > > >> fail.
>> > > > >> > > It is
>> > > > >> > > > > >> good
>> > > > >> > > > > >> > to complete this feature.
>> > > > >> > > > > >> >
>> > > > >> > > > > >> > - I think it is reasonable to manage disk in Kafka.
>> One
>> > > of
>> > > > >> the
>> > > > >> > > most
>> > > > >> > > > > >> > important work that Kafka is doing is to determine
>> the
>> > > > >> replica
>> > > > >> > > > > >> assignment
>> > > > >> > > > > >> > across brokers and make sure enough copies of a
>> given
>> > > > >> replica is
>> > > > >> > > > > >> available.
>> > > > >> > > > > >> > I would argue that it is not much different than
>> > > > determining
>> > > > >> the
>> > > > >> > > replica
>> > > > >> > > > > >> > assignment across disk conceptually.
>> > > > >> > > > > >> >
>> > > > >> > > > > >> > - I would agree that this KIP is improve
>> performance of
>> > > > >> Kafka at
>> > > > >> > > the
>> > > > >> > > > > >> cost
>> > > > >> > > > > >> > of more complexity inside Kafka, by switching from
>> > > RAID-10
>> > > > to
>> > > > >> > > JBOD. I
>> > > > >> > > > > >> would
>> > > > >> > > > > >> > argue that this is a right direction. If we can gain
>> > 20%+
>> > > > >> > > performance by
>> > > > >> > > > > >> > managing NIC in Kafka as compared to existing
>> approach
>> > > and
>> > > > >> other
>> > > > >> > > > > >> > alternatives, I would say we should just do it.
>> Such a
>> > > gain
>> > > > >> in
>> > > > >> > > > > >> performance,
>> > > > >> > > > > >> > or equivalently reduction in cost, can save
>> millions of
>> > > > >> dollars
>> > > > >> > > per year
>> > > > >> > > > > >> > for any company running Kafka at large scale.
>> > > > >> > > > > >> >
>> > > > >> > > > > >> > Thanks,
>> > > > >> > > > > >> > Dong
>> > > > >> > > > > >> >
>> > > > >> > > > > >> >
>> > > > >> > > > > >> > On Wed, Feb 1, 2017 at 5:41 AM, Eno Thereska <
>> > > > >> > > eno.there...@gmail.com>
>> > > > >> > > > > >> wrote:
>> > > > >> > > > > >> >
>> > > > >> > > > > >> >> I'm coming somewhat late to the discussion,
>> apologies
>> > > for
>> > > > >> that.
>> > > > >> > > > > >> >>
>> > > > >> > > > > >> >> I'm worried about this proposal. It's moving Kafka
>> to
>> > a
>> > > > >> world
>> > > > >> > > where it
>> > > > >> > > > > >> >> manages disks. So in a sense, the scope of the KIP
>> is
>> > > > >> limited,
>> > > > >> > > but the
>> > > > >> > > > > >> >> direction it sets for Kafka is quite a big step
>> > change.
>> > > > >> > > Fundamentally
>> > > > >> > > > > >> this
>> > > > >> > > > > >> >> is about balancing resources for a Kafka broker.
>> This
>> > > can
>> > > > be
>> > > > >> > > done by a
>> > > > >> > > > > >> >> tool, rather than by changing Kafka. E.g., the tool
>> > > would
>> > > > >> take a
>> > > > >> > > bunch
>> > > > >> > > > > >> of
>> > > > >> > > > > >> >> disks together, create a volume over them and
>> export
>> > > that
>> > > > >> to a
>> > > > >> > > Kafka
>> > > > >> > > > > >> broker
>> > > > >> > > > > >> >> (in addition to setting the memory limits for that
>> > > broker
>> > > > or
>> > > > >> > > limiting
>> > > > >> > > > > >> other
>> > > > >> > > > > >> >> resources). A different bunch of disks can then
>> make
>> > up
>> > > a
>> > > > >> second
>> > > > >> > > > > >> volume,
>> > > > >> > > > > >> >> and be used by another Kafka broker. This is
>> aligned
>> > > with
>> > > > >> what
>> > > > >> > > Colin is
>> > > > >> > > > > >> >> saying (as I understand it).
>> > > > >> > > > > >> >>
>> > > > >> > > > > >> >> Disks are not the only resource on a machine, there
>> > are
>> > > > >> several
>> > > > >> > > > > >> instances
>> > > > >> > > > > >> >> where multiple NICs are used for example. Do we
>> want
>> > > fine
>> > > > >> grained
>> > > > >> > > > > >> >> management of all these resources? I'd argue that
>> > opens
>> > > us
>> > > > >> the
>> > > > >> > > system
>> > > > >> > > > > >> to a
>> > > > >> > > > > >> >> lot of complexity.
>> > > > >> > > > > >> >>
>> > > > >> > > > > >> >> Thanks
>> > > > >> > > > > >> >> Eno
>> > > > >> > > > > >> >>
>> > > > >> > > > > >> >>
>> > > > >> > > > > >> >>> On 1 Feb 2017, at 01:53, Dong Lin <
>> > lindon...@gmail.com
>> > > >
>> > > > >> wrote:
>> > > > >> > > > > >> >>>
>> > > > >> > > > > >> >>> Hi all,
>> > > > >> > > > > >> >>>
>> > > > >> > > > > >> >>> I am going to initiate the vote If there is no
>> > further
>> > > > >> concern
>> > > > >> > > with
>> > > > >> > > > > >> the
>> > > > >> > > > > >> >> KIP.
>> > > > >> > > > > >> >>>
>> > > > >> > > > > >> >>> Thanks,
>> > > > >> > > > > >> >>> Dong
>> > > > >> > > > > >> >>>
>> > > > >> > > > > >> >>>
>> > > > >> > > > > >> >>> On Fri, Jan 27, 2017 at 8:08 PM, radai <
>> > > > >> > > radai.rosenbl...@gmail.com>
>> > > > >> > > > > >> >> wrote:
>> > > > >> > > > > >> >>>
>> > > > >> > > > > >> >>>> a few extra points:
>> > > > >> > > > > >> >>>>
>> > > > >> > > > > >> >>>> 1. broker per disk might also incur more client
>> <-->
>> > > > >> broker
>> > > > >> > > sockets:
>> > > > >> > > > > >> >>>> suppose every producer / consumer "talks" to >1
>> > > > partition,
>> > > > >> > > there's a
>> > > > >> > > > > >> >> very
>> > > > >> > > > > >> >>>> good chance that partitions that were co-located
>> on
>> > a
>> > > > >> single
>> > > > >> > > 10-disk
>> > > > >> > > > > >> >> broker
>> > > > >> > > > > >> >>>> would now be split between several single-disk
>> > broker
>> > > > >> > > processes on
>> > > > >> > > > > >> the
>> > > > >> > > > > >> >> same
>> > > > >> > > > > >> >>>> machine. hard to put a multiplier on this, but
>> > likely
>> > > > >x1.
>> > > > >> > > sockets
>> > > > >> > > > > >> are a
>> > > > >> > > > > >> >>>> limited resource at the OS level and incur some
>> > memory
>> > > > >> cost
>> > > > >> > > (kernel
>> > > > >> > > > > >> >>>> buffers)
>> > > > >> > > > > >> >>>>
>> > > > >> > > > > >> >>>> 2. there's a memory overhead to spinning up a JVM
>> > > > >> (compiled
>> > > > >> > > code and
>> > > > >> > > > > >> >> byte
>> > > > >> > > > > >> >>>> code objects etc). if we assume this overhead is
>> > ~300
>> > > MB
>> > > > >> > > (order of
>> > > > >> > > > > >> >>>> magnitude, specifics vary) than spinning up 10
>> JVMs
>> > > > would
>> > > > >> lose
>> > > > >> > > you 3
>> > > > >> > > > > >> GB
>> > > > >> > > > > >> >> of
>> > > > >> > > > > >> >>>> RAM. not a ton, but non negligible.
>> > > > >> > > > > >> >>>>
>> > > > >> > > > > >> >>>> 3. there would also be some overhead downstream
>> of
>> > > kafka
>> > > > >> in any
>> > > > >> > > > > >> >> management
>> > > > >> > > > > >> >>>> / monitoring / log aggregation system. likely
>> less
>> > > than
>> > > > >> x10
>> > > > >> > > though.
>> > > > >> > > > > >> >>>>
>> > > > >> > > > > >> >>>> 4. (related to above) - added complexity of
>> > > > administration
>> > > > >> > > with more
>> > > > >> > > > > >> >>>> running instances.
>> > > > >> > > > > >> >>>>
>> > > > >> > > > > >> >>>> is anyone running kafka with anywhere near 100GB
>> > > heaps?
>> > > > i
>> > > > >> > > thought the
>> > > > >> > > > > >> >> point
>> > > > >> > > > > >> >>>> was to rely on kernel page cache to do the disk
>> > > > buffering
>> > > > >> ....
>> > > > >> > > > > >> >>>>
>> > > > >> > > > > >> >>>> On Thu, Jan 26, 2017 at 11:00 AM, Dong Lin <
>> > > > >> > > lindon...@gmail.com>
>> > > > >> > > > > >> wrote:
>> > > > >> > > > > >> >>>>
>> > > > >> > > > > >> >>>>> Hey Colin,
>> > > > >> > > > > >> >>>>>
>> > > > >> > > > > >> >>>>> Thanks much for the comment. Please see me
>> comment
>> > > > >> inline.
>> > > > >> > > > > >> >>>>>
>> > > > >> > > > > >> >>>>> On Thu, Jan 26, 2017 at 10:15 AM, Colin McCabe <
>> > > > >> > > cmcc...@apache.org>
>> > > > >> > > > > >> >>>> wrote:
>> > > > >> > > > > >> >>>>>
>> > > > >> > > > > >> >>>>>> On Wed, Jan 25, 2017, at 13:50, Dong Lin wrote:
>> > > > >> > > > > >> >>>>>>> Hey Colin,
>> > > > >> > > > > >> >>>>>>>
>> > > > >> > > > > >> >>>>>>> Good point! Yeah we have actually considered
>> and
>> > > > >> tested this
>> > > > >> > > > > >> >>>> solution,
>> > > > >> > > > > >> >>>>>>> which we call one-broker-per-disk. It would
>> work
>> > > and
>> > > > >> should
>> > > > >> > > > > >> require
>> > > > >> > > > > >> >>>> no
>> > > > >> > > > > >> >>>>>>> major change in Kafka as compared to this JBOD
>> > KIP.
>> > > > So
>> > > > >> it
>> > > > >> > > would
>> > > > >> > > > > >> be a
>> > > > >> > > > > >> >>>>> good
>> > > > >> > > > > >> >>>>>>> short term solution.
>> > > > >> > > > > >> >>>>>>>
>> > > > >> > > > > >> >>>>>>> But it has a few drawbacks which makes it less
>> > > > >> desirable in
>> > > > >> > > the
>> > > > >> > > > > >> long
>> > > > >> > > > > >> >>>>>>> term.
>> > > > >> > > > > >> >>>>>>> Assume we have 10 disks on a machine. Here are
>> > the
>> > > > >> problems:
>> > > > >> > > > > >> >>>>>>
>> > > > >> > > > > >> >>>>>> Hi Dong,
>> > > > >> > > > > >> >>>>>>
>> > > > >> > > > > >> >>>>>> Thanks for the thoughtful reply.
>> > > > >> > > > > >> >>>>>>
>> > > > >> > > > > >> >>>>>>>
>> > > > >> > > > > >> >>>>>>> 1) Our stress test result shows that
>> > > > >> one-broker-per-disk
>> > > > >> > > has 15%
>> > > > >> > > > > >> >>>> lower
>> > > > >> > > > > >> >>>>>>> throughput
>> > > > >> > > > > >> >>>>>>>
>> > > > >> > > > > >> >>>>>>> 2) Controller would need to send 10X as many
>> > > > >> > > LeaderAndIsrRequest,
>> > > > >> > > > > >> >>>>>>> MetadataUpdateRequest and StopReplicaRequest.
>> > This
>> > > > >> > > increases the
>> > > > >> > > > > >> >>>> burden
>> > > > >> > > > > >> >>>>>>> on
>> > > > >> > > > > >> >>>>>>> controller which can be the performance
>> > bottleneck.
>> > > > >> > > > > >> >>>>>>
>> > > > >> > > > > >> >>>>>> Maybe I'm misunderstanding something, but there
>> > > would
>> > > > >> not be
>> > > > >> > > 10x as
>> > > > >> > > > > >> >>>> many
>> > > > >> > > > > >> >>>>>> StopReplicaRequest RPCs, would there?  The
>> other
>> > > > >> requests
>> > > > >> > > would
>> > > > >> > > > > >> >>>> increase
>> > > > >> > > > > >> >>>>>> 10x, but from a pretty low base, right?  We are
>> > not
>> > > > >> > > reassigning
>> > > > >> > > > > >> >>>>>> partitions all the time, I hope (or else we
>> have
>> > > > bigger
>> > > > >> > > > > >> problems...)
>> > > > >> > > > > >> >>>>>>
>> > > > >> > > > > >> >>>>>
>> > > > >> > > > > >> >>>>> I think the controller will group
>> > StopReplicaRequest
>> > > > per
>> > > > >> > > broker and
>> > > > >> > > > > >> >> send
>> > > > >> > > > > >> >>>>> only one StopReplicaRequest to a broker during
>> > > > controlled
>> > > > >> > > shutdown.
>> > > > >> > > > > >> >>>> Anyway,
>> > > > >> > > > > >> >>>>> we don't have to worry about this if we agree
>> that
>> > > > other
>> > > > >> > > requests
>> > > > >> > > > > >> will
>> > > > >> > > > > >> >>>>> increase by 10X. One MetadataRequest to send to
>> > each
>> > > > >> broker
>> > > > >> > > in the
>> > > > >> > > > > >> >>>> cluster
>> > > > >> > > > > >> >>>>> every time there is leadership change. I am not
>> > sure
>> > > > >> this is
>> > > > >> > > a real
>> > > > >> > > > > >> >>>>> problem. But in theory this makes the overhead
>> > > > complexity
>> > > > >> > > O(number
>> > > > >> > > > > >> of
>> > > > >> > > > > >> >>>>> broker) and may be a concern in the future.
>> Ideally
>> > > we
>> > > > >> should
>> > > > >> > > avoid
>> > > > >> > > > > >> it.
>> > > > >> > > > > >> >>>>>
>> > > > >> > > > > >> >>>>>
>> > > > >> > > > > >> >>>>>>
>> > > > >> > > > > >> >>>>>>>
>> > > > >> > > > > >> >>>>>>> 3) Less efficient use of physical resource on
>> the
>> > > > >> machine.
>> > > > >> > > The
>> > > > >> > > > > >> number
>> > > > >> > > > > >> >>>>> of
>> > > > >> > > > > >> >>>>>>> socket on each machine will increase by 10X.
>> The
>> > > > >> number of
>> > > > >> > > > > >> connection
>> > > > >> > > > > >> >>>>>>> between any two machine will increase by 100X.
>> > > > >> > > > > >> >>>>>>>
>> > > > >> > > > > >> >>>>>>> 4) Less efficient way to management memory and
>> > > quota.
>> > > > >> > > > > >> >>>>>>>
>> > > > >> > > > > >> >>>>>>> 5) Rebalance between disks/brokers on the same
>> > > > machine
>> > > > >> will
>> > > > >> > > less
>> > > > >> > > > > >> >>>>>>> efficient
>> > > > >> > > > > >> >>>>>>> and less flexible. Broker has to read data
>> from
>> > > > another
>> > > > >> > > broker on
>> > > > >> > > > > >> the
>> > > > >> > > > > >> >>>>>>> same
>> > > > >> > > > > >> >>>>>>> machine via socket. It is also harder to do
>> > > automatic
>> > > > >> load
>> > > > >> > > balance
>> > > > >> > > > > >> >>>>>>> between
>> > > > >> > > > > >> >>>>>>> disks on the same machine in the future.
>> > > > >> > > > > >> >>>>>>>
>> > > > >> > > > > >> >>>>>>> I will put this and the explanation in the
>> > rejected
>> > > > >> > > alternative
>> > > > >> > > > > >> >>>>> section.
>> > > > >> > > > > >> >>>>>>> I
>> > > > >> > > > > >> >>>>>>> have a few questions:
>> > > > >> > > > > >> >>>>>>>
>> > > > >> > > > > >> >>>>>>> - Can you explain why this solution can help
>> > avoid
>> > > > >> > > scalability
>> > > > >> > > > > >> >>>>>>> bottleneck?
>> > > > >> > > > > >> >>>>>>> I actually think it will exacerbate the
>> > scalability
>> > > > >> problem
>> > > > >> > > due
>> > > > >> > > > > >> the
>> > > > >> > > > > >> >>>> 2)
>> > > > >> > > > > >> >>>>>>> above.
>> > > > >> > > > > >> >>>>>>> - Why can we push more RPC with this solution?
>> > > > >> > > > > >> >>>>>>
>> > > > >> > > > > >> >>>>>> To really answer this question we'd have to
>> take a
>> > > > deep
>> > > > >> dive
>> > > > >> > > into
>> > > > >> > > > > >> the
>> > > > >> > > > > >> >>>>>> locking of the broker and figure out how
>> > effectively
>> > > > it
>> > > > >> can
>> > > > >> > > > > >> >> parallelize
>> > > > >> > > > > >> >>>>>> truly independent requests.  Almost every
>> > > > multithreaded
>> > > > >> > > process is
>> > > > >> > > > > >> >>>> going
>> > > > >> > > > > >> >>>>>> to have shared state, like shared queues or
>> shared
>> > > > >> sockets,
>> > > > >> > > that is
>> > > > >> > > > > >> >>>>>> going to make scaling less than linear when you
>> > add
>> > > > >> disks or
>> > > > >> > > > > >> >>>> processors.
>> > > > >> > > > > >> >>>>>> (And clearly, another option is to improve that
>> > > > >> scalability,
>> > > > >> > > rather
>> > > > >> > > > > >> >>>>>> than going multi-process!)
>> > > > >> > > > > >> >>>>>>
>> > > > >> > > > > >> >>>>>
>> > > > >> > > > > >> >>>>> Yeah I also think it is better to improve
>> > scalability
>> > > > >> inside
>> > > > >> > > kafka
>> > > > >> > > > > >> code
>> > > > >> > > > > >> >>>> if
>> > > > >> > > > > >> >>>>> possible. I am not sure we currently have any
>> > > > scalability
>> > > > >> > > issue
>> > > > >> > > > > >> inside
>> > > > >> > > > > >> >>>>> Kafka that can not be removed without using
>> > > > >> multi-process.
>> > > > >> > > > > >> >>>>>
>> > > > >> > > > > >> >>>>>
>> > > > >> > > > > >> >>>>>>
>> > > > >> > > > > >> >>>>>>> - It is true that a garbage collection in one
>> > > broker
>> > > > >> would
>> > > > >> > > not
>> > > > >> > > > > >> affect
>> > > > >> > > > > >> >>>>>>> others. But that is after every broker only
>> uses
>> > > 1/10
>> > > > >> of the
>> > > > >> > > > > >> memory.
>> > > > >> > > > > >> >>>>> Can
>> > > > >> > > > > >> >>>>>>> we be sure that this will actually help
>> > > performance?
>> > > > >> > > > > >> >>>>>>
>> > > > >> > > > > >> >>>>>> The big question is, how much memory do Kafka
>> > > brokers
>> > > > >> use
>> > > > >> > > now, and
>> > > > >> > > > > >> how
>> > > > >> > > > > >> >>>>>> much will they use in the future?  Our
>> experience
>> > in
>> > > > >> HDFS
>> > > > >> > > was that
>> > > > >> > > > > >> >> once
>> > > > >> > > > > >> >>>>>> you start getting more than 100-200GB Java heap
>> > > sizes,
>> > > > >> full
>> > > > >> > > GCs
>> > > > >> > > > > >> start
>> > > > >> > > > > >> >>>>>> taking minutes to finish when using the
>> standard
>> > > JVMs.
>> > > > >> That
>> > > > >> > > alone
>> > > > >> > > > > >> is
>> > > > >> > > > > >> >> a
>> > > > >> > > > > >> >>>>>> good reason to go multi-process or consider
>> > storing
>> > > > more
>> > > > >> > > things off
>> > > > >> > > > > >> >> the
>> > > > >> > > > > >> >>>>>> Java heap.
>> > > > >> > > > > >> >>>>>>
>> > > > >> > > > > >> >>>>>
>> > > > >> > > > > >> >>>>> I see. Now I agree one-broker-per-disk should be
>> > more
>> > > > >> > > efficient in
>> > > > >> > > > > >> >> terms
>> > > > >> > > > > >> >>>> of
>> > > > >> > > > > >> >>>>> GC since each broker probably needs less than
>> 1/10
>> > of
>> > > > the
>> > > > >> > > memory
>> > > > >> > > > > >> >>>> available
>> > > > >> > > > > >> >>>>> on a typical machine nowadays. I will remove
>> this
>> > > from
>> > > > >> the
>> > > > >> > > reason of
>> > > > >> > > > > >> >>>>> rejection.
>> > > > >> > > > > >> >>>>>
>> > > > >> > > > > >> >>>>>
>> > > > >> > > > > >> >>>>>>
>> > > > >> > > > > >> >>>>>> Disk failure is the "easy" case.  The "hard"
>> case,
>> > > > >> which is
>> > > > >> > > > > >> >>>>>> unfortunately also the much more common case,
>> is
>> > > disk
>> > > > >> > > misbehavior.
>> > > > >> > > > > >> >>>>>> Towards the end of their lives, disks tend to
>> > start
>> > > > >> slowing
>> > > > >> > > down
>> > > > >> > > > > >> >>>>>> unpredictably.  Requests that would have
>> completed
>> > > > >> > > immediately
>> > > > >> > > > > >> before
>> > > > >> > > > > >> >>>>>> start taking 20, 100 500 milliseconds.  Some
>> files
>> > > may
>> > > > >> be
>> > > > >> > > readable
>> > > > >> > > > > >> and
>> > > > >> > > > > >> >>>>>> other files may not be.  System calls hang,
>> > > sometimes
>> > > > >> > > forever, and
>> > > > >> > > > > >> the
>> > > > >> > > > > >> >>>>>> Java process can't abort them, because the
>> hang is
>> > > in
>> > > > >> the
>> > > > >> > > kernel.
>> > > > >> > > > > >> It
>> > > > >> > > > > >> >>>> is
>> > > > >> > > > > >> >>>>>> not fun when threads are stuck in "D state"
>> > > > >> > > > > >> >>>>>> http://stackoverflow.com/quest
>> > > > >> ions/20423521/process-perminan
>> > > > >> > > > > >> >>>>>> tly-stuck-on-d-state
>> > > > >> > > > > >> >>>>>> .  Even kill -9 cannot abort the thread then.
>> > > > >> Fortunately,
>> > > > >> > > this is
>> > > > >> > > > > >> >>>>>> rare.
>> > > > >> > > > > >> >>>>>>
>> > > > >> > > > > >> >>>>>
>> > > > >> > > > > >> >>>>> I agree it is a harder problem and it is rare.
>> We
>> > > > >> probably
>> > > > >> > > don't
>> > > > >> > > > > >> have
>> > > > >> > > > > >> >> to
>> > > > >> > > > > >> >>>>> worry about it in this KIP since this issue is
>> > > > >> orthogonal to
>> > > > >> > > > > >> whether or
>> > > > >> > > > > >> >>>> not
>> > > > >> > > > > >> >>>>> we use JBOD.
>> > > > >> > > > > >> >>>>>
>> > > > >> > > > > >> >>>>>
>> > > > >> > > > > >> >>>>>>
>> > > > >> > > > > >> >>>>>> Another approach we should consider is for
>> Kafka
>> > to
>> > > > >> > > implement its
>> > > > >> > > > > >> own
>> > > > >> > > > > >> >>>>>> storage layer that would stripe across multiple
>> > > disks.
>> > > > >> This
>> > > > >> > > > > >> wouldn't
>> > > > >> > > > > >> >>>>>> have to be done at the block level, but could
>> be
>> > > done
>> > > > >> at the
>> > > > >> > > file
>> > > > >> > > > > >> >>>> level.
>> > > > >> > > > > >> >>>>>> We could use consistent hashing to determine
>> which
>> > > > >> disks a
>> > > > >> > > file
>> > > > >> > > > > >> should
>> > > > >> > > > > >> >>>>>> end up on, for example.
>> > > > >> > > > > >> >>>>>>
>> > > > >> > > > > >> >>>>>
>> > > > >> > > > > >> >>>>> Are you suggesting that we should distribute
>> log,
>> > or
>> > > > log
>> > > > >> > > segment,
>> > > > >> > > > > >> >> across
>> > > > >> > > > > >> >>>>> disks of brokers? I am not sure if I fully
>> > understand
>> > > > >> this
>> > > > >> > > > > >> approach. My
>> > > > >> > > > > >> >>>> gut
>> > > > >> > > > > >> >>>>> feel is that this would be a drastic solution
>> that
>> > > > would
>> > > > >> > > require
>> > > > >> > > > > >> >>>>> non-trivial design. While this may be useful to
>> > > Kafka,
>> > > > I
>> > > > >> would
>> > > > >> > > > > >> prefer
>> > > > >> > > > > >> >> not
>> > > > >> > > > > >> >>>>> to discuss this in detail in this thread unless
>> you
>> > > > >> believe
>> > > > >> > > it is
>> > > > >> > > > > >> >>>> strictly
>> > > > >> > > > > >> >>>>> superior to the design in this KIP in terms of
>> > > solving
>> > > > >> our
>> > > > >> > > use-case.
>> > > > >> > > > > >> >>>>>
>> > > > >> > > > > >> >>>>>
>> > > > >> > > > > >> >>>>>> best,
>> > > > >> > > > > >> >>>>>> Colin
>> > > > >> > > > > >> >>>>>>
>> > > > >> > > > > >> >>>>>>>
>> > > > >> > > > > >> >>>>>>> Thanks,
>> > > > >> > > > > >> >>>>>>> Dong
>> > > > >> > > > > >> >>>>>>>
>> > > > >> > > > > >> >>>>>>> On Wed, Jan 25, 2017 at 11:34 AM, Colin
>> McCabe <
>> > > > >> > > > > >> cmcc...@apache.org>
>> > > > >> > > > > >> >>>>>>> wrote:
>> > > > >> > > > > >> >>>>>>>
>> > > > >> > > > > >> >>>>>>>> Hi Dong,
>> > > > >> > > > > >> >>>>>>>>
>> > > > >> > > > > >> >>>>>>>> Thanks for the writeup!  It's very
>> interesting.
>> > > > >> > > > > >> >>>>>>>>
>> > > > >> > > > > >> >>>>>>>> I apologize in advance if this has been
>> > discussed
>> > > > >> > > somewhere else.
>> > > > >> > > > > >> >>>>> But
>> > > > >> > > > > >> >>>>>> I
>> > > > >> > > > > >> >>>>>>>> am curious if you have considered the
>> solution
>> > of
>> > > > >> running
>> > > > >> > > > > >> multiple
>> > > > >> > > > > >> >>>>>>>> brokers per node.  Clearly there is a memory
>> > > > overhead
>> > > > >> with
>> > > > >> > > this
>> > > > >> > > > > >> >>>>>> solution
>> > > > >> > > > > >> >>>>>>>> because of the fixed cost of starting
>> multiple
>> > > JVMs.
>> > > > >> > > However,
>> > > > >> > > > > >> >>>>> running
>> > > > >> > > > > >> >>>>>>>> multiple JVMs would help avoid scalability
>> > > > >> bottlenecks.
>> > > > >> > > You
>> > > > >> > > > > >> could
>> > > > >> > > > > >> >>>>>>>> probably push more RPCs per second, for
>> example.
>> > > A
>> > > > >> garbage
>> > > > >> > > > > >> >>>>> collection
>> > > > >> > > > > >> >>>>>>>> in one broker would not affect the others.
>> It
>> > > would
>> > > > >> be
>> > > > >> > > > > >> interesting
>> > > > >> > > > > >> >>>>> to
>> > > > >> > > > > >> >>>>>>>> see this considered in the "alternate
>> designs"
>> > > > design,
>> > > > >> > > even if
>> > > > >> > > > > >> you
>> > > > >> > > > > >> >>>>> end
>> > > > >> > > > > >> >>>>>>>> up deciding it's not the way to go.
>> > > > >> > > > > >> >>>>>>>>
>> > > > >> > > > > >> >>>>>>>> best,
>> > > > >> > > > > >> >>>>>>>> Colin
>> > > > >> > > > > >> >>>>>>>>
>> > > > >> > > > > >> >>>>>>>>
>> > > > >> > > > > >> >>>>>>>> On Thu, Jan 12, 2017, at 10:46, Dong Lin
>> wrote:
>> > > > >> > > > > >> >>>>>>>>> Hi all,
>> > > > >> > > > > >> >>>>>>>>>
>> > > > >> > > > > >> >>>>>>>>> We created KIP-112: Handle disk failure for
>> > JBOD.
>> > > > >> Please
>> > > > >> > > find
>> > > > >> > > > > >> the
>> > > > >> > > > > >> >>>>> KIP
>> > > > >> > > > > >> >>>>>>>>> wiki
>> > > > >> > > > > >> >>>>>>>>> in the link https://cwiki.apache.org/confl
>> > > > >> > > > > >> >>>> uence/display/KAFKA/KIP-
>> > > > >> > > > > >> >>>>>>>>> 112%3A+Handle+disk+failure+for+JBOD.
>> > > > >> > > > > >> >>>>>>>>>
>> > > > >> > > > > >> >>>>>>>>> This KIP is related to KIP-113
>> > > > >> > > > > >> >>>>>>>>> <https://cwiki.apache.org/conf
>> > > > >> luence/display/KAFKA/KIP-
>> > > > >> > > > > >> >>>>>>>> 113%3A+Support+replicas+moveme
>> > > > >> nt+between+log+directories>:
>> > > > >> > > > > >> >>>>>>>>> Support replicas movement between log
>> > > directories.
>> > > > >> They
>> > > > >> > > are
>> > > > >> > > > > >> >>>> needed
>> > > > >> > > > > >> >>>>> in
>> > > > >> > > > > >> >>>>>>>>> order
>> > > > >> > > > > >> >>>>>>>>> to support JBOD in Kafka. Please help review
>> > the
>> > > > >> KIP. You
>> > > > >> > > > > >> >>>> feedback
>> > > > >> > > > > >> >>>>> is
>> > > > >> > > > > >> >>>>>>>>> appreciated!
>> > > > >> > > > > >> >>>>>>>>>
>> > > > >> > > > > >> >>>>>>>>> Thanks,
>> > > > >> > > > > >> >>>>>>>>> Dong
>> > > > >> > > > > >> >>>>>>>>
>> > > > >> > > > > >> >>>>>>
>> > > > >> > > > > >> >>>>>
>> > > > >> > > > > >> >>>>
>> > > > >> > > > > >> >>
>> > > > >> > > > > >> >>
>> > > > >> > > > > >>
>> > > > >> > > > > >>
>> > > > >> > > > > >
>> > > > >> > >
>> > > > >>
>> > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>
>

Reply via email to