+ceph-devel.

Thanks,
Guang

On Jul 29, 2014, at 10:20 PM, Guang Yang <yguan...@outlook.com> wrote:

> Hi Yehuda,
> Per you review comment in terms of IO throttling for bucket index operation, 
> I prototyped the below code (details still need to polish), can you take a 
> look if that is right way to go?
> 
> Another problem I came across is that ClsBucketIndexOpCtx::handle_compeltion 
> was not called for the bucket index init op (below), is there anything I 
> missed obviously here?
> 
> Thanks,
> Guang
> 
> 
> class ClsBucketIndexAioThrottler {
> protected:
>  int completed;
>  int ret_code;
>  IoCtx& io_ctx;
>  Mutex lock;
>  struct LockCond {
>    Mutex lock;
>    Cond cond;
>    LockCond() : lock("LockCond"), cond() {}
>  } lock_cond;
> public:
>  ClsBucketIndexAioThrottler(IoCtx& _io_ctx)
>    : completed(0), ret_code(0), io_ctx(_io_ctx),
>    lock("ClsBucketIndexAioThrottler"), lock_cond() {}
> 
>  virtual ~ClsBucketIndexAioThrottler() {}
>  virtual void do_next() = 0;
>  virtual bool is_completed () = 0;
> 
>  void complete(int ret) {
>    {
>      Mutex::Locker l(lock);
>      if (ret < 0)
>        ret_code = ret;
>      ++completed;
>    }
> 
>    lock_cond.lock.Lock();
>    lock_cond.cond.Signal();
>    lock_cond.lock.Unlock();
>  }
> 
>  int get_ret_code () {
>    Mutex::Locker l(lock);
>    return ret_code;
>  }
> 
>  virtual int wait_completion() {
>    lock_cond.lock.Lock();
>    while (1) {
>      if (is_completed()) {
>        lock_cond.lock.Unlock();
>        return ret_code;
>      }
>      lock_cond.cond.Wait(lock_cond.lock);
>      lock_cond.lock.Lock();
>    }
>  }
> };
> 
> class ClsBucketIndexListAioThrottler : public ClsBucketIndexAioThrottler {
> protected:
>  vector<string> bucket_objects;
>  vector<string>::iterator iter_pos;
> public:
>  ClsBucketIndexListAioThrottler(IoCtx& _io_ctx, const vector<string> 
> _bucket_objs)
>    : ClsBucketIndexAioThrottler(_io_ctx), bucket_objects(_bucket_objs),
>    iter_pos(bucket_objects.begin()) {}
> 
>  virtual bool is_completed() {
>    Mutex::Locker l(lock);
>    int sent = 0;
>    vector<string>::iterator iter = bucket_objects.begin();
>    for (; iter != iter_pos; ++iter) ++sent;
> 
>    return (sent == completed &&
>        (iter_pos == bucket_objects.end() /*Success*/ || ret_code < 0 
> /*Failure*/));
>  }
> };
> 
> template<typename T>
> class ClsBucketIndexOpCtx : public ObjectOperationCompletion {
> private:
>  T* data;
>  // Return code of the operation
>  int* ret_code;
> 
>  // The Aio completion object associated with this Op, it should
>  // be release from within the completion handler
>  librados::AioCompletion* completion;
>  ClsBucketIndexAioThrottler* throttler;
> public:
>  ClsBucketIndexOpCtx(T* _data, int* _ret_code, librados::AioCompletion* 
> _completion,
>          ClsBucketIndexAioThrottler* _throttler)
>    : data(_data), ret_code(_ret_code), completion(_completion), 
> throttler(_throttler) {}
>  ~ClsBucketIndexOpCtx() {}
> 
>  // The completion callback, fill the response data
>  void handle_completion(int r, bufferlist& outbl) {
>    if (r >= 0) {
>      if (data) {
>        try {
>          bufferlist::iterator iter = outbl.begin();
>          ::decode((*data), iter);
>        } catch (buffer::error& err) {
>          r = -EIO;
>        }
>      }
>      // Do the next request
>    }
>    throttler->do_next();
>    throttler->complete(r);
>    if (completion) {
>      completion->release();
>    }
>  }
> };
> 
> 
> class ClsBucketIndexInitAioThrottler : public ClsBucketIndexListAioThrottler {
> public:
>  ClsBucketIndexInitAioThrottler(IoCtx& _io_ctx, const vector<string> 
> _bucket_objs) :
>    ClsBucketIndexListAioThrottler(_io_ctx, _bucket_objs) {}
> 
>  virtual void do_next() {
>    string oid;
>    {
>      Mutex::Locker l(lock);
>      if (iter_pos == bucket_objects.end())
>        return;
>      oid = *(iter_pos++);
>    }
>    AioCompletion* c = librados::Rados::aio_create_completion(NULL, NULL, 
> NULL);
>    // Dummy
>    bufferlist in;
>    librados::ObjectWriteOperation op;
>    op.create(true);
>    op.exec("rgw", "bucket_init_index", in, new ClsBucketIndexOpCtx<int>(NULL, 
> NULL, c, this));
>    io_ctx.aio_operate(oid, c, &op, NULL); 
>  }
> };
> 
> 
> int cls_rgw_bucket_index_init_op(librados::IoCtx &io_ctx,
>        const vector<string>& bucket_objs, uint32_t max_aio)
> {
>   vector<string>::const_iterator iter = bucket_objs.begin();
>   bufferlist in;
>   ClsBucketIndexAioThrottler* throttler = new 
> ClsBucketIndexInitAioThrottler(io_ctx, bucket_objs);
>   for (; iter != bucket_objs.end() && max_aio-- > 0; ++iter) {
>       throttler->do_next();
>   }
>   throttler->wait_completion();
>   return 0;
> }
> 
> 

--
To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
the body of a message to majord...@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html

Reply via email to