# include <linux/freezer.h>
#include "async-thread.h"
+#define WORK_QUEUED_BIT 0
+#define WORK_DONE_BIT 1
+#define WORK_ORDER_DONE_BIT 2
+
/*
* container for the kthread task pointer and the list of pending work
* One of these is allocated per thread.
}
}
+static noinline int run_ordered_completions(struct btrfs_workers *workers,
+ struct btrfs_work *work)
+{
+ unsigned long flags;
+
+ if (!workers->ordered)
+ return 0;
+
+ set_bit(WORK_DONE_BIT, &work->flags);
+
+ spin_lock_irqsave(&workers->lock, flags);
+
+ while(!list_empty(&workers->order_list)) {
+ work = list_entry(workers->order_list.next,
+ struct btrfs_work, order_list);
+
+ if (!test_bit(WORK_DONE_BIT, &work->flags))
+ break;
+
+ /* we are going to call the ordered done function, but
+ * we leave the work item on the list as a barrier so
+ * that later work items that are done don't have their
+ * functions called before this one returns
+ */
+ if (test_and_set_bit(WORK_ORDER_DONE_BIT, &work->flags))
+ break;
+
+ spin_unlock_irqrestore(&workers->lock, flags);
+
+ work->ordered_func(work);
+
+ /* now take the lock again and call the freeing code */
+ spin_lock_irqsave(&workers->lock, flags);
+ list_del(&work->order_list);
+ work->ordered_free(work);
+ }
+
+ spin_unlock_irqrestore(&workers->lock, flags);
+ return 0;
+}
+
/*
* main loop for servicing work items
*/
cur = worker->pending.next;
work = list_entry(cur, struct btrfs_work, list);
list_del(&work->list);
- clear_bit(0, &work->flags);
+ clear_bit(WORK_QUEUED_BIT, &work->flags);
work->worker = worker;
spin_unlock_irq(&worker->lock);
work->func(work);
atomic_dec(&worker->num_pending);
+ /*
+ * unless this is an ordered work queue,
+ * 'work' was probably freed by func above.
+ */
+ run_ordered_completions(worker->workers, work);
+
spin_lock_irq(&worker->lock);
check_idle_worker(worker);
+
}
worker->working = 0;
if (freezing(current)) {
workers->num_workers = 0;
INIT_LIST_HEAD(&workers->worker_list);
INIT_LIST_HEAD(&workers->idle_list);
+ INIT_LIST_HEAD(&workers->order_list);
spin_lock_init(&workers->lock);
workers->max_workers = max;
workers->idle_thresh = 32;
workers->name = name;
+ workers->ordered = 0;
}
/*
struct btrfs_worker_thread *worker = work->worker;
unsigned long flags;
- if (test_and_set_bit(0, &work->flags))
+ if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags))
goto out;
spin_lock_irqsave(&worker->lock, flags);
int wake = 0;
/* don't requeue something already on a list */
- if (test_and_set_bit(0, &work->flags))
+ if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags))
goto out;
worker = find_worker(workers);
+ if (workers->ordered) {
+ spin_lock_irqsave(&workers->lock, flags);
+ list_add_tail(&work->order_list, &workers->order_list);
+ spin_unlock_irqrestore(&workers->lock, flags);
+ } else {
+ INIT_LIST_HEAD(&work->order_list);
+ }
spin_lock_irqsave(&worker->lock, flags);
atomic_inc(&worker->num_pending);
*/
struct btrfs_work {
/*
- * only func should be set to the function you want called
+ * func should be set to the function you want called
* your work struct is passed as the only arg
+ *
+ * ordered_func must be set for work sent to an ordered work queue,
+ * and it is called to complete a given work item in the same
+ * order they were sent to the queue.
*/
void (*func)(struct btrfs_work *work);
+ void (*ordered_func)(struct btrfs_work *work);
+ void (*ordered_free)(struct btrfs_work *work);
/*
* flags should be set to zero. It is used to make sure the
/* don't touch these */
struct btrfs_worker_thread *worker;
struct list_head list;
+ struct list_head order_list;
};
struct btrfs_workers {
/* once a worker has this many requests or fewer, it is idle */
int idle_thresh;
+ /* force completions in the order they were queued */
+ int ordered;
+
/* list with all the work threads. The workers on the idle thread
* may be actively servicing jobs, but they haven't yet hit the
* idle thresh limit above.
struct list_head worker_list;
struct list_head idle_list;
+ /*
+ * when operating in ordered mode, this maintains the list
+ * of work items waiting for completion
+ */
+ struct list_head order_list;
+
/* lock for finding the next worker thread to queue on */
spinlock_t lock;
struct inode *inode;
struct bio *bio;
struct list_head list;
- extent_submit_bio_hook_t *submit_bio_hook;
+ extent_submit_bio_hook_t *submit_bio_start;
+ extent_submit_bio_hook_t *submit_bio_done;
int rw;
int mirror_num;
unsigned long bio_flags;
btrfs_async_submit_limit(info);
}
-static void run_one_async_submit(struct btrfs_work *work)
+static void run_one_async_start(struct btrfs_work *work)
+{
+ struct btrfs_fs_info *fs_info;
+ struct async_submit_bio *async;
+
+ async = container_of(work, struct async_submit_bio, work);
+ fs_info = BTRFS_I(async->inode)->root->fs_info;
+ async->submit_bio_start(async->inode, async->rw, async->bio,
+ async->mirror_num, async->bio_flags);
+}
+
+static void run_one_async_done(struct btrfs_work *work)
{
struct btrfs_fs_info *fs_info;
struct async_submit_bio *async;
waitqueue_active(&fs_info->async_submit_wait))
wake_up(&fs_info->async_submit_wait);
- async->submit_bio_hook(async->inode, async->rw, async->bio,
+ async->submit_bio_done(async->inode, async->rw, async->bio,
async->mirror_num, async->bio_flags);
+}
+
+static void run_one_async_free(struct btrfs_work *work)
+{
+ struct async_submit_bio *async;
+
+ async = container_of(work, struct async_submit_bio, work);
kfree(async);
}
int btrfs_wq_submit_bio(struct btrfs_fs_info *fs_info, struct inode *inode,
int rw, struct bio *bio, int mirror_num,
unsigned long bio_flags,
- extent_submit_bio_hook_t *submit_bio_hook)
+ extent_submit_bio_hook_t *submit_bio_start,
+ extent_submit_bio_hook_t *submit_bio_done)
{
struct async_submit_bio *async;
int limit = btrfs_async_submit_limit(fs_info);
async->rw = rw;
async->bio = bio;
async->mirror_num = mirror_num;
- async->submit_bio_hook = submit_bio_hook;
- async->work.func = run_one_async_submit;
+ async->submit_bio_start = submit_bio_start;
+ async->submit_bio_done = submit_bio_done;
+
+ async->work.func = run_one_async_start;
+ async->work.ordered_func = run_one_async_done;
+ async->work.ordered_free = run_one_async_free;
+
async->work.flags = 0;
async->bio_flags = bio_flags;
return 0;
}
-static int __btree_submit_bio_hook(struct inode *inode, int rw, struct bio *bio,
- int mirror_num, unsigned long bio_flags)
+static int __btree_submit_bio_start(struct inode *inode, int rw,
+ struct bio *bio, int mirror_num,
+ unsigned long bio_flags)
{
- struct btrfs_root *root = BTRFS_I(inode)->root;
- int ret;
-
/*
* when we're called for a write, we're already in the async
* submission context. Just jump into btrfs_map_bio
*/
- if (rw & (1 << BIO_RW)) {
- btree_csum_one_bio(bio);
- return btrfs_map_bio(BTRFS_I(inode)->root, rw, bio,
- mirror_num, 1);
- }
+ btree_csum_one_bio(bio);
+ return 0;
+}
+static int __btree_submit_bio_done(struct inode *inode, int rw, struct bio *bio,
+ int mirror_num, unsigned long bio_flags)
+{
/*
- * called for a read, do the setup so that checksum validation
- * can happen in the async kernel threads
+ * when we're called for a write, we're already in the async
+ * submission context. Just jump into btrfs_map_bio
*/
- ret = btrfs_bio_wq_end_io(root->fs_info, bio, 1);
- BUG_ON(ret);
-
return btrfs_map_bio(BTRFS_I(inode)->root, rw, bio, mirror_num, 1);
}
* can happen in parallel across all CPUs
*/
if (!(rw & (1 << BIO_RW))) {
- return __btree_submit_bio_hook(inode, rw, bio, mirror_num, 0);
+ int ret;
+ /*
+ * called for a read, do the setup so that checksum validation
+ * can happen in the async kernel threads
+ */
+ ret = btrfs_bio_wq_end_io(BTRFS_I(inode)->root->fs_info,
+ bio, 1);
+ BUG_ON(ret);
+
+ return btrfs_map_bio(BTRFS_I(inode)->root, rw, bio,
+ mirror_num, 1);
}
return btrfs_wq_submit_bio(BTRFS_I(inode)->root->fs_info,
inode, rw, bio, mirror_num, 0,
- __btree_submit_bio_hook);
+ __btree_submit_bio_start,
+ __btree_submit_bio_done);
}
static int btree_writepage(struct page *page, struct writeback_control *wbc)
* were sent by the writeback daemons, improving overall locality
* of the IO going down the pipe.
*/
- fs_info->workers.idle_thresh = 128;
+ fs_info->workers.idle_thresh = 8;
+ fs_info->workers.ordered = 1;
btrfs_init_workers(&fs_info->fixup_workers, "fixup", 1);
btrfs_init_workers(&fs_info->endio_workers, "endio",
int btrfs_wq_submit_bio(struct btrfs_fs_info *fs_info, struct inode *inode,
int rw, struct bio *bio, int mirror_num,
unsigned long bio_flags,
- extent_submit_bio_hook_t *submit_bio_hook);
+ extent_submit_bio_hook_t *submit_bio_start,
+ extent_submit_bio_hook_t *submit_bio_done);
+
int btrfs_congested_async(struct btrfs_fs_info *info, int iodone);
unsigned long btrfs_async_submit_limit(struct btrfs_fs_info *info);
int btrfs_write_tree_block(struct extent_buffer *buf);
* At IO completion time the cums attached on the ordered extent record
* are inserted into the btree
*/
-int __btrfs_submit_bio_hook(struct inode *inode, int rw, struct bio *bio,
+int __btrfs_submit_bio_start(struct inode *inode, int rw, struct bio *bio,
int mirror_num, unsigned long bio_flags)
{
struct btrfs_root *root = BTRFS_I(inode)->root;
ret = btrfs_csum_one_bio(root, inode, bio);
BUG_ON(ret);
+ return 0;
+}
+/*
+ * in order to insert checksums into the metadata in large chunks,
+ * we wait until bio submission time. All the pages in the bio are
+ * checksummed and sums are attached onto the ordered extent record.
+ *
+ * At IO completion time the cums attached on the ordered extent record
+ * are inserted into the btree
+ */
+int __btrfs_submit_bio_done(struct inode *inode, int rw, struct bio *bio,
+ int mirror_num, unsigned long bio_flags)
+{
+ struct btrfs_root *root = BTRFS_I(inode)->root;
return btrfs_map_bio(root, rw, bio, mirror_num, 1);
}
/* we're doing a write, do the async checksumming */
return btrfs_wq_submit_bio(BTRFS_I(inode)->root->fs_info,
inode, rw, bio, mirror_num,
- bio_flags, __btrfs_submit_bio_hook);
+ bio_flags, __btrfs_submit_bio_start,
+ __btrfs_submit_bio_done);
}
mapit: