Hi Dong,

The KIP meetings are traditionally held at 11am. Would that also work? So 
Tuesday 7th at 11am?

Thanks
Eno

> On 2 Feb 2017, at 02:53, Dong Lin <lindon...@gmail.com> wrote:
> 
> Hey Eno, Colin,
> 
> Would you have time next Tuesday morning to discuss the KIP? How about 10 -
> 11 am?
> 
> To make best use of our time, can you please invite one or more committer
> from Confluent to join the meeting? I hope the KIP can receive one or more
> +1 from committer at Confluent if we have no concern the KIP after the KIP
> meeting.
> 
> In the meeting time, please feel free to provide comment in the thread so
> that discussion in the KIP meeting can be more efficient.
> 
> Thanks,
> Dong
> 
> On Wed, Feb 1, 2017 at 5:43 PM, Dong Lin <lindon...@gmail.com> wrote:
> 
>> Hey Colin,
>> 
>> Thanks much for the comment. Please see my reply inline.
>> 
>> On Wed, Feb 1, 2017 at 1:54 PM, Colin McCabe <cmcc...@apache.org> wrote:
>> 
>>> On Wed, Feb 1, 2017, at 11:35, Dong Lin wrote:
>>>> Hey Grant, Colin,
>>>> 
>>>> My bad, I misunderstood Grant's suggestion initially. Indeed this is a
>>>> very
>>>> interesting idea to just wait for replica.max.lag.ms for the replica on
>>>> the
>>>> bad disk to drop out of ISR instead of having broker actively reporting
>>>> this to the controller.
>>>> 
>>>> I have several concerns with this approach.
>>>> 
>>>> - Broker needs to maintain persist the information of all partitions
>>> that
>>>> it has created, in a file in each disk. This is needed for broker to
>>> know
>>>> the replicas already created on the bad disks that it can not access. If
>>>> we
>>>> don't do it, then controller sends LeaderAndIsrRequest to a broker to
>>>> become follower for a partition on the bad disk, the broker will create
>>>> partition on a good disk. The good disks may be overloaded as cascading
>>>> effect.
>>>> 
>>>> While it is possible to let broker keep track of the replicas that it
>>> has
>>>> created, I think it is less clean than the approach in the current KIP
>>>> for
>>>> reason provided in the rejective alternative section.
>>>> 
>>>> - Change is needed in the controller logic to handle failure to make a
>>>> broker as leader when controller receives LeaderAndIsrResponse.
>>>> Otherwise,
>>>> things go wrong if partition on the bad disk is requested to become
>>>> leader.
>>>> As of now, broker doesn't handle error in LeaderAndIsrResponse.
>>>> 
>>>> - We still need tools and mechanism for administrator to know whether a
>>>> replica is offline due to bad disk. I am worried that asking
>>>> administrator
>>>> to log into a machine and get this information in the log is not
>>> scalable
>>>> when the broker number is large. Although each company can develop their
>>>> internal tools to get this information, it is a waste of developer time
>>>> to
>>>> reinvent the wheel. Reading this information in the log also seems less
>>>> reliable then getting it from Kafka request/response.
>>>> 
>>>> I guess the goal of this alternative approach is to avoid making major
>>>> change in Kafka at the cost of increased disk failure discovery time
>>> etc.
>>>> But I think the changes required for fixing the problems above won't be
>>>> much less.
>>> 
>>> Thanks for the thoughtful replies, Dong L.
>>> 
>>> Instead of having an "offline" state, how about having a "creating"
>>> state for replicas and a "created" state?  Then if a replica was not
>>> accessible on any disk, but still in "created" state, the broker could
>>> know that something had gone wrong.  This also would catch issues like
>>> the broker being started without all log directories configured, or
>>> disks not being correctly mounted at the expected mount points, leading
>>> to empty directories.
>>> 
>> 
>> Indeed, we need to have an additional state per replica to solve this
>> problem. The current KIP design addresses the problem by putting the
>> "created" state in zookeeper, as you can see in the public interface change
>> of the KIP. Are you suggesting to solve the problem by storing this
>> information in local disk of the broker instead of zookeeper? I have two
>> concerns with this approach:
>> 
>> - It requires broker to keep track of the replicas it has created. This
>> solution will split the task of determining offline replicas among
>> controller and brokers as opposed to the current Kafka design, where the
>> controller determines states of replicas and propagate this information to
>> brokers. We think it is less error-prone to still let controller be the
>> only entity that maintains metadata (e.g. replica state) of Kafka cluster.
>> 
>> - If we store this information in local disk, then we need to have
>> additional request/response protocol in order to request broker to reset
>> this information, e.g. after a bad disk is replaced with good disk, so that
>> the replica can be re-created on a good disk. Things would be easier if we
>> store this information in zookeeper.
>> 
>> 
>>> 
>>>> 
>>>> To answer Colin's questions:
>>>> 
>>>> - There is no action required on the side of administrator in case of
>>> log
>>>> directory failure.
>>>> 
>>>> - Broker itself is going to discover log directory failure and declare
>>>> offline replicas. Broker doesn't explicitly declare log directory
>>>> failure.
>>>> But administrator can learn from the MetadataResponse that replica is
>>>> offline due to disk failure, i.e. if replica is offline but broker is
>>>> online.
>>> 
>>> Can you expand on this a little bit?  It sounds like you are considering
>>> dealing with failures on a replica-by-replica basis, rather than a
>>> disk-by-disk basis.  But it's disks that fail, not really individual
>>> files or directories on disks.  This decision interacts poorly with the
>>> lack of a periodic scanner.  It's easy to imagine a scenario where an
>>> infrequently used replica sits on a dead disk for a long time without us
>>> declaring it dead.
>>> 
>> 
>> Sure. The broker will fail deal with failures on a disk-by-disk basis. All
>> replicas on a disk to be failed if there is any disk related exception when
>> broker accesses that disk. It means that if any replica on that broker can
>> not be read, then all replicas on that disk are considered offline. Since
>> controller doesn't know the disk names of replicas, it has to learn the
>> liveness of replicas on a replica-by-replica basis in order to do leader
>> election.
>> 
>> Besides, I don't think we will have problem with the scenario you
>> described. If a replica is indeed not touched for a long time, then it
>> doesn't matter whether it is considered dead or not. The moment it is
>> needed, either for read or for write, the KIP makes sure that we know its
>> state and make leader election accordingly.
>> 
>> 
>>>> 
>>>> - This KIP does not handle cases where a few disks on a broker are full,
>>>> but the others have space. If a disk is full and can not be written then
>>>> the disk is considered to have failed. The imbalance across disks is an
>>>> existing problem and will be handled in KIP-113.
>>> 
>>> OK.
>>> 
>>>> 
>>>> - This KIP does not do a disk scanner that will periodically check for
>>>> error conditions. It doesn't handle any performance degradation of
>>> disks.
>>>> We wait for a failure to happen before declaring a disk bad.
>>>> 
>>>> Yes, this KIP requires us to fix cases in the code where we are
>>>> suppressing
>>>> disk errors or ignoring their root cause. But decision of which
>>> Exception
>>>> should be considered disk failure and how to handle each of these are
>>>> more
>>>> like implementation detail. I hope we can focus on the detail and high
>>>> level idea of this KIP and only worry about specific exception when the
>>>> patch is being reviewed.
>>> 
>>> Hmm... I think we should discuss how we are going to harden the code
>>> against disk failures, and verify that it has been hardened.  Or maybe
>>> we could do this in a follow-up KIP.
>>> 
>> 
>> By "harden the code against disk errors" do you mean that we should make a
>> full list of disk-related exception we may see and decide if we should
>> treat each of these differently? I agree it is useful in the long term. But
>> this is actually out of the scope of this KIP. The KIP will re-use the
>> existing KafkaStorageException without having to change what exceptions are
>> considered KafkaStorageException. The goal is to fail replicas on a disk
>> instead of crashing the broker when KafkaStorageException is observed.
>> 
>> Thanks,
>> Dong
>> 
>> 
>>>> After all we probably only know the list of
>>>> exceptions and ways to handle them when we start to implement the KIP.
>>>> And
>>>> we need to improve this list over time as we discover various failure in
>>>> the deployment.
>>>> 
>>>> 
>>>> Hey Eno,
>>>> 
>>>> Sure thing. Thanks for offering time to have a KIP meeting to discuss
>>>> this.
>>>> I will ask other Kafka developer at LinkedIn about their availability.
>>> 
>>> Yeah, it would be nice to talk about this.
>> 
>> 
>>> regards,
>>> Colin
>>> 
>>> 
>>>> 
>>>> Thanks,
>>>> Dong
>>>> 
>>>> 
>>>> On Wed, Feb 1, 2017 at 10:37 AM, Eno Thereska <eno.there...@gmail.com>
>>>> wrote:
>>>> 
>>>>> Hi Dong,
>>>>> 
>>>>> Would it make sense to do a discussion over video/voice about this? I
>>>>> think it's sufficiently complex that we can probably make quicker
>>> progress
>>>>> that way? So shall we do a KIP meeting soon? I can do this week
>>> (Thu/Fri)
>>>>> or next week.
>>>>> 
>>>>> Thanks
>>>>> Eno
>>>>>> On 1 Feb 2017, at 18:29, Colin McCabe <cmcc...@apache.org> wrote:
>>>>>> 
>>>>>> Hmm.  Maybe I misinterpreted, but I got the impression that Grant
>>> was
>>>>>> suggesting that we avoid introducing this concept of "offline
>>> replicas"
>>>>>> for now.  Is that feasible?
>>>>>> 
>>>>>> What is the strategy for declaring a log directory bad?  Is it an
>>>>>> administrative action?  Or is the broker itself going to be
>>> responsible
>>>>>> for this?  How do we handle cases where a few disks on a broker are
>>>>>> full, but the others have space?
>>>>>> 
>>>>>> Are we going to have a disk scanner that will periodically check for
>>>>>> error conditions (similar to the background checks that RAID
>>> controllers
>>>>>> do)?  Or will we wait for a failure to happen before declaring a
>>> disk
>>>>>> bad?
>>>>>> 
>>>>>> It seems to me that if we want this to work well we will need to fix
>>>>>> cases in the code where we are suppressing disk errors or ignoring
>>> their
>>>>>> root cause.  For example, any place where we are using the old Java
>>> APIs
>>>>>> that just return a boolean on failure will need to be fixed, since
>>> the
>>>>>> failure could now be disk full, permission denied, or IOE, and we
>>> will
>>>>>> need to handle those cases differently.  Also, we will need to
>>> harden
>>>>>> the code against disk errors.  Formerly it was OK to just crash on a
>>>>>> disk error; now it is not.  It would be nice to see more in the test
>>>>>> plan about injecting IOExceptions into disk handling code and
>>> verifying
>>>>>> that we can handle it correctly.
>>>>>> 
>>>>>> regards,
>>>>>> Colin
>>>>>> 
>>>>>> 
>>>>>> On Wed, Feb 1, 2017, at 10:02, Dong Lin wrote:
>>>>>>> Hey Grant,
>>>>>>> 
>>>>>>> Yes, this KIP does exactly what you described:)
>>>>>>> 
>>>>>>> Thanks,
>>>>>>> Dong
>>>>>>> 
>>>>>>> On Wed, Feb 1, 2017 at 9:45 AM, Grant Henke <ghe...@cloudera.com>
>>>>> wrote:
>>>>>>> 
>>>>>>>> Hi Dong,
>>>>>>>> 
>>>>>>>> Thanks for putting this together.
>>>>>>>> 
>>>>>>>> Since we are discussing alternative/simplified options. Have you
>>>>> considered
>>>>>>>> handling the disk failures broker side to prevent a crash,
>>> marking the
>>>>> disk
>>>>>>>> as "bad" to that individual broker, and continuing as normal? I
>>>>> imagine the
>>>>>>>> broker would then fall out of sync for the replicas hosted on the
>>> bad
>>>>> disk
>>>>>>>> and the ISR would shrink. This would allow people using min.isr
>>> to keep
>>>>>>>> their data safe and the cluster operators would see a shrink in
>>> many
>>>>> ISRs
>>>>>>>> and hopefully an obvious log message leading to a quick fix. I
>>> haven't
>>>>>>>> thought through this idea in depth though. So there could be some
>>>>>>>> shortfalls.
>>>>>>>> 
>>>>>>>> Thanks,
>>>>>>>> Grant
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> On Wed, Feb 1, 2017 at 11:21 AM, Dong Lin <lindon...@gmail.com>
>>> wrote:
>>>>>>>> 
>>>>>>>>> Hey Eno,
>>>>>>>>> 
>>>>>>>>> Thanks much for the review.
>>>>>>>>> 
>>>>>>>>> I think your suggestion is to split disks of a machine into
>>> multiple
>>>>> disk
>>>>>>>>> sets and run one broker per disk set. Yeah this is similar to
>>> Colin's
>>>>>>>>> suggestion of one-broker-per-disk, which we have evaluated at
>>> LinkedIn
>>>>>>>> and
>>>>>>>>> considered it to be a good short term approach.
>>>>>>>>> 
>>>>>>>>> As of now I don't think any of these approach is a better
>>> alternative
>>>>> in
>>>>>>>>> the long term. I will summarize these here. I have put these
>>> reasons
>>>>> in
>>>>>>>> the
>>>>>>>>> KIP's motivation section and rejected alternative section. I am
>>> happy
>>>>> to
>>>>>>>>> discuss more and I would certainly like to use an alternative
>>> solution
>>>>>>>> that
>>>>>>>>> is easier to do with better performance.
>>>>>>>>> 
>>>>>>>>> - JBOD vs. RAID-10: if we switch from RAID-10 with
>>>>> replication-factoer=2
>>>>>>>> to
>>>>>>>>> JBOD with replicatio-factor=3, we get 25% reduction in disk
>>> usage and
>>>>>>>>> doubles the tolerance of broker failure before data
>>> unavailability
>>>>> from 1
>>>>>>>>> to 2. This is pretty huge gain for any company that uses Kafka at
>>>>> large
>>>>>>>>> scale.
>>>>>>>>> 
>>>>>>>>> - JBOD vs. one-broker-per-disk: The benefit of
>>> one-broker-per-disk is
>>>>>>>> that
>>>>>>>>> no major code change is needed in Kafka. Among the disadvantage
>>> of
>>>>>>>>> one-broker-per-disk summarized in the KIP and previous email with
>>>>> Colin,
>>>>>>>>> the biggest one is the 15% throughput loss compared to JBOD and
>>> less
>>>>>>>>> flexibility to balance across disks. Further, it probably
>>> requires
>>>>> change
>>>>>>>>> to internal deployment tools at various companies to deal with
>>>>>>>>> one-broker-per-disk setup.
>>>>>>>>> 
>>>>>>>>> - JBOD vs. RAID-0: This is the setup that used at Microsoft. The
>>>>> problem
>>>>>>>> is
>>>>>>>>> that a broker becomes unavailable if any disk fail. Suppose
>>>>>>>>> replication-factor=2 and there are 10 disks per machine. Then the
>>>>>>>>> probability of of any message becomes unavailable due to disk
>>> failure
>>>>>>>> with
>>>>>>>>> RAID-0 is 100X higher than that with JBOD.
>>>>>>>>> 
>>>>>>>>> - JBOD vs. one-broker-per-few-disks: one-broker-per-few-disk is
>>>>> somewhere
>>>>>>>>> between one-broker-per-disk and RAID-0. So it carries an averaged
>>>>>>>>> disadvantages of these two approaches.
>>>>>>>>> 
>>>>>>>>> To answer your question regarding, I think it is reasonable to
>>> mange
>>>>> disk
>>>>>>>>> in Kafka. By "managing disks" we mean the management of
>>> assignment of
>>>>>>>>> replicas across disks. Here are my reasons in more detail:
>>>>>>>>> 
>>>>>>>>> - I don't think this KIP is a big step change. By allowing user
>>> to
>>>>>>>>> configure Kafka to run multiple log directories or disks as of
>>> now,
>>>>> it is
>>>>>>>>> implicit that Kafka manages disks. It is just not a complete
>>> feature.
>>>>>>>>> Microsoft and probably other companies are using this feature
>>> under
>>>>> the
>>>>>>>>> undesirable effect that a broker will fail any if any disk fail.
>>> It is
>>>>>>>> good
>>>>>>>>> to complete this feature.
>>>>>>>>> 
>>>>>>>>> - I think it is reasonable to manage disk in Kafka. One of the
>>> most
>>>>>>>>> important work that Kafka is doing is to determine the replica
>>>>> assignment
>>>>>>>>> across brokers and make sure enough copies of a given replica is
>>>>>>>> available.
>>>>>>>>> I would argue that it is not much different than determining the
>>>>> replica
>>>>>>>>> assignment across disk conceptually.
>>>>>>>>> 
>>>>>>>>> - I would agree that this KIP is improve performance of Kafka at
>>> the
>>>>> cost
>>>>>>>>> of more complexity inside Kafka, by switching from RAID-10 to
>>> JBOD. I
>>>>>>>> would
>>>>>>>>> argue that this is a right direction. If we can gain 20%+
>>> performance
>>>>> by
>>>>>>>>> managing NIC in Kafka as compared to existing approach and other
>>>>>>>>> alternatives, I would say we should just do it. Such a gain in
>>>>>>>> performance,
>>>>>>>>> or equivalently reduction in cost, can save millions of dollars
>>> per
>>>>> year
>>>>>>>>> for any company running Kafka at large scale.
>>>>>>>>> 
>>>>>>>>> Thanks,
>>>>>>>>> Dong
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> On Wed, Feb 1, 2017 at 5:41 AM, Eno Thereska <
>>> eno.there...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> I'm coming somewhat late to the discussion, apologies for that.
>>>>>>>>>> 
>>>>>>>>>> I'm worried about this proposal. It's moving Kafka to a world
>>> where
>>>>> it
>>>>>>>>>> manages disks. So in a sense, the scope of the KIP is limited,
>>> but
>>>>> the
>>>>>>>>>> direction it sets for Kafka is quite a big step change.
>>> Fundamentally
>>>>>>>>> this
>>>>>>>>>> is about balancing resources for a Kafka broker. This can be
>>> done by
>>>>> a
>>>>>>>>>> tool, rather than by changing Kafka. E.g., the tool would take a
>>>>> bunch
>>>>>>>> of
>>>>>>>>>> disks together, create a volume over them and export that to a
>>> Kafka
>>>>>>>>> broker
>>>>>>>>>> (in addition to setting the memory limits for that broker or
>>> limiting
>>>>>>>>> other
>>>>>>>>>> resources). A different bunch of disks can then make up a second
>>>>>>>> volume,
>>>>>>>>>> and be used by another Kafka broker. This is aligned with what
>>> Colin
>>>>> is
>>>>>>>>>> saying (as I understand it).
>>>>>>>>>> 
>>>>>>>>>> Disks are not the only resource on a machine, there are several
>>>>>>>> instances
>>>>>>>>>> where multiple NICs are used for example. Do we want fine
>>> grained
>>>>>>>>>> management of all these resources? I'd argue that opens us the
>>> system
>>>>>>>> to
>>>>>>>>> a
>>>>>>>>>> lot of complexity.
>>>>>>>>>> 
>>>>>>>>>> Thanks
>>>>>>>>>> Eno
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>>> On 1 Feb 2017, at 01:53, Dong Lin <lindon...@gmail.com> wrote:
>>>>>>>>>>> 
>>>>>>>>>>> Hi all,
>>>>>>>>>>> 
>>>>>>>>>>> I am going to initiate the vote If there is no further concern
>>> with
>>>>>>>> the
>>>>>>>>>> KIP.
>>>>>>>>>>> 
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Dong
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> On Fri, Jan 27, 2017 at 8:08 PM, radai <
>>> radai.rosenbl...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>>> a few extra points:
>>>>>>>>>>>> 
>>>>>>>>>>>> 1. broker per disk might also incur more client <--> broker
>>>>> sockets:
>>>>>>>>>>>> suppose every producer / consumer "talks" to >1 partition,
>>> there's
>>>>> a
>>>>>>>>>> very
>>>>>>>>>>>> good chance that partitions that were co-located on a single
>>>>> 10-disk
>>>>>>>>>> broker
>>>>>>>>>>>> would now be split between several single-disk broker
>>> processes on
>>>>>>>> the
>>>>>>>>>> same
>>>>>>>>>>>> machine. hard to put a multiplier on this, but likely >x1.
>>> sockets
>>>>>>>>> are a
>>>>>>>>>>>> limited resource at the OS level and incur some memory cost
>>> (kernel
>>>>>>>>>>>> buffers)
>>>>>>>>>>>> 
>>>>>>>>>>>> 2. there's a memory overhead to spinning up a JVM (compiled
>>> code
>>>>> and
>>>>>>>>>> byte
>>>>>>>>>>>> code objects etc). if we assume this overhead is ~300 MB
>>> (order of
>>>>>>>>>>>> magnitude, specifics vary) than spinning up 10 JVMs would
>>> lose you
>>>>> 3
>>>>>>>>> GB
>>>>>>>>>> of
>>>>>>>>>>>> RAM. not a ton, but non negligible.
>>>>>>>>>>>> 
>>>>>>>>>>>> 3. there would also be some overhead downstream of kafka in
>>> any
>>>>>>>>>> management
>>>>>>>>>>>> / monitoring / log aggregation system. likely less than x10
>>> though.
>>>>>>>>>>>> 
>>>>>>>>>>>> 4. (related to above) - added complexity of administration
>>> with
>>>>> more
>>>>>>>>>>>> running instances.
>>>>>>>>>>>> 
>>>>>>>>>>>> is anyone running kafka with anywhere near 100GB heaps? i
>>> thought
>>>>>>>> the
>>>>>>>>>> point
>>>>>>>>>>>> was to rely on kernel page cache to do the disk buffering ....
>>>>>>>>>>>> 
>>>>>>>>>>>> On Thu, Jan 26, 2017 at 11:00 AM, Dong Lin <
>>> lindon...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>>> Hey Colin,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Thanks much for the comment. Please see me comment inline.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> On Thu, Jan 26, 2017 at 10:15 AM, Colin McCabe <
>>>>> cmcc...@apache.org
>>>>>>>>> 
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On Wed, Jan 25, 2017, at 13:50, Dong Lin wrote:
>>>>>>>>>>>>>>> Hey Colin,
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Good point! Yeah we have actually considered and tested
>>> this
>>>>>>>>>>>> solution,
>>>>>>>>>>>>>>> which we call one-broker-per-disk. It would work and should
>>>>>>>> require
>>>>>>>>>>>> no
>>>>>>>>>>>>>>> major change in Kafka as compared to this JBOD KIP. So it
>>> would
>>>>>>>> be
>>>>>>>>> a
>>>>>>>>>>>>> good
>>>>>>>>>>>>>>> short term solution.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> But it has a few drawbacks which makes it less desirable
>>> in the
>>>>>>>>> long
>>>>>>>>>>>>>>> term.
>>>>>>>>>>>>>>> Assume we have 10 disks on a machine. Here are the
>>> problems:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Hi Dong,
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Thanks for the thoughtful reply.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 1) Our stress test result shows that one-broker-per-disk
>>> has 15%
>>>>>>>>>>>> lower
>>>>>>>>>>>>>>> throughput
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 2) Controller would need to send 10X as many
>>>>> LeaderAndIsrRequest,
>>>>>>>>>>>>>>> MetadataUpdateRequest and StopReplicaRequest. This
>>> increases the
>>>>>>>>>>>> burden
>>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>> controller which can be the performance bottleneck.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Maybe I'm misunderstanding something, but there would not
>>> be 10x
>>>>>>>> as
>>>>>>>>>>>> many
>>>>>>>>>>>>>> StopReplicaRequest RPCs, would there?  The other requests
>>> would
>>>>>>>>>>>> increase
>>>>>>>>>>>>>> 10x, but from a pretty low base, right?  We are not
>>> reassigning
>>>>>>>>>>>>>> partitions all the time, I hope (or else we have bigger
>>>>>>>> problems...)
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> I think the controller will group StopReplicaRequest per
>>> broker
>>>>> and
>>>>>>>>>> send
>>>>>>>>>>>>> only one StopReplicaRequest to a broker during controlled
>>>>> shutdown.
>>>>>>>>>>>> Anyway,
>>>>>>>>>>>>> we don't have to worry about this if we agree that other
>>> requests
>>>>>>>>> will
>>>>>>>>>>>>> increase by 10X. One MetadataRequest to send to each broker
>>> in the
>>>>>>>>>>>> cluster
>>>>>>>>>>>>> every time there is leadership change. I am not sure this is
>>> a
>>>>> real
>>>>>>>>>>>>> problem. But in theory this makes the overhead complexity
>>> O(number
>>>>>>>> of
>>>>>>>>>>>>> broker) and may be a concern in the future. Ideally we should
>>>>> avoid
>>>>>>>>> it.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 3) Less efficient use of physical resource on the machine.
>>> The
>>>>>>>>> number
>>>>>>>>>>>>> of
>>>>>>>>>>>>>>> socket on each machine will increase by 10X. The number of
>>>>>>>>> connection
>>>>>>>>>>>>>>> between any two machine will increase by 100X.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 4) Less efficient way to management memory and quota.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 5) Rebalance between disks/brokers on the same machine
>>> will less
>>>>>>>>>>>>>>> efficient
>>>>>>>>>>>>>>> and less flexible. Broker has to read data from another
>>> broker
>>>>> on
>>>>>>>>> the
>>>>>>>>>>>>>>> same
>>>>>>>>>>>>>>> machine via socket. It is also harder to do automatic load
>>>>>>>> balance
>>>>>>>>>>>>>>> between
>>>>>>>>>>>>>>> disks on the same machine in the future.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> I will put this and the explanation in the rejected
>>> alternative
>>>>>>>>>>>>> section.
>>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>> have a few questions:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> - Can you explain why this solution can help avoid
>>> scalability
>>>>>>>>>>>>>>> bottleneck?
>>>>>>>>>>>>>>> I actually think it will exacerbate the scalability
>>> problem due
>>>>>>>> the
>>>>>>>>>>>> 2)
>>>>>>>>>>>>>>> above.
>>>>>>>>>>>>>>> - Why can we push more RPC with this solution?
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> To really answer this question we'd have to take a deep
>>> dive into
>>>>>>>>> the
>>>>>>>>>>>>>> locking of the broker and figure out how effectively it can
>>>>>>>>>> parallelize
>>>>>>>>>>>>>> truly independent requests.  Almost every multithreaded
>>> process
>>>>> is
>>>>>>>>>>>> going
>>>>>>>>>>>>>> to have shared state, like shared queues or shared sockets,
>>> that
>>>>>>>> is
>>>>>>>>>>>>>> going to make scaling less than linear when you add disks or
>>>>>>>>>>>> processors.
>>>>>>>>>>>>>> (And clearly, another option is to improve that scalability,
>>>>>>>> rather
>>>>>>>>>>>>>> than going multi-process!)
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Yeah I also think it is better to improve scalability inside
>>> kafka
>>>>>>>>> code
>>>>>>>>>>>> if
>>>>>>>>>>>>> possible. I am not sure we currently have any scalability
>>> issue
>>>>>>>>> inside
>>>>>>>>>>>>> Kafka that can not be removed without using multi-process.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> - It is true that a garbage collection in one broker would
>>> not
>>>>>>>>> affect
>>>>>>>>>>>>>>> others. But that is after every broker only uses 1/10 of
>>> the
>>>>>>>>> memory.
>>>>>>>>>>>>> Can
>>>>>>>>>>>>>>> we be sure that this will actually help performance?
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> The big question is, how much memory do Kafka brokers use
>>> now,
>>>>> and
>>>>>>>>> how
>>>>>>>>>>>>>> much will they use in the future?  Our experience in HDFS
>>> was
>>>>> that
>>>>>>>>>> once
>>>>>>>>>>>>>> you start getting more than 100-200GB Java heap sizes, full
>>> GCs
>>>>>>>>> start
>>>>>>>>>>>>>> taking minutes to finish when using the standard JVMs.  That
>>>>> alone
>>>>>>>>> is
>>>>>>>>>> a
>>>>>>>>>>>>>> good reason to go multi-process or consider storing more
>>> things
>>>>>>>> off
>>>>>>>>>> the
>>>>>>>>>>>>>> Java heap.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> I see. Now I agree one-broker-per-disk should be more
>>> efficient in
>>>>>>>>>> terms
>>>>>>>>>>>> of
>>>>>>>>>>>>> GC since each broker probably needs less than 1/10 of the
>>> memory
>>>>>>>>>>>> available
>>>>>>>>>>>>> on a typical machine nowadays. I will remove this from the
>>> reason
>>>>>>>> of
>>>>>>>>>>>>> rejection.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Disk failure is the "easy" case.  The "hard" case, which is
>>>>>>>>>>>>>> unfortunately also the much more common case, is disk
>>>>> misbehavior.
>>>>>>>>>>>>>> Towards the end of their lives, disks tend to start slowing
>>> down
>>>>>>>>>>>>>> unpredictably.  Requests that would have completed
>>> immediately
>>>>>>>>> before
>>>>>>>>>>>>>> start taking 20, 100 500 milliseconds.  Some files may be
>>>>> readable
>>>>>>>>> and
>>>>>>>>>>>>>> other files may not be.  System calls hang, sometimes
>>> forever,
>>>>> and
>>>>>>>>> the
>>>>>>>>>>>>>> Java process can't abort them, because the hang is in the
>>> kernel.
>>>>>>>>> It
>>>>>>>>>>>> is
>>>>>>>>>>>>>> not fun when threads are stuck in "D state"
>>>>>>>>>>>>>> http://stackoverflow.com/quest
>>> ions/20423521/process-perminan
>>>>>>>>>>>>>> tly-stuck-on-d-state
>>>>>>>>>>>>>> .  Even kill -9 cannot abort the thread then.  Fortunately,
>>> this
>>>>>>>> is
>>>>>>>>>>>>>> rare.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> I agree it is a harder problem and it is rare. We probably
>>> don't
>>>>>>>> have
>>>>>>>>>> to
>>>>>>>>>>>>> worry about it in this KIP since this issue is orthogonal to
>>>>>>>> whether
>>>>>>>>> or
>>>>>>>>>>>> not
>>>>>>>>>>>>> we use JBOD.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Another approach we should consider is for Kafka to
>>> implement its
>>>>>>>>> own
>>>>>>>>>>>>>> storage layer that would stripe across multiple disks.  This
>>>>>>>>> wouldn't
>>>>>>>>>>>>>> have to be done at the block level, but could be done at
>>> the file
>>>>>>>>>>>> level.
>>>>>>>>>>>>>> We could use consistent hashing to determine which disks a
>>> file
>>>>>>>>> should
>>>>>>>>>>>>>> end up on, for example.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Are you suggesting that we should distribute log, or log
>>> segment,
>>>>>>>>>> across
>>>>>>>>>>>>> disks of brokers? I am not sure if I fully understand this
>>>>>>>> approach.
>>>>>>>>> My
>>>>>>>>>>>> gut
>>>>>>>>>>>>> feel is that this would be a drastic solution that would
>>> require
>>>>>>>>>>>>> non-trivial design. While this may be useful to Kafka, I
>>> would
>>>>>>>> prefer
>>>>>>>>>> not
>>>>>>>>>>>>> to discuss this in detail in this thread unless you believe
>>> it is
>>>>>>>>>>>> strictly
>>>>>>>>>>>>> superior to the design in this KIP in terms of solving our
>>>>>>>> use-case.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> best,
>>>>>>>>>>>>>> Colin
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>> Dong
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> On Wed, Jan 25, 2017 at 11:34 AM, Colin McCabe <
>>>>>>>> cmcc...@apache.org
>>>>>>>>>> 
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Hi Dong,
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Thanks for the writeup!  It's very interesting.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> I apologize in advance if this has been discussed
>>> somewhere
>>>>>>>> else.
>>>>>>>>>>>>> But
>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>> am curious if you have considered the solution of running
>>>>>>>> multiple
>>>>>>>>>>>>>>>> brokers per node.  Clearly there is a memory overhead
>>> with this
>>>>>>>>>>>>>> solution
>>>>>>>>>>>>>>>> because of the fixed cost of starting multiple JVMs.
>>> However,
>>>>>>>>>>>>> running
>>>>>>>>>>>>>>>> multiple JVMs would help avoid scalability bottlenecks.
>>> You
>>>>>>>> could
>>>>>>>>>>>>>>>> probably push more RPCs per second, for example.  A
>>> garbage
>>>>>>>>>>>>> collection
>>>>>>>>>>>>>>>> in one broker would not affect the others.  It would be
>>>>>>>>> interesting
>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>> see this considered in the "alternate designs" design,
>>> even if
>>>>>>>> you
>>>>>>>>>>>>> end
>>>>>>>>>>>>>>>> up deciding it's not the way to go.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> best,
>>>>>>>>>>>>>>>> Colin
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> On Thu, Jan 12, 2017, at 10:46, Dong Lin wrote:
>>>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> We created KIP-112: Handle disk failure for JBOD. Please
>>> find
>>>>>>>> the
>>>>>>>>>>>>> KIP
>>>>>>>>>>>>>>>>> wiki
>>>>>>>>>>>>>>>>> in the link https://cwiki.apache.org/confl
>>>>>>>>>>>> uence/display/KAFKA/KIP-
>>>>>>>>>>>>>>>>> 112%3A+Handle+disk+failure+for+JBOD.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> This KIP is related to KIP-113
>>>>>>>>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>>>>>>>>>>> 113%3A+Support+replicas+moveme
>>> nt+between+log+directories>:
>>>>>>>>>>>>>>>>> Support replicas movement between log directories. They
>>> are
>>>>>>>>>>>> needed
>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>> order
>>>>>>>>>>>>>>>>> to support JBOD in Kafka. Please help review the KIP. You
>>>>>>>>>>>> feedback
>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>> appreciated!
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>> Dong
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> --
>>>>>>>> Grant Henke
>>>>>>>> Software Engineer | Cloudera
>>>>>>>> gr...@cloudera.com | twitter.com/gchenke |
>>> linkedin.com/in/granthenke
>>>>>>>> 
>>>>> 
>>>>> 
>>> 
>> 
>> 

Reply via email to