Il 21/01/2013 12:39, Kevin Wolf ha scritto:
> Am 16.01.2013 18:31, schrieb Paolo Bonzini:
>> There is really no change in the behavior of the job here, since
>> there is still a maximum of one in-flight I/O operation between
>> the source and the target.  However, this patch already introduces
>> the AIO callbacks (which are unmodified in the next patch)
>> and some of the logic to count in-flight operations and only
>> complete the job when there is none.
>>
>> Signed-off-by: Paolo Bonzini <pbonz...@redhat.com>
>> ---
>>  block/mirror.c |  155 
>> ++++++++++++++++++++++++++++++++++++++++++--------------
>>  trace-events   |    2 +
>>  2 files changed, 119 insertions(+), 38 deletions(-)
>>
>> diff --git a/block/mirror.c b/block/mirror.c
>> index ab41340..75c550a 100644
>> --- a/block/mirror.c
>> +++ b/block/mirror.c
>> @@ -33,8 +33,19 @@ typedef struct MirrorBlockJob {
>>      unsigned long *cow_bitmap;
>>      HBitmapIter hbi;
>>      uint8_t *buf;
>> +
>> +    int in_flight;
>> +    int ret;
>>  } MirrorBlockJob;
>>  
>> +typedef struct MirrorOp {
>> +    MirrorBlockJob *s;
>> +    QEMUIOVector qiov;
>> +    struct iovec iov;
>> +    int64_t sector_num;
>> +    int nb_sectors;
>> +} MirrorOp;
>> +
>>  static BlockErrorAction mirror_error_action(MirrorBlockJob *s, bool read,
>>                                              int error)
>>  {
>> @@ -48,15 +59,60 @@ static BlockErrorAction 
>> mirror_error_action(MirrorBlockJob *s, bool read,
>>      }
>>  }
>>  
>> -static int coroutine_fn mirror_iteration(MirrorBlockJob *s,
>> -                                         BlockErrorAction *p_action)
>> +static void mirror_iteration_done(MirrorOp *op)
>> +{
>> +    MirrorBlockJob *s = op->s;
>> +
>> +    s->in_flight--;
>> +    trace_mirror_iteration_done(s, op->sector_num, op->nb_sectors);
>> +    g_slice_free(MirrorOp, op);
>> +    qemu_coroutine_enter(s->common.co, NULL);
> 
> This doesn't check if the job coroutine is actually in a state where
> it's valid to reenter.
> 
> Technically it might even be okay because reentering during a sleep is
> allowed and as good as reentering during the new yield, and bdrv_flush()
> is only called if s->in_flight == 0. Most other calls _should_ be okay
> as well, but I'm not so sure about bdrv_drain_all(), especially once
> .bdrv_drain exists.

bdrv_drain_all is also called only if s->in_flight == 0 too, but I see
your point.  It is indeed quite subtle, but it's okay.

> As you can see, this is becoming very subtle, so I would prefer adding
> some explicit bool s->may_reenter or something like that.

The right boolean to test is already there, it's job->busy.  I can add a
new API block_job_yield/block_job_enter (where block_job_yield
resets/sets busy across the yield, and block_job_enter only enters if
!job->busy), but that would be a separate series IMO.

>> +}
> 
>> @@ -177,28 +233,43 @@ static void coroutine_fn mirror_run(void *opaque)
>>      }
>>  
>>      bdrv_dirty_iter_init(bs, &s->hbi);
>> +    last_pause_ns = qemu_get_clock_ns(rt_clock);
>>      for (;;) {
>>          uint64_t delay_ns;
>>          int64_t cnt;
>>          bool should_complete;
>>  
>> +        if (s->ret < 0) {
>> +            ret = s->ret;
>> +            break;
>> +        }
>> +
>>          cnt = bdrv_get_dirty_count(bs);
>> -        if (cnt != 0) {
>> -            BlockErrorAction action = BDRV_ACTION_REPORT;
>> -            ret = mirror_iteration(s, &action);
>> -            if (ret < 0 && action == BDRV_ACTION_REPORT) {
>> -                goto immediate_exit;
>> +
>> +        /* Note that even when no rate limit is applied we need to yield
>> +         * periodically with no pending I/O so that qemu_aio_flush() 
>> returns.
>> +         * We do so every SLICE_TIME milliseconds, or when there is an 
>> error,
> 
> s/milli/nano/
> 
>> +         * or when the source is clean, whichever comes first.
>> +         */
>> +        if (qemu_get_clock_ns(rt_clock) - last_pause_ns < SLICE_TIME &&
>> +            s->common.iostatus == BLOCK_DEVICE_IO_STATUS_OK) {
>> +            if (s->in_flight > 0) {
>> +                trace_mirror_yield(s, s->in_flight, cnt);
>> +                qemu_coroutine_yield();
>> +                continue;
>> +            } else if (cnt != 0) {
>> +                mirror_iteration(s);
>> +                continue;
>>              }
>> -            cnt = bdrv_get_dirty_count(bs);
>>          }
>>  
>>          should_complete = false;
>> -        if (cnt == 0) {
>> +        if (s->in_flight == 0 && cnt == 0) {
>>              trace_mirror_before_flush(s);
>>              ret = bdrv_flush(s->target);
>>              if (ret < 0) {
>>                  if (mirror_error_action(s, false, -ret) == 
>> BDRV_ACTION_REPORT) {
>> -                    goto immediate_exit;
>> +                    break;
> 
> Is this an unrelated change?

Seems like a rebase hiccup.  Doesn't have any semantic change, I'll drop it.

>>                  }
>>              } else {
>>                  /* We're out of the streaming phase.  From now on, if the 
>> job
>> @@ -244,15 +315,12 @@ static void coroutine_fn mirror_run(void *opaque)
>>                  delay_ns = 0;
>>              }
>>  
>> -            /* Note that even when no rate limit is applied we need to yield
>> -             * with no pending I/O here so that bdrv_drain_all() returns.
>> -             */
>>              block_job_sleep_ns(&s->common, rt_clock, delay_ns);
>>              if (block_job_is_cancelled(&s->common)) {
>>                  break;
>>              }
>>          } else if (!should_complete) {
>> -            delay_ns = (cnt == 0 ? SLICE_TIME : 0);
>> +            delay_ns = (s->in_flight == 0 && cnt == 0 ? SLICE_TIME : 0);
>>              block_job_sleep_ns(&s->common, rt_clock, delay_ns);
>>          } else if (cnt == 0) {
> 
> Why don't we need to check s->in_flight == 0 here?

As above, should_complete is only set to true within an if(in_flight == 0).

Paolo

>>              /* The two disks are in sync.  Exit and report successful
>> @@ -262,9 +330,20 @@ static void coroutine_fn mirror_run(void *opaque)
>>              s->common.cancelled = false;
>>              break;
>>          }
>> +        last_pause_ns = qemu_get_clock_ns(rt_clock);
>>      }
>>  
>>  immediate_exit:
>> +    if (s->in_flight > 0) {
>> +        /* We get here only if something went wrong.  Either the job failed,
>> +         * or it was cancelled prematurely so that we do not guarantee that
>> +         * the target is a copy of the source.
>> +         */
>> +        assert(ret < 0 || (!s->synced && 
>> block_job_is_cancelled(&s->common)));
>> +        mirror_drain(s);
>> +    }
>> +
>> +    assert(s->in_flight == 0);
>>      qemu_vfree(s->buf);
>>      g_free(s->cow_bitmap);
>>      bdrv_set_dirty_tracking(bs, 0);
> 
> Kevin
> 
> 


Reply via email to