Hi, Dong,

Thanks for the KIP. A few comments below.

1. For moving data across directories
1.1 I am not sure why we want to use ReplicaFetcherThread to move data
around in the leader. ReplicaFetchThread fetches data from socket. For
moving data locally, it seems that we want to avoid the socket overhead.
1.2 I am also not sure about moving data in the ReplicaFetcherThread in the
follower. For example, I am not sure setting replica.fetch.max.wait to 0
 is ideal. It may not always be effective since a fetch request in the
ReplicaFetcherThread could be arbitrarily delayed due to replication
throttling on the leader. In general, the data movement logic across disks
seems different from that in ReplicaFetcherThread. So, I am not sure why
they need to be coupled.
1.3 Could you add a bit more details on how we swap the replicas when the
new ones are fully caught up? For example, what happens when the new
replica in the new log directory is caught up, but when we want to do the
swap, some new data has arrived?
1.4 Do we need to do the .move at the log segment level or could we just do
that at the replica directory level? Renaming just a directory is much
faster than renaming the log segments.
1.5 Could you also describe a bit what happens when either the source or
the target log directory fails while the data moving is in progress?

2. For partition reassignment.
2.1 I am not sure if the controller can block on ChangeReplicaDirRequest.
Data movement may take a long time to complete. If there is an outstanding
request from the controller to a broker, that broker won't be able to
process any new request from the controller. So if another event (e.g.
broker failure) happens when the data movement is in progress, subsequent
LeaderAnIsrRequest will be delayed.
2.2 in the KIP, the partition reassignment tool is also used for cases
where an admin just wants to balance the existing data across log
directories in the broker. In this case, it seems that it's over killing to
have the process go through the controller. A simpler approach is to issue
an RPC request to the broker directly.
2.3 When using the partition reassignment tool to move replicas across
brokers, it make sense to be able to specify the log directory of the newly
created replicas. The KIP does that in two separate requests
ChangeReplicaDirRequest and LeaderAndIsrRequest, and tracks the progress of
each independently. An alternative is to do that just in LeaderAndIsrRequest.
That way, the new replicas will be created in the right log dir in the
first place and the controller just needs to track the progress of
partition reassignment in the current way.

3. /admin/reassign_partitions: Including the log dir in every replica may
not be efficient. We could include a list of log directories and reference
the index of the log directory in each replica.

4. DescribeDirsRequest: The stats in the request are already available from
JMX. Do we need the new request?

5. We want to be consistent on ChangeReplicaDirRequest vs
ChangeReplicaRequest.

Thanks,

Jun


On Fri, Feb 3, 2017 at 6:19 PM, Dong Lin <lindon...@gmail.com> wrote:

