C++11多线程编程(1)

前段时间,我在Google std::thread时,读到了网站thispointer.com的多线程入门教程,该教程通俗易懂,但又不缺乏深度,其示例代码结合了许多C++11的新的特性,比如匿名函数等(第4部分进行拓展说明)。
下面,我就按照这个教程,介绍C++11 Multithreading。

目录

  1. 多线程创建和参数传递
  2. 多线程条件竞争及其解决方法
  3. 多线程事件处理、条件变量
  4. 多线程返回值
  5. 线程池

C++11已经支持多线程,但编译时需要添加-std=c++11-pthread选项:

g++ -std=c++11 sample.cpp -pthread

如果使用CMake编译:

//CMakeLists.txt 中添加
set(CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS} "-std=c++11 -pthread")

头文件:<thread>

1. 多线程创建及参数传递

1.1 子线程的创建

在C++11中,我们使用std::thread类创建子线程,一个std::thread对象管理一个新的进程。

std::thread thObj(<CALLBACK>);

新建thObj对象后,子线程将执行对象绑定的回调函数CALLBACK,父线程仍然继续执行。
回调函数可以写成以下3种形式:
(1)Function Pointer 函数指针
(2)Function Objects 函数对象
(3)Lambda functions 匿名函数

(1)通过函数指针创建线程

#include <iostream>
#include <thread>
 
void thread_function()
{
    for(int i = 0; i < 10000; i++);
        std::cout<<"thread function Executing"<<std::endl;
}
 
int main()  
{
    std::thread threadObj(thread_function);
    for(int i = 0; i < 10000; i++);
        std::cout<<"Display From MainThread"<<std::endl;
    threadObj.join();    
    std::cout<<"Exit of Main function"<<std::endl;
    return 0;
}

(2)通过函数对象创建线程

#include <iostream>
#include <thread>
class DisplayThread
{
public:
    void operator()()     
    {
        for(int i = 0; i < 10000; i++)
            std::cout<<"Display Thread Executing"<<std::endl;
    }
};
 
int main()  
{
    std::thread threadObj( (DisplayThread()) );
    for(int i = 0; i < 10000; i++)
        std::cout<<"Display From Main Thread "<<std::endl;
    std::cout<<"Waiting For Thread to complete"<<std::endl;
    threadObj.join();
    std::cout<<"Exiting from Main Thread"<<std::endl;
    return 0;
}

(3)通过匿名函数创建线程

#include <iostream>
#include <thread>
int main()  
{
    int x = 9;
    std::thread threadObj([]{
            for(int i = 0; i < 10000; i++)
                std::cout<<"Display Thread Executing"<<std::endl;
            });
            
    for(int i = 0; i < 10000; i++)
        std::cout<<"Display From Main Thread"<<std::endl;
        
    threadObj.join();
    std::cout<<"Exiting from Main Thread"<<std::endl;
    return 0;
}

1.2 get_id()获取线程标识

std::thread::get_id()用于获取线程对象的标识,若线程对象没有绑定回调函数,返回a non-executing threadstd::this_thread::get_id()用于获取当前线程标识。

#include <iostream>
#include <thread>
void thread_function()
{
    std::cout<<"Inside Thread :: ID  = "<<std::this_thread::get_id()<<std::endl;    
}
int main()  
{
    std::thread threadObj1(thread_function);
    std::thread threadObj2(thread_function);
 
    if(threadObj1.get_id() != threadObj2.get_id())
        std::cout<<"Both Threads have different IDs"<<std::endl;
 
        std::cout<<"From Main Thread :: ID of Thread 1 = "<<threadObj1.get_id()<<std::endl;    
    std::cout<<"From Main Thread :: ID of Thread 2 = "<<threadObj2.get_id()<<std::endl;    
 
    threadObj1.join();    
    threadObj2.join();    
    return 0;
}

Output:

Inside Thread :: ID  = 3083496256
Both Threads have different IDs
From Main Thread :: ID of Thread 1 = 3083496256
From Main Thread :: ID of Thread 2 = 3075103552
Inside Thread :: ID  = 3075103552

1.3 join()线程结合

join()顾名思义指把threadObj线程加入到当前线程,这时原来两个交替执行的线程合并为顺序执行了。比如父线程A调用了子线程B的join()方法后,那么子线程B执行完毕后再执行父线程A。

