新聞中心

        EEPW首頁 > 嵌入式系統 > 設計應用 > linux kernel工作隊列及源碼詳細講解

        linux kernel工作隊列及源碼詳細講解

        作者: 時間:2011-12-21 來源:網絡 收藏

        1. 前言

        (workqueue)的Linux內核中的定義的用來處理不是很緊急事件的回調方式處理方法.

        以下代碼的版本為2.6.19.2, 源代碼文件主要為kernel/workqueue.c.

        2. 數據結構

        /* include/linux/workqueue.h */
        // 工作節點結構
        struct work_struct {
        // 等待時間
        unsigned long pending;
        // 鏈表節點
        struct list_head entry;
        // workqueue
        void (*func)(void *);
        // func的數據
        void *data;
        // 指向CPU相關數據, 一般指向struct cpu_workqueue_struct結構
        void *wq_data;
        // 定時器
        struct timer_list timer;
        };

        struct execute_work {
        struct work_struct work;
        };

        /* kernel/workqueue.c */
        /*
        * The per-CPU workqueue (if single thread, we always use the first
        * possible cpu).
        *
        * The sequence counters are for flush_scheduled_work(). It wants to wait
        * until all currently-scheduled works are completed, but it doesn't
        * want to be livelocked by new, incoming ones. So it waits until
        * remove_sequence is >= the insert_sequence which pertained when
        * flush_scheduled_work() was called.
        */
        // 這個結構是針對每個CPU的
        struct cpu_workqueue_struct {
        // 結構鎖
        spinlock_t lock;
        // 下一個要執行的節點序號
        long remove_sequence; /* Least-recently added (next to run) */
        // 下一個要插入節點的序號
        long insert_sequence; /* Next to add */
        // 工作機構鏈表節點
        struct list_head worklist;
        // 要進行處理的等待隊列
        wait_queue_head_t more_work;
        // 處理完的等待隊列
        wait_queue_head_t work_done;
        // 節點
        struct workqueue_struct *wq;
        // 進程指針
        struct task_struct *thread;
        int run_depth; /* Detect run_workqueue() recursion depth */
        } ____cacheline_aligned;
        /*
        * The externally visible workqueue abstraction is an array of
        * per-CPU workqueues:
        */
        // 結構
        struct workqueue_struct {
        struct cpu_workqueue_struct *cpu_wq;
        const char *name;
        struct list_head list; /* Empty if single thread */
        };

        kernel/workqueue.c中定義了一個工作隊列鏈表, 所有工作隊列可以掛接到這個鏈表中:
        static LIST_HEAD(workqueues);

        3. 一些宏定義

        /* include/linux/workqueue.h */
        // 初始化工作隊列
        #define __WORK_INITIALIZER(n, f, d) {
        // 初始化list
        .entry = { (n).entry, (n).entry },
        //
        .func = (f),
        // 回調函數參數
        .data = (d),
        // 初始化定時器
        .timer = TIMER_INITIALIZER(NULL, 0, 0),
        }

        // 聲明工作隊列并初始化
        #define DECLARE_WORK(n, f, d)
        struct work_struct n = __WORK_INITIALIZER(n, f, d)
        /*
        * initialize a work-struct's func and data pointers:
        */
        // 重新定義工作結構參數
        #define PREPARE_WORK(_work, _func, _data)
        do {
        (_work)->func = _func;
        (_work)->data = _data;
        } while (0)
        /*
        * initialize all of a work-struct:
        */
        // 初始化工作結構, 和__WORK_INITIALIZER功能相同,不過__WORK_INITIALIZER用在
        // 參數初始化定義, 而該宏用在程序之中對工作結構賦值
        #define INIT_WORK(_work, _func, _data)
        do {
        INIT_LIST_HEAD((_work)->entry);
        (_work)->pending = 0;
        PREPARE_WORK((_work), (_func), (_data));
        init_timer((_work)->timer);
        } while (0)

        4. 操作函數

        4.1 創建工作隊列

        一般的創建函數是create_workqueue, 但這其實只是一個宏:
        /* include/linux/workqueue.h */
        #define create_workqueue(name) __create_workqueue((name), 0)
        在workqueue的初始化函數中, 定義了一個針對內核中所有線程可用的事件工作隊列, 其他內核線程建立的事件工作結構就都掛接到該隊列:
        void init_workqueues(void)
        {
        ...
        keventd_wq = create_workqueue(events);
        ...
        }

        核心創建函數是__create_workqueue:

        struct workqueue_struct *__create_workqueue(const char *name,
        int singlethread)
        {
        int cpu, destroy = 0;
        struct workqueue_struct *wq;
        struct task_struct *p;
        // 分配工作隊列結構空間
        wq = kzalloc(sizeof(*wq), GFP_KERNEL);
        if (!wq)
        return NULL;
        // 為每個CPU分配單獨的工作隊列空間
        wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct);
        if (!wq->cpu_wq) {
        kfree(wq);
        return NULL;
        }
        wq->name = name;
        mutex_lock(workqueue_mutex);
        if (singlethread) {
        // 使用create_workqueue宏時該參數始終為0
        // 如果是單一線程模式, 在單線程中調用各個工作隊列
        // 建立一個的工作隊列內核線程
        INIT_LIST_HEAD(wq->list);
        // 建立工作隊列的線程
        p = create_workqueue_thread(wq, singlethread_cpu);
        if (!p)
        destroy = 1;
        else
        // 喚醒該線程
        wake_up_process(p);
        } else {
        // 鏈表模式, 將工作隊列添加到工作隊列鏈表
        list_add(wq->list, workqueues);
        // 為每個CPU建立一個工作隊列線程
        for_each_online_cpu(cpu) {
        p = create_workqueue_thread(wq, cpu);
        if (p) {
        // 綁定CPU
        kthread_bind(p, cpu);
        // 喚醒線程
        wake_up_process(p);
        } else
        destroy = 1;
        }
        }
        mutex_unlock(workqueue_mutex);
        /*
        * Was there any error during startup? If yes then clean up:
        */
        if (destroy) {
        // 建立線程失敗, 釋放工作隊列
        destroy_workqueue(wq);
        wq = NULL;
        }
        return wq;
        }
        EXPORT_SYMBOL_GPL(__create_workqueue);

        // 創建工作隊列線程
        static struct task_struct *create_workqueue_thread(struct workqueue_struct *wq,
        int cpu)
        {
        // 每個CPU的工作隊列
        struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu);
        struct task_struct *p;
        spin_lock_init(cwq->lock);
        // 初始化
        cwq->wq = wq;
        cwq->thread = NULL;
        cwq->insert_sequence = 0;
        cwq->remove_sequence = 0;
        INIT_LIST_HEAD(cwq->worklist);
        // 初始化等待隊列more_work, 該隊列處理要執行的工作結構
        init_waitqueue_head(cwq->more_work);
        // 初始化等待隊列work_done, 該隊列處理執行完的工作結構
        init_waitqueue_head(cwq->work_done);
        // 建立內核線程work_thread
        if (is_single_threaded(wq))
        p = kthread_create(worker_thread, cwq, %s, wq->name);
        else
        p = kthread_create(worker_thread, cwq, %s/%d, wq->name, cpu);
        if (IS_ERR(p))
        return NULL;
        // 保存線程指針
        cwq->thread = p;
        return p;
        }
        static int worker_thread(void *__cwq)
        {
        struct cpu_workqueue_struct *cwq = __cwq;
        // 聲明一個等待隊列
        DECLARE_WAITQUEUE(wait, current);
        // 信號
        struct k_sigaction sa;
        sigset_t blocked;
        current->flags |= PF_NOFREEZE;
        // 降低進程優先級, 工作進程不是個很緊急的進程,不和其他進程搶占CPU,通常在系統空閑時運行
        set_user_nice(current, -5);
        /* Block and flush all signals */
        // 阻塞所有信號
        sigfillset(blocked);
        sigprocmask(SIG_BLOCK, blocked, NULL);
        flush_signals(current);
        /*
        * We inherited MPOL_INTERLEAVE from the booting kernel.
        * Set MPOL_DEFAULT to insure node local allocations.
        */
        numa_default_policy();
        /* SIG_IGN makes children autoreap: see do_notify_parent(). */
        // 信號處理都是忽略
        sa.sa.sa_handler = SIG_IGN;
        sa.sa.sa_flags = 0;
        siginitset(sa.sa.sa_mask, sigmask(SIGCHLD));
        do_sigaction(SIGCHLD, sa, (struct k_sigaction *)0);
        // 進程可中斷
        set_current_state(TASK_INTERRUPTIBLE);
        // 進入循環, 沒明確停止該進程就一直運行
        while (!kthread_should_stop()) {
        // 設置more_work等待隊列, 當有新work結構鏈入隊列中時會激發此等待隊列
        add_wait_queue(cwq->more_work, wait);
        if (list_empty(cwq->worklist))
        // 工作隊列為空, 睡眠
        schedule();
        else
        // 進行運行狀態
        __set_current_state(TASK_RUNNING);
        // 刪除等待隊列
        remove_wait_queue(cwq->more_work, wait);
        // 按鏈表遍歷執行工作任務
        if (!list_empty(cwq->worklist))
        run_workqueue(cwq);
        // 執行完工作, 設置進程是可中斷的, 重新循環等待工作
        set_current_state(TASK_INTERRUPTIBLE);
        }
        __set_current_state(TASK_RUNNING);
        return 0;
        }

        // 運行工作結構
        static void run_workqueue(struct cpu_workqueue_struct *cwq)
        {
        unsigned long flags;
        /*
        * Keep taking off work from the queue until
        * done.
        */
        // 加鎖
        spin_lock_irqsave(cwq->lock, flags);
        // 統計已經遞歸調用了多少次了
        cwq->run_depth++;
        if (cwq->run_depth > 3) {
        // 遞歸調用此時太多
        /* morton gets to eat his hat */
        printk(%s: recursion depth exceeded: %dn,
        __FUNCTION__, cwq->run_depth);
        dump_stack();
        }
        // 遍歷工作鏈表
        while (!list_empty(cwq->worklist)) {
        // 獲取的是next節點的
        struct work_struct *work = list_entry(cwq->worklist.next,
        struct work_struct, entry);
        void (*f) (void *) = work->func;
        void *data = work->data;
        // 刪除節點, 同時節點中的list參數清空
        list_del_init(cwq->worklist.next);
        // 解鎖
        // 現在在執行以下代碼時可以中斷,run_workqueue本身可能會重新被調用, 所以要判斷遞歸深度
        spin_unlock_irqrestore(cwq->lock, flags);
        BUG_ON(work->wq_data != cwq);
        // 工作結構已經不在鏈表中
        clear_bit(0, work->pending);
        // 執行工作函數
        f(data);
        // 重新加鎖
        spin_lock_irqsave(cwq->lock, flags);
        // 執行完的工作序列號遞增
        cwq->remove_sequence++;
        // 喚醒工作完成等待隊列, 供釋放工作隊列
        wake_up(cwq->work_done);
        }
        // 減少遞歸深度
        cwq->run_depth--;
        // 解鎖
        spin_unlock_irqrestore(cwq->lock, flags);
        }

        4.2 釋放工作隊列
        /**
        * destroy_workqueue - safely terminate a workqueue
        * @wq: target workqueue
        *
        * Safely destroy a workqueue. All work currently pending will be done first.
        */
        void destroy_workqueue(struct workqueue_struct *wq)
        {
        int cpu;
        // 清除當前工作隊列中的所有工作
        flush_workqueue(wq);
        /* We don't need the distraction of CPUs appearing and vanishing. */
        mutex_lock(workqueue_mutex);
        // 結束該工作隊列的線程
        if (is_single_threaded(wq))
        cleanup_workqueue_thread(wq, singlethread_cpu);
        else {
        for_each_online_cpu(cpu)
        cleanup_workqueue_thread(wq, cpu);
        list_del(wq->list);
        }
        mutex_unlock(workqueue_mutex);
        // 釋放工作隊列中對應每個CPU的工作隊列數據
        free_percpu(wq->cpu_wq);
        kfree(wq);
        }
        EXPORT_SYMBOL_GPL(destroy_workqueue);

        /**
        * flush_workqueue - ensure that any scheduled work has run to completion.
        * @wq: workqueue to flush
        *
        * Forces execution of the workqueue and blocks until its completion.
        * This is typically used in driver shutdown handlers.
        *
        * This function will sample each workqueue's current insert_sequence number and
        * will sleep until the head sequence is greater than or equal to that. This
        * means that we sleep until all works which were queued on entry have been
        * handled, but we are not livelocked by new incoming ones.
        *
        * This function used to run the workqueues itself. Now we just wait for the
        * helper threads to do it.
        */
        void fastcall flush_workqueue(struct workqueue_struct *wq)
        {
        // 該進程可以睡眠
        might_sleep();
        // 清空每個CPU上的工作隊列
        if (is_single_threaded(wq)) {
        /* Always use first cpu's area. */
        flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, singlethread_cpu));
        } else {
        int cpu;
        mutex_lock(workqueue_mutex);
        for_each_online_cpu(cpu)
        flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu));
        mutex_unlock(workqueue_mutex);
        }
        }
        EXPORT_SYMBOL_GPL(flush_workqueue);

        flush_workqueue的核心處理函數為flush_cpu_workqueue:
        static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
        {
        if (cwq->thread == current) {
        // 如果是工作隊列進程正在被調度
        /*
        * Probably keventd trying to flush its own queue. So simply run
        * it by hand rather than deadlocking.
        */
        // 執行完該工作隊列
        run_workqueue(cwq);
        } else {
        // 定義等待
        DEFINE_WAIT(wait);
        long sequence_needed;
        // 加鎖
        spin_lock_irq(cwq->lock);
        // 最新工作結構序號
        sequence_needed = cwq->insert_sequence;
        // 該條件是判斷隊列中是否還有沒有執行的工作結構
        while (sequence_needed - cwq->remove_sequence > 0) {
        // 有為執行的工作結構
        // 通過work_done等待隊列等待
        prepare_to_wait(cwq->work_done, wait,
        TASK_UNINTERRUPTIBLE);
        // 解鎖
        spin_unlock_irq(cwq->lock);
        // 睡眠, 由wake_up(cwq->work_done)來喚醒
        schedule();
        // 重新加鎖
        spin_lock_irq(cwq->lock);
        }
        // 等待清除
        finish_wait(cwq->work_done, wait);
        spin_unlock_irq(cwq->lock);
        }
        }

        4.3 調度工作

        在大多數情況下, 并不需要自己建立工作隊列,而是只定義工作, 將工作結構掛接到內核預定義的事件工作隊列中調度, 在kernel/workqueue.c中定義了一個靜態全局量的工作隊列keventd_wq:
        static struct workqueue_struct *keventd_wq;

        4.3.1 立即調度
        // 在其他函數中使用以下函數來調度工作結構, 是把工作結構掛接到工作隊列中進行調度
        /**
        * schedule_work - put work task in global workqueue
        * @work: job to be done
        *
        * This puts a job in the kernel-global workqueue.
        */
        // 調度工作結構, 將工作結構添加到事件工作隊列keventd_wq
        int fastcall schedule_work(struct work_struct *work)
        {
        return queue_work(keventd_wq, work);
        }
        EXPORT_SYMBOL(schedule_work);

        /**
        * queue_work - queue work on a workqueue
        * @wq: workqueue to use
        * @work: work to queue
        *
        * Returns 0 if @work was already on a queue, non-zero otherwise.
        *
        * We queue the work to the CPU it was submitted, but there is no
        * guarantee that it will be processed by that CPU.
        */
        int fastcall queue_work(struct workqueue_struct *wq, struct work_struct *work)
        {
        int ret = 0, cpu = get_cpu();
        if (!test_and_set_bit(0, work->pending)) {
        // 工作結構還沒在隊列, 設置pending標志表示把工作結構掛接到隊列中
        if (unlikely(is_single_threaded(wq)))
        cpu = singlethread_cpu;
        BUG_ON(!list_empty(work->entry));
        // 進行具體的排隊
        __queue_work(per_cpu_ptr(wq->cpu_wq, cpu), work);
        ret = 1;
        }
        put_cpu();
        return ret;
        }
        EXPORT_SYMBOL_GPL(queue_work);
        /* Preempt must be disabled. */
        // 不能被搶占
        static void __queue_work(struct cpu_workqueue_struct *cwq,
        struct work_struct *work)
        {
        unsigned long flags;
        // 加鎖
        spin_lock_irqsave(cwq->lock, flags);
        // 指向CPU工作隊列
        work->wq_data = cwq;
        // 掛接到工作鏈表
        list_add_tail(work->entry, cwq->worklist);
        // 遞增插入的序列號
        cwq->insert_sequence++;
        // 喚醒等待隊列準備處理工作結構
        wake_up(cwq->more_work);
        spin_unlock_irqrestore(cwq->lock, flags);
        }

        4.3.2 延遲調度

        4.3.2.1 schedule_delayed_work
        /**
        * schedule_delayed_work - put work task in global workqueue after delay
        * @work: job to be done
        * @delay: number of jiffies to wait
        *
        * After waiting for a given time this puts a job in the kernel-global
        * workqueue.
        */
        // 延遲調度工作, 延遲一定時間后再將工作結構掛接到工作隊列
        int fastcall schedule_delayed_work(struct work_struct *work, unsigned long delay)
        {
        return queue_delayed_work(keventd_wq, work, delay);
        }
        EXPORT_SYMBOL(schedule_delayed_work);

        /**
        * queue_delayed_work - queue work on a workqueue after delay
        * @wq: workqueue to use
        * @work: work to queue
        * @delay: number of jiffies to wait before queueing
        *
        * Returns 0 if @work was already on a queue, non-zero otherwise.
        */
        int fastcall queue_delayed_work(struct workqueue_struct *wq,
        struct work_struct *work, unsigned long delay)
        {
        int ret = 0;
        // 定時器, 此時的定時器應該是不起效的, 延遲將通過該定時器來實現
        struct timer_list *timer = work->timer;
        if (!test_and_set_bit(0, work->pending)) {
        // 工作結構還沒在隊列, 設置pending標志表示把工作結構掛接到隊列中
        // 如果現在定時器已經起效, 出錯
        BUG_ON(timer_pending(timer));
        // 工作結構已經掛接到鏈表, 出錯
        BUG_ON(!list_empty(work->entry));
        /* This stores wq for the moment, for the timer_fn */
        // 保存工作隊列的指針
        work->wq_data = wq;
        // 定時器初始化
        timer->expires = jiffies + delay;
        timer->data = (unsigned long)work;
        // 定時函數
        timer->function = delayed_work_timer_fn;
        // 定時器生效, 定時到期后再添加到工作隊列
        add_timer(timer);
        ret = 1;
        }
        return ret;
        }
        EXPORT_SYMBOL_GPL(queue_delayed_work);


        // 定時中斷函數
        static void delayed_work_timer_fn(unsigned long __data)
        {
        struct work_struct *work = (struct work_struct *)__data;
        struct workqueue_struct *wq = work->wq_data;
        // 獲取CPU
        int cpu = smp_processor_id();
        if (unlikely(is_single_threaded(wq)))
        cpu = singlethread_cpu;
        // 將工作結構添加到工作隊列,注意這是在時間中斷調用
        __queue_work(per_cpu_ptr(wq->cpu_wq, cpu), work);
        }

        4.3.2.2 schedule_delayed_work_on

        指定CPU的延遲調度工作結構, 和schedule_delayed_work相比增加了一個CPU參數, 其他都相同
        /**
        * schedule_delayed_work_on - queue work in global workqueue on CPU after delay
        * @cpu: cpu to use
        * @work: job to be done
        * @delay: number of jiffies to wait
        *
        * After waiting for a given time this puts a job in the kernel-global
        * workqueue on the specified CPU.
        */
        int schedule_delayed_work_on(int cpu,
        struct work_struct *work, unsigned long delay)
        {
        return queue_delayed_work_on(cpu, keventd_wq, work, delay);
        }

        /**
        * queue_delayed_work_on - queue work on specific CPU after delay
        * @cpu: CPU number to execute work on
        * @wq: workqueue to use
        * @work: work to queue
        * @delay: number of jiffies to wait before queueing
        *
        * Returns 0 if @work was already on a queue, non-zero otherwise.
        */
        int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
        struct work_struct *work, unsigned long delay)
        {
        int ret = 0;
        struct timer_list *timer = work->timer;
        if (!test_and_set_bit(0, work->pending)) {
        BUG_ON(timer_pending(timer));
        BUG_ON(!list_empty(work->entry));
        /* This stores wq for the moment, for the timer_fn */
        work->wq_data = wq;
        timer->expires = jiffies + delay;
        timer->data = (unsigned long)work;
        timer->function = delayed_work_timer_fn;
        add_timer_on(timer, cpu);
        ret = 1;
        }
        return ret;
        }
        EXPORT_SYMBOL_GPL(queue_delayed_work_on);

        5. 結論

        工作隊列和定時器函數處理有點類似, 都是執行一定的回調函數, 但和定時器處理函數不同的是定時器回調函數只執行一次, 而且執行定時器回調函數的時候是在時鐘中斷中, 限制比較多, 因此回調程序不能太復雜; 而工作隊列是通過內核線程實現, 一直有效, 可重復執行, 由于執行時降低了線程的優先級, 執行時可能休眠, 因此工作隊列處理的應該是那些不是很緊急的任務, 如垃圾回收處理等, 通常在系統空閑時執行,在xfrm庫中就廣泛使用了workqueue,使用時,只需要定義work結構,然后調用schedule_(delayed_)work即可。

        本文引用地址:http://www.104case.com/article/257787.htm

        linux相關文章:linux教程




        評論


        相關推薦

        技術專區

        關閉
        主站蜘蛛池模板: 札达县| 万盛区| 丹棱县| 尚义县| 临湘市| 抚远县| 方山县| 天全县| 石河子市| 杭锦旗| 江安县| 隆昌县| 乌恰县| 开封县| 汾阳市| 虹口区| 高邑县| 阿克苏市| 尖扎县| 大理市| 安岳县| 当阳市| 曲沃县| 南部县| 朔州市| 奇台县| 东阿县| 班玛县| 石门县| 成安县| 民县| 陇西县| 汪清县| 遂溪县| 甘洛县| 运城市| 保德县| 慈溪市| 东港市| 龙南县| 淮北市|