0%

ThreadPool

GitHub SourceCode : 連結

自從加入精神時光屋團隊後,對於一些平行處理、多執行序等程式掌控力就越來越要求。為了不要當拖油瓶,這次練習的是 ThreadPoll,從這個練習可以更精準掌握 Thread 控制技巧。


目標

建立 Thread 其實是需要成本的,所以頻繁的建立 Thread 砍掉 Thread 相當耗效能,另一方面,如果等到有 Task 才開始建立 Thread 可能會導致第一個 Task 回應速度過慢。例如 : IIS 底層就有 ThreadPool ,為了因應 Request 進來時能更快速的回應,但又不能為了一昧追求速度不管資源的有效運用,所以ThreadPool 應該有增長跟最低維持幾條的上下限。

依據上面描述定義以下目標

  1. 自製 ThreadPool
  2. 依據 Task 量決定 Thread 的數量,Task 過多會加開 Thread 處理,反之會減少
  3. ThreadPool 可以設定 Thread 數量上下限

實作

希望呼叫的方式如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class Program
{
static void Main(string[] args)
{
// 建立自己的 ThreadPool ,設定 Thread 至少保持 3 條,上限不超過 10 條
MyThreadPool myTreadPool = new MyThreadPool(3,10);

// 一值塞任務進去,不管任務執行完了沒
// 所以 ThreadPool 應該要能接住 Task 讓它們排隊消耗
for (int i = 0; i < 200; i++)
{
myTreadPool.Enqueue(i);
}

Console.ReadKey();
// 等待執行結束
myTreadPool.WaitFinished();
}
}

MyThreadPool

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class MyThreadPool
{
private BlockingCollection<int> _jobQueue;

private int _minThread;

private int _maxThread;

public MyThreadPool(int minThread,int maxThread)
{
this._minThread = minThread;
this._maxThread = maxThread;
// 最多只能放 100 個 Task
this._jobQueue = new BlockingCollection<int>(100);
}

public void WaitFinished()
{
throw new NotImplementedException();
}

public void Enqueue(int i)
{
_jobQueue.TryAdd(i);
}
}

實作 ThreadPool 保有最少的 Thread

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class MyThreadPool
{
private BlockingCollection<int> _jobQueue;

private int _minThread;

private int _maxThread;

private int _currentThreadCount;

private List<Thread> _threads;

public MyThreadPool(int minThread,int maxThread)
{
this._minThread = minThread;
this._maxThread = maxThread;
this._jobQueue = new BlockingCollection<int>(100);
this._currentThreadCount = 0;
this._threads = new List<Thread>();

for (int i = 0; i < minThread; i++)
{
this.CreateThread();
}
}

private void CreateThread()
{
int id = Interlocked.Increment(ref _currentThreadCount);
if (id > _maxThread)
{
// 可開的 Thread 到達極限, 無法加開
Interlocked.Decrement(ref _currentThreadCount);
return;
}

Thread thread = new Thread(ThreadBody);
thread.Name = $"Thread-{id}";
thread.Start();

this._threads.Add(thread);

Console.WriteLine($"Thread count : {this._currentThreadCount}");
}

private void ThreadBody()
{

}
}

實作 TheadBody

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private void ThreadBody()
{
string name = Thread.CurrentThread.Name;
Console.WriteLine(name + " starts");

// 如果有 Task 還沒有塞完就一直搶來處理
while (!this._jobQueue.IsCompleted)
{
int task = 0;
while(_jobQueue.TryTake(out task, 100))
{
// 模擬 Task 要執行的時間, 0.1 ~ 0.5秒不等
Random rnd = new Random();
int excuteTime = 0;
excuteTime = rnd.Next(100, 500);
Console.WriteLine($"{name} do task_{task} spend {excuteTime} ms");

Thread.Sleep(excuteTime);
}
}

Console.WriteLine(name + " are closed");
}

這邊會有一個問題, 如果塞 Task 的速度不定, 可能時快時慢, 如果中間間隔過長會導致 Thread 一直拿不到 Task 來工作卻又停不下來, 持續空轉 , 所以必須加一個機制讓它等待, 並在必要時刻喚醒


ManualResetEvent

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private ManualResetEvent _mre;
public MyThreadPool(int minThread,int maxThread)
{
...
this._mre = new ManualResetEvent(false);
...
}

...
private void ThreadBody()
{
string name = Thread.CurrentThread.Name;
Console.WriteLine(name + " starts");

// 如果有 Task 還沒有塞完就一直搶來處理
while (!this._jobQueue.IsCompleted)
{
int task = 0;
while(_jobQueue.TryTake(out task, 100))
{
Random rnd = new Random();
int excuteTime = 0;
excuteTime = rnd.Next(100, 500);
Console.WriteLine($"{name} do task_{task} spend {excuteTime} ms");
Thread.Sleep(excuteTime);
}

_mre.WaitOne();
}
}

MSDN : ManualResetEvent Class

ManualResetEvent 就像是個手動的紅綠燈, 可以將 Thread Block 在 WaitOne 這行, 直到呼叫 ManualResetEvent.Set()將燈號切成綠燈, 全部的 Thread 才會往下繼續執行。

