自從加入精神時光屋團隊後,對於一些平行處理、多執行序等程式掌控力就越來越要求。為了不要當拖油瓶,這次練習的是 ThreadPoll,從這個練習可以更精準掌握 Thread 控制技巧。
目標
建立 Thread 其實是需要成本的,所以頻繁的建立 Thread 砍掉 Thread 相當耗效能,另一方面,如果等到有 Task 才開始建立 Thread 可能會導致第一個 Task 回應速度過慢。例如 : IIS 底層就有 ThreadPool ,為了因應 Request 進來時能更快速的回應,但又不能為了一昧追求速度不管資源的有效運用,所以ThreadPool 應該有增長跟最低維持幾條的上下限。
依據上面描述定義以下目標
- 自製 ThreadPool
- 依據 Task 量決定 Thread 的數量,Task 過多會加開 Thread 處理,反之會減少
- ThreadPool 可以設定 Thread 數量上下限
實作
希望呼叫的方式如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| class Program { static void Main(string[] args) { MyThreadPool myTreadPool = new MyThreadPool(3,10);
for (int i = 0; i < 200; i++) { myTreadPool.Enqueue(i); }
Console.ReadKey(); myTreadPool.WaitFinished(); } }
|
MyThreadPool
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public class MyThreadPool { private BlockingCollection<int> _jobQueue;
private int _minThread;
private int _maxThread;
public MyThreadPool(int minThread,int maxThread) { this._minThread = minThread; this._maxThread = maxThread; this._jobQueue = new BlockingCollection<int>(100); }
public void WaitFinished() { throw new NotImplementedException(); }
public void Enqueue(int i) { _jobQueue.TryAdd(i); } }
|
實作 ThreadPool 保有最少的 Thread
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| public class MyThreadPool { private BlockingCollection<int> _jobQueue;
private int _minThread;
private int _maxThread;
private int _currentThreadCount; private List<Thread> _threads; public MyThreadPool(int minThread,int maxThread) { this._minThread = minThread; this._maxThread = maxThread; this._jobQueue = new BlockingCollection<int>(100); this._currentThreadCount = 0; this._threads = new List<Thread>();
for (int i = 0; i < minThread; i++) { this.CreateThread(); } }
private void CreateThread() { int id = Interlocked.Increment(ref _currentThreadCount); if (id > _maxThread) { Interlocked.Decrement(ref _currentThreadCount); return; }
Thread thread = new Thread(ThreadBody); thread.Name = $"Thread-{id}"; thread.Start();
this._threads.Add(thread);
Console.WriteLine($"Thread count : {this._currentThreadCount}"); }
private void ThreadBody() {
} }
|
實作 TheadBody
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| private void ThreadBody() { string name = Thread.CurrentThread.Name; Console.WriteLine(name + " starts");
while (!this._jobQueue.IsCompleted) { int task = 0; while(_jobQueue.TryTake(out task, 100)) { Random rnd = new Random(); int excuteTime = 0; excuteTime = rnd.Next(100, 500); Console.WriteLine($"{name} do task_{task} spend {excuteTime} ms"); Thread.Sleep(excuteTime); } }
Console.WriteLine(name + " are closed"); }
|
這邊會有一個問題, 如果塞 Task 的速度不定, 可能時快時慢, 如果中間間隔過長會導致 Thread 一直拿不到 Task 來工作卻又停不下來, 持續空轉 , 所以必須加一個機制讓它等待, 並在必要時刻喚醒
ManualResetEvent
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| private ManualResetEvent _mre; public MyThreadPool(int minThread,int maxThread) { ... this._mre = new ManualResetEvent(false); ... }
... private void ThreadBody() { string name = Thread.CurrentThread.Name; Console.WriteLine(name + " starts");
while (!this._jobQueue.IsCompleted) { int task = 0; while(_jobQueue.TryTake(out task, 100)) { Random rnd = new Random(); int excuteTime = 0; excuteTime = rnd.Next(100, 500); Console.WriteLine($"{name} do task_{task} spend {excuteTime} ms"); Thread.Sleep(excuteTime); }
_mre.WaitOne(); } }
|
MSDN : ManualResetEvent Class
ManualResetEvent 就像是個手動的紅綠燈, 可以將 Thread Block 在 WaitOne 這行, 直到呼叫 ManualResetEvent.Set()將燈號切成綠燈, 全部的 Thread 才會往下繼續執行。
與之相對的是 AutoResetEvent Class , 差別只在於 ManualResetEvent 呼叫 Set (綠燈), Reset (紅燈)。而 AutoResetEvent 呼叫 Set 每次只會隨機放一條 Thread , 不像 ManualRestEvent 是全部放
Enqueue
1 2 3 4 5 6 7 8 9 10 11 12 13
| public void Enqueue(int i) { while (_jobQueue.TryAdd(i) == false) { this.CreateThread(); } _mre.Set(); _mre.Reset(); }
|
實作動態伸縮 Thread 數量
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| private void ThreadBody() { ... while (!this._jobQueue.IsCompleted) { if (_mre.WaitOne(5000) == false) { if (Interlocked.Decrement(ref _currentThreadCount) < _minThread) { Interlocked.Increment(ref _currentThreadCount); } else { Console.WriteLine($"Thread count : {this._currentThreadCount}"); break; } } } Console.WriteLine(name + " are closed"); }
|
WaitFinished
這個方法如果被呼叫時, 應該將所有 Thread 都喚醒, 並且等待每個 Thread 都執行完畢
1 2 3 4 5 6 7 8 9 10 11 12 13
| public void WaitFinished() { this._jobQueue.CompleteAdding(); _mre.Set();
foreach (var t in _threads) { if (t != null) { t.Join(); } } }
|
**完整版的 Code **
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| class Program { static void Main(string[] args) { MyThreadPool myTreadPool = new MyThreadPool(3,10);
for (int i = 0; i < 200; i++) { myTreadPool.Enqueue(i); }
Console.ReadKey(); myTreadPool.WaitFinished(); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
| public class MyThreadPool { private BlockingCollection<int> _jobQueue;
private int _minThread;
private int _maxThread;
private int _currentThreadCount;
private ManualResetEvent _mre;
private List<Thread> _threads;
public MyThreadPool(int minThread,int maxThread) { this._minThread = minThread; this._maxThread = maxThread; this._jobQueue = new BlockingCollection<int>(100); this._currentThreadCount = 0; this._mre = new ManualResetEvent(false); this._threads = new List<Thread>();
for (int i = 0; i < minThread; i++) { this.CreateThread(); } }
private void CreateThread() { int id = Interlocked.Increment(ref _currentThreadCount); if (id > _maxThread) { Interlocked.Decrement(ref _currentThreadCount); return; }
Thread thread = new Thread(ThreadBody); thread.Name = $"Thread-{id}"; thread.Start();
this._threads.Add(thread);
Console.WriteLine($"Thread count : {this._currentThreadCount}"); }
public void WaitFinished() { this._jobQueue.CompleteAdding(); _mre.Set();
foreach (var t in _threads) { if (t != null) { t.Join(); } } }
public void Enqueue(int i) { _mre.Set();
while (_jobQueue.TryAdd(i) == false) { this.CreateThread(); }
_mre.Reset(); }
private void ThreadBody() { string name = Thread.CurrentThread.Name; Console.WriteLine(name + " starts");
while (!this._jobQueue.IsCompleted) { int task = 0; while(_jobQueue.TryTake(out task, 100)) { Random rnd = new Random(); int excuteTime = 0; excuteTime = rnd.Next(100, 500); Console.WriteLine($"{name} do task_{task} spend {excuteTime} ms"); Thread.Sleep(excuteTime); }
if (_mre.WaitOne(5000) == false) { if (Interlocked.Decrement(ref _currentThreadCount) < _minThread) { Interlocked.Increment(ref _currentThreadCount); } else { Console.WriteLine($"Thread count : {this._currentThreadCount}"); break; } } }
Console.WriteLine(name + " are closed"); } }
|
執行結果
執行中應該可以看到 Task 消化不夠快而加開 Thread
最後因為程式在等待 ReadKey(), 如果超過 5 秒不按會看到 Thread 收掉的訊息
按下任意鍵,就會全部收掉並關閉程式