23.01.2017, 22:11, "Dong Lin" <lindon...@gmail.com>:
> Thanks. Please see my comment inline.
>
> On Mon, Jan 23, 2017 at 6:45 AM, Alexey Ozeritsky <aozerit...@yandex.ru>
> wrote:
>
>>  13.01.2017, 22:29, "Dong Lin" <lindon...@gmail.com>:
>>  > Hey Alexey,
>>  >
>>  > Thanks for your review and the alternative approach. Here is my
>>  > understanding of your patch. kafka's background threads are used to move
>>  > data between replicas. When data movement is triggered, the log will be
>>  > rolled and the new logs will be put in the new directory, and background
>>  > threads will move segment from old directory to new directory.
>>  >
>>  > It is important to note that KIP-112 is intended to work with KIP-113 to
>>  > support JBOD. I think your solution is definitely simpler and better
>>  under
>>  > the current kafka implementation that a broker will fail if any disk
>>  fails.
>>  > But I am not sure if we want to allow broker to run with partial disks
>>  > failure. Let's say the a replica is being moved from log_dir_old to
>>  > log_dir_new and then log_dir_old stops working due to disk failure. How
>>  > would your existing patch handles it? To make the scenario a bit more
>>
>>  We will lose log_dir_old. After broker restart we can read the data from
>>  log_dir_new.
>
> No, you probably can't. This is because the broker doesn't have *all* the
> data for this partition. For example, say the broker has
> partition_segement_1, partition_segment_50 and partition_segment_100 on the
> log_dir_old. partition_segment_100, which has the latest data, has been
> moved to log_dir_new, and the log_dir_old fails before partition_segment_50
> and partition_segment_1 is moved to log_dir_new. When broker re-starts, it
> won't have partition_segment_50. This causes problem if broker is elected
> leader and consumer wants to consume data in the partition_segment_1.

Right.