與之相對的是 AutoResetEvent Class , 差別只在於 ManualResetEvent 呼叫 Set (綠燈), Reset (紅燈)。而 AutoResetEvent 呼叫 Set 每次只會隨機放一條 Thread , 不像 ManualRestEvent 是全部放


Enqueue

1
2
3
4
5
6
7
8
9
10
11
12
13
public void Enqueue(int i)
{
// 表示容量滿了
while (_jobQueue.TryAdd(i) == false)
{
// Queue Length 過長, 需加開 Thread
this.CreateThread();
}
// 切成綠燈, 放掉所有 Thread 開始搶工作做
_mre.Set();
// 切成紅燈, 如果有 Tread 搶不到事情來做又會被 Block 在 _mre.WaitOne(); 那行
_mre.Reset();
}

實作動態伸縮 Thread 數量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private void ThreadBody()
{
...

while (!this._jobQueue.IsCompleted)
{
if (_mre.WaitOne(5000) == false)
{
// 進到這邊表示 Thread 並不是因為_mre.Set()被喚醒, 而是5秒 timeout
// 嘗試回收 Thread
if (Interlocked.Decrement(ref _currentThreadCount) < _minThread)
{
Interlocked.Increment(ref _currentThreadCount);
}
else
{
Console.WriteLine($"Thread count : {this._currentThreadCount}");
break;
}
}
}
Console.WriteLine(name + " are closed");
}

WaitFinished

這個方法如果被呼叫時, 應該將所有 Thread 都喚醒, 並且等待每個 Thread 都執行完畢

1
2
3
4
5
6
7
8
9
10
11
12
13
public void WaitFinished()
{
this._jobQueue.CompleteAdding();
_mre.Set();

foreach (var t in _threads)
{
if (t != null)
{
t.Join();
}
}
}

**完整版的 Code **

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class Program
{
static void Main(string[] args)
{
// 建立自己的 ThreadPool ,設定 Thread 至少保持 3 條,上限不超過 10 條
MyThreadPool myTreadPool = new MyThreadPool(3,10);

// 一值塞任務進去,不管任務執行完了沒
// 所以 ThreadPool 應該要能接住 Task 讓它們排隊消耗
for (int i = 0; i < 200; i++)
{
myTreadPool.Enqueue(i);
}

Console.ReadKey();
// 等待執行結束
myTreadPool.WaitFinished();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
public class MyThreadPool
{
private BlockingCollection<int> _jobQueue;

private int _minThread;

private int _maxThread;

private int _currentThreadCount;

private ManualResetEvent _mre;

private List<Thread> _threads;

public MyThreadPool(int minThread,int maxThread)
{
this._minThread = minThread;
this._maxThread = maxThread;
// 最多只能放 100 個 Task
this._jobQueue = new BlockingCollection<int>(100);
this._currentThreadCount = 0;
this._mre = new ManualResetEvent(false);
this._threads = new List<Thread>();

for (int i = 0; i < minThread; i++)
{
this.CreateThread();
}
}

private void CreateThread()
{
int id = Interlocked.Increment(ref _currentThreadCount);
if (id > _maxThread)
{
// 可開的 Thread 到達極限, 無法加開
Interlocked.Decrement(ref _currentThreadCount);
return;
}

Thread thread = new Thread(ThreadBody);
thread.Name = $"Thread-{id}";
thread.Start();

this._threads.Add(thread);

Console.WriteLine($"Thread count : {this._currentThreadCount}");
}

public void WaitFinished()
{
this._jobQueue.CompleteAdding();
_mre.Set();

foreach (var t in _threads)
{
if (t != null)
{
t.Join();
}
}
}

public void Enqueue(int i)
{
_mre.Set();

// 表示容量滿了
while (_jobQueue.TryAdd(i) == false)
{
// Queue Length 過長, 需加開 Thread
this.CreateThread();
}

_mre.Reset();
}

private void ThreadBody()
{
string name = Thread.CurrentThread.Name;
Console.WriteLine(name + " starts");

// 如果有 Task 還沒有塞完就一直搶來處理
while (!this._jobQueue.IsCompleted)
{
int task = 0;
while(_jobQueue.TryTake(out task, 100))
{
Random rnd = new Random();
int excuteTime = 0;
excuteTime = rnd.Next(100, 500);
Console.WriteLine($"{name} do task_{task} spend {excuteTime} ms");
Thread.Sleep(excuteTime);
}


if (_mre.WaitOne(5000) == false)
{
// 此條 Thread 5秒都沒有工作, 嘗試收掉
if (Interlocked.Decrement(ref _currentThreadCount) < _minThread)
{
Interlocked.Increment(ref _currentThreadCount);
}
else
{
Console.WriteLine($"Thread count : {this._currentThreadCount}");
break;
}
}
}

Console.WriteLine(name + " are closed");
}
}

執行結果

執行中應該可以看到 Task 消化不夠快而加開 Thread

/images/20190801/1.png

最後因為程式在等待 ReadKey(), 如果超過 5 秒不按會看到 Thread 收掉的訊息

/images/20190801/2.png

按下任意鍵,就會全部收掉並關閉程式