본문으로 바로가기

C++11) std::thread 와 관련된 함수, 라이브러리 2

category C++/Modern 2019. 2. 8. 03:32

<std::atomic> 


atomic 클래스는 정수형 또는 포인터 타입에 아토믹 하게 (thread-safe) 하게 이용할 수 있게 하는 클래스 입니다.

memory_order 는 기본적으로 memory_order_seq_cst 입니다.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#include <iostream>
#include <thread>
#include <atomic>
using namespace std;
 
void func1(std::atomic<int>& i) {
 
    for (int j = 1; j <= 1000000++j)
        i++;
}
int main() {
    std::atomic<int> i = 0;
    thread t1(func1, std::ref(i));
    thread t2(func1, std::ref(i));
    t1.join();
    t2.join();
    cout << i << endl;
}
cs



t1,t2의 결과로 i 는 2000000을 출력합니다.

atomic 클래스는 값 복사와 대입 연산을 금지하고 있습니다.

1
2
3
4
5
6
7
8
9
10
std::atomic<int> i = 0;
thread t1(func1, i);    //error, atomic은 복사 연산이 금지 되어 있습니다.
std::atomic<int> ii = i;    // 위와 동일하게 복사 연산이 금지됩니다.
std::atomic<int> iii;
iii = i;    // 대입 또한 불가능 합니다.
thread t2(func1, std::ref(i));    // ok, atomic을 참조만 넘깁니다.
 
std::atomic<int> iiii;
std::atomic<short> s;
iiii = s;    //ok 이 둘은 다른 클래스라 대입이 가능합니다.
cs

atomic 과, atomic 그리고 atomic과 정수형에 대한 비교연산도 지원합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
int main() {
    
    atomic<int> a = 5;
    int b = 10;
    int c = 20;
    //비교 함수, a 와 b 를 비교해 a와 b가 같다면, a를 c로 바꿈
    //같지 않다면 b를 a로 바꿈
    a.compare_exchange_weak(b, c); // a = 5, b = 5, c = 20
    a.compare_exchange_weak(b, c); // a = 20, b = 5 , c = 20
 
    //compare_exchange_weak 와 compare_exchange_strong 의 차이
    //두 값을 비교했을때 같다면, strong은 항상 교환을 하고 true를 리턴해야함
    // 하지만 weak는 비교 도중 바로 false를 리턴하여 작업을 중단 시킬 수 있음
}
cs


자세한 정보는 이곳을 확인..

<memory fence 와 memory order>

프로그램이 단일 CPU에서 실행될 땐 하드웨어는 프로그래머가 지정한 순서 대로 (프로그램 순서) 작업하기 때문에
memory fence가 필요하지 않습니다. 하지만 다중 스레드에서 작업이 이루어 진다면 memory fence가 필요합니다 ( race condition )

본문 : https://en.wikipedia.org/wiki/Memory_barrier
https://www.kernel.org/doc/Documentation/memory-barriers.txt

memory_order 는 6가지 종류가 있으며, atomic 클래스의 함수들은 기본 memory_order 종류로 memory_order_seq_cst를 받습니다.

1
2
3
4
5
6
7
8
typedef enum memory_order {
    memory_order_relaxed,
    memory_order_consume,
    memory_order_acquire,
    memory_order_release,
    memory_order_acq_rel,
    memory_order_seq_cst
    } memory_order;
cs

memory_order_relaxed : 다른 읽기 또는 쓰기에 대해 동기화 또는 순서 제한이 적용되지 않으며, 연산의 원자성만 보장됩니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#include <vector>
#include <iostream>
#include <thread>
#include <atomic>
 
std::atomic<int> cnt = { 0 };
void f()
{
    for (int n = 0; n < 1000++n)
        cnt.fetch_add(1std::memory_order_relaxed);
}
int main()
{
    std::vector<std::thread> v;
    for (int n = 0; n < 10++n)
        v.emplace_back(f);
    for (auto& t : v)
        t.join();
        std::cout << cnt;    // 출력값은 10000
}
cs

