From 15ce4a0c1ce0d5e288398cb9e5493fd4e55e2025 Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Mon, 20 Mar 2006 13:44:34 -0500 Subject: [PATCH] NFS: Replace atomic_t variables in nfs_direct_req with a single spin lock Three atomic_t variables cause a lot of bus locking. Because they are all used in the same places in the code, just use a single spin lock. Now that the atomic_t variables are gone, we can remove the request size limitation since the code no longer depends on the limited width of atomic_t on some platforms. Test plan: Compile with CONFIG_NFS and CONFIG_NFS_DIRECTIO enabled. Millions of fsx operations, iozone, OraSim. Signed-off-by: Chuck Lever Signed-off-by: Trond Myklebust --- fs/nfs/direct.c | 81 ++++++++++++++++++++++++++++--------------------- 1 file changed, 47 insertions(+), 34 deletions(-) diff --git a/fs/nfs/direct.c b/fs/nfs/direct.c index bcbc213b403..3de7c4b0796 100644 --- a/fs/nfs/direct.c +++ b/fs/nfs/direct.c @@ -58,7 +58,6 @@ #include "iostat.h" #define NFSDBG_FACILITY NFSDBG_VFS -#define MAX_DIRECTIO_SIZE (4096UL << PAGE_SHIFT) static void nfs_free_user_pages(struct page **pages, int npages, int do_dirty); static kmem_cache_t *nfs_direct_cachep; @@ -68,6 +67,8 @@ static kmem_cache_t *nfs_direct_cachep; */ struct nfs_direct_req { struct kref kref; /* release manager */ + + /* I/O parameters */ struct list_head list; /* nfs_read/write_data structs */ struct file * filp; /* file descriptor */ struct kiocb * iocb; /* controlling i/o request */ @@ -75,12 +76,14 @@ struct nfs_direct_req { struct inode * inode; /* target file of i/o */ struct page ** pages; /* pages in our buffer */ unsigned int npages; /* count of pages */ - atomic_t complete, /* i/os we're waiting for */ - count, /* bytes actually processed */ + + /* completion state */ + spinlock_t lock; /* protect completion state */ + int outstanding; /* i/os we're waiting for */ + ssize_t count, /* bytes actually processed */ error; /* any reported error */ }; - /** * nfs_direct_IO - NFS address space operation for direct I/O * @rw: direction (read or write) @@ -110,12 +113,6 @@ static inline int nfs_get_user_pages(int rw, unsigned long user_addr, size_t siz unsigned long page_count; size_t array_size; - /* set an arbitrary limit to prevent type overflow */ - if (size > MAX_DIRECTIO_SIZE) { - *pages = NULL; - return -EFBIG; - } - page_count = (user_addr + size + PAGE_SIZE - 1) >> PAGE_SHIFT; page_count -= user_addr >> PAGE_SHIFT; @@ -164,8 +161,10 @@ static inline struct nfs_direct_req *nfs_direct_req_alloc(void) init_waitqueue_head(&dreq->wait); INIT_LIST_HEAD(&dreq->list); dreq->iocb = NULL; - atomic_set(&dreq->count, 0); - atomic_set(&dreq->error, 0); + spin_lock_init(&dreq->lock); + dreq->outstanding = 0; + dreq->count = 0; + dreq->error = 0; return dreq; } @@ -181,19 +180,18 @@ static void nfs_direct_req_release(struct kref *kref) */ static ssize_t nfs_direct_wait(struct nfs_direct_req *dreq) { - int result = -EIOCBQUEUED; + ssize_t result = -EIOCBQUEUED; /* Async requests don't wait here */ if (dreq->iocb) goto out; - result = wait_event_interruptible(dreq->wait, - (atomic_read(&dreq->complete) == 0)); + result = wait_event_interruptible(dreq->wait, (dreq->outstanding == 0)); if (!result) - result = atomic_read(&dreq->error); + result = dreq->error; if (!result) - result = atomic_read(&dreq->count); + result = dreq->count; out: kref_put(&dreq->kref, nfs_direct_req_release); @@ -214,9 +212,9 @@ static void nfs_direct_complete(struct nfs_direct_req *dreq) nfs_free_user_pages(dreq->pages, dreq->npages, 1); if (dreq->iocb) { - long res = atomic_read(&dreq->error); + long res = (long) dreq->error; if (!res) - res = atomic_read(&dreq->count); + res = (long) dreq->count; aio_complete(dreq->iocb, res, 0); } else wake_up(&dreq->wait); @@ -233,7 +231,6 @@ static struct nfs_direct_req *nfs_direct_read_alloc(size_t nbytes, size_t rsize) { struct list_head *list; struct nfs_direct_req *dreq; - unsigned int reads = 0; unsigned int rpages = (rsize + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT; dreq = nfs_direct_req_alloc(); @@ -259,13 +256,12 @@ static struct nfs_direct_req *nfs_direct_read_alloc(size_t nbytes, size_t rsize) list_add(&data->pages, list); data->req = (struct nfs_page *) dreq; - reads++; + dreq->outstanding++; if (nbytes <= rsize) break; nbytes -= rsize; } kref_get(&dreq->kref); - atomic_set(&dreq->complete, reads); return dreq; } @@ -276,13 +272,21 @@ static void nfs_direct_read_result(struct rpc_task *task, void *calldata) if (nfs_readpage_result(task, data) != 0) return; + + spin_lock(&dreq->lock); + if (likely(task->tk_status >= 0)) - atomic_add(data->res.count, &dreq->count); + dreq->count += data->res.count; else - atomic_set(&dreq->error, task->tk_status); + dreq->error = task->tk_status; + + if (--dreq->outstanding) { + spin_unlock(&dreq->lock); + return; + } - if (unlikely(atomic_dec_and_test(&dreq->complete))) - nfs_direct_complete(dreq); + spin_unlock(&dreq->lock); + nfs_direct_complete(dreq); } static const struct rpc_call_ops nfs_read_direct_ops = { @@ -388,7 +392,6 @@ static struct nfs_direct_req *nfs_direct_write_alloc(size_t nbytes, size_t wsize { struct list_head *list; struct nfs_direct_req *dreq; - unsigned int writes = 0; unsigned int wpages = (wsize + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT; dreq = nfs_direct_req_alloc(); @@ -414,16 +417,19 @@ static struct nfs_direct_req *nfs_direct_write_alloc(size_t nbytes, size_t wsize list_add(&data->pages, list); data->req = (struct nfs_page *) dreq; - writes++; + dreq->outstanding++; if (nbytes <= wsize) break; nbytes -= wsize; } kref_get(&dreq->kref); - atomic_set(&dreq->complete, writes); return dreq; } +/* + * NB: Return the value of the first error return code. Subsequent + * errors after the first one are ignored. + */ static void nfs_direct_write_result(struct rpc_task *task, void *calldata) { struct nfs_write_data *data = calldata; @@ -436,15 +442,22 @@ static void nfs_direct_write_result(struct rpc_task *task, void *calldata) if (unlikely(data->res.verf->committed != NFS_FILE_SYNC)) status = -EIO; + spin_lock(&dreq->lock); + if (likely(status >= 0)) - atomic_add(data->res.count, &dreq->count); + dreq->count += data->res.count; else - atomic_set(&dreq->error, status); + dreq->error = status; - if (unlikely(atomic_dec_and_test(&dreq->complete))) { - nfs_end_data_update(data->inode); - nfs_direct_complete(dreq); + if (--dreq->outstanding) { + spin_unlock(&dreq->lock); + return; } + + spin_unlock(&dreq->lock); + + nfs_end_data_update(data->inode); + nfs_direct_complete(dreq); } static const struct rpc_call_ops nfs_write_direct_ops = { -- 2.41.1