C++线程并发记录

提高性能两种方法:

  • 将单个任务拆分成几个部分,各自并行运行,从而降低运行时间,但任务并行 各部分之间可能存在依赖(逻辑或数据)
  • 多种数据执行相同任务,提高数据处理能力,但带来的是数据吞吐量提升

线程是有限的资源,若线程太多,每个线程所需要的独立的堆栈空间反而加重计算机的负担,通常有线程池来限制线程数量

1.线程传入函数

1
2
3
4
5
6
7
8
9
10
#include <iostream>
#include <thread>
void hello(){
std::cout<<"i am a thread!"<<std::endl;
}
int main(){
std::thread t(hello);
t.join();
return 0;
}

C++使用线程库很简单,第一步是使用thread库,线程初始化对象是使用thread t(function)function代表需要运行的线程的回调函数,因此实际上也可以用lambada表达式来代替,std::thread([ ]( ){std::cout<<"i am a thread!"<<std::endl; })效果是一样的,最后调用对象函数join使得线程运行起来。

2.线程传入对象

1
2
3
4
5
6
7
8
class background_task{
public:
void operator( )( ) const{
do_something();
}
};
background_task f;
std::thread t( f );

线程传入对象通常是在对象中重载()函数,使得在调用std::thread t()时调用重载后的()函数,从而使得绑定,代码中,提供的函数对象会复制到新的线程存储空间中,函数对象的执行和调用都在线程的内存空间中。
另外一点,线程传入对象与传入结构体并无太大区别,C++中对象更像是结构体的功能延伸(增加函数、权限等概念),因此结构体实际上也可以有构造函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
struct func{
int &i;
func(int & _i):i(_i){ }
void operator( )( ){
do_something();
}
};
int main(){
int local_state=0;
func my_func(local_state);
std::thread t( my_func);
t.detach();
return 0;
}

与上面不同的是:

  • 结构体代替类对象实现线程数据传入
  • 结构体传入main函数一个引用变量
  • 使用detach使线程分离,因此在main函数结束后依然以一个独立线程运行
    因此危险在于分离线程使用主线程引用变量,在主函数失效后就会引用一个错误值,因此通常将数据复制到新的线程中,或者使用join使得线程仍在主线程的控制中,但值得注意的使用join后主线程会等待自线程完成任务,在此期间主线程其实没做任何事,因此收益甚微,同时在该线程对象只能使用一次join,之后就不能再次使用,使用joinable会返回false。多数join使用情况是在应用抛出异常时,通过新线程做决定从而避免生命周期问题。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
int main(){
int some_local=0;
func my_func(sime_local);
std::thread t(my_func);
try{
do_somethin();
}
catch(...){
t.join();
throw;
}
t.join();
return 0;
}

因此对比使用detach后,detach危险在于脱离了主线程控制,因此使用detach是使用作为守护线程或者是在分离线程函数中添加能终止该线程的逻辑函数。

3.线程传入变量参数

向线程中传入参数很简单,主要注意的是参数传入方式是值拷贝传递、值引用、指针传递方式不同。

1
2
3
4
5
6
7
8
void f(int i,std::string const &s);
//std::thread t(f,3,"hello");
int main(){
char buff[1024];
std::thread t(f,3,std::string(buff));//避免指针悬挂
t.detach();
return 0;
}

对于结构体数据引用注意引用理解错误

1
2
3
4
5
6
7
8
9
void update(weight_id w,weight_data &data);
int main(){
weight_data data;
std::thread t(update,w,std::ref(data));//转换为为引用
display();
t.join();
process(data);
return 0;
}

对于类函数传入参数,第三个才是函数中第一个参数(和bind一样)

1
2
3
4
5
6
7
class X{
public:
void do_something(int m);
}
X my_x;
int m(0);
std::thread t(&my_x::do_something,&my_x,m);

还有第一点关于线程变量所有权的转移,比如:stream,unique_ptr都是可移动但不可拷贝,不同线程间需要将该资源进行转移,此时使用std::move转移

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class scoped_thread{
std::thread t;
public:
explicit scoped_thread(std::thread _t):t(std::move(_t)){//转移线程权
if(!t.joinable())
throw std::logic_error(" No thread!");
}
~scoped_thread(){
t.join();
}
scoped_thread(scoped_thread const &)=delete;
scoped_thread & operator=(scoped_thread const &)=delete;
};
struct func();
int main(){
int local_state;
scpoed_thread t(std::thread(func(local_state)));
do_something;
return 0;
}

每个线程都有自己的std::thread::id可通过get_id获取,通常用于特殊线程分配任务

4.线程数据共享

线程间共享数据多是线程出问题的地方,比如在使用双向链表时候,其中一个线程准备删除一个节点,另外一个线程准备查询一个节点,中间竞争会导致数据错误,因此在C++中添加了互斥量用于保护共享数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#include <list>
#include <mutex>
#include <algorithm>

std::list<int> some_list;
std::mutex some_mutex;