위는 순서에 상관없이 단순히 더해야 할때 사용됩니다. (shareptr 같은 참조만 증가시키는 연산을 하기에 적합)
memory_order_release는 지금까지 작성한 모든 것들을 방출합니다.
memory_order_acquire는 release까지 기다렸다가 release떄 작성한 메모리 값들을 보게해줍니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#include <vector>
#include <iostream>
#include <thread>
#include <atomic>
 
std::atomic<bool> flag = { false };
int value = 0;
 
void thread1() {
    value = 100;
    flag.store(truestd::memory_order_release);
}
void thread2() {
    flag.load(std::memory_order_acquire);
    if ( value == 100 ) value = 200;
}
int main()
{
    std::thread a(thread1);
    std::thread b(thread2);
 
    a.join(); b.join();
    std::cout << value << std::endl;    // 200 을 출력합니다.
}
cs

위 memory_order는 producer - customer 관계라고도 하며,
쓰고 나서 배포하기 읽기 전에 획득하기를 말합니다. 즉
release 하기전에 Write 한 것들이 acquire 한 후 read 해야 한단 겁니다. 
대신 위 사항은 customer 가 producer 보다 먼저 호출되면 안됩니다 즉,
b가 a 보다 먼저 호출되면 안됩니다.

memory_order_release 와 memory_order_acquire 는 spinlock 구조로 자주 사용됩니다.

spinlock : 만약 다른 스레드가 lock을 소유하고 있다면, 그 lock 이 반환될때까지 계속 확인하며 기다리는 것이다.
크리티컬 섹션(임계 구역)에 진입이 불가능할때 컨텍스트 스위칭을 하지 않고 잠시 루프를 돌면서 재시도 하는 것을 말합니다.
Lock-unlock 과정이 매우 짧아서 락하는 경우가 드문 경우( 즉: 적절하게 크리티컬 섹션을 사용한 경우 ) 유용합니다.
spinlock은 다음과 같은 특성을 가집니다.

1. lock 을 얻을 수 없다면, 계속 해서 lock 을 얻을때까지 기다린다. 이른 바 바쁘게 기다리는 busy wating 이다.


2. 바쁘게 기다린다는 것은 무한 루프를 돌며 최대한 다른 스레드에게 cpu를 양보하지 않는 것이다.

3. lock이 곧 사용가능해질 경우, 컨텍스트 스위치를 줄여 cpu의 부담을 줄여준다. 만약 어떤 스레드가 lock을 오랫동안 유지한다면 오히려 
cpu 시간을 많이 소모할 가능성이 크다.

4. 하나의 cpu나 하나의 코어만 있는 경우에는 유용하지 않다.그 이유는 만약 스레드가 lock 을 가지고 있고 그 스레드가 lock 을 풀어주려면
싱글 cpu에서는 어짜피 컨텍스트 스위칭이 일어나기 때문이다. 주의할 점은 spinlock 을 잘못 사용하면 cpu 사용률이 100% 되는 상황이 발생한다.
스핀락은 기본적으로 무한루프를 돌면서 lock을 기다리므로 하나의 쓰레드가 lock 을 오랫동안 가지고 있다면
다른 blocking 된 쓰레드는 busy wating을 하므로 cpu를 쓸데없이 낭비하게 된다

장점은 spinlock을 적절히 잘 활용하면 컨텍스트 스위칭을 줄여 효율을 높일 수 있다.

무한 루프를 돌기 보다는 일정시간 lock을 얻을 수 없다면 잠시 sleep 을하는 back off 알고리즘을 사용하는게 훨씬 좋다.


spinlock 과 mutex의 실행속도 차이 :
https://smallake.kr/?p=6356



