博客專欄

        EEPW首頁(yè) > 博客 > C/C++手撕線程池(線程池的封裝和實(shí)現(xiàn))

        C/C++手撕線程池(線程池的封裝和實(shí)現(xiàn))

        發(fā)布人:電子禪石 時(shí)間:2024-04-09 來(lái)源:工程師 發(fā)布文章

        本文使用的源碼地址:

        https://github.com/SCljh/thread_pool


        線程池描述

        ?


        池式結(jié)構(gòu)

        ??在計(jì)算機(jī)體系結(jié)構(gòu)中有許多池式結(jié)構(gòu):內(nèi)存池、數(shù)據(jù)庫(kù)連接池、請(qǐng)求池、消息隊(duì)列、

        對(duì)象池等等。


        ??池式結(jié)構(gòu)解決的主要問(wèn)題為緩沖問(wèn)題,起到的是緩沖區(qū)的作用。

        ?


        線程池

        ?通過(guò)使用線程池,我們可以有效降低多線程操作中任務(wù)申請(qǐng)和釋放產(chǎn)生的性能消耗。

            特別是當(dāng)我們每個(gè)線程的任務(wù)處理比較快時(shí),系統(tǒng)大部分性能消耗都花在了

            pthread_create以及釋放線程的過(guò)程中。

        那既然是這樣的話,何不在程序開(kāi)始運(yùn)行階段提前創(chuàng)建好一堆線程,

        等我們需要用的時(shí)候只要去這一堆線程中領(lǐng)一個(gè)線程,用完了再放回去,

        等程序運(yùn)行結(jié)束時(shí)統(tǒng)一釋放這一堆線程呢?按照這個(gè)想法,線程池出現(xiàn)了。

        ?


        線程池解決的問(wèn)題

        解決任務(wù)處理

        阻塞IO

        解決線程創(chuàng)建于銷毀的成本問(wèn)題

        管理線程

        ??線程池應(yīng)用之一:日志存儲(chǔ)


        ??在服務(wù)器保存日志至磁盤(pán)上時(shí),性能往往壓在磁盤(pán)讀寫(xiě)上,

             而引入線程池利用異步解耦可以解決磁盤(pán)讀寫(xiě)性能問(wèn)題。


        ??線程池的主要作用:異步解耦

        ?

        說(shuō)了那么多線程池的優(yōu)點(diǎn),那接下來(lái)要做的就是手撕這誘人的玩意了。

        ?


        樸實(shí)無(wú)華但不枯燥的代碼(以c++為例)

        本文主要講解的是c++線程池的實(shí)現(xiàn),C語(yǔ)言實(shí)現(xiàn)其實(shí)思想和c++是一致的,

        具體的代碼可見(jiàn)文章開(kāi)頭的鏈接。


        線程池中比較關(guān)鍵的東西

        ??若想自己編寫(xiě)一個(gè)線程池框架,那么可以先關(guān)注線程池中比較關(guān)鍵的東西:


        工作隊(duì)列

        任務(wù)隊(duì)列

        線程池的池

        pthread_create中的回調(diào)函數(shù)

        為什么說(shuō)這些東西比較關(guān)鍵?因?yàn)檫@“大四元”基本上支撐起了整個(gè)線程池的框架。

        而線程池的框架如下所示:

        ————————————————

        線程池

        ?如圖所示,我們將整個(gè)框架以及任務(wù)添加接口定義為線程池的“”,

        那么在這個(gè)池子中重要的就是工作隊(duì)列任務(wù)隊(duì)列

        以及決定工作隊(duì)列中的thread到底應(yīng)該工作還是休息的回調(diào)函數(shù)

        ??那么手撕線程池,我們就從這幾個(gè)關(guān)鍵點(diǎn)入手。

        工作隊(duì)列

        ??worker隊(duì)列,首先要有worker才有隊(duì)列,我們首先定義worker結(jié)構(gòu)體:

        ??可想而知,worker中要有create_pthread函數(shù)的id參數(shù),

        還需要有控制每一個(gè)worker live or die的標(biāo)志terminate,

        我們最好再設(shè)置一個(gè)標(biāo)志表示這個(gè)worker是否在工作。

        最后,我們要知道這個(gè)worker隸屬于那個(gè)線程池(至于為什么下文將介紹)

        struct NWORKER{
                pthread_t threadid;		//線程id
                bool terminate;			//是否需要結(jié)束該worker的標(biāo)志
                int isWorking;			//該worker是否在工作
                ThreadPool *pool;		//隸屬于的線程池
            }

        任務(wù)隊(duì)列

        ??任務(wù)隊(duì)列就簡(jiǎn)單得多了,想想編程語(yǔ)言中的任務(wù)應(yīng)該是什么?不就是函數(shù)嘛。

        所以我們只需要定義一個(gè)函數(shù)該有的東西就行了。

        struct NJOB{
                void (*func)(void *arg);     //任務(wù)函數(shù)
                void *user_data;			 //函數(shù)參數(shù)
            };

        線程池本池

        ??對(duì)于一個(gè)線程池,任務(wù)隊(duì)列和工作隊(duì)列已經(jīng)是必不可少的東西了,

        那線程池還有需要哪些東西輔助它以達(dá)到它該有的功能呢?

        ??一說(shuō)到線程,那處理好線程同步就是一個(gè)繞不開(kāi)的話題,

        那在線程池中我們需要處理的臨界資源有哪些呢?

        想想我們工作隊(duì)列中的每個(gè)worker都在等待一個(gè)任務(wù)隊(duì)列看其是否有任務(wù)到來(lái),

        所以很容易得出結(jié)論我們必須要在線程池中實(shí)現(xiàn)兩把鎖:

        一把是用來(lái)控制對(duì)任務(wù)隊(duì)列操作的互斥鎖,

        另一把是當(dāng)任務(wù)隊(duì)列有新任務(wù)時(shí)喚醒worker的條件鎖。

        ??有了這兩把鎖,線程池中再加點(diǎn)必要的一些數(shù)字以及對(duì)線程池操作的函數(shù),

        那么這個(gè)類就寫(xiě)完了。實(shí)現(xiàn)代碼如下:

        class ThreadPool{
        private:
            struct NWORKER{
                pthread_t threadid;
                bool terminate;
                int isWorking;
                ThreadPool *pool;
            } *m_workers;
        
            struct NJOB{
                void (*func)(void *arg);     //任務(wù)函數(shù)
                void *user_data;
            };
        public:
            //線程池初始化
            //numWorkers:線程數(shù)量
            ThreadPool(int numWorkers, int max_jobs);
            //銷毀線程池
            ~ThreadPool();
            //面向用戶的添加任務(wù)
            int pushJob(void (*func)(void *data), void *arg, int len);
        
        private:
            //向線程池中添加任務(wù)
            bool _addJob(NJOB* job);
            //回調(diào)函數(shù)
            static void* _run(void *arg);
            void _threadLoop(void *arg);
        
        private:
            std::list<NJOB*> m_jobs_list;
            int m_max_jobs;		//任務(wù)隊(duì)列中的最大任務(wù)數(shù)
            int m_sum_thread;	//worker總數(shù)
            int m_free_thread;		//空閑worker數(shù)
            pthread_cond_t m_jobs_cond;      //線程條件等待
            pthread_mutex_t m_jobs_mutex;  //為任務(wù)加鎖防止一個(gè)任務(wù)
            被兩個(gè)線程執(zhí)行等其他情況
        };

        可以看到我們做了一些必要的封裝,只給用戶提供了構(gòu)造函數(shù)、

        析構(gòu)函數(shù)以及添加任務(wù)的函數(shù)。這也是一個(gè)基本的線程池框架必要的接口。

         

        回調(diào)函數(shù)

        static?

        ??根據(jù)上方代碼可以看見(jiàn),回調(diào)函數(shù)為static函數(shù)。

        我可不想在我使用使用回調(diào)函數(shù)的時(shí)候自動(dòng)給我加上*this參數(shù)。

        ??首先回調(diào)函數(shù)是每個(gè)線程創(chuàng)建之后就開(kāi)始執(zhí)行的函數(shù),

        該函數(shù)作為pthread_create的第三個(gè)參數(shù)傳入。我們來(lái)看看pthread_create的函數(shù)原型:

        int pthread_create(pthread_t *tidp,const pthread_attr_t *attr,
        void *(*start_rtn)(void*),void *arg);

        注意到,此處的我們傳入的回調(diào)函數(shù)必須是接受一個(gè)void*參數(shù),

        且返回類型為void*的函數(shù)。如果我們將回調(diào)函數(shù)寫(xiě)成線程池的普通成員函數(shù),

        那么c++會(huì)在這個(gè)函數(shù)參數(shù)前默認(rèn)加上一個(gè)*this參數(shù),

        **這也是為什么我們能在成員函數(shù)中使用當(dāng)前對(duì)象中的一些屬性。

        **然而就是這個(gè)原因,若我們傳入的回調(diào)函數(shù)指針為類的成員函數(shù),

        那c++編譯器會(huì)破壞我們的函數(shù)結(jié)構(gòu)(因?yàn)榻o我們加了一個(gè)形參),

        導(dǎo)致pthread_create的第三個(gè)參數(shù)不符合要求而報(bào)錯(cuò):

        NON-STATIC

        看吧,編譯器不讓我們用non-static的成員函數(shù)作為回調(diào)函數(shù)傳入pthread_create中。

        其實(shí)在c++中,大多數(shù)回調(diào)函數(shù)都有這個(gè)要求。


        ??那為什么static就可以呢?


        這是因?yàn)閟tatic函數(shù)為類的靜態(tài)函數(shù),當(dāng)類的成員函數(shù)被static修飾后,

        調(diào)用該函數(shù)將不會(huì)默認(rèn)傳遞*this指針,

        這也是為什么static成員函數(shù)中不能使用對(duì)象的非static屬性:

        你*this指針都沒(méi)傳我上哪去找你的對(duì)象?

        函數(shù)本身

        ??在運(yùn)行回調(diào)函數(shù)的時(shí)候,我們又想用對(duì)象里的東西(比如鎖),

        編譯器又不讓我們用,那怎么辦?別忘了雖然static函數(shù)沒(méi)有*this指針,

        但它卻可以有一個(gè)*void的參數(shù)啊。有了這個(gè)*void,我們還怕少一個(gè)*this指針?

        我們可以先寫(xiě)一個(gè)static函數(shù),將需要的對(duì)象指針通過(guò)形參傳到這個(gè)函數(shù)里,

        再在這個(gè)函數(shù)中通過(guò)這個(gè)對(duì)象調(diào)用成員函數(shù)的方法,就能使用這個(gè)對(duì)象的成員屬性了。


        ??就像這樣:

        //run為static函數(shù)
        void* ThreadPool::_run(void *arg) {
            NWORKER *worker = (NWORKER *)arg;
            worker->pool->_threadLoop(arg);
        }
        //threadLoop為普通成員函數(shù)
        void ThreadPool::_threadLoop(void *arg) {
            //在這里就能直接用當(dāng)前ThreadPool對(duì)象的東西了
        }

         

        至于threadLoop的實(shí)現(xiàn),由于線程是要一直存在的,

        一個(gè)while(true)的循環(huán)肯定少不了了。這個(gè)循環(huán)中具體做什么呢:

        不斷檢查任務(wù)隊(duì)列中是否有job:


        如果有,則取出這個(gè)job,并將該job從任務(wù)隊(duì)列中刪除,且執(zhí)行job中的func函數(shù)。

        如果沒(méi)有,調(diào)用pthread_cond_wait函數(shù)等待job到來(lái)時(shí)被喚醒。

        若當(dāng)前worker的terminate為真,則退出循環(huán)結(jié)束線程。

        注意在對(duì)job操作前別忘了加鎖,函數(shù)實(shí)現(xiàn)如下:

        void ThreadPool::_threadLoop(void *arg) {
            NWORKER *worker = (NWORKER*)arg;
            while (1){
                //線程只有兩個(gè)狀態(tài):執(zhí)行\(zhòng)等待
                //查看任務(wù)隊(duì)列前先獲取鎖
                pthread_mutex_lock(&m_jobs_mutex);
                //當(dāng)前沒(méi)有任務(wù)
                while (m_jobs_list.size() == 0) {
                	//檢查worker是否需要結(jié)束生命
                    if (worker->terminate) break;
                    //條件等待直到被喚醒
                    pthread_cond_wait(&m_jobs_cond,&m_jobs_mutex);
                }
                //檢查worker是否需要結(jié)束生命
                if (worker->terminate){
                    pthread_mutex_unlock(&m_jobs_mutex);
                    break;
                }
                //獲取到j(luò)ob后將該job從任務(wù)隊(duì)列移出,免得其他worker過(guò)來(lái)重復(fù)做這個(gè)任務(wù)
                struct NJOB *job = m_jobs_list.front();
                m_jobs_list.pop_front();
        		//對(duì)任務(wù)隊(duì)列的操作結(jié)束,釋放鎖
                pthread_mutex_unlock(&m_jobs_mutex);
        
                m_free_thread--;
                worker->isWorking = true;
                //執(zhí)行job中的func
                job->func(job->user_data);
                worker->isWorking = false;
        
                free(job->user_data);
                free(job);
            }
        
            free(worker);
            pthread_exit(NULL);
        }


                                

        原文鏈接:https://blog.csdn.net/ACMer_L/article/details/107578636

                             

                          

                                

        ————————————————


                                

                       

                                



                                

        原文鏈接:https://blog.csdn.net/ACMer_L/article/details/107578636


        *博客內(nèi)容為網(wǎng)友個(gè)人發(fā)布,僅代表博主個(gè)人觀點(diǎn),如有侵權(quán)請(qǐng)聯(lián)系工作人員刪除。



        關(guān)鍵詞: threadpool

        技術(shù)專區(qū)

        關(guān)閉
        主站蜘蛛池模板: 茌平县| 平和县| 肇源县| 永康市| 二连浩特市| 富蕴县| 云阳县| 宁都县| 镇坪县| 阜南县| 房产| 利川市| 永宁县| 保德县| 肇州县| 左权县| 长葛市| 乡城县| 浦北县| 泌阳县| 富锦市| 鹤岗市| 台中县| 玛曲县| 清镇市| 五河县| 博乐市| 江门市| 武强县| 平南县| 辛集市| 马鞍山市| 千阳县| 金寨县| 岑巩县| 循化| 江华| 杭锦后旗| 衡东县| 麻江县| 波密县|