void add_list(int new_value){
std::lock_guard<std::mutex> guard(some_mutex);
some_list.push_back(new_value);
}
bool list_contains(int value_find){
std::lock_guard<std::mutex> guard(some_mutex);
return std::find(some_list.begin(),some_list.end(),value_find)!some_list.end();
}

同样注意的是引用以及指针在使用互斥量时其实并没有完全将共享数据分离开来,避免用户函数调用,因此再重写一个线程安全的数据结构时,需要消减大量关于该结构体的算法,比如复制、赋值、引用等,以减少可能的调用。

死锁是在有两个以上互斥变量会发生的情况,彼此线程都需要对方的共享资源,但在这之前都各自为共享的数据上了锁,此时使用lock函数一次性锁住多个互斥量,lock锁住两个互斥量后,互斥量调用lock_guard时只用使用adopt_lock表示锁可捕捉,这样就不会因为上锁顺序不同导致锁死。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class some_big_object;
void swap(some_big_object & lhs,some_big_object & rhs);
class X{
private:
some_big_object some_detail;
std::mutex m;
public:
X(some_big_object const & sd):some_datail(sd){}

friend void swap(X & lhs,X & rhs){
if(&lhs==&rhs)return ;
std::lock(lhs.m,rhs.m);
std::lock_guard<std::mutex> lock_a(lhs.m,std::adopt_lock);
std::lock_guard<std::mutex> lock_b(rhs.m,std::adopt_lock);
swap(lhs.some_datail,rhs.some_detail);
}
};

5.线程同步

线程同步最好的方式是通过唤醒的方式去提醒另外一个线程,也被称为“条件变量”

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#include <condition_variable> 
std::mutex mut;
std::queue<data_chunk> data_queue; // 1
std::condition_variable data_cond;
void data_preparation_thread(){
while(more_data_to_prepare()){
data_chunk const data=prepare_data();
std::lock_guard<std::mutex> lk(mut);
data_queue.push(data); // 2
data_cond.notify_one(); // 3
}
}
void data_processing_thread(){
while(true){
std::unique_lock<std::mutex> lk(mut); // 4
data_cond.wait(lk,[]{return !data_queue.empty();}); // 不为空则开始
data_chunk data=data_queue.front();
data_queue.pop();
lk.unlock(); // 6
process(data);
if(is_last_chunk(data))
break;
}
}

6.线程异步

异步多用在线程计算一个返回值时,使用async对象,通过调用get函数获取返回值,向异步中传递参数与创建线程相同

1
2
3
4
5
6
7
8
9
10
11
#include <future>
#include <iostream>

int find_answer();
void do_other();
int main(){
std::future<int> the_answer=std::async(find_answer);
do_other();
std::cout<<"answer is"<<the_answer.get()<<std::endl;
return 0;
}

异步线程发展为任务与期望概念,std::packaged_task<>对其函数绑定,当对象调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#include <deque>
#include <mutex>
#include <future>
#include <thread>
#include <utility>
std::mutex m;
std::deque<std::packaged_task<void()> > tasks;
bool gui_shutdown_message_received();
void get_and_process_gui_message();
void gui_thread() // 1
{
while(!gui_shutdown_message_received()) // 2
{
get_and_process_gui_message(); // 3
std::packaged_task<void()> task;
{
std::lock_guard<std::mutex> lk(m);
if(tasks.empty()) // 4
continue;
task=std::move(tasks.front()); // 5
tasks.pop_front();
}
task(); // 6
}
}
std::thread gui_bg_thread(gui_thread);
template<typename Func>
std::future<void> post_task_for_gui_thread(Func f)
{
std::packaged_task<void()> task(f); // 7
std::future<void> res=task.get_future(); // 8
std::lock_guard<std::mutex> lk(m); // 9
tasks.push_back(std::move(task)); // 10
return res;
}

7.原子操作

原子类型特化的结构体或者类型可以使得在操作时避免数据冲突,通过使用头文件atomic,通过调用std::atomic<>特化该类或结构体,并且特化后的原子类支持+=、-=、*=、|=等操作,通过调用storeload读写,exchage读改写。

1
2
3
4
std::atomic<bool> b;
bool x=b.load(std::memory_order_acqiure);
b.store(true);
x=b.exchage(false,std::memory_order_acq_rel);

另外atomic提供两种exchange交换值的方式:

  • compare_exchange_weak()原始值与预期值一致时存储可能也会不成功,通常是缺少单条CAS的机器上
  • compare_exchange_strong() 两值相等存储提供值,两值不等期望值被更新为原子变量中的值

atomic特殊化的指针通过fetch_add()fetch_sub()实现指针偏移,以及fetch_and()fetch_or()fetch_xor()

1
2
3
4
5
6
7
8
9
std::shared_ptr<my_data> p;
void precess_global_data(){
std::share_ptr<my_data> local=std::atomix_load(&p);
precess_data(local);
}
void update_global_data(){
std::share_ptr<my_data> local(new my_data);
std::atomic_store(&p,local);
}