Hi Jun,

Thanks for the reply. These comments are very helpful. Let me answer them
inline.


On Mon, Feb 13, 2017 at 3:25 PM, Jun Rao <j...@confluent.io> wrote:

> Hi, Dong,
>
> Thanks for the reply. A few more replies and new comments below.
>
> On Fri, Feb 10, 2017 at 4:27 PM, Dong Lin <lindon...@gmail.com> wrote:
>
> > Hi Jun,
> >
> > Thanks for the detailed comments. Please see answers inline:
> >
> > On Fri, Feb 10, 2017 at 3:08 PM, Jun Rao <j...@confluent.io> wrote:
> >
> > > Hi, Dong,
> > >
> > > Thanks for the updated wiki. A few comments below.
> > >
> > > 1. Topics get created
> > > 1.1 Instead of storing successfully created replicas in ZK, could we
> > store
> > > unsuccessfully created replicas in ZK? Since the latter is less common,
> > it
> > > probably reduces the load on ZK.
> > >
> >
> > We can store unsuccessfully created replicas in ZK. But I am not sure if
> > that can reduce write load on ZK.
> >
> > If we want to reduce write load on ZK using by store unsuccessfully
> created
> > replicas in ZK, then broker should not write to ZK if all replicas are
> > successfully created. It means that if /broker/topics/[topic]/partiti
> > ons/[partitionId]/controller_managed_state doesn't exist in ZK for a
> given
> > partition, we have to assume all replicas of this partition have been
> > successfully created and send LeaderAndIsrRequest with create = false.
> This
> > becomes a problem if controller crashes before receiving
> > LeaderAndIsrResponse to validate whether a replica has been created.
> >
> > I think this approach and reduce the number of bytes stored in ZK. But I
> am
> > not sure if this is a concern.
> >
> >
> >
> I was mostly concerned about the controller failover time. Currently, the
> controller failover is likely dominated by the cost of reading
> topic/partition level information from ZK. If we add another partition
> level path in ZK, it probably will double the controller failover time. If
> the approach of representing the non-created replicas doesn't work, have
> you considered just adding the created flag in the leaderAndIsr path in ZK?
>
>
Yes, I have considered adding the created flag in the leaderAndIsr path in
ZK. If we were to add created flag per replica in the LeaderAndIsrRequest,
then it requires a lot of change in the code base.

If we don't add created flag per replica in the LeaderAndIsrRequest, then
the information in leaderAndIsr path in ZK and LeaderAndIsrRequest would be
different. Further, the procedure for broker to update ISR in ZK will be a
bit complicated. When leader updates leaderAndIsr path in ZK, it will have
to first read created flags from ZK, change isr, and write leaderAndIsr
back to ZK. And it needs to check znode version and re-try write operation
in ZK if controller has updated ZK during this period. This is in contrast
to the current implementation where the leader either gets all the
information from LeaderAndIsrRequest sent by controller, or determine the
infromation by itself (e.g. ISR), before writing to leaderAndIsr path in ZK.

It seems to me that the above solution is a bit complicated and not clean.
Thus I come up with the design in this KIP to store this created flag in a
separate zk path. The path is named controller_managed_state to indicate
that we can store in this znode all information that is managed by
controller only, as opposed to ISR.

I agree with your concern of increased ZK read time during controller
failover. How about we store the "created" information in the
znode /brokers/topics/[topic]? We can change that znode to have the
following data format:

{
  "version" : 2,
  "created" : {
    "1" : [1, 2, 3],
    ...
  }
  "partition" : {
    "1" : [1, 2, 3],
    ...
  }
}

We won't have extra zk read using this solution. It also seems reasonable
to put the partition assignment information together with replica creation
information. The latter is only changed once after the partition is created
or re-assigned.


>
>
> >
> > > 1.2 If an error is received for a follower, does the controller eagerly
> > > remove it from ISR or do we just let the leader removes it after
> timeout?
> > >
> >
> > No, Controller will not actively remove it from ISR. But controller will
> > recognize it as offline replica and propagate this information to all
> > brokers via UpdateMetadataRequest. Each leader can use this information
> to
> > actively remove offline replica from ISR set. I have updated to wiki to
> > clarify it.
> >
> >
>
> That seems inconsistent with how the controller deals with offline replicas
> due to broker failures. When that happens, the broker will (1) select a new
> leader if the offline replica is the leader; (2) remove the replica from
> ISR if the offline replica is the follower. So, intuitively, it seems that
> we should be doing the same thing when dealing with offline replicas due to
> disk failure.
>

My bad. I misunderstand how the controller currently handles broker failure
and ISR change. Yes we should do the same thing when dealing with offline
replicas here. I have updated the KIP to specify that, when an offline
replica is discovered by controller, the controller removes offline
replicas from ISR in the ZK and sends LeaderAndIsrRequest with updated ISR
to be used by partition leaders.


>
>
>
> >
> > > 1.3 Similar, if an error is received for a leader, should the
> controller
> > > trigger leader election again?
> > >
> >
> > Yes, controller will trigger leader election if leader replica is
> offline.
> > I have updated the wiki to clarify it.
> >
> >
> > >
> > > 2. A log directory stops working on a broker during runtime:
> > > 2.1 It seems the broker remembers the failed directory after hitting an
> > > IOException and the failed directory won't be used for creating new
> > > partitions until the broker is restarted? If so, could you add that to
> > the
> > > wiki.
> > >
> >
> > Right, broker assumes a log directory to be good after it starts, and
> mark
> > log directory as bad once there is IOException when broker attempts to
> > access the log directory. New replicas will only be created on good log
> > directory. I just added this to the KIP.
> >
> >
> > > 2.2 Could you be a bit more specific on how and during which operation
> > the
> > > broker detects directory failure? Is it when the broker hits an
> > IOException
> > > during writes, or both reads and writes?  For example, during broker
> > > startup, it only reads from each of the log directories, if it hits an
> > > IOException there, does the broker immediately mark the directory as
> > > offline?
> > >
> >
> > Broker marks log directory as bad once there is IOException when broker
> > attempts to access the log directory. This includes read and write. These
> > operations include log append, log read, log cleaning, watermark
> checkpoint
> > etc. If broker hits IOException when it reads from each of the log
> > directory during startup, it immediately mark the directory as offline.
> >
> > I just updated the KIP to clarify it.
> >
> >
> > > 3. Partition reassignment: If we know a replica is offline, do we still
> > > want to send StopReplicaRequest to it?
> > >
> >
> > No, controller doesn't send StopReplicaRequest for an offline replica.
> > Controller treats this scenario in the same way that exiting Kafka
> > implementation does when the broker of this replica is offline.
> >
> >
> > >
> > > 4. UpdateMetadataRequestPartitionState: For offline_replicas, do they
> > only
> > > include offline replicas due to log directory failures or do they also
> > > include offline replicas due to broker failure?
> > >
> >
> > UpdateMetadataRequestPartitionState's offline_replicas include offline
> > replicas due to both log directory failure and broker failure. This is to
> > make the semantics of this field easier to understand. Broker can
> > distinguish whether a replica is offline due to broker failure or disk
> > failure by checking whether a broker is live in the
> UpdateMetadataRequest.
> >
> >
> > >
> > > 5. Tools: Could we add some kind of support in the tool to list offline
> > > directories?
> > >
> >
> > In KIP-112 we don't have tools to list offline directories because we
> have
> > intentionally avoided exposing log directory information (e.g. log
> > directory path) to user or other brokers. I think we can add this feature
> > in KIP-113, in which we will have DescribeDirsRequest to list log
> directory
> > information (e.g. partition assignment, path, size) needed for rebalance.
> >
> >
> Since we are introducing a new failure mode, if a replica becomes offline
> due to failure in log directories, the first thing an admin wants to know
> is which log directories are offline from the broker's perspective.  So,
> including such a tool will be useful. Do you plan to do KIP-112 and KIP-113
>  in the same release?
>
>
Yes, I agree that including such a tool is using. This is probably better
to be added in KIP-113 because we need DescribeDirsRequest to get this
information. I will update KIP-113 to include this tool.