memory_order_seq_cst : 호출전 작성했던 모든 메모리 변환들이 호출 후엔 보이게 됩니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#include <thread>
#include <atomic>
#include <cassert>
 
std::atomic<bool> x = { false };
std::atomic<bool> y = { false };
std::atomic<int> z = { 0 };
 
void write_x(){
    x.store(truestd::memory_order_seq_cst);
}
void write_y(){
    y.store(truestd::memory_order_seq_cst);
}
void read_x_then_y()
{
    while (!x.load(std::memory_order_seq_cst));
    if (y.load(std::memory_order_seq_cst)) 
        ++z;
}
void read_y_then_x(){
    while (!y.load(std::memory_order_seq_cst));
    if (x.load(std::memory_order_seq_cst)) 
        ++z;
}
int main(){
    std::thread a(write_x);
    std::thread b(write_y);
    std::thread c(read_x_then_y);
    std::thread d(read_y_then_x);
    a.join(); b.join(); c.join(); d.join();
    assert(z.load() != 0);  // 일어날 일 없음, z == 2
}
cs


사실 더 작성하고싶으나 너무 방대한 양이라, 링크를 걸도록 하겠음..

https://www.slideshare.net/seao/c-atomic
http://egloos.zum.com/sweeper/v/3059861
https://en.cppreference.com/w/cpp/atomic/memory_order


<lock_guard>

이번엔 mutex의 lock에 대한 함수들을 작성하겠습니다.
기본적인 lock_guard는 프로그래머의 deadlock을 방지하기 위해 자동으로 unlock을 해주며, RAII 기반으로 되어있습니다.
사용자가 원할때 unlock을 할 수 없다는 단점이 있으며, 스코프에서 사라질때 자동으로 unlock 됩니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
std::mutex g_mtx;
int g_sum = 0;
void func1() {
    std::lock_guard<std::mutex> lgmtx(g_mtx);    //따로 unlock이 필요없음
    std::lock_guard<std::mutex> lgmtx2 = lgmtx;    // 삭제된 연산자임 사용 불가
    for (int i = 0; i < 1000000++i)
        ++g_sum;
 
    g_mtx.unlock();    // 미정의 동작! 
}
int main()
{
    std::thread a(func1);
    std::thread b(func1);
    a.join(); b.join();
}
cs


std::lock_Guard와 다른 함수로 std::unique_lock이 있습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
std::mutex g_mtx;
int g_sum = 0;
 
 
std::unique_lock<std::mutex> a(g_mtx, std::adopt_lock);    // lock을 만들고, 바로 lock을 함
std::unique_lock<std::mutex> b(g_mtx, std::defer_lock); // lock을 만들지만, lock을 하진 않음
if (b.try_lock()) b.lock();
std::unique_lock<std::mutex> c(g_mtx, std::chrono::seconds(3));    // lock을 만들고, 3초 뒤에 lock 을 함
if (c) {
    // 작업내용
}
 
auto d = std::unique_lock<std::mutex>(g_mtx, std::adopt_lock);
= std::move(d); // 같은 unique_lock 끼리 move semantics가 가능, 결과로 d는 사용할 수 없음
a.try_lock_for(std::chrono::seconds(3));    // 3초동안 lock을 시도함
a.try_lock_until(...);    // ...초 까지 lock을 시도함
a.unlock();    // 락 해제    ( unlock을 해주지 않아도 자동으로 unlock 처리함 )
cs

<mutex와 semaphore의 차이>

뮤텍스는 동기화 함에 있어 하나의 쓰레드만 실행되게 하지만, 세마포어는 지정된 수만큼의 쓰레드가 동시에 실행되도록 동기화가 가능하다.
지정된 수보다 작거나, 같을때까진 스레드의 실행을 허용하고, 지정된 수를 넘으면 실행을 막음

