>Reviewed-by: Yan, Zheng <zheng.z....@intel.com>

Thanks very much!
Jianpeng Ma
>
>On 09/12/2013 01:54 PM, majianpeng wrote:
>> For writev/pwritev sync-operatoin, ceph only do the first iov.
>> It don't think other iovs.Now implement this.
>> I divided the write-sync-operation into two functions.One for
>> direct-write,other for none-direct-sync-write.This is because for
>> none-direct-sync-write we can merge iovs to one.But for direct-write,
>> we can't merge iovs.
>> 
>> V4:
>>      reconstruct the code by Yan, Zheng
>> V2:
>>   -using struct iov_iter replace clone iovs in ceph_sync_write.
>> 
>> Signed-off-by: Jianpeng Ma <majianp...@gmail.com>
>> Reviewed-by: Yan, Zheng <zheng.z....@intel.com>
>> ---
>>  fs/ceph/file.c | 273 
>> ++++++++++++++++++++++++++++++++++++++++-----------------
>>  1 file changed, 193 insertions(+), 80 deletions(-)
>> 
>> diff --git a/fs/ceph/file.c b/fs/ceph/file.c
>> index 3de8982..5cf034e 100644
>> --- a/fs/ceph/file.c
>> +++ b/fs/ceph/file.c
>> @@ -489,83 +489,79 @@ static void ceph_sync_write_unsafe(struct 
>> ceph_osd_request *req, bool unsafe)
>>      }
>>  }
>>  
>> +
>>  /*
>> - * Synchronous write, straight from __user pointer or user pages (if
>> - * O_DIRECT).
>> + * Synchronous write, straight from __user pointer or user pages.
>>   *
>>   * If write spans object boundary, just do multiple writes.  (For a
>>   * correct atomic write, we should e.g. take write locks on all
>>   * objects, rollback on failure, etc.)
>>   */
>> -static ssize_t ceph_sync_write(struct file *file, const char __user *data,
>> -                           size_t left, loff_t pos, loff_t *ppos)
>> +static ssize_t
>> +ceph_sync_direct_write(struct kiocb *iocb, const struct iovec *iov,
>> +                   unsigned long nr_segs, size_t count)
>>  {
>> +    struct file *file = iocb->ki_filp;
>>      struct inode *inode = file_inode(file);
>>      struct ceph_inode_info *ci = ceph_inode(inode);
>>      struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
>>      struct ceph_snap_context *snapc;
>>      struct ceph_vino vino;
>>      struct ceph_osd_request *req;
>> -    int num_ops = 1;
>>      struct page **pages;
>>      int num_pages;
>> -    u64 len;
>>      int written = 0;
>>      int flags;
>>      int check_caps = 0;
>> -    int page_align, io_align;
>> -    unsigned long buf_align;
>> +    int page_align;
>>      int ret;
>>      struct timespec mtime = CURRENT_TIME;
>> -    bool own_pages = false;
>> +    loff_t pos = iocb->ki_pos;
>> +    struct iov_iter i;
>>  
>>      if (ceph_snap(file_inode(file)) != CEPH_NOSNAP)
>>              return -EROFS;
>>  
>> -    dout("sync_write on file %p %lld~%u %s\n", file, pos,
>> -         (unsigned)left, (file->f_flags & O_DIRECT) ? "O_DIRECT" : "");
>> +    dout("sync_direct_write on file %p %lld~%u\n", file, pos,
>> +         (unsigned)count);
>>  
>> -    ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + left);
>> +    ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count);
>>      if (ret < 0)
>>              return ret;
>>  
>>      ret = invalidate_inode_pages2_range(inode->i_mapping,
>>                                          pos >> PAGE_CACHE_SHIFT,
>> -                                        (pos + left) >> PAGE_CACHE_SHIFT);
>> +                                        (pos + count) >> PAGE_CACHE_SHIFT);
>>      if (ret < 0)
>>              dout("invalidate_inode_pages2_range returned %d\n", ret);
>>  
>>      flags = CEPH_OSD_FLAG_ORDERSNAP |
>>              CEPH_OSD_FLAG_ONDISK |
>>              CEPH_OSD_FLAG_WRITE;
>> -    if ((file->f_flags & (O_SYNC|O_DIRECT)) == 0)
>> -            flags |= CEPH_OSD_FLAG_ACK;
>> -    else
>> -            num_ops++;      /* Also include a 'startsync' command. */
>>  
>> -    /*
>> -     * we may need to do multiple writes here if we span an object
>> -     * boundary.  this isn't atomic, unfortunately.  :(
>> -     */
>> -more:
>> -    io_align = pos & ~PAGE_MASK;
>> -    buf_align = (unsigned long)data & ~PAGE_MASK;
>> -    len = left;
>> -
>> -    snapc = ci->i_snap_realm->cached_context;
>> -    vino = ceph_vino(inode);
>> -    req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
>> -                                vino, pos, &len, num_ops,
>> -                                CEPH_OSD_OP_WRITE, flags, snapc,
>> -                                ci->i_truncate_seq, ci->i_truncate_size,
>> -                                false);
>> -    if (IS_ERR(req))
>> -            return PTR_ERR(req);
>> +    iov_iter_init(&i, iov, nr_segs, count, 0);
>> +
>> +    while (iov_iter_count(&i) > 0) {
>> +            void __user *data = i.iov->iov_base + i.iov_offset;
>> +            u64 len = i.iov->iov_len - i.iov_offset;
>> +
>> +            page_align = (unsigned long)data & ~PAGE_MASK;
>> +
>> +            snapc = ci->i_snap_realm->cached_context;
>> +            vino = ceph_vino(inode);
>> +            req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
>> +                                        vino, pos, &len,
>> +                                        2,/*include a 'startsync' command*/
>> +                                        CEPH_OSD_OP_WRITE, flags, snapc,
>> +                                        ci->i_truncate_seq,
>> +                                        ci->i_truncate_size,
>> +                                        false);
>> +            if (IS_ERR(req)) {
>> +                    ret = PTR_ERR(req);
>> +                    goto out;
>> +            }
>>  
>> -    /* write from beginning of first page, regardless of io alignment */
>> -    page_align = file->f_flags & O_DIRECT ? buf_align : io_align;
>> -    num_pages = calc_pages_for(page_align, len);
>> -    if (file->f_flags & O_DIRECT) {
>> +            num_pages = calc_pages_for(page_align, len);
>>              pages = ceph_get_direct_page_vector(data, num_pages, false);
>>              if (IS_ERR(pages)) {
>>                      ret = PTR_ERR(pages);
>> @@ -577,60 +573,175 @@ more:
>>               * may block.
>>               */
>>              truncate_inode_pages_range(inode->i_mapping, pos,
>> -                                       (pos+len) | (PAGE_CACHE_SIZE-1));
>> -    } else {
>> +                               (pos+len) | (PAGE_CACHE_SIZE-1));
>> +            osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align,
>> +                                            false, false);
>> +
>> +            /* BUG_ON(vino.snap != CEPH_NOSNAP); */
>> +            ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime);
>> +
>> +            ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
>> +            if (!ret)
>> +                    ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
>> +
>> +            ceph_put_page_vector(pages, num_pages, false);
>> +
>> +out:
>> +            ceph_osdc_put_request(req);
>> +            if (ret == 0) {
>> +                    pos += len;
>> +                    written += len;
>> +                    iov_iter_advance(&i, (size_t)len);
>> +
>> +                    if (pos > i_size_read(inode)) {
>> +                            check_caps = ceph_inode_set_size(inode, pos);
>> +                            if (check_caps)
>> +                                    ceph_check_caps(ceph_inode(inode),
>> +                                                    CHECK_CAPS_AUTHONLY,
>> +                                                    NULL);
>> +                    }
>> +            } else
>> +                    break;
>> +    }
>> +
>> +    if (ret != -EOLDSNAPC && written > 0) {
>> +            iocb->ki_pos = pos;
>> +            ret = written;
>> +    }
>> +    return ret;
>> +}
>> +
>> +
>> +/*
>> + * Synchronous write, straight from __user pointer or user pages.
>> + *
>> + * If write spans object boundary, just do multiple writes.  (For a
>> + * correct atomic write, we should e.g. take write locks on all
>> + * objects, rollback on failure, etc.)
>> + */
>> +static ssize_t ceph_sync_write(struct kiocb *iocb, const struct iovec *iov,
>> +                           unsigned long nr_segs, size_t count)
>> +{
>> +    struct file *file = iocb->ki_filp;
>> +    struct inode *inode = file_inode(file);
>> +    struct ceph_inode_info *ci = ceph_inode(inode);
>> +    struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
>> +    struct ceph_snap_context *snapc;
>> +    struct ceph_vino vino;
>> +    struct ceph_osd_request *req;
>> +    struct page **pages;
>> +    u64 len;
>> +    int num_pages;
>> +    int written = 0;
>> +    int flags;
>> +    int check_caps = 0;
>> +    int ret;
>> +    struct timespec mtime = CURRENT_TIME;
>> +    loff_t pos = iocb->ki_pos;
>> +    struct iov_iter i;
>> +
>> +    if (ceph_snap(file_inode(file)) != CEPH_NOSNAP)
>> +            return -EROFS;
>> +
>> +    dout("sync_write on file %p %lld~%u\n", file, pos, (unsigned)count);
>> +
>> +    ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count);
>> +    if (ret < 0)
>> +            return ret;
>> +
>> +    ret = invalidate_inode_pages2_range(inode->i_mapping,
>> +                                        pos >> PAGE_CACHE_SHIFT,
>> +                                        (pos + count) >> PAGE_CACHE_SHIFT);
>> +    if (ret < 0)
>> +            dout("invalidate_inode_pages2_range returned %d\n", ret);
>> +
>> +    flags = CEPH_OSD_FLAG_ORDERSNAP |
>> +            CEPH_OSD_FLAG_ONDISK |
>> +            CEPH_OSD_FLAG_WRITE |
>> +            CEPH_OSD_FLAG_ACK;
>> +
>> +    iov_iter_init(&i, iov, nr_segs, count, 0);
>> +
>> +    while ((len = iov_iter_count(&i)) > 0) {
>> +            size_t left;
>> +            int n;
>> +
>> +            snapc = ci->i_snap_realm->cached_context;
>> +            vino = ceph_vino(inode);
>> +            req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
>> +                                        vino, pos, &len, 1,
>> +                                        CEPH_OSD_OP_WRITE, flags, snapc,
>> +                                        ci->i_truncate_seq,
>> +                                        ci->i_truncate_size,
>> +                                        false);
>> +            if (IS_ERR(req)) {
>> +                    ret = PTR_ERR(req);
>> +                    goto out;
>> +            }
>> +
>> +            /*
>> +             * write from beginning of first page,
>> +             * regardless of io alignment
>> +             */
>> +            num_pages = (len + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT;
>> +
>>              pages = ceph_alloc_page_vector(num_pages, GFP_NOFS);
>>              if (IS_ERR(pages)) {
>>                      ret = PTR_ERR(pages);
>>                      goto out;
>>              }
>> -            ret = ceph_copy_user_to_page_vector(pages, data, pos, len);
>> +
>> +            left = len;
>> +            for (n = 0; n < num_pages; n++) {
>> +                    size_t plen = min(left, PAGE_SIZE);
>> +                    ret = iov_iter_copy_from_user(pages[n], &i, 0, plen);
>> +                    if (ret != plen) {
>> +                            ret = -EFAULT;
>> +                            break;
>> +                    }
>> +                    left -= ret;
>> +                    iov_iter_advance(&i, ret);
>> +            }
>> +
>>              if (ret < 0) {
>>                      ceph_release_page_vector(pages, num_pages);
>>                      goto out;
>>              }
>>  
>> -            if ((file->f_flags & O_SYNC) == 0) {
>> -                    /* get a second commit callback */
>> -                    req->r_unsafe_callback = ceph_sync_write_unsafe;
>> -                    req->r_inode = inode;
>> -                    own_pages = true;
>> -            }
>> -    }
>> -    osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align,
>> -                                    false, own_pages);
>> +            /* get a second commit callback */
>> +            req->r_unsafe_callback = ceph_sync_write_unsafe;
>> +            req->r_inode = inode;
>>  
>> -    /* BUG_ON(vino.snap != CEPH_NOSNAP); */
>> -    ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime);
>> +            osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0,
>> +                                            false, true);
>>  
>> -    ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
>> -    if (!ret)
>> -            ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
>> +            /* BUG_ON(vino.snap != CEPH_NOSNAP); */
>> +            ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime);
>>  
>> -    if (file->f_flags & O_DIRECT)
>> -            ceph_put_page_vector(pages, num_pages, false);
>> -    else if (file->f_flags & O_SYNC)
>> -            ceph_release_page_vector(pages, num_pages);
>> +            ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
>> +            if (!ret)
>> +                    ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
>>  
>>  out:
>> -    ceph_osdc_put_request(req);
>> -    if (ret == 0) {
>> -            pos += len;
>> -            written += len;
>> -            left -= len;
>> -            data += len;
>> -            if (left)
>> -                    goto more;
>> +            ceph_osdc_put_request(req);
>> +            if (ret == 0) {
>> +                    pos += len;
>> +                    written += len;
>> +
>> +                    if (pos > i_size_read(inode)) {
>> +                            check_caps = ceph_inode_set_size(inode, pos);
>> +                            if (check_caps)
>> +                                    ceph_check_caps(ceph_inode(inode),
>> +                                                    CHECK_CAPS_AUTHONLY,
>> +                                                    NULL);
>> +                    }
>> +            } else
>> +                    break;
>> +    }
>>  
>> +    if (ret != -EOLDSNAPC && written > 0) {
>>              ret = written;
>> -            *ppos = pos;
>> -            if (pos > i_size_read(inode))
>> -                    check_caps = ceph_inode_set_size(inode, pos);
>> -            if (check_caps)
>> -                    ceph_check_caps(ceph_inode(inode), CHECK_CAPS_AUTHONLY,
>> -                                    NULL);
>> -    } else if (ret != -EOLDSNAPC && written > 0) {
>> -            ret = written;
>> +            iocb->ki_pos = pos;
>>      }
>>      return ret;
>>  }
>> @@ -772,11 +883,13 @@ retry_snap:
>>           inode, ceph_vinop(inode), pos, count, ceph_cap_string(got));
>>  
>>      if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 ||
>> -        (iocb->ki_filp->f_flags & O_DIRECT) ||
>> -        (fi->flags & CEPH_F_SYNC)) {
>> +        (file->f_flags & O_DIRECT) || (fi->flags & CEPH_F_SYNC)) {
>>              mutex_unlock(&inode->i_mutex);
>> -            written = ceph_sync_write(file, iov->iov_base, count,
>> -                                      pos, &iocb->ki_pos);
>> +            if (file->f_flags & O_DIRECT)
>> +                    written = ceph_sync_direct_write(iocb, iov,
>> +                                                     nr_segs, count);
>> +            else
>> +                    written = ceph_sync_write(iocb, iov, nr_segs, count);
>>              if (written == -EOLDSNAPC) {
>>                      dout("aio_write %p %llx.%llx %llu~%u"
>>                              "got EOLDSNAPC, retrying\n",
>> 
>

Reply via email to