>
>>  > complicated, let's say the broker is shtudown, log_dir_old's disk fails,
>>  > and the broker starts. In this case broker doesn't even know if
>>  log_dir_new
>>  > has all the data needed for this replica. It becomes a problem if the
>>  > broker is elected leader of this partition in this case.
>>
>>  log_dir_new contains the most recent data so we will lose the tail of
>>  partition.
>>  This is not a big problem for us because we already delete tails by hand
>>  (see https://issues.apache.org/jira/browse/KAFKA-1712).
>>  Also we dont use authomatic leader balancing 
>> (auto.leader.rebalance.enable=false),
>>  so this partition becomes the leader with a low probability.
>>  I think my patch can be modified to prohibit the selection of the leader
>>  until the partition does not move completely.
>
> I guess you are saying that you have deleted the tails by hand in your own
> kafka branch. But KAFKA-1712 is not accepted into Kafka trunk and I am not

No. We just modify segments mtime by cron job. This works with vanilla kafka.

> sure if it is the right solution. How would this solution address the
> problem mentioned above?

If you need only fresh data and if you remove old data by hands this is not a 
problem. But in general case 
this is a problem of course.

>
> BTW, I am not sure the solution mentioned in KAFKA-1712 is the right way to
> address its problem. Now that we have timestamp in the message we can use
> that to delete old segement instead of relying on the log segment mtime.
> Just some idea and we don't have to discuss this problem here.
>
>>  >
>>  > The solution presented in the KIP attempts to handle it by replacing
>>  > replica in an atomic version fashion after the log in the new dir has
>>  fully
>>  > caught up with the log in the old dir. At at time the log can be
>>  considered
>>  > to exist on only one log directory.
>>
>>  As I understand your solution does not cover quotas.
>>  What happens if someone starts to transfer 100 partitions ?
>
> Good point. Quota can be implemented in the future. It is currently
> mentioned as as a potential future improvement in KIP-112
> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-112%3A+Handle+disk+failure+for+JBOD>.Thanks
> for the reminder. I will move it to KIP-113.
>
>>  > If yes, it will read a ByteBufferMessageSet from topicPartition.log and
>>  append the message set to topicPartition.move
>>
>>  i.e. processPartitionData will read data from the beginning of
>>  topicPartition.log? What is the read size?
>>  A ReplicaFetchThread reads many partitions so if one does some complicated
>>  work (= read a lot of data from disk) everything will slow down.
>>  I think read size should not be very big.
>>
>>  On the other hand at this point (processPartitionData) one can use only
>>  the new data (ByteBufferMessageSet from parameters) and wait until
>>  (topicPartition.move.smallestOffset <= topicPartition.log.smallestOffset
>>  && topicPartition.log.largestOffset == topicPartition.log.largestOffset).
>>  In this case the write speed to topicPartition.move and topicPartition.log
>>  will be the same so this will allow us to move many partitions to one disk.
>
> The read size of a given partition is configured
> using replica.fetch.max.bytes, which is the same size used by FetchRequest
> from follower to leader. If the broker is moving a replica for which it

OK. Could you mention it in KIP?

> acts as a follower, the disk write rate for moving this replica is at most
> the rate it fetches from leader (assume it is catching up and has
> sufficient data to read from leader, which is subject to round-trip-time
> between itself and the leader. Thus this part if probably fine even without
> quota.

I think there are 2 problems
1. Without speed limiter this will not work good even for 1 partition. In our 
production we had a problem so we did the throuput limiter:
https://github.com/resetius/kafka/commit/cda31dadb2f135743bf41083062927886c5ddce1#diff-ffa8861e850121997a534ebdde2929c6R713

2. I dont understand how it will work in case of big replica.fetch.wait.max.ms 
and partition with irregular flow.
For example someone could have replica.fetch.wait.max.ms=10minutes and 
partition that has very high data flow from 12:00 to 13:00 and zero flow 
otherwise.
In this case processPartitionData could be called once per 10minutes so if we 
start data moving in 13:01 it will be finished next day.

>
> But ff the broker is moving a replica for which it acts as a leader, as of
> current KIP the broker will keep reading from log_dir_old and append to
> log_dir_new without having to wait for round-trip-time. We probably need
> quota for this in the future.
>
>>  >
>>  > And to answer your question, yes topicpartition.log refers to
>>  > topic-paritition/segment.log.
>>  >
>>  > Thanks,
>>  > Dong
>>  >
>>  > On Fri, Jan 13, 2017 at 4:12 AM, Alexey Ozeritsky <aozerit...@yandex.ru>
>>  > wrote:
>>  >
>>  >> Hi,
>>  >>
>>  >> We have the similar solution that have been working in production since
>>  >> 2014. You can see it here: https://github.com/resetius/ka
>>  >> fka/commit/20658593e246d2184906879defa2e763c4d413fb
>>  >> The idea is very simple
>>  >> 1. Disk balancer runs in a separate thread inside scheduler pool.
>>  >> 2. It does not touch empty partitions
>>  >> 3. Before it moves a partition it forcibly creates new segment on a
>>  >> destination disk
>>  >> 4. It moves segment by segment from new to old.
>>  >> 5. Log class works with segments on both disks
>>  >>
>>  >> Your approach seems too complicated, moreover it means that you have to
>>  >> patch different components of the system
>>  >> Could you clarify what do you mean by topicPartition.log? Is it
>>  >> topic-paritition/segment.log ?
>>  >>
>>  >> 12.01.2017, 21:47, "Dong Lin" <lindon...@gmail.com>:
>>  >> > Hi all,
>>  >> >
>>  >> > We created KIP-113: Support replicas movement between log
>>  directories.
>>  >> > Please find the KIP wiki in the link
>>  >> > *https://cwiki.apache.org/confluence/display/KAFKA/KIP-113%
>>  >> 3A+Support+replicas+movement+between+log+directories
>>  >> > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-113%
>>  >> 3A+Support+replicas+movement+between+log+directories>.*
>>  >> >
>>  >> > This KIP is related to KIP-112
>>  >> > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-112%
>>  >> 3A+Handle+disk+failure+for+JBOD>:
>>  >> > Handle disk failure for JBOD. They are needed in order to support
>>  JBOD in
>>  >> > Kafka. Please help review the KIP. You feedback is appreciated!
>>  >> >
>>  >> > Thanks,
>>  >> > Dong

Reply via email to