세마포어는 뮤텍스가 될 수 있지만, 뮤텍스는 세마포어가 될 수 없음.
세마포어는 소유할 수 없지만, 뮤텍스는 소유할 수 있고 소유자가 책임을 짐
뮤텍스는 1개만 동기화가 되지만, 세마포어는 하나 이상을 동기화 할 수 있음

스핀락과도 마찬가지지만, 대부분 많은 상황에선 세마포어를 사용하지만 매우 짧은 시간동안만 lock을 할거라면 스핀 락을 사용하는게 낫다

<semaphore>

세마포어의 정보를 작성하기전, C++11 에서는 조건변수 (condition_variable) 을 제공한다. 이에 대해 알아둬야 하는게 나을것 같다.
조건 변수는 헤더 파일을 include 해야 사용가능 하며 특정 조건이 만족할 때까지 현재 Thread 를 blocking 할 수 있다.
( 이때 특정조건은 notify 할 수도있고 time out 등이 있다. )
조건 변수는 각 Thread 를 blocking 함으로 호출 순서를 조절해서 결과적으로는 Thread 간의 통신을 가능하게 해주는 효과를 가짐

조건변수는 크게 wait 와 notify_one() 이나 notify_all() 함수를 세트로 가지게 됨

notify_one() 함수는 해당 조건 변수를 기다리고 있는 thread들 중 한개의 Thread를 깨움 
notify_all() 함수는 해당 조건 변수를 기다리고 있는 모든 Thread 들을 깨움 
wait() 함수는 호출하는 Thread가 락 객체를 점유하고 있는 상태여야함. wait()를 호출하면 해당 락 객체가 unlock()을 호출하고
Thread가 blocking 됨

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
std::mutex g_mtx;
std::condition_variable cv;
void func1(){
    std::unique_lock<std::mutex> lck(g_mtx);
    cv.wait(lck);    // cv.notify_all 이 호출되기 전까진 계속 기다림
}
void func2() {
    std::unique_lock<std::mutex> lck(g_mtx);
    cv.notify_all();
}
int main()
{
    std::thread a(func1);
    std::thread b(func1);
    func2();
 
    a.join(); b.join();
}
cs




만약 cv.notify_all() 이 아닌, cv.notify_one() 이 호출된다면, a b 둘중 하나는 쓰레드에 들어갈 것이고, 
다른 한 쓰레드는 notify_one()이 호출될때까지 계속 대기하게 된다.
func2()가 호출되지 않으면, a b 스레드 둘다 대기하게 된다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class semaphore {
public:
    semaphore(int count = 0) : cnt(count) {}
    void notify() {
        std::unique_lock<std::mutex> lck(m_mtx);
        ++cnt;
        m_cv.notify_one();
    }
    void wait() {
        std::unique_lock<std::mutex> lck(m_mtx);
        while (!cnt)    // cnt 가 0이면 계속 대기함
            m_cv.wait(lck);
        --cnt;    //cnt 가 감소했다는건 thread가 작업에 들어갔다는 것
    }
    unsigned long getCount() { return cnt; }
private:
    std::mutex m_mtx;
    std::condition_variable m_cv;
    unsigned long cnt;
};
 
semaphore sm;
int num = 0;
void func1(int _id) {
    std::cout << _id << "번째 thread 작업 시작" << std::endl;
    for (int i = 0; i < 1000000++i, ++num);
    sm.notify();
}
int main()
{
    std::thread a[10];
    for (int i = 0; i < 10++i) {
        a[i] = std::thread(func1, i + 1);
        a[i].detach();    //쓰레드를 따로 분리시켜줬음
        sm.wait();    // thread가 안에 들어가며 notify를 호출하고 wait 된 스레드중 하나가 작업에 
    }
    std::cout << num << std::endl;    // 10000000이 출력 됨!
}
cs

사실 세마포어 소스는 내가 직접 짰는데.. 보고도 뭔 개소리인지 모르겠다.. 왜 잘 돌아가는거지 대체 왜....
나중에 더 깊게 공부하게 되면 다시 수정하겠음..