Hey Jun,

Thanks for the all the comments. I should have written the summary earlier
but got delayed. I think Grant has summarized pretty much every major
issues we discussed in the KIP meeting. I have provided answer to each
issue. Let me try to address your questions here.

I will update the KIP to explain how that zookeeper path is managed and
used. I will also describe in the KIP what happens during (a) controller
failover, (b) partition reassignment, (c) topic deletion. I will let you
and everyone know once the change has been made in the KIP.

I actually think there should be no performance issue to have 5 RPC per
disk failure in the cluster given that we don't have performance issue with
having 5 RPC per partition ISR change. My gut feel is that the frequency of
ISR change should be 100X higher than that of disk failure. If RPC will not
be less of an issue if disk failure is frequent.

It seems fine if broker doesn't receive response when notifying controller
of the disk failure, as long as controller is guaranteed to send
LeaderAndIsrRequset to query the replica state on the broker.

I agree it is useful to replace zookeeper access with direct RPC for both
notification events. But I am wondering if we can do it in a future KIP.
Notification via zookeeper is pretty straightforward because we are already
doing this with no performance concern. On the other hand, letting broker
notify zookeeper via RPC requires non-trivial design. We need to decide the
wire protocol(s), whether broker should retry notification, which thread
should send this RPC to controller etc. Currently broker doesn't have a
very well-established way to send RPC to controller. When broker sends
ControlledShutdownRequest to controller, it creates NetworkClient on the
fly. It means that non-trivial work needs to be done in order to support a
long term solution for broker to send notification to controller.

Thanks,
Dong

On Tue, Feb 7, 2017 at 2:23 PM, Jun Rao <j...@confluent.io> wrote:

> Hi, Dong,
>
> Thanks for the discussion in the KIP meeting today. A few comments inlined
> below.
>
> On Mon, Feb 6, 2017 at 7:22 PM, Dong Lin <lindon...@gmail.com> wrote:
>
> > Hey Jun,
> >
> > Thanks for the review! Please see reply inline.
> >
> > On Mon, Feb 6, 2017 at 6:21 PM, Jun Rao <j...@confluent.io> wrote:
> >
> > > Hi, Dong,
> > >
> > > Thanks for the proposal. A few quick questions/comments.
> > >
> > > 1. Do you know why your stress test loses 15% of the throughput with
> the
> > > one-broker-per-disk setup?
> > >
> >
> > I think it is probably related to thread scheduling and socket
> management,
> > though I haven't validated this theory.
> >
> > With one-broker-per-disk setup, each broker has 16 io threads, 12 network
> > threads, 14 replica fetcher threads.
> > With one-broker-per-machine setup, each broker has 160 io threads, 120
> > network threads, 140 replica fetcher threads.
> >
> > I can test this theory by increasing the thread of broker by 10 in an
> > existing cluster and see if throughput capacity changes. It is not
> > surprising if performance does degrade with 10X threads. But I haven't
> > validated this yet.
> >
>
>
> > > 2. In the KIP, it wasn't super clear to me what
> > > /broker/topics/[topic]/partitions/[partitionId]/controller_m
> anaged_state
> > > represents
> > > and when it's updated. It seems that this path should be updated every
> > time
> > > the disk associated with one of the replica goes bad or is fixed.
> > However,
> > > the wiki only mentions updating this path when a new topic is created.
> It
> > > would be useful to provide a high level description of what this path
> > > stores, when it's updated and by who.
> > >
> >
> > This path will only be updated by controller. When a replica is
> > successfully created on a broker, controller at this replica id to the
> > "created" list of the corresponding partition. When a replica needs to be
> > re-created because the bad disk is replaced with an empty good disk, user
> > executes kafka-log-dirs.sh so that controller will remove this replica id
> > from the "created" list of the corresponding partition.
> >
> > The first part is described in "Topic gets created" scenario. The second
> > part is kind of mentioned in the "The disk (or log directory) gets fixed"
> > scenario but not clear as it doesn't mention the full zookeeper path. I
> > have made this
> > <https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?
> > pageId=67638402&selectedPageVersions=7&selectedPageVersions=8>
> > change in the KIP to clarify the second part.
> >
> > Currently I have used steps per scenario to describe how the KIP works.
> > Would you like me to have a section to describe how this ZK path is
> > managed?
> >
> >
> >
> So, it seems that
> /broker/topics/[topic]/partitions/[partitionId]/controller_managed_state
> is a reflection of the log directory state in the broker. It would be
> useful to describe how the broker maintains the directory state and whether
> that state is reset during broker restart.
>
> For completeness, it would be useful to also describe what happens during
> (a) controller failover, (b) partition reassignment, (c) topic deletion
> (for example, what happens when a replica to be deleted is on a failed log
> directory).
>
>
> > > 3. The proposal uses ZK to propagate disk failure/recovery from the
> > broker
> > > to the controller. Not sure if this is the best approach in the long
> > term.
> > > It may be better for the broker to send RPC requests directly to the
> > > controller?
> > >
> >
> > I choose to propagate this information via ZK for simplicity of the
> design
> > and implementation since isr notification is passed via ZK and most
> events
> > (e.g. broker offline, partition reassignment) are triggered in controller
> > via ZK listener. Yes it can be implemented using RPC. But I am not very
> > sure what we gain by using RPC instead of ZK. Should we have a separate
> KIP
> > in the future to migrate all existing notification to using RPC?
> >
> >
> My concern with ZK-based communication is efficiency. To send a message
> from the broker to the controller in this approach, the sender needs to do
> 1 write to ZK and the receiver needs to do 1 read from ZK, followed by 1
> delete to ZK. So, we will need a total of 5 RPCs (a read from ZK requires 1
> RPC and a write/delete to ZK requires at least 2 RPCs). If the broker can
> send a message directly to the controller, it just needs 1 RPC. Another
> potential issue with the ZK-based approach is that it's hard for the sender
> to receive a response. We made an exception for using ZK-based notification
> for ISR propagation since it's a quicker way to fix an existing problem.
> Since we are adding a new feature, it would be useful to think through
> what's the best way for the broker to communicate with the controller in
> the long term.
>
>
> Thanks,
>
> Jun
>
>
> > >
> > > Jun
> > >
> > >
> > > On Wed, Jan 25, 2017 at 1:50 PM, Dong Lin <lindon...@gmail.com> wrote:
> > >
> > > > Hey Colin,
> > > >
> > > > Good point! Yeah we have actually considered and tested this
> solution,
> > > > which we call one-broker-per-disk. It would work and should require
> no
> > > > major change in Kafka as compared to this JBOD KIP. So it would be a
> > good
> > > > short term solution.
> > > >
> > > > But it has a few drawbacks which makes it less desirable in the long
> > > term.
> > > > Assume we have 10 disks on a machine. Here are the problems:
> > > >
> > > > 1) Our stress test result shows that one-broker-per-disk has 15%
> lower
> > > > throughput
> > > >
> > > > 2) Controller would need to send 10X as many LeaderAndIsrRequest,
> > > > MetadataUpdateRequest and StopReplicaRequest. This increases the
> burden
> > > on
> > > > controller which can be the performance bottleneck.
> > > >
> > > > 3) Less efficient use of physical resource on the machine. The number
> > of
> > > > socket on each machine will increase by 10X. The number of connection
> > > > between any two machine will increase by 100X.
> > > >
> > > > 4) Less efficient way to management memory and quota.
> > > >
> > > > 5) Rebalance between disks/brokers on the same machine will less
> > > efficient
> > > > and less flexible. Broker has to read data from another broker on the
> > > same
> > > > machine via socket. It is also harder to do automatic load balance
> > > between
> > > > disks on the same machine in the future.
> > > >
> > > > I will put this and the explanation in the rejected alternative
> > section.
> > > I
> > > > have a few questions:
> > > >
> > > > - Can you explain why this solution can help avoid scalability
> > > bottleneck?
> > > > I actually think it will exacerbate the scalability problem due the
> 2)
> > > > above.
> > > > - Why can we push more RPC with this solution?
> > > > - It is true that a garbage collection in one broker would not affect
> > > > others. But that is after every broker only uses 1/10 of the memory.
> > Can
> > > we
> > > > be sure that this will actually help performance?
> > > >
> > > > Thanks,
> > > > Dong
> > > >
> > > > On Wed, Jan 25, 2017 at 11:34 AM, Colin McCabe <cmcc...@apache.org>
> > > wrote:
> > > >
> > > > > Hi Dong,
> > > > >
> > > > > Thanks for the writeup!  It's very interesting.
> > > > >
> > > > > I apologize in advance if this has been discussed somewhere else.
> > But
> > > I
> > > > > am curious if you have considered the solution of running multiple
> > > > > brokers per node.  Clearly there is a memory overhead with this
> > > solution
> > > > > because of the fixed cost of starting multiple JVMs.  However,
> > running
> > > > > multiple JVMs would help avoid scalability bottlenecks.  You could
> > > > > probably push more RPCs per second, for example.  A garbage
> > collection
> > > > > in one broker would not affect the others.  It would be interesting
> > to
> > > > > see this considered in the "alternate designs" design, even if you
> > end
> > > > > up deciding it's not the way to go.
> > > > >
> > > > > best,
> > > > > Colin
> > > > >
> > > > >
> > > > > On Thu, Jan 12, 2017, at 10:46, Dong Lin wrote:
> > > > > > Hi all,
> > > > > >
> > > > > > We created KIP-112: Handle disk failure for JBOD. Please find the
> > KIP
> > > > > > wiki
> > > > > > in the link https://cwiki.apache.org/confl
> uence/display/KAFKA/KIP-
> > > > > > 112%3A+Handle+disk+failure+for+JBOD.
> > > > > >
> > > > > > This KIP is related to KIP-113
> > > > > > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > 113%3A+Support+replicas+movement+between+log+directories>:
> > > > > > Support replicas movement between log directories. They are
> needed
> > in
> > > > > > order
> > > > > > to support JBOD in Kafka. Please help review the KIP. You
> feedback
> > is
> > > > > > appreciated!
> > > > > >
> > > > > > Thanks,
> > > > > > Dong
> > > > >
> > > >
> > >
> >
>

Reply via email to