I plan to do KIP-112 and KIP-113 separately to make each KIP and their
patch easier to review. I don't have any plan about which release to have
these KIPs. My plan is to both of them ASAP. Is there particular timeline
you prefer for code of these two KIPs to checked-in?


> >
> > >
> > > 6. Metrics: Could we add some metrics to show offline directories?
> > >
> >
> > Sure. I think it makes sense to have each broker report its number of
> > offline replicas and offline log directories. The previous metric was put
> > in KIP-113. I just added both metrics in KIP-112.
> >
> >
> > >
> > > 7. There are still references to kafka-log-dirs.sh. Are they valid?
> > >
> >
> > My bad. I just removed this from "Changes in Operational Procedures" and
> > "Test Plan" in the KIP.
> >
> >
> > >
> > > 8. Do you think KIP-113 is ready for review? One thing that KIP-113
> > > mentions during partition reassignment is to first send
> > > LeaderAndIsrRequest, followed by ChangeReplicaDirRequest. It seems it's
> > > better if the replicas are created in the right log directory in the
> > first
> > > place? The reason that I brought it up here is because it may affect
> the
> > > protocol of LeaderAndIsrRequest.
> > >
> >
> > Yes, KIP-113 is ready for review. The advantage of the current design is
> > that we can keep LeaderAndIsrRequest log-direcotry-agnostic. The
> > implementation would be much easier to read if all log related logic
> (e.g.
> > various errors) are put in ChangeReplicadIRrequest and the code path of
> > handling replica movement is separated from leadership handling.
> >
> > In other words, I think Kafka may be easier to develop in the long term
> if
> > we separate these two requests.
> >
> > I agree that ideally we want to create replicas in the right log
> directory
> > in the first place. But I am not sure if there is any performance or
> > correctness concern with the existing way of moving it after it is
> created.
> > Besides, does this decision affect the change proposed in KIP-112?
> >
> >
> I am just wondering if you have considered including the log directory for
> the replicas in the LeaderAndIsrRequest.
>
>
Yeah I have thought about this idea, but only briefly. I rejected this idea
because log directory is broker's local information and I prefer not to
expose local config information to the cluster through LeaderAndIsrRequest.


> 9. Could you describe when the offline replicas due to log directory
> failure are removed from the replica fetch threads?
>

Yes. If the offline replica was a leader, either a new leader is elected or
all follower brokers will stop fetching for this partition. If the offline
replica is a follower, the broker will stop fetching for this replica
immediately. A broker stops fetching data for a replica by removing the
replica from the replica fetch threads. I have updated the KIP to clarify
it.


>
> 10. The wiki mentioned changing the log directory to a file for simulating
> disk failure in system tests. Could we just change the permission of the
> log directory to 000 to simulate that?
>


Sure,