> Hey ALexey,
>
> Thanks for all the comments!
>
> I have updated the KIP to specify how we enforce quota. I also updated the
> "The thread model and broker logic for moving replica data between log
> directories" to make it easier to read. You can find the exact change here
> <https://cwiki.apache.org/confluence/pages/diffpagesbyversio
> n.action?pageId=67638408&selectedPageVersions=5&selectedPageVersions=6>.
> The idea is to use the same replication quota mechanism introduced in
> KIP-73.
>
> Thanks,
> Dong
>
>
>
> On Wed, Feb 1, 2017 at 2:16 AM, Alexey Ozeritsky <aozerit...@yandex.ru>
> wrote:
>
> >
> >
> > 24.01.2017, 22:03, "Dong Lin" <lindon...@gmail.com>:
> > > Hey Alexey,
> > >
> > > Thanks. I think we agreed that the suggested solution doesn't work in
> > > general for kafka users. To answer your questions:
> > >
> > > 1. I agree we need quota to rate limit replica movement when a broker
> is
> > > moving a "leader" replica. I will come up with solution, probably
> re-use
> > > the config of replication quota introduced in KIP-73.
> > >
> > > 2. Good point. I agree that this is a problem in general. If is no new
> > data
> > > on that broker, with current default value of
> replica.fetch.wait.max.ms
> > > and replica.fetch.max.bytes, the replica will be moved at only 2 MBps
> > > throughput. I think the solution is for broker to set
> > > replica.fetch.wait.max.ms to 0 in its FetchRequest if the
> corresponding
> > > ReplicaFetcherThread needs to move some replica to another disk.
> > >
> > > 3. I have updated the KIP to mention that the read size of a given
> > > partition is configured using replica.fetch.max.bytes when we move
> > replicas
> > > between disks.
> > >
> > > Please see this
> > > <https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?
> > pageId=67638408&selectedPageVersions=4&selectedPageVersions=5>
> > > for the change of the KIP. I will come up with a solution to throttle
> > > replica movement when a broker is moving a "leader" replica.
> >
> > Thanks. It looks great.
> >
> > >
> > > On Tue, Jan 24, 2017 at 3:30 AM, Alexey Ozeritsky <
> aozerit...@yandex.ru>
> > > wrote:
> > >
> > >>  23.01.2017, 22:11, "Dong Lin" <lindon...@gmail.com>:
> > >>  > Thanks. Please see my comment inline.
> > >>  >
> > >>  > On Mon, Jan 23, 2017 at 6:45 AM, Alexey Ozeritsky <
> > aozerit...@yandex.ru>
> > >>  > wrote:
> > >>  >
> > >>  >> 13.01.2017, 22:29, "Dong Lin" <lindon...@gmail.com>:
> > >>  >> > Hey Alexey,
> > >>  >> >
> > >>  >> > Thanks for your review and the alternative approach. Here is my
> > >>  >> > understanding of your patch. kafka's background threads are used
> > to
> > >>  move
> > >>  >> > data between replicas. When data movement is triggered, the log
> > will
> > >>  be
> > >>  >> > rolled and the new logs will be put in the new directory, and
> > >>  background
> > >>  >> > threads will move segment from old directory to new directory.
> > >>  >> >
> > >>  >> > It is important to note that KIP-112 is intended to work with
> > >>  KIP-113 to
> > >>  >> > support JBOD. I think your solution is definitely simpler and
> > better
> > >>  >> under
> > >>  >> > the current kafka implementation that a broker will fail if any
> > disk
> > >>  >> fails.
> > >>  >> > But I am not sure if we want to allow broker to run with partial
> > >>  disks
> > >>  >> > failure. Let's say the a replica is being moved from log_dir_old
> > to
> > >>  >> > log_dir_new and then log_dir_old stops working due to disk
> > failure.
> > >>  How
> > >>  >> > would your existing patch handles it? To make the scenario a bit
> > more
> > >>  >>
> > >>  >> We will lose log_dir_old. After broker restart we can read the
> data
> > >>  from
> > >>  >> log_dir_new.
> > >>  >
> > >>  > No, you probably can't. This is because the broker doesn't have
> > *all* the
> > >>  > data for this partition. For example, say the broker has
> > >>  > partition_segement_1, partition_segment_50 and
> partition_segment_100
> > on
> > >>  the
> > >>  > log_dir_old. partition_segment_100, which has the latest data, has
> > been
> > >>  > moved to log_dir_new, and the log_dir_old fails before
> > >>  partition_segment_50
> > >>  > and partition_segment_1 is moved to log_dir_new. When broker
> > re-starts,
> > >>  it
> > >>  > won't have partition_segment_50. This causes problem if broker is
> > elected
> > >>  > leader and consumer wants to consume data in the
> partition_segment_1.
> > >>
> > >>  Right.
> > >>
> > >>  >
> > >>  >> > complicated, let's say the broker is shtudown, log_dir_old's
> disk
> > >>  fails,
> > >>  >> > and the broker starts. In this case broker doesn't even know if
> > >>  >> log_dir_new
> > >>  >> > has all the data needed for this replica. It becomes a problem
> if
> > the
> > >>  >> > broker is elected leader of this partition in this case.
> > >>  >>
> > >>  >> log_dir_new contains the most recent data so we will lose the tail
> > of
> > >>  >> partition.
> > >>  >> This is not a big problem for us because we already delete tails
> by
> > >>  hand
> > >>  >> (see https://issues.apache.org/jira/browse/KAFKA-1712).
> > >>  >> Also we dont use authomatic leader balancing
> > >>  (auto.leader.rebalance.enable=false),
> > >>  >> so this partition becomes the leader with a low probability.
> > >>  >> I think my patch can be modified to prohibit the selection of the
> > >>  leader
> > >>  >> until the partition does not move completely.
> > >>  >
> > >>  > I guess you are saying that you have deleted the tails by hand in
> > your
> > >>  own
> > >>  > kafka branch. But KAFKA-1712 is not accepted into Kafka trunk and I
> > am
> > >>  not
> > >>
> > >>  No. We just modify segments mtime by cron job. This works with
> vanilla
> > >>  kafka.
> > >>
> > >>  > sure if it is the right solution. How would this solution address
> the
> > >>  > problem mentioned above?
> > >>
> > >>  If you need only fresh data and if you remove old data by hands this
> is
> > >>  not a problem. But in general case
> > >>  this is a problem of course.
> > >>
> > >>  >
> > >>  > BTW, I am not sure the solution mentioned in KAFKA-1712 is the
> right
> > way
> > >>  to
> > >>  > address its problem. Now that we have timestamp in the message we
> > can use
> > >>  > that to delete old segement instead of relying on the log segment
> > mtime.
> > >>  > Just some idea and we don't have to discuss this problem here.
> > >>  >
> > >>  >> >
> > >>  >> > The solution presented in the KIP attempts to handle it by
> > replacing
> > >>  >> > replica in an atomic version fashion after the log in the new
> dir
> > has
> > >>  >> fully
> > >>  >> > caught up with the log in the old dir. At at time the log can be
> > >>  >> considered
> > >>  >> > to exist on only one log directory.
> > >>  >>
> > >>  >> As I understand your solution does not cover quotas.
> > >>  >> What happens if someone starts to transfer 100 partitions ?
> > >>  >
> > >>  > Good point. Quota can be implemented in the future. It is currently
> > >>  > mentioned as as a potential future improvement in KIP-112
> > >>  > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-112%3
> > >>  A+Handle+disk+failure+for+JBOD>.Thanks
> > >>  > for the reminder. I will move it to KIP-113.
> > >>  >
> > >>  >> > If yes, it will read a ByteBufferMessageSet from
> > topicPartition.log
> > >>  and
> > >>  >> append the message set to topicPartition.move
> > >>  >>
> > >>  >> i.e. processPartitionData will read data from the beginning of
> > >>  >> topicPartition.log? What is the read size?
> > >>  >> A ReplicaFetchThread reads many partitions so if one does some
> > >>  complicated
> > >>  >> work (= read a lot of data from disk) everything will slow down.
> > >>  >> I think read size should not be very big.
> > >>  >>
> > >>  >> On the other hand at this point (processPartitionData) one can use
> > only
> > >>  >> the new data (ByteBufferMessageSet from parameters) and wait until
> > >>  >> (topicPartition.move.smallestOffset <=
> > topicPartition.log.smallestOff
> > >>  set
> > >>  >> && topicPartition.log.largestOffset ==
> > topicPartition.log.largestOffs
> > >>  et).
> > >>  >> In this case the write speed to topicPartition.move and
> > >>  topicPartition.log
> > >>  >> will be the same so this will allow us to move many partitions to
> > one
> > >>  disk.
> > >>  >
> > >>  > The read size of a given partition is configured
> > >>  > using replica.fetch.max.bytes, which is the same size used by
> > >>  FetchRequest
> > >>  > from follower to leader. If the broker is moving a replica for
> which
> > it
> > >>
> > >>  OK. Could you mention it in KIP?
> > >>
> > >>  > acts as a follower, the disk write rate for moving this replica is
> at
> > >>  most
> > >>  > the rate it fetches from leader (assume it is catching up and has
> > >>  > sufficient data to read from leader, which is subject to
> > round-trip-time
> > >>  > between itself and the leader. Thus this part if probably fine even
> > >>  without
> > >>  > quota.
> > >>
> > >>  I think there are 2 problems
> > >>  1. Without speed limiter this will not work good even for 1
> partition.
> > In
> > >>  our production we had a problem so we did the throuput limiter:
> > >>  https://github.com/resetius/kafka/commit/cda31dadb2f135743bf
> > >>  41083062927886c5ddce1#diff-ffa8861e850121997a534ebdde2929c6R713
> > >>
> > >>  2. I dont understand how it will work in case of big
> > >>  replica.fetch.wait.max.ms and partition with irregular flow.
> > >>  For example someone could have replica.fetch.wait.max.ms=10minutes
> and
> > >>  partition that has very high data flow from 12:00 to 13:00 and zero
> > flow
> > >>  otherwise.
> > >>  In this case processPartitionData could be called once per 10minutes
> > so if
> > >>  we start data moving in 13:01 it will be finished next day.
> > >>
> > >>  >
> > >>  > But ff the broker is moving a replica for which it acts as a
> leader,
> > as
> > >>  of
> > >>  > current KIP the broker will keep reading from log_dir_old and
> append
> > to
> > >>  > log_dir_new without having to wait for round-trip-time. We probably
> > need
> > >>  > quota for this in the future.
> > >>  >
> > >>  >> >
> > >>  >> > And to answer your question, yes topicpartition.log refers to
> > >>  >> > topic-paritition/segment.log.
> > >>  >> >
> > >>  >> > Thanks,
> > >>  >> > Dong
> > >>  >> >
> > >>  >> > On Fri, Jan 13, 2017 at 4:12 AM, Alexey Ozeritsky <
> > >>  aozerit...@yandex.ru>
> > >>  >> > wrote:
> > >>  >> >
> > >>  >> >> Hi,
> > >>  >> >>
> > >>  >> >> We have the similar solution that have been working in
> production
> > >>  since
> > >>  >> >> 2014. You can see it here: https://github.com/resetius/ka
> > >>  >> >> fka/commit/20658593e246d2184906879defa2e763c4d413fb
> > >>  >> >> The idea is very simple
> > >>  >> >> 1. Disk balancer runs in a separate thread inside scheduler
> pool.
> > >>  >> >> 2. It does not touch empty partitions
> > >>  >> >> 3. Before it moves a partition it forcibly creates new segment
> > on a
> > >>  >> >> destination disk
> > >>  >> >> 4. It moves segment by segment from new to old.
> > >>  >> >> 5. Log class works with segments on both disks
> > >>  >> >>
> > >>  >> >> Your approach seems too complicated, moreover it means that you
> > >>  have to
> > >>  >> >> patch different components of the system
> > >>  >> >> Could you clarify what do you mean by topicPartition.log? Is it
> > >>  >> >> topic-paritition/segment.log ?
> > >>  >> >>
> > >>  >> >> 12.01.2017, 21:47, "Dong Lin" <lindon...@gmail.com>:
> > >>  >> >> > Hi all,
> > >>  >> >> >
> > >>  >> >> > We created KIP-113: Support replicas movement between log
> > >>  >> directories.
> > >>  >> >> > Please find the KIP wiki in the link
> > >>  >> >> > *https://cwiki.apache.org/confluence/display/KAFKA/KIP-113%
> > >>  >> >> 3A+Support+replicas+movement+between+log+directories
> > >>  >> >> > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-113%
> > >>  >> >> 3A+Support+replicas+movement+between+log+directories>.*
> > >>  >> >> >
> > >>  >> >> > This KIP is related to KIP-112
> > >>  >> >> > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-112%
> > >>  >> >> 3A+Handle+disk+failure+for+JBOD>:
> > >>  >> >> > Handle disk failure for JBOD. They are needed in order to
> > support
> > >>  >> JBOD in
> > >>  >> >> > Kafka. Please help review the KIP. You feedback is
> appreciated!
> > >>  >> >> >
> > >>  >> >> > Thanks,
> > >>  >> >> > Dong
> >
>

Reply via email to