#include <iostream>
#include <thread>
#include <algorithm>//std::for_each
class WorkerThread
{
public:
    void operator()()
    {
        std::cout<<"Worker Thread "<<std::this_thread::get_id()<<" is Executing"<<std::endl;
    }
};
int main()
{
    std::vector<std::thread> threadList;
    for(int i = 0; i < 10; i++)
    {
        threadList.push_back( std::thread( WorkerThread() ) );
    }
    // Now wait for all the worker thread to finish i.e.
    // Call join() function on each of the std::thread object
    std::cout<<"wait for all the worker thread to finish"<<std::endl;
    std::for_each(threadList.begin(),threadList.end(), std::mem_fn(&std::thread::join));
    std::cout<<"Exiting from Main Thread"<<std::endl;
    return 0;
}

1.4 detach()线程分离

detach()方法使得父线程不再管理子线程。detached 线程也称为守护线程或后台线程。

std::thread th(funcPtr);
th.detach();

需要仔细处理线程的detach()join()方法,使用joinable()方法检查线程对象是否可以join/detach.
准则1:Never call join() or detach() on std::thread object with no associated executing thread.

std::thread threadObj( (WorkerThread()) );
threadObj.join();
threadObj.join(); // It will cause Program to Terminate

std::thread threadObj( (WorkerThread()) );
threadObj.detach();
threadObj.detach(); // It will cause Program to Terminate

准则2: Never forget to call either join or detach on a std::thread object with associated executing thread.
RESOURCE ACQUISITION IS INITIALIZATION (RAII)可以有效帮助我们解决忘记join/detach的情况。

#include <iostream>
#include <thread>
class ThreadRAII
{
    std::thread & m_thread;
    public:
        ThreadRAII(std::thread  & threadObj) : m_thread(threadObj)
        {
            
        }
        ~ThreadRAII()
        {
            // Check if thread is joinable then detach the thread
            if(m_thread.joinable())
            {
                m_thread.detach();
            }
        }
};
void thread_function()
{
    for(int i = 0; i < 10000; i++);
        std::cout<<"thread_function Executing"<<std::endl;
}
 
int main()  
{
    std::thread threadObj(thread_function);
    
    // If we comment this Line, then program will crash
    ThreadRAII wrapperObj(threadObj);
    return 0;
}

1.5 子线程参数的传递

参数在std::thread的构造方法时以拷贝的方式进行传递。

1.5.1 引用作为参数传递

#include <iostream>
#include <thread>
void threadCallback(int const & x)
{
    int & y = const_cast<int &>(x);
    y++;
    std::cout<<"Inside Thread x = "<<x<<std::endl;
}
int main()
{
    int x = 9;
    std::cout<<"In Main Thread : Before Thread Start x = "<<x<<std::endl;
    std::thread threadObj(threadCallback, x);
    threadObj.join();
    std::cout<<"In Main Thread : After Thread Joins x = "<<x<<std::endl;
    return 0;
}

Output:

In Main Thread : Before Thread Start x = 9 
Inside Thread x = 10 
In Main Thread : After Thread Joins x = 9 

什么原因?Its because x in the thread function threadCallback is reference to the temporary value copied* at the new thread’s stack. 可以用std::ref()来解决这一问题。

#include <iostream>
#include <thread>
void threadCallback(int const & x)
{
    int & y = const_cast<int &>(x);
    y++;
    std::cout<<"Inside Thread x = "<<x<<std::endl;
}
int main()
{
    int x = 9;
    std::cout<<"In Main Thread : Before Thread Start x = "<<x<<std::endl;
    std::thread threadObj(threadCallback,std::ref(x));
    threadObj.join();
    std::cout<<"In Main Thread : After Thread Joins x = "<<x<<std::endl;
    return 0;
}

1.5.2 成员函数指针作为回调函数时的参数传递

#include <iostream>
#include <thread>
class DummyClass {
public:
    DummyClass()
    {}
    DummyClass(const DummyClass & obj)
    {}
    void sampleMemberFunction(int x)
    {
        std::cout<<"Inside sampleMemberFunction "<<x<<std::endl;
    }
};
int main() {
 
    DummyClass dummyObj;
    int x = 10;
    std::thread threadObj(&DummyClass::sampleMemberFunction,&dummyObj, x);
    threadObj.join();
    return 0;
}