>
> Thanks,
>
> Jun
>
>
> > > Jun
> > >
> > > On Fri, Feb 10, 2017 at 9:53 AM, Dong Lin <lindon...@gmail.com> wrote:
> > >
> > > > Hi Jun,
> > > >
> > > > Can I replace zookeeper access with direct RPC for both ISR
> > notification
> > > > and disk failure notification in a future KIP, or do you feel we
> should
> > > do
> > > > it in this KIP?
> > > >
> > > > Hi Eno, Grant and everyone,
> > > >
> > > > Is there further improvement you would like to see with this KIP?
> > > >
> > > > Thanks you all for the comments,
> > > >
> > > > Dong
> > > >
> > > >
> > > >
> > > > On Thu, Feb 9, 2017 at 4:45 PM, Dong Lin <lindon...@gmail.com>
> wrote:
> > > >
> > > > >
> > > > >
> > > > > On Thu, Feb 9, 2017 at 3:37 PM, Colin McCabe <cmcc...@apache.org>
> > > wrote:
> > > > >
> > > > >> On Thu, Feb 9, 2017, at 11:40, Dong Lin wrote:
> > > > >> > Thanks for all the comments Colin!
> > > > >> >
> > > > >> > To answer your questions:
> > > > >> > - Yes, a broker will shutdown if all its log directories are
> bad.
> > > > >>
> > > > >> That makes sense.  Can you add this to the writeup?
> > > > >>
> > > > >
> > > > > Sure. This has already been added. You can find it here
> > > > > <https://cwiki.apache.org/confluence/pages/diffpagesbyversio
> n.action
> > ?
> > > > pageId=67638402&selectedPageVersions=9&selectedPageVersions=10>
> > > > > .
> > > > >
> > > > >
> > > > >>
> > > > >> > - I updated the KIP to explicitly state that a log directory
> will
> > be
> > > > >> > assumed to be good until broker sees IOException when it tries
> to
> > > > access
> > > > >> > the log directory.
> > > > >>
> > > > >> Thanks.
> > > > >>
> > > > >> > - Controller doesn't explicitly know whether there is new log
> > > > directory
> > > > >> > or
> > > > >> > not. All controller knows is whether replicas are online or
> > offline
> > > > >> based
> > > > >> > on LeaderAndIsrResponse. According to the existing Kafka
> > > > implementation,
> > > > >> > controller will always send LeaderAndIsrRequest to a broker
> after
> > it
> > > > >> > bounces.
> > > > >>
> > > > >> I thought so.  It's good to clarify, though.  Do you think it's
> > worth
> > > > >> adding a quick discussion of this on the wiki?
> > > > >>
> > > > >
> > > > > Personally I don't think it is needed. If broker starts with no bad
> > log
> > > > > directory, everything should work it is and we should not need to
> > > clarify
> > > > > it. The KIP has already covered the scenario when a broker starts
> > with
> > > > bad
> > > > > log directory. Also, the KIP doesn't claim or hint that we support
> > > > dynamic
> > > > > addition of new log directories. I think we are good.
> > > > >
> > > > >
> > > > >> best,
> > > > >> Colin
> > > > >>
> > > > >> >
> > > > >> > Please see this
> > > > >> > <https://cwiki.apache.org/confluence/pages/diffpagesbyversio
> > > > >> n.action?pageId=67638402&selectedPageVersions=9&
> > > > selectedPageVersions=10>
> > > > >> > for the change of the KIP.
> > > > >> >
> > > > >> > On Thu, Feb 9, 2017 at 11:04 AM, Colin McCabe <
> cmcc...@apache.org
> > >
> > > > >> wrote:
> > > > >> >
> > > > >> > > On Thu, Feb 9, 2017, at 11:03, Colin McCabe wrote:
> > > > >> > > > Thanks, Dong L.
> > > > >> > > >
> > > > >> > > > Do we plan on bringing down the broker process when all log
> > > > >> directories
> > > > >> > > > are offline?
> > > > >> > > >
> > > > >> > > > Can you explicitly state on the KIP that the log dirs are
> all
> > > > >> considered
> > > > >> > > > good after the broker process is bounced?  It seems like an
> > > > >> important
> > > > >> > > > thing to be clear about.  Also, perhaps discuss how the
> > > controller
> > > > >> > > > becomes aware of the newly good log directories after a
> broker
> > > > >> bounce
> > > > >> > > > (and whether this triggers re-election).
> > > > >> > >
> > > > >> > > I meant to write, all the log dirs where the broker can still
> > read
> > > > the
> > > > >> > > index and some other files.  Clearly, log dirs that are
> > completely
> > > > >> > > inaccessible will still be considered bad after a broker
> process
> > > > >> bounce.
> > > > >> > >
> > > > >> > > best,
> > > > >> > > Colin
> > > > >> > >
> > > > >> > > >
> > > > >> > > > +1 (non-binding) aside from that
> > > > >> > > >
> > > > >> > > >
> > > > >> > > >
> > > > >> > > > On Wed, Feb 8, 2017, at 00:47, Dong Lin wrote:
> > > > >> > > > > Hi all,
> > > > >> > > > >
> > > > >> > > > > Thank you all for the helpful suggestion. I have updated
> the
> > > KIP
> > > > >> to
> > > > >> > > > > address
> > > > >> > > > > the comments received so far. See here
> > > > >> > > > > <https://cwiki.apache.org/conf
> > luence/pages/diffpagesbyversio
> > > > >> n.action?
> > > > >> > > pageId=67638402&selectedPageVersions=8&selectedPageVersions=
> > 9>to
> > > > >> > > > > read the changes of the KIP. Here is a summary of change:
> > > > >> > > > >
> > > > >> > > > > - Updated the Proposed Change section to change the
> recovery
> > > > >> steps.
> > > > >> > > After
> > > > >> > > > > this change, broker will also create replica as long as
> all
> > > log
> > > > >> > > > > directories
> > > > >> > > > > are working.
> > > > >> > > > > - Removed kafka-log-dirs.sh from this KIP since user no
> > longer
> > > > >> needs to
> > > > >> > > > > use
> > > > >> > > > > it for recovery from bad disks.
> > > > >> > > > > - Explained how the znode controller_managed_state is
> > managed
> > > in
> > > > >> the
> > > > >> > > > > Public
> > > > >> > > > > interface section.
> > > > >> > > > > - Explained what happens during controller failover,
> > partition
> > > > >> > > > > reassignment
> > > > >> > > > > and topic deletion in the Proposed Change section.
> > > > >> > > > > - Updated Future Work section to include the following
> > > potential
> > > > >> > > > > improvements
> > > > >> > > > >   - Let broker notify controller of ISR change and disk
> > state
> > > > >> change
> > > > >> > > via
> > > > >> > > > > RPC instead of using zookeeper
> > > > >> > > > >   - Handle various failure scenarios (e.g. slow disk) on a
> > > > >> case-by-case
> > > > >> > > > > basis. For example, we may want to detect slow disk and
> > > consider
> > > > >> it as
> > > > >> > > > > offline.
> > > > >> > > > >   - Allow admin to mark a directory as bad so that it will
> > not
> > > > be
> > > > >> used.
> > > > >> > > > >
> > > > >> > > > > Thanks,
> > > > >> > > > > Dong
> > > > >> > > > >
> > > > >> > > > >
> > > > >> > > > >
> > > > >> > > > > On Tue, Feb 7, 2017 at 5:23 PM, Dong Lin <
> > lindon...@gmail.com
> > > >
> > > > >> wrote:
> > > > >> > > > >
> > > > >> > > > > > Hey Eno,
> > > > >> > > > > >
> > > > >> > > > > > Thanks much for the comment!
> > > > >> > > > > >
> > > > >> > > > > > I still think the complexity added to Kafka is justified
> > by
> > > > its
> > > > >> > > benefit.
> > > > >> > > > > > Let me provide my reasons below.
> > > > >> > > > > >
> > > > >> > > > > > 1) The additional logic is easy to understand and thus
> its
> > > > >> complexity
> > > > >> > > > > > should be reasonable.
> > > > >> > > > > >
> > > > >> > > > > > On the broker side, it needs to catch exception when
> > access
> > > > log
> > > > >> > > directory,
> > > > >> > > > > > mark log directory and all its replicas as offline,
> notify
> > > > >> > > controller by
> > > > >> > > > > > writing the zookeeper notification path, and specify
> error
> > > in
> > > > >> > > > > > LeaderAndIsrResponse. On the controller side, it will
> > > listener
> > > > >> to
> > > > >> > > > > > zookeeper for disk failure notification, learn about
> > offline
> > > > >> > > replicas in
> > > > >> > > > > > the LeaderAndIsrResponse, and take offline replicas into
> > > > >> > > consideration when
> > > > >> > > > > > electing leaders. It also mark replica as created in
> > > zookeeper
> > > > >> and
> > > > >> > > use it
> > > > >> > > > > > to determine whether a replica is created.
> > > > >> > > > > >
> > > > >> > > > > > That is all the logic we need to add in Kafka. I
> > personally
> > > > feel
> > > > >> > > this is
> > > > >> > > > > > easy to reason about.
> > > > >> > > > > >
> > > > >> > > > > > 2) The additional code is not much.
> > > > >> > > > > >
> > > > >> > > > > > I expect the code for KIP-112 to be around 1100 lines
> new
> > > > code.
> > > > >> > > Previously
> > > > >> > > > > > I have implemented a prototype of a slightly different
> > > design
> > > > >> (see
> > > > >> > > here
> > > > >> > > > > > <https://docs.google.com/document/d/1Izza0SBmZMVUBUt9s_
> > > > >> > > -Dqi3D8e0KGJQYW8xgEdRsgAI/edit>)
> > > > >> > > > > > and uploaded it to github (see here
> > > > >> > > > > > <https://github.com/lindong28/kafka/tree/JBOD>). The
> > patch
> > > > >> changed
> > > > >> > > 33
> > > > >> > > > > > files, added 1185 lines and deleted 183 lines. The size
> of
> > > > >> prototype
> > > > >> > > patch
> > > > >> > > > > > is actually smaller than patch of KIP-107 (see here
> > > > >> > > > > > <https://github.com/apache/kafka/pull/2476>) which is
> > > already
> > > > >> > > accepted.
> > > > >> > > > > > The KIP-107 patch changed 49 files, added 1349 lines and
> > > > >> deleted 141
> > > > >> > > lines.
> > > > >> > > > > >
> > > > >> > > > > > 3) Comparison with one-broker-per-multiple-volumes
> > > > >> > > > > >
> > > > >> > > > > > This KIP can improve the availability of Kafka in this
> > case
> > > > such
> > > > >> > > that one
> > > > >> > > > > > failed volume doesn't bring down the entire broker.
> > > > >> > > > > >
> > > > >> > > > > > 4) Comparison with one-broker-per-volume
> > > > >> > > > > >
> > > > >> > > > > > If each volume maps to multiple disks, then we still
> have
> > > > >> similar
> > > > >> > > problem
> > > > >> > > > > > such that the broker will fail if any disk of the volume
> > > > failed.
> > > > >> > > > > >
> > > > >> > > > > > If each volume maps to one disk, it means that we need
> to
> > > > >> deploy 10
> > > > >> > > > > > brokers on a machine if the machine has 10 disks. I will
> > > > >> explain the
> > > > >> > > > > > concern with this approach in order of their importance.
> > > > >> > > > > >
> > > > >> > > > > > - It is weird if we were to tell kafka user to deploy 50
> > > > >> brokers on a
> > > > >> > > > > > machine of 50 disks.
> > > > >> > > > > >
> > > > >> > > > > > - Either when user deploys Kafka on a commercial cloud
> > > > platform
> > > > >> or
> > > > >> > > when
> > > > >> > > > > > user deploys their own cluster, the size or largest disk
> > is
> > > > >> usually
> > > > >> > > > > > limited. There will be scenarios where user want to
> > increase
> > > > >> broker
> > > > >> > > > > > capacity by having multiple disks per broker. This JBOD
> > KIP
> > > > >> makes it
> > > > >> > > > > > feasible without hurting availability due to single disk
> > > > >> failure.
> > > > >> > > > > >
> > > > >> > > > > > - Automatic load rebalance across disks will be easier
> and
> > > > more
> > > > >> > > flexible
> > > > >> > > > > > if one broker has multiple disks. This can be future
> work.
> > > > >> > > > > >
> > > > >> > > > > > - There is performance concern when you deploy 10 broker
> > > vs. 1
> > > > >> > > broker on
> > > > >> > > > > > one machine. The metadata the cluster, including
> > > FetchRequest,
> > > > >> > > > > > ProduceResponse, MetadataRequest and so on will all be
> 10X
> > > > >> more. The
> > > > >> > > > > > packet-per-second will be 10X higher which may limit
> > > > >> performance if
> > > > >> > > pps is
> > > > >> > > > > > the performance bottleneck. The number of socket on the
> > > > machine
> > > > >> is
> > > > >> > > 10X
> > > > >> > > > > > higher. And the number of replication thread will be
> 100X
> > > > more.
> > > > >> The
> > > > >> > > impact
> > > > >> > > > > > will be more significant with increasing number of disks
> > per
> > > > >> > > machine. Thus
> > > > >> > > > > > it will limit Kakfa's scalability in the long term.
> > > > >> > > > > >
> > > > >> > > > > > Thanks,
> > > > >> > > > > > Dong
> > > > >> > > > > >
> > > > >> > > > > >
> > > > >> > > > > > On Tue, Feb 7, 2017 at 1:51 AM, Eno Thereska <
> > > > >> eno.there...@gmail.com
> > > > >> > > >
> > > > >> > > > > > wrote:
> > > > >> > > > > >
> > > > >> > > > > >> Hi Dong,
> > > > >> > > > > >>
> > > > >> > > > > >> To simplify the discussion today, on my part I'll zoom
> > into
> > > > one
> > > > >> > > thing
> > > > >> > > > > >> only:
> > > > >> > > > > >>
> > > > >> > > > > >> - I'll discuss the options called below :
> > > > >> "one-broker-per-disk" or
> > > > >> > > > > >> "one-broker-per-few-disks".
> > > > >> > > > > >>
> > > > >> > > > > >> - I completely buy the JBOD vs RAID arguments so there
> is
> > > no
> > > > >> need to
> > > > >> > > > > >> discuss that part for me. I buy it that JBODs are good.
> > > > >> > > > > >>
> > > > >> > > > > >> I find the terminology can be improved a bit. Ideally
> > we'd
> > > be
> > > > >> > > talking
> > > > >> > > > > >> about volumes, not disks. Just to make it clear that
> > Kafka
> > > > >> > > understand
> > > > >> > > > > >> volumes/directories, not individual raw disks. So by
> > > > >> > > > > >> "one-broker-per-few-disks" what I mean is that the
> admin
> > > can
> > > > >> pool a
> > > > >> > > few
> > > > >> > > > > >> disks together to create a volume/directory and give
> that
> > > to
> > > > >> Kafka.
> > > > >> > > > > >>
> > > > >> > > > > >>
> > > > >> > > > > >> The kernel of my question will be that the admin
> already
> > > has
> > > > >> tools
> > > > >> > > to 1)
> > > > >> > > > > >> create volumes/directories from a JBOD and 2) start a
> > > broker
> > > > >> on a
> > > > >> > > desired
> > > > >> > > > > >> machine and 3) assign a broker resources like a
> > directory.
> > > I
> > > > >> claim
> > > > >> > > that
> > > > >> > > > > >> those tools are sufficient to optimise resource
> > allocation.
> > > > I
> > > > >> > > understand
> > > > >> > > > > >> that a broker could manage point 3) itself, ie juggle
> the
> > > > >> > > directories. My
> > > > >> > > > > >> question is whether the complexity added to Kafka is
> > > > justified.
> > > > >> > > > > >> Operationally it seems to me an admin will still have
> to
> > do
> > > > >> all the
> > > > >> > > three
> > > > >> > > > > >> items above.
> > > > >> > > > > >>
> > > > >> > > > > >> Looking forward to the discussion
> > > > >> > > > > >> Thanks
> > > > >> > > > > >> Eno
> > > > >> > > > > >>
> > > > >> > > > > >>
> > > > >> > > > > >> > On 1 Feb 2017, at 17:21, Dong Lin <
> lindon...@gmail.com
> > >
> > > > >> wrote:
> > > > >> > > > > >> >
> > > > >> > > > > >> > Hey Eno,
> > > > >> > > > > >> >
> > > > >> > > > > >> > Thanks much for the review.
> > > > >> > > > > >> >
> > > > >> > > > > >> > I think your suggestion is to split disks of a
> machine
> > > into
> > > > >> > > multiple
> > > > >> > > > > >> disk
> > > > >> > > > > >> > sets and run one broker per disk set. Yeah this is
> > > similar
> > > > to
> > > > >> > > Colin's
> > > > >> > > > > >> > suggestion of one-broker-per-disk, which we have
> > > evaluated
> > > > at
> > > > >> > > LinkedIn
> > > > >> > > > > >> and
> > > > >> > > > > >> > considered it to be a good short term approach.
> > > > >> > > > > >> >
> > > > >> > > > > >> > As of now I don't think any of these approach is a
> > better
> > > > >> > > alternative in
> > > > >> > > > > >> > the long term. I will summarize these here. I have
> put
> > > > these
> > > > >> > > reasons in
> > > > >> > > > > >> the
> > > > >> > > > > >> > KIP's motivation section and rejected alternative
> > > section.
> > > > I
> > > > >> am
> > > > >> > > happy to
> > > > >> > > > > >> > discuss more and I would certainly like to use an
> > > > alternative
> > > > >> > > solution
> > > > >> > > > > >> that
> > > > >> > > > > >> > is easier to do with better performance.
> > > > >> > > > > >> >
> > > > >> > > > > >> > - JBOD vs. RAID-10: if we switch from RAID-10 with
> > > > >> > > > > >> replication-factoer=2 to
> > > > >> > > > > >> > JBOD with replicatio-factor=3, we get 25% reduction
> in
> > > disk
> > > > >> usage
> > > > >> > > and
> > > > >> > > > > >> > doubles the tolerance of broker failure before data
> > > > >> > > unavailability from
> > > > >> > > > > >> 1
> > > > >> > > > > >> > to 2. This is pretty huge gain for any company that
> > uses
> > > > >> Kafka at
> > > > >> > > large
> > > > >> > > > > >> > scale.
> > > > >> > > > > >> >
> > > > >> > > > > >> > - JBOD vs. one-broker-per-disk: The benefit of
> > > > >> > > one-broker-per-disk is
> > > > >> > > > > >> that
> > > > >> > > > > >> > no major code change is needed in Kafka. Among the
> > > > >> disadvantage of
> > > > >> > > > > >> > one-broker-per-disk summarized in the KIP and
> previous
> > > > email
> > > > >> with
> > > > >> > > Colin,
> > > > >> > > > > >> > the biggest one is the 15% throughput loss compared
> to
> > > JBOD
> > > > >> and
> > > > >> > > less
> > > > >> > > > > >> > flexibility to balance across disks. Further, it
> > probably
> > > > >> requires
> > > > >> > > > > >> change
> > > > >> > > > > >> > to internal deployment tools at various companies to
> > deal
> > > > >> with
> > > > >> > > > > >> > one-broker-per-disk setup.
> > > > >> > > > > >> >
> > > > >> > > > > >> > - JBOD vs. RAID-0: This is the setup that used at
> > > > Microsoft.
> > > > >> The
> > > > >> > > > > >> problem is
> > > > >> > > > > >> > that a broker becomes unavailable if any disk fail.
> > > Suppose
> > > > >> > > > > >> > replication-factor=2 and there are 10 disks per
> > machine.
> > > > >> Then the
> > > > >> > > > > >> > probability of of any message becomes unavailable due
> > to
> > > > disk
> > > > >> > > failure
> > > > >> > > > > >> with
> > > > >> > > > > >> > RAID-0 is 100X higher than that with JBOD.
> > > > >> > > > > >> >
> > > > >> > > > > >> > - JBOD vs. one-broker-per-few-disks:
> > > > one-broker-per-few-disk
> > > > >> is
> > > > >> > > > > >> somewhere
> > > > >> > > > > >> > between one-broker-per-disk and RAID-0. So it carries
> > an
> > > > >> averaged
> > > > >> > > > > >> > disadvantages of these two approaches.
> > > > >> > > > > >> >
> > > > >> > > > > >> > To answer your question regarding, I think it is
> > > reasonable
> > > > >> to
> > > > >> > > mange
> > > > >> > > > > >> disk
> > > > >> > > > > >> > in Kafka. By "managing disks" we mean the management
> of
> > > > >> > > assignment of
> > > > >> > > > > >> > replicas across disks. Here are my reasons in more
> > > detail:
> > > > >> > > > > >> >
> > > > >> > > > > >> > - I don't think this KIP is a big step change. By
> > > allowing
> > > > >> user to
> > > > >> > > > > >> > configure Kafka to run multiple log directories or
> > disks
> > > as
> > > > >> of
> > > > >> > > now, it
> > > > >> > > > > >> is
> > > > >> > > > > >> > implicit that Kafka manages disks. It is just not a
> > > > complete
> > > > >> > > feature.
> > > > >> > > > > >> > Microsoft and probably other companies are using this
> > > > feature
> > > > >> > > under the
> > > > >> > > > > >> > undesirable effect that a broker will fail any if any
> > > disk
> > > > >> fail.
> > > > >> > > It is
> > > > >> > > > > >> good
> > > > >> > > > > >> > to complete this feature.
> > > > >> > > > > >> >
> > > > >> > > > > >> > - I think it is reasonable to manage disk in Kafka.
> One
> > > of
> > > > >> the
> > > > >> > > most
> > > > >> > > > > >> > important work that Kafka is doing is to determine
> the
> > > > >> replica
> > > > >> > > > > >> assignment
> > > > >> > > > > >> > across brokers and make sure enough copies of a given
> > > > >> replica is
> > > > >> > > > > >> available.
> > > > >> > > > > >> > I would argue that it is not much different than
> > > > determining
> > > > >> the
> > > > >> > > replica
> > > > >> > > > > >> > assignment across disk conceptually.
> > > > >> > > > > >> >
> > > > >> > > > > >> > - I would agree that this KIP is improve performance
> of
> > > > >> Kafka at
> > > > >> > > the
> > > > >> > > > > >> cost
> > > > >> > > > > >> > of more complexity inside Kafka, by switching from
> > > RAID-10
> > > > to
> > > > >> > > JBOD. I
> > > > >> > > > > >> would
> > > > >> > > > > >> > argue that this is a right direction. If we can gain
> > 20%+
> > > > >> > > performance by
> > > > >> > > > > >> > managing NIC in Kafka as compared to existing
> approach
> > > and
> > > > >> other
> > > > >> > > > > >> > alternatives, I would say we should just do it. Such
> a
> > > gain
> > > > >> in
> > > > >> > > > > >> performance,
> > > > >> > > > > >> > or equivalently reduction in cost, can save millions
> of
> > > > >> dollars
> > > > >> > > per year
> > > > >> > > > > >> > for any company running Kafka at large scale.
> > > > >> > > > > >> >
> > > > >> > > > > >> > Thanks,
> > > > >> > > > > >> > Dong
> > > > >> > > > > >> >
> > > > >> > > > > >> >
> > > > >> > > > > >> > On Wed, Feb 1, 2017 at 5:41 AM, Eno Thereska <
> > > > >> > > eno.there...@gmail.com>
> > > > >> > > > > >> wrote:
> > > > >> > > > > >> >
> > > > >> > > > > >> >> I'm coming somewhat late to the discussion,
> apologies
> > > for
> > > > >> that.
> > > > >> > > > > >> >>
> > > > >> > > > > >> >> I'm worried about this proposal. It's moving Kafka
> to
> > a
> > > > >> world
> > > > >> > > where it
> > > > >> > > > > >> >> manages disks. So in a sense, the scope of the KIP
> is
> > > > >> limited,
> > > > >> > > but the
> > > > >> > > > > >> >> direction it sets for Kafka is quite a big step
> > change.
> > > > >> > > Fundamentally
> > > > >> > > > > >> this
> > > > >> > > > > >> >> is about balancing resources for a Kafka broker.
> This
> > > can
> > > > be
> > > > >> > > done by a
> > > > >> > > > > >> >> tool, rather than by changing Kafka. E.g., the tool
> > > would
> > > > >> take a
> > > > >> > > bunch
> > > > >> > > > > >> of
> > > > >> > > > > >> >> disks together, create a volume over them and export
> > > that
> > > > >> to a
> > > > >> > > Kafka
> > > > >> > > > > >> broker
> > > > >> > > > > >> >> (in addition to setting the memory limits for that
> > > broker
> > > > or
> > > > >> > > limiting
> > > > >> > > > > >> other
> > > > >> > > > > >> >> resources). A different bunch of disks can then make
> > up
> > > a
> > > > >> second
> > > > >> > > > > >> volume,
> > > > >> > > > > >> >> and be used by another Kafka broker. This is aligned
> > > with
> > > > >> what
> > > > >> > > Colin is
> > > > >> > > > > >> >> saying (as I understand it).
> > > > >> > > > > >> >>
> > > > >> > > > > >> >> Disks are not the only resource on a machine, there
> > are
> > > > >> several
> > > > >> > > > > >> instances
> > > > >> > > > > >> >> where multiple NICs are used for example. Do we want
> > > fine
> > > > >> grained
> > > > >> > > > > >> >> management of all these resources? I'd argue that
> > opens
> > > us
> > > > >> the
> > > > >> > > system
> > > > >> > > > > >> to a
> > > > >> > > > > >> >> lot of complexity.
> > > > >> > > > > >> >>
> > > > >> > > > > >> >> Thanks
> > > > >> > > > > >> >> Eno
> > > > >> > > > > >> >>
> > > > >> > > > > >> >>
> > > > >> > > > > >> >>> On 1 Feb 2017, at 01:53, Dong Lin <
> > lindon...@gmail.com
> > > >
> > > > >> wrote:
> > > > >> > > > > >> >>>
> > > > >> > > > > >> >>> Hi all,
> > > > >> > > > > >> >>>
> > > > >> > > > > >> >>> I am going to initiate the vote If there is no
> > further
> > > > >> concern
> > > > >> > > with
> > > > >> > > > > >> the
> > > > >> > > > > >> >> KIP.
> > > > >> > > > > >> >>>
> > > > >> > > > > >> >>> Thanks,
> > > > >> > > > > >> >>> Dong
> > > > >> > > > > >> >>>
> > > > >> > > > > >> >>>
> > > > >> > > > > >> >>> On Fri, Jan 27, 2017 at 8:08 PM, radai <
> > > > >> > > radai.rosenbl...@gmail.com>
> > > > >> > > > > >> >> wrote:
> > > > >> > > > > >> >>>
> > > > >> > > > > >> >>>> a few extra points:
> > > > >> > > > > >> >>>>
> > > > >> > > > > >> >>>> 1. broker per disk might also incur more client
> <-->
> > > > >> broker
> > > > >> > > sockets:
> > > > >> > > > > >> >>>> suppose every producer / consumer "talks" to >1
> > > > partition,
> > > > >> > > there's a
> > > > >> > > > > >> >> very
> > > > >> > > > > >> >>>> good chance that partitions that were co-located
> on
> > a
> > > > >> single
> > > > >> > > 10-disk
> > > > >> > > > > >> >> broker
> > > > >> > > > > >> >>>> would now be split between several single-disk
> > broker
> > > > >> > > processes on
> > > > >> > > > > >> the
> > > > >> > > > > >> >> same
> > > > >> > > > > >> >>>> machine. hard to put a multiplier on this, but
> > likely
> > > > >x1.
> > > > >> > > sockets
> > > > >> > > > > >> are a
> > > > >> > > > > >> >>>> limited resource at the OS level and incur some
> > memory
> > > > >> cost
> > > > >> > > (kernel
> > > > >> > > > > >> >>>> buffers)
> > > > >> > > > > >> >>>>
> > > > >> > > > > >> >>>> 2. there's a memory overhead to spinning up a JVM
> > > > >> (compiled
> > > > >> > > code and
> > > > >> > > > > >> >> byte
> > > > >> > > > > >> >>>> code objects etc). if we assume this overhead is
> > ~300
> > > MB
> > > > >> > > (order of
> > > > >> > > > > >> >>>> magnitude, specifics vary) than spinning up 10
> JVMs
> > > > would
> > > > >> lose
> > > > >> > > you 3
> > > > >> > > > > >> GB
> > > > >> > > > > >> >> of
> > > > >> > > > > >> >>>> RAM. not a ton, but non negligible.
> > > > >> > > > > >> >>>>
> > > > >> > > > > >> >>>> 3. there would also be some overhead downstream of
> > > kafka
> > > > >> in any
> > > > >> > > > > >> >> management
> > > > >> > > > > >> >>>> / monitoring / log aggregation system. likely less
> > > than
> > > > >> x10
> > > > >> > > though.
> > > > >> > > > > >> >>>>
> > > > >> > > > > >> >>>> 4. (related to above) - added complexity of
> > > > administration
> > > > >> > > with more
> > > > >> > > > > >> >>>> running instances.
> > > > >> > > > > >> >>>>
> > > > >> > > > > >> >>>> is anyone running kafka with anywhere near 100GB
> > > heaps?
> > > > i
> > > > >> > > thought the
> > > > >> > > > > >> >> point
> > > > >> > > > > >> >>>> was to rely on kernel page cache to do the disk
> > > > buffering
> > > > >> ....
> > > > >> > > > > >> >>>>
> > > > >> > > > > >> >>>> On Thu, Jan 26, 2017 at 11:00 AM, Dong Lin <
> > > > >> > > lindon...@gmail.com>
> > > > >> > > > > >> wrote:
> > > > >> > > > > >> >>>>
> > > > >> > > > > >> >>>>> Hey Colin,
> > > > >> > > > > >> >>>>>
> > > > >> > > > > >> >>>>> Thanks much for the comment. Please see me
> comment
> > > > >> inline.
> > > > >> > > > > >> >>>>>
> > > > >> > > > > >> >>>>> On Thu, Jan 26, 2017 at 10:15 AM, Colin McCabe <
> > > > >> > > cmcc...@apache.org>
> > > > >> > > > > >> >>>> wrote:
> > > > >> > > > > >> >>>>>
> > > > >> > > > > >> >>>>>> On Wed, Jan 25, 2017, at 13:50, Dong Lin wrote:
> > > > >> > > > > >> >>>>>>> Hey Colin,
> > > > >> > > > > >> >>>>>>>
> > > > >> > > > > >> >>>>>>> Good point! Yeah we have actually considered
> and
> > > > >> tested this
> > > > >> > > > > >> >>>> solution,
> > > > >> > > > > >> >>>>>>> which we call one-broker-per-disk. It would
> work
> > > and
> > > > >> should
> > > > >> > > > > >> require
> > > > >> > > > > >> >>>> no
> > > > >> > > > > >> >>>>>>> major change in Kafka as compared to this JBOD
> > KIP.
> > > > So
> > > > >> it
> > > > >> > > would
> > > > >> > > > > >> be a
> > > > >> > > > > >> >>>>> good
> > > > >> > > > > >> >>>>>>> short term solution.
> > > > >> > > > > >> >>>>>>>
> > > > >> > > > > >> >>>>>>> But it has a few drawbacks which makes it less
> > > > >> desirable in
> > > > >> > > the
> > > > >> > > > > >> long
> > > > >> > > > > >> >>>>>>> term.
> > > > >> > > > > >> >>>>>>> Assume we have 10 disks on a machine. Here are
> > the
> > > > >> problems:
> > > > >> > > > > >> >>>>>>
> > > > >> > > > > >> >>>>>> Hi Dong,
> > > > >> > > > > >> >>>>>>
> > > > >> > > > > >> >>>>>> Thanks for the thoughtful reply.
> > > > >> > > > > >> >>>>>>
> > > > >> > > > > >> >>>>>>>
> > > > >> > > > > >> >>>>>>> 1) Our stress test result shows that
> > > > >> one-broker-per-disk
> > > > >> > > has 15%
> > > > >> > > > > >> >>>> lower
> > > > >> > > > > >> >>>>>>> throughput
> > > > >> > > > > >> >>>>>>>
> > > > >> > > > > >> >>>>>>> 2) Controller would need to send 10X as many
> > > > >> > > LeaderAndIsrRequest,
> > > > >> > > > > >> >>>>>>> MetadataUpdateRequest and StopReplicaRequest.
> > This
> > > > >> > > increases the
> > > > >> > > > > >> >>>> burden
> > > > >> > > > > >> >>>>>>> on
> > > > >> > > > > >> >>>>>>> controller which can be the performance
> > bottleneck.
> > > > >> > > > > >> >>>>>>
> > > > >> > > > > >> >>>>>> Maybe I'm misunderstanding something, but there
> > > would
> > > > >> not be
> > > > >> > > 10x as
> > > > >> > > > > >> >>>> many
> > > > >> > > > > >> >>>>>> StopReplicaRequest RPCs, would there?  The other
> > > > >> requests
> > > > >> > > would
> > > > >> > > > > >> >>>> increase
> > > > >> > > > > >> >>>>>> 10x, but from a pretty low base, right?  We are
> > not
> > > > >> > > reassigning
> > > > >> > > > > >> >>>>>> partitions all the time, I hope (or else we have
> > > > bigger
> > > > >> > > > > >> problems...)
> > > > >> > > > > >> >>>>>>
> > > > >> > > > > >> >>>>>
> > > > >> > > > > >> >>>>> I think the controller will group
> > StopReplicaRequest
> > > > per
> > > > >> > > broker and
> > > > >> > > > > >> >> send
> > > > >> > > > > >> >>>>> only one StopReplicaRequest to a broker during
> > > > controlled
> > > > >> > > shutdown.
> > > > >> > > > > >> >>>> Anyway,
> > > > >> > > > > >> >>>>> we don't have to worry about this if we agree
> that
> > > > other
> > > > >> > > requests
> > > > >> > > > > >> will
> > > > >> > > > > >> >>>>> increase by 10X. One MetadataRequest to send to
> > each
> > > > >> broker
> > > > >> > > in the
> > > > >> > > > > >> >>>> cluster
> > > > >> > > > > >> >>>>> every time there is leadership change. I am not
> > sure
> > > > >> this is
> > > > >> > > a real
> > > > >> > > > > >> >>>>> problem. But in theory this makes the overhead
> > > > complexity
> > > > >> > > O(number
> > > > >> > > > > >> of
> > > > >> > > > > >> >>>>> broker) and may be a concern in the future.
> Ideally
> > > we
> > > > >> should
> > > > >> > > avoid
> > > > >> > > > > >> it.
> > > > >> > > > > >> >>>>>
> > > > >> > > > > >> >>>>>
> > > > >> > > > > >> >>>>>>
> > > > >> > > > > >> >>>>>>>
> > > > >> > > > > >> >>>>>>> 3) Less efficient use of physical resource on
> the
> > > > >> machine.
> > > > >> > > The
> > > > >> > > > > >> number
> > > > >> > > > > >> >>>>> of
> > > > >> > > > > >> >>>>>>> socket on each machine will increase by 10X.
> The
> > > > >> number of
> > > > >> > > > > >> connection
> > > > >> > > > > >> >>>>>>> between any two machine will increase by 100X.
> > > > >> > > > > >> >>>>>>>
> > > > >> > > > > >> >>>>>>> 4) Less efficient way to management memory and
> > > quota.
> > > > >> > > > > >> >>>>>>>
> > > > >> > > > > >> >>>>>>> 5) Rebalance between disks/brokers on the same
> > > > machine
> > > > >> will
> > > > >> > > less
> > > > >> > > > > >> >>>>>>> efficient
> > > > >> > > > > >> >>>>>>> and less flexible. Broker has to read data from
> > > > another
> > > > >> > > broker on
> > > > >> > > > > >> the
> > > > >> > > > > >> >>>>>>> same
> > > > >> > > > > >> >>>>>>> machine via socket. It is also harder to do
> > > automatic
> > > > >> load
> > > > >> > > balance
> > > > >> > > > > >> >>>>>>> between
> > > > >> > > > > >> >>>>>>> disks on the same machine in the future.
> > > > >> > > > > >> >>>>>>>
> > > > >> > > > > >> >>>>>>> I will put this and the explanation in the
> > rejected
> > > > >> > > alternative
> > > > >> > > > > >> >>>>> section.
> > > > >> > > > > >> >>>>>>> I
> > > > >> > > > > >> >>>>>>> have a few questions:
> > > > >> > > > > >> >>>>>>>
> > > > >> > > > > >> >>>>>>> - Can you explain why this solution can help
> > avoid
> > > > >> > > scalability
> > > > >> > > > > >> >>>>>>> bottleneck?
> > > > >> > > > > >> >>>>>>> I actually think it will exacerbate the
> > scalability
> > > > >> problem
> > > > >> > > due
> > > > >> > > > > >> the
> > > > >> > > > > >> >>>> 2)
> > > > >> > > > > >> >>>>>>> above.
> > > > >> > > > > >> >>>>>>> - Why can we push more RPC with this solution?
> > > > >> > > > > >> >>>>>>
> > > > >> > > > > >> >>>>>> To really answer this question we'd have to
> take a
> > > > deep
> > > > >> dive
> > > > >> > > into
> > > > >> > > > > >> the
> > > > >> > > > > >> >>>>>> locking of the broker and figure out how
> > effectively
> > > > it
> > > > >> can
> > > > >> > > > > >> >> parallelize
> > > > >> > > > > >> >>>>>> truly independent requests.  Almost every
> > > > multithreaded
> > > > >> > > process is
> > > > >> > > > > >> >>>> going
> > > > >> > > > > >> >>>>>> to have shared state, like shared queues or
> shared
> > > > >> sockets,
> > > > >> > > that is
> > > > >> > > > > >> >>>>>> going to make scaling less than linear when you
> > add
> > > > >> disks or
> > > > >> > > > > >> >>>> processors.
> > > > >> > > > > >> >>>>>> (And clearly, another option is to improve that
> > > > >> scalability,
> > > > >> > > rather
> > > > >> > > > > >> >>>>>> than going multi-process!)
> > > > >> > > > > >> >>>>>>
> > > > >> > > > > >> >>>>>
> > > > >> > > > > >> >>>>> Yeah I also think it is better to improve
> > scalability
> > > > >> inside
> > > > >> > > kafka
> > > > >> > > > > >> code
> > > > >> > > > > >> >>>> if
> > > > >> > > > > >> >>>>> possible. I am not sure we currently have any
> > > > scalability
> > > > >> > > issue
> > > > >> > > > > >> inside
> > > > >> > > > > >> >>>>> Kafka that can not be removed without using
> > > > >> multi-process.
> > > > >> > > > > >> >>>>>
> > > > >> > > > > >> >>>>>
> > > > >> > > > > >> >>>>>>
> > > > >> > > > > >> >>>>>>> - It is true that a garbage collection in one
> > > broker
> > > > >> would
> > > > >> > > not
> > > > >> > > > > >> affect
> > > > >> > > > > >> >>>>>>> others. But that is after every broker only
> uses
> > > 1/10
> > > > >> of the
> > > > >> > > > > >> memory.
> > > > >> > > > > >> >>>>> Can
> > > > >> > > > > >> >>>>>>> we be sure that this will actually help
> > > performance?
> > > > >> > > > > >> >>>>>>
> > > > >> > > > > >> >>>>>> The big question is, how much memory do Kafka
> > > brokers
> > > > >> use
> > > > >> > > now, and
> > > > >> > > > > >> how
> > > > >> > > > > >> >>>>>> much will they use in the future?  Our
> experience
> > in
> > > > >> HDFS
> > > > >> > > was that
> > > > >> > > > > >> >> once
> > > > >> > > > > >> >>>>>> you start getting more than 100-200GB Java heap
> > > sizes,
> > > > >> full
> > > > >> > > GCs
> > > > >> > > > > >> start
> > > > >> > > > > >> >>>>>> taking minutes to finish when using the standard
> > > JVMs.
> > > > >> That
> > > > >> > > alone
> > > > >> > > > > >> is
> > > > >> > > > > >> >> a
> > > > >> > > > > >> >>>>>> good reason to go multi-process or consider
> > storing
> > > > more
> > > > >> > > things off
> > > > >> > > > > >> >> the
> > > > >> > > > > >> >>>>>> Java heap.
> > > > >> > > > > >> >>>>>>
> > > > >> > > > > >> >>>>>
> > > > >> > > > > >> >>>>> I see. Now I agree one-broker-per-disk should be
> > more
> > > > >> > > efficient in
> > > > >> > > > > >> >> terms
> > > > >> > > > > >> >>>> of
> > > > >> > > > > >> >>>>> GC since each broker probably needs less than
> 1/10
> > of
> > > > the
> > > > >> > > memory
> > > > >> > > > > >> >>>> available
> > > > >> > > > > >> >>>>> on a typical machine nowadays. I will remove this
> > > from
> > > > >> the
> > > > >> > > reason of
> > > > >> > > > > >> >>>>> rejection.
> > > > >> > > > > >> >>>>>
> > > > >> > > > > >> >>>>>
> > > > >> > > > > >> >>>>>>
> > > > >> > > > > >> >>>>>> Disk failure is the "easy" case.  The "hard"
> case,
> > > > >> which is
> > > > >> > > > > >> >>>>>> unfortunately also the much more common case, is
> > > disk
> > > > >> > > misbehavior.
> > > > >> > > > > >> >>>>>> Towards the end of their lives, disks tend to
> > start
> > > > >> slowing
> > > > >> > > down
> > > > >> > > > > >> >>>>>> unpredictably.  Requests that would have
> completed
> > > > >> > > immediately
> > > > >> > > > > >> before
> > > > >> > > > > >> >>>>>> start taking 20, 100 500 milliseconds.  Some
> files
> > > may
> > > > >> be
> > > > >> > > readable
> > > > >> > > > > >> and
> > > > >> > > > > >> >>>>>> other files may not be.  System calls hang,
> > > sometimes
> > > > >> > > forever, and
> > > > >> > > > > >> the
> > > > >> > > > > >> >>>>>> Java process can't abort them, because the hang
> is
> > > in
> > > > >> the
> > > > >> > > kernel.
> > > > >> > > > > >> It
> > > > >> > > > > >> >>>> is
> > > > >> > > > > >> >>>>>> not fun when threads are stuck in "D state"
> > > > >> > > > > >> >>>>>> http://stackoverflow.com/quest
> > > > >> ions/20423521/process-perminan
> > > > >> > > > > >> >>>>>> tly-stuck-on-d-state
> > > > >> > > > > >> >>>>>> .  Even kill -9 cannot abort the thread then.
> > > > >> Fortunately,
> > > > >> > > this is
> > > > >> > > > > >> >>>>>> rare.
> > > > >> > > > > >> >>>>>>
> > > > >> > > > > >> >>>>>
> > > > >> > > > > >> >>>>> I agree it is a harder problem and it is rare. We
> > > > >> probably
> > > > >> > > don't
> > > > >> > > > > >> have
> > > > >> > > > > >> >> to
> > > > >> > > > > >> >>>>> worry about it in this KIP since this issue is
> > > > >> orthogonal to
> > > > >> > > > > >> whether or
> > > > >> > > > > >> >>>> not
> > > > >> > > > > >> >>>>> we use JBOD.
> > > > >> > > > > >> >>>>>
> > > > >> > > > > >> >>>>>
> > > > >> > > > > >> >>>>>>
> > > > >> > > > > >> >>>>>> Another approach we should consider is for Kafka
> > to
> > > > >> > > implement its
> > > > >> > > > > >> own
> > > > >> > > > > >> >>>>>> storage layer that would stripe across multiple
> > > disks.
> > > > >> This
> > > > >> > > > > >> wouldn't
> > > > >> > > > > >> >>>>>> have to be done at the block level, but could be
> > > done
> > > > >> at the
> > > > >> > > file
> > > > >> > > > > >> >>>> level.
> > > > >> > > > > >> >>>>>> We could use consistent hashing to determine
> which
> > > > >> disks a
> > > > >> > > file
> > > > >> > > > > >> should
> > > > >> > > > > >> >>>>>> end up on, for example.
> > > > >> > > > > >> >>>>>>
> > > > >> > > > > >> >>>>>
> > > > >> > > > > >> >>>>> Are you suggesting that we should distribute log,
> > or
> > > > log
> > > > >> > > segment,
> > > > >> > > > > >> >> across
> > > > >> > > > > >> >>>>> disks of brokers? I am not sure if I fully
> > understand
> > > > >> this
> > > > >> > > > > >> approach. My
> > > > >> > > > > >> >>>> gut
> > > > >> > > > > >> >>>>> feel is that this would be a drastic solution
> that
> > > > would
> > > > >> > > require
> > > > >> > > > > >> >>>>> non-trivial design. While this may be useful to
> > > Kafka,
> > > > I
> > > > >> would
> > > > >> > > > > >> prefer
> > > > >> > > > > >> >> not
> > > > >> > > > > >> >>>>> to discuss this in detail in this thread unless
> you
> > > > >> believe
> > > > >> > > it is
> > > > >> > > > > >> >>>> strictly
> > > > >> > > > > >> >>>>> superior to the design in this KIP in terms of
> > > solving
> > > > >> our
> > > > >> > > use-case.
> > > > >> > > > > >> >>>>>
> > > > >> > > > > >> >>>>>
> > > > >> > > > > >> >>>>>> best,
> > > > >> > > > > >> >>>>>> Colin
> > > > >> > > > > >> >>>>>>
> > > > >> > > > > >> >>>>>>>
> > > > >> > > > > >> >>>>>>> Thanks,
> > > > >> > > > > >> >>>>>>> Dong
> > > > >> > > > > >> >>>>>>>
> > > > >> > > > > >> >>>>>>> On Wed, Jan 25, 2017 at 11:34 AM, Colin McCabe
> <
> > > > >> > > > > >> cmcc...@apache.org>
> > > > >> > > > > >> >>>>>>> wrote:
> > > > >> > > > > >> >>>>>>>
> > > > >> > > > > >> >>>>>>>> Hi Dong,
> > > > >> > > > > >> >>>>>>>>
> > > > >> > > > > >> >>>>>>>> Thanks for the writeup!  It's very
> interesting.
> > > > >> > > > > >> >>>>>>>>
> > > > >> > > > > >> >>>>>>>> I apologize in advance if this has been
> > discussed
> > > > >> > > somewhere else.
> > > > >> > > > > >> >>>>> But
> > > > >> > > > > >> >>>>>> I
> > > > >> > > > > >> >>>>>>>> am curious if you have considered the solution
> > of
> > > > >> running
> > > > >> > > > > >> multiple
> > > > >> > > > > >> >>>>>>>> brokers per node.  Clearly there is a memory
> > > > overhead
> > > > >> with
> > > > >> > > this
> > > > >> > > > > >> >>>>>> solution
> > > > >> > > > > >> >>>>>>>> because of the fixed cost of starting multiple
> > > JVMs.
> > > > >> > > However,
> > > > >> > > > > >> >>>>> running
> > > > >> > > > > >> >>>>>>>> multiple JVMs would help avoid scalability
> > > > >> bottlenecks.
> > > > >> > > You
> > > > >> > > > > >> could
> > > > >> > > > > >> >>>>>>>> probably push more RPCs per second, for
> example.
> > > A
> > > > >> garbage
> > > > >> > > > > >> >>>>> collection
> > > > >> > > > > >> >>>>>>>> in one broker would not affect the others.  It
> > > would
> > > > >> be
> > > > >> > > > > >> interesting
> > > > >> > > > > >> >>>>> to
> > > > >> > > > > >> >>>>>>>> see this considered in the "alternate designs"
> > > > design,
> > > > >> > > even if
> > > > >> > > > > >> you
> > > > >> > > > > >> >>>>> end
> > > > >> > > > > >> >>>>>>>> up deciding it's not the way to go.
> > > > >> > > > > >> >>>>>>>>
> > > > >> > > > > >> >>>>>>>> best,
> > > > >> > > > > >> >>>>>>>> Colin
> > > > >> > > > > >> >>>>>>>>
> > > > >> > > > > >> >>>>>>>>
> > > > >> > > > > >> >>>>>>>> On Thu, Jan 12, 2017, at 10:46, Dong Lin
> wrote:
> > > > >> > > > > >> >>>>>>>>> Hi all,
> > > > >> > > > > >> >>>>>>>>>
> > > > >> > > > > >> >>>>>>>>> We created KIP-112: Handle disk failure for
> > JBOD.
> > > > >> Please
> > > > >> > > find
> > > > >> > > > > >> the
> > > > >> > > > > >> >>>>> KIP
> > > > >> > > > > >> >>>>>>>>> wiki
> > > > >> > > > > >> >>>>>>>>> in the link https://cwiki.apache.org/confl
> > > > >> > > > > >> >>>> uence/display/KAFKA/KIP-
> > > > >> > > > > >> >>>>>>>>> 112%3A+Handle+disk+failure+for+JBOD.
> > > > >> > > > > >> >>>>>>>>>
> > > > >> > > > > >> >>>>>>>>> This KIP is related to KIP-113
> > > > >> > > > > >> >>>>>>>>> <https://cwiki.apache.org/conf
> > > > >> luence/display/KAFKA/KIP-
> > > > >> > > > > >> >>>>>>>> 113%3A+Support+replicas+moveme
> > > > >> nt+between+log+directories>:
> > > > >> > > > > >> >>>>>>>>> Support replicas movement between log
> > > directories.
> > > > >> They
> > > > >> > > are
> > > > >> > > > > >> >>>> needed
> > > > >> > > > > >> >>>>> in
> > > > >> > > > > >> >>>>>>>>> order
> > > > >> > > > > >> >>>>>>>>> to support JBOD in Kafka. Please help review
> > the
> > > > >> KIP. You
> > > > >> > > > > >> >>>> feedback
> > > > >> > > > > >> >>>>> is
> > > > >> > > > > >> >>>>>>>>> appreciated!
> > > > >> > > > > >> >>>>>>>>>
> > > > >> > > > > >> >>>>>>>>> Thanks,
> > > > >> > > > > >> >>>>>>>>> Dong
> > > > >> > > > > >> >>>>>>>>
> > > > >> > > > > >> >>>>>>
> > > > >> > > > > >> >>>>>
> > > > >> > > > > >> >>>>
> > > > >> > > > > >> >>
> > > > >> > > > > >> >>
> > > > >> > > > > >>
> > > > >> > > > > >>
> > > > >> > > > > >
> > > > >> > >
> > > > >>
> > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to