On Monday 10 December 2007 13:31, Jonathan Corbet wrote:
> Hey, Daniel,
>
> I'm just getting around to looking at this.  One thing jumped out at me:
> > +   if (bio->bi_throttle) {
> > +           struct request_queue *q = bio->bi_queue;
> > +           bio->bi_throttle = 0; /* or detect multiple endio and err? */
> > +           atomic_add(bio->bi_throttle, &q->available);
> > +           wake_up(&q->throttle_wait);
> > +   }
>
> I'm feeling like I must be really dumb, but...how can that possibly
> work?  You're zeroing >bi_throttle before adding it back into
> q->available, so the latter will never increase...

Hi Jon,

Don't you know?  These days we optimize all our code for modern
processors with tunnelling instructions and metaphysical cache.
On such processors, setting a register to zero does not entirely
destroy all the data that used to be in the register, so subsequent
instructions can make further use of the overwritten data by
reconstructing it from remnants of bits left attached to the edges of
the register.

Um, yeah, that's it.

Actually, I fat-fingered it in the merge to -mm.  Thanks for the catch,
corrected patch attached.

The offending line isn't even a functional part of the algorithm, it is
just supposed to defend against the possibility that, somehow,
->bi_endio gets called multiple times.  Probably it should really be
something like:

                BUG_ON(bio->bi_throttle == -1);
                if (bio->bi_throttle) {
                        ...
                        bio->bi_throttle = -1;

Or perhaps we should just rely on nobody ever making that mistake
and let somebody else catch it if it does.

Regards,

Daniel
--- 2.6.24-rc3-mm.clean/block/ll_rw_blk.c	2007-12-04 14:45:25.000000000 -0800
+++ 2.6.24-rc3-mm/block/ll_rw_blk.c	2007-12-10 04:49:56.000000000 -0800
@@ -3210,9 +3210,9 @@ static inline int bio_check_eod(struct b
  */
 static inline void __generic_make_request(struct bio *bio)
 {
-	struct request_queue *q;
+	struct request_queue *q = bdev_get_queue(bio->bi_bdev);
 	sector_t old_sector;
-	int ret, nr_sectors = bio_sectors(bio);
+	int nr_sectors = bio_sectors(bio);
 	dev_t old_dev;
 	int err = -EIO;
 
@@ -3221,6 +3221,13 @@ static inline void __generic_make_reques
 	if (bio_check_eod(bio, nr_sectors))
 		goto end_io;
 
+	if (q && q->metric && !bio->bi_queue) {
+		int need = bio->bi_throttle = q->metric(bio);
+		bio->bi_queue = q;
+		/* FIXME: potential race if atomic_sub is called in the middle of condition check */
+		wait_event(q->throttle_wait, atomic_read(&q->available) >= need);
+		atomic_sub(need, &q->available);
+	}
 	/*
 	 * Resolve the mapping until finished. (drivers are
 	 * still free to implement/resolve their own stacking
@@ -3231,10 +3238,9 @@ static inline void __generic_make_reques
 	 */
 	old_sector = -1;
 	old_dev = 0;
-	do {
+	while (1) {
 		char b[BDEVNAME_SIZE];
 
-		q = bdev_get_queue(bio->bi_bdev);
 		if (!q) {
 			printk(KERN_ERR
 			       "generic_make_request: Trying to access "
@@ -3282,8 +3288,10 @@ end_io:
 			goto end_io;
 		}
 
-		ret = q->make_request_fn(q, bio);
-	} while (ret);
+		if (!q->make_request_fn(q, bio))
+			return;
+		q = bdev_get_queue(bio->bi_bdev);
+	}
 }
 
 /*
--- 2.6.24-rc3-mm.clean/drivers/md/dm.c	2007-12-04 14:46:04.000000000 -0800
+++ 2.6.24-rc3-mm/drivers/md/dm.c	2007-12-04 23:31:41.000000000 -0800
@@ -889,6 +889,11 @@ static int dm_any_congested(void *conges
 	return r;
 }
 
+static unsigned dm_metric(struct bio *bio)
+{
+	return bio->bi_vcnt;
+}
+
 /*-----------------------------------------------------------------
  * An IDR is used to keep track of allocated minor numbers.
  *---------------------------------------------------------------*/
@@ -967,6 +972,7 @@ out:
 
 static struct block_device_operations dm_blk_dops;
 
+#define DEFAULT_THROTTLE_CAPACITY 1000
 /*
  * Allocate and initialise a blank device with a given minor.
  */
@@ -1009,6 +1015,11 @@ static struct mapped_device *alloc_dev(i
 		goto bad1_free_minor;
 
 	md->queue->queuedata = md;
+	md->queue->metric = dm_metric;
+	/* A dm device constructor may change the throttle capacity */
+	atomic_set(&md->queue->available, md->queue->capacity = DEFAULT_THROTTLE_CAPACITY);
+	init_waitqueue_head(&md->queue->throttle_wait);
+
 	md->queue->backing_dev_info.congested_fn = dm_any_congested;
 	md->queue->backing_dev_info.congested_data = md;
 	blk_queue_make_request(md->queue, dm_request);
--- 2.6.24-rc3-mm.clean/fs/bio.c	2007-12-04 14:38:47.000000000 -0800
+++ 2.6.24-rc3-mm/fs/bio.c	2007-12-04 23:31:41.000000000 -0800
@@ -1007,6 +1007,13 @@ void bio_endio(struct bio *bio, int erro
 	else if (!test_bit(BIO_UPTODATE, &bio->bi_flags))
 		error = -EIO;
 
+	if (bio->bi_throttle) {
+		struct request_queue *q = bio->bi_queue;
+		atomic_add(bio->bi_throttle, &q->available);
+		bio->bi_throttle = 0; /* or detect multiple endio and err? */
+		wake_up(&q->throttle_wait);
+	}
+
 	if (bio->bi_end_io)
 		bio->bi_end_io(bio, error);
 }
--- 2.6.24-rc3-mm.clean/include/linux/bio.h	2007-12-04 14:39:31.000000000 -0800
+++ 2.6.24-rc3-mm/include/linux/bio.h	2007-12-04 23:31:41.000000000 -0800
@@ -111,6 +111,9 @@ struct bio {
 	bio_end_io_t		*bi_end_io;
 	atomic_t		bi_cnt;		/* pin count */
 
+	struct request_queue	*bi_queue;	/* for throttling */
+	unsigned		bi_throttle;	/* throttle metric */
+
 	void			*bi_private;
 
 	bio_destructor_t	*bi_destructor;	/* destructor */
--- 2.6.24-rc3-mm.clean/include/linux/blkdev.h	2007-12-04 14:47:18.000000000 -0800
+++ 2.6.24-rc3-mm/include/linux/blkdev.h	2007-12-04 23:31:41.000000000 -0800
@@ -383,6 +383,10 @@ struct request_queue
 	struct work_struct	unplug_work;
 
 	struct backing_dev_info	backing_dev_info;
+	unsigned (*metric)(struct bio *bio);	/* bio throttle metric */
+	wait_queue_head_t	throttle_wait;
+	atomic_t		available;
+	unsigned		capacity;
 
 	/*
 	 * The queue owner gets to use this for whatever they like.

Reply via email to