从第1部分的参数传递可以看出,线程间的数据共享方式很简单,但这种简单的方式容易导致很多问题,其中常见的一个就是:竞争条件。
第2部分将介绍竞争条件产生及如何解决竞争条件导致的异常。

2. 多线程条件竞争及其解决方法

2.1 竞争条件异常的产生

下面将以钱包为例说明竞争条件导致异常的产生,假设testMultithreadedWallet产生5个线程同时调用addMoney()方法。

#include <iostream>
#include <thread>
#include <vector>
class Wallet
{
    int mMoney;
public:
    Wallet() :mMoney(0){}
    int getMoney() { return mMoney; }
    void addMoney(int money)
    {
       for(int i = 0; i < money; ++i)
       {
          mMoney++;
       }
    }
};
int testMultithreadedWallet()
{
   Wallet walletObject;
   std::vector<std::thread> threads;
   for(int i = 0; i < 5; ++i){
        threads.push_back(std::thread(&Wallet::addMoney, &walletObject, 1000));
   }

   for(int i = 0; i < threads.size() ; i++)
   {
       threads.at(i).join();
   }
   return walletObject.getMoney();
}

int main()
{

  int val = 0;
  for(int k = 0; k < 1000; k++)
  {
     if((val = testMultithreadedWallet()) != 5000)
     {
       std::cout << "Error at count = "<<k<<" Money in Wallet = "<<val << std::endl;
     }
  }
  return 0;
}

Outputs:

...
Error at count = 971  Money in Wallet = 4568
Error at count = 972  Money in Wallet = 4260
Error at count = 972  Money in Wallet = 4260
Error at count = 973  Money in Wallet = 4976
Error at count = 973  Money in Wallet = 4976
...

这就是竞争条件产生异常的示例,当多个线程同时改变同一地址指向的变量时,异常将会出现。

异常产生的原因分析

每一个线程同时对mMoney变量递增,虽然mMoney++看起来是一条命令,但底层可以细分为3个命令:
(1)加载mMoney变量至寄存器;
(2)寄存器递增+1;
(3)更新寄存器值到mMenoy变量。

假设线程1和线程2”mMoney++“操作的底层命令执行顺序如下:

    Thread 1             Thread 2
加载mMoney变量至寄存器               
                        加载mMoney变量至寄存器
寄存器递增+1          
                        寄存器递增+1
更新寄存器值到mMenoy变量 
                        更新寄存器值到mMenoy变量

如果这两个线程执行之前mMoney=46,那么经过线程1和线程2的递增,mMoney应该为48。但是由于出现上面的执行顺序,mMoney=46被加载至不同的寄存器后递增为mMoney=47,这是因为寄存器不同,最后更新mMenoy值时有一次递增被忽略了。

2.2 互斥锁解决竞争条件异常

互斥锁可以有效解决竞争条件导致的异常,仍以上面的为例子。互斥锁的使用很简单,在需要加锁的代码之前调用lock()加锁,代码之后再调用unlock()解锁。
C++11的互斥锁std::mutex在头文件<mutex>文件中。

#include<iostream>
#include<thread>
#include<vector>
#include<mutex>

class Wallet
{
    int mMoney;
    std::mutex mutex;
public:
    Wallet() :mMoney(0){}
    int getMoney()   {  return mMoney; }
    void addMoney(int money)
    {
        mutex.lock();
        for(int i = 0; i < money; ++i)
        {
            mMoney++;
        }
        mutex.unlock();
    }
};

为了避免忘记调用unlock()方法导致的阻塞,可以使用std::lock_guard互斥锁。

class Wallet
{
    int mMoney;
    std::mutex mutex;
public:
    Wallet() :mMoney(0){}
    int getMoney()   {  return mMoney; }
    void addMoney(int money)
    {
        std::lock_guard<std::mutex> lockGuard(mutex);
        // In constructor it locks the mutex

        for(int i = 0; i < money; ++i)
        {
            // If some exception occurs at this
            // poin then destructor of lockGuard
            // will be called due to stack unwinding.
            //
            mMoney++;
        }
        // Once function exits, then destructor
        // of lockGuard Object will be called.
        // In destructor it unlocks the mutex.
    }
 };

参考资料

1.thisPointer多线程