I'm coming somewhat late to the discussion, apologies for that.

I'm worried about this proposal. It's moving Kafka to a world where it manages 
disks. So in a sense, the scope of the KIP is limited, but the direction it 
sets for Kafka is quite a big step change. Fundamentally this is about 
balancing resources for a Kafka broker. This can be done by a tool, rather than 
by changing Kafka. E.g., the tool would take a bunch of disks together, create 
a volume over them and export that to a Kafka broker (in addition to setting 
the memory limits for that broker or limiting other resources). A different 
bunch of disks can then make up a second volume, and be used by another Kafka 
broker. This is aligned with what Colin is saying (as I understand it). 

Disks are not the only resource on a machine, there are several instances where 
multiple NICs are used for example. Do we want fine grained management of all 
these resources? I'd argue that opens us the system to a lot of complexity.

Thanks
Eno


> On 1 Feb 2017, at 01:53, Dong Lin <lindon...@gmail.com> wrote:
> 
> Hi all,
> 
> I am going to initiate the vote If there is no further concern with the KIP.
> 
> Thanks,
> Dong
> 
> 
> On Fri, Jan 27, 2017 at 8:08 PM, radai <radai.rosenbl...@gmail.com> wrote:
> 
>> a few extra points:
>> 
>> 1. broker per disk might also incur more client <--> broker sockets:
>> suppose every producer / consumer "talks" to >1 partition, there's a very
>> good chance that partitions that were co-located on a single 10-disk broker
>> would now be split between several single-disk broker processes on the same
>> machine. hard to put a multiplier on this, but likely >x1. sockets are a
>> limited resource at the OS level and incur some memory cost (kernel
>> buffers)
>> 
>> 2. there's a memory overhead to spinning up a JVM (compiled code and byte
>> code objects etc). if we assume this overhead is ~300 MB (order of
>> magnitude, specifics vary) than spinning up 10 JVMs would lose you 3 GB of
>> RAM. not a ton, but non negligible.
>> 
>> 3. there would also be some overhead downstream of kafka in any management
>> / monitoring / log aggregation system. likely less than x10 though.
>> 
>> 4. (related to above) - added complexity of administration with more
>> running instances.
>> 
>> is anyone running kafka with anywhere near 100GB heaps? i thought the point
>> was to rely on kernel page cache to do the disk buffering ....
>> 
>> On Thu, Jan 26, 2017 at 11:00 AM, Dong Lin <lindon...@gmail.com> wrote:
>> 
>>> Hey Colin,
>>> 
>>> Thanks much for the comment. Please see me comment inline.
>>> 
>>> On Thu, Jan 26, 2017 at 10:15 AM, Colin McCabe <cmcc...@apache.org>
>> wrote:
>>> 
>>>> On Wed, Jan 25, 2017, at 13:50, Dong Lin wrote:
>>>>> Hey Colin,
>>>>> 
>>>>> Good point! Yeah we have actually considered and tested this
>> solution,
>>>>> which we call one-broker-per-disk. It would work and should require
>> no
>>>>> major change in Kafka as compared to this JBOD KIP. So it would be a
>>> good
>>>>> short term solution.
>>>>> 
>>>>> But it has a few drawbacks which makes it less desirable in the long
>>>>> term.
>>>>> Assume we have 10 disks on a machine. Here are the problems:
>>>> 
>>>> Hi Dong,
>>>> 
>>>> Thanks for the thoughtful reply.
>>>> 
>>>>> 
>>>>> 1) Our stress test result shows that one-broker-per-disk has 15%
>> lower
>>>>> throughput
>>>>> 
>>>>> 2) Controller would need to send 10X as many LeaderAndIsrRequest,
>>>>> MetadataUpdateRequest and StopReplicaRequest. This increases the
>> burden
>>>>> on
>>>>> controller which can be the performance bottleneck.
>>>> 
>>>> Maybe I'm misunderstanding something, but there would not be 10x as
>> many
>>>> StopReplicaRequest RPCs, would there?  The other requests would
>> increase
>>>> 10x, but from a pretty low base, right?  We are not reassigning
>>>> partitions all the time, I hope (or else we have bigger problems...)
>>>> 
>>> 
>>> I think the controller will group StopReplicaRequest per broker and send
>>> only one StopReplicaRequest to a broker during controlled shutdown.
>> Anyway,
>>> we don't have to worry about this if we agree that other requests will
>>> increase by 10X. One MetadataRequest to send to each broker in the
>> cluster
>>> every time there is leadership change. I am not sure this is a real
>>> problem. But in theory this makes the overhead complexity O(number of
>>> broker) and may be a concern in the future. Ideally we should avoid it.
>>> 
>>> 
>>>> 
>>>>> 
>>>>> 3) Less efficient use of physical resource on the machine. The number
>>> of
>>>>> socket on each machine will increase by 10X. The number of connection
>>>>> between any two machine will increase by 100X.
>>>>> 
>>>>> 4) Less efficient way to management memory and quota.
>>>>> 
>>>>> 5) Rebalance between disks/brokers on the same machine will less
>>>>> efficient
>>>>> and less flexible. Broker has to read data from another broker on the
>>>>> same
>>>>> machine via socket. It is also harder to do automatic load balance
>>>>> between
>>>>> disks on the same machine in the future.
>>>>> 
>>>>> I will put this and the explanation in the rejected alternative
>>> section.
>>>>> I
>>>>> have a few questions:
>>>>> 
>>>>> - Can you explain why this solution can help avoid scalability
>>>>> bottleneck?
>>>>> I actually think it will exacerbate the scalability problem due the
>> 2)
>>>>> above.
>>>>> - Why can we push more RPC with this solution?
>>>> 
>>>> To really answer this question we'd have to take a deep dive into the
>>>> locking of the broker and figure out how effectively it can parallelize
>>>> truly independent requests.  Almost every multithreaded process is
>> going
>>>> to have shared state, like shared queues or shared sockets, that is
>>>> going to make scaling less than linear when you add disks or
>> processors.
>>>> (And clearly, another option is to improve that scalability, rather
>>>> than going multi-process!)
>>>> 
>>> 
>>> Yeah I also think it is better to improve scalability inside kafka code
>> if
>>> possible. I am not sure we currently have any scalability issue inside
>>> Kafka that can not be removed without using multi-process.
>>> 
>>> 
>>>> 
>>>>> - It is true that a garbage collection in one broker would not affect
>>>>> others. But that is after every broker only uses 1/10 of the memory.
>>> Can
>>>>> we be sure that this will actually help performance?
>>>> 
>>>> The big question is, how much memory do Kafka brokers use now, and how
>>>> much will they use in the future?  Our experience in HDFS was that once
>>>> you start getting more than 100-200GB Java heap sizes, full GCs start
>>>> taking minutes to finish when using the standard JVMs.  That alone is a
>>>> good reason to go multi-process or consider storing more things off the
>>>> Java heap.
>>>> 
>>> 
>>> I see. Now I agree one-broker-per-disk should be more efficient in terms
>> of
>>> GC since each broker probably needs less than 1/10 of the memory
>> available
>>> on a typical machine nowadays. I will remove this from the reason of
>>> rejection.
>>> 
>>> 
>>>> 
>>>> Disk failure is the "easy" case.  The "hard" case, which is
>>>> unfortunately also the much more common case, is disk misbehavior.
>>>> Towards the end of their lives, disks tend to start slowing down
>>>> unpredictably.  Requests that would have completed immediately before
>>>> start taking 20, 100 500 milliseconds.  Some files may be readable and
>>>> other files may not be.  System calls hang, sometimes forever, and the
>>>> Java process can't abort them, because the hang is in the kernel.  It
>> is
>>>> not fun when threads are stuck in "D state"
>>>> http://stackoverflow.com/questions/20423521/process-perminan
>>>> tly-stuck-on-d-state
>>>> .  Even kill -9 cannot abort the thread then.  Fortunately, this is
>>>> rare.
>>>> 
>>> 
>>> I agree it is a harder problem and it is rare. We probably don't have to
>>> worry about it in this KIP since this issue is orthogonal to whether or
>> not
>>> we use JBOD.
>>> 
>>> 
>>>> 
>>>> Another approach we should consider is for Kafka to implement its own
>>>> storage layer that would stripe across multiple disks.  This wouldn't
>>>> have to be done at the block level, but could be done at the file
>> level.
>>>> We could use consistent hashing to determine which disks a file should
>>>> end up on, for example.
>>>> 
>>> 
>>> Are you suggesting that we should distribute log, or log segment, across
>>> disks of brokers? I am not sure if I fully understand this approach. My
>> gut
>>> feel is that this would be a drastic solution that would require
>>> non-trivial design. While this may be useful to Kafka, I would prefer not
>>> to discuss this in detail in this thread unless you believe it is
>> strictly
>>> superior to the design in this KIP in terms of solving our use-case.
>>> 
>>> 
>>>> best,
>>>> Colin
>>>> 
>>>>> 
>>>>> Thanks,
>>>>> Dong
>>>>> 
>>>>> On Wed, Jan 25, 2017 at 11:34 AM, Colin McCabe <cmcc...@apache.org>
>>>>> wrote:
>>>>> 
>>>>>> Hi Dong,
>>>>>> 
>>>>>> Thanks for the writeup!  It's very interesting.
>>>>>> 
>>>>>> I apologize in advance if this has been discussed somewhere else.
>>> But
>>>> I
>>>>>> am curious if you have considered the solution of running multiple
>>>>>> brokers per node.  Clearly there is a memory overhead with this
>>>> solution
>>>>>> because of the fixed cost of starting multiple JVMs.  However,
>>> running
>>>>>> multiple JVMs would help avoid scalability bottlenecks.  You could
>>>>>> probably push more RPCs per second, for example.  A garbage
>>> collection
>>>>>> in one broker would not affect the others.  It would be interesting
>>> to
>>>>>> see this considered in the "alternate designs" design, even if you
>>> end
>>>>>> up deciding it's not the way to go.
>>>>>> 
>>>>>> best,
>>>>>> Colin
>>>>>> 
>>>>>> 
>>>>>> On Thu, Jan 12, 2017, at 10:46, Dong Lin wrote:
>>>>>>> Hi all,
>>>>>>> 
>>>>>>> We created KIP-112: Handle disk failure for JBOD. Please find the
>>> KIP
>>>>>>> wiki
>>>>>>> in the link https://cwiki.apache.org/confl
>> uence/display/KAFKA/KIP-
>>>>>>> 112%3A+Handle+disk+failure+for+JBOD.
>>>>>>> 
>>>>>>> This KIP is related to KIP-113
>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>> 113%3A+Support+replicas+movement+between+log+directories>:
>>>>>>> Support replicas movement between log directories. They are
>> needed
>>> in
>>>>>>> order
>>>>>>> to support JBOD in Kafka. Please help review the KIP. You
>> feedback
>>> is
>>>>>>> appreciated!
>>>>>>> 
>>>>>>> Thanks,
>>>>>>> Dong
>>>>>> 
>>>> 
>>> 
>> 

Reply via email to