0%

windows server 2019 重開機後,自建 container nat networks 會消失。
來源 : Windows container network drivers

1
NAT networks created on Windows Server 2019 (or above) are no longer persisted after reboot.

所以我有寫一些重啟機器時檢查 network ,如果不存在就再建立一組並且啟動 docker,但時不時會碰到 Error response from daemon: hnsCall failed in Win32: The object already exists. (0x1392) 的錯誤。

但透過以下指令卻又都找不到已經存在的 network mapping

1
2
$ docker network ls
$ Get-NetNatStaticMapping

直接下指令砍掉要建立的 network 名稱,會回傳這物件不存在,但你要建立又會說這物件已經存在的鬼擋牆狀況… 這應該是 windows container bug ,之前只要碰到這個問題都直接重長機器,而剛剛終於找到解法了 (灑花)

解法

1
2
3
4
5
6
7
8
# 先停掉 docker service
$Stop-Service docker

# 停掉 hns
$Stop-Service hns

# 重啟 docker service, 這會連帶的 hns 也會被啟動
$Start-Service docker

這時候再重新跑 docker network create 就會過了,這問題卡了我很長一段時間,終於找到解法了

/images/20200226/0.jpg

(圖片出處: https://sploot.tw/2018/06/powershell-suite-windows-attack-tookit/)

powershell 苦手如我,最近有越來越多的需求要從頭到尾自建 CI/CD 與自動化佈署流程,每次寫都要查一次覺得很煩(越老越金魚腦),決定把常常用到的筆記方便查找

ENV

1
2
3
4
5
6
7
8
#列出所有的環境變數
$gci env:* | Sort-Object name

#result
Name Value
---- -----
ALLUSERSPROFILE C:\ProgramData
APPDATA C:\Users\Steven Tsai\AppData\Roaming
1
2
3
4
5
6
#設定環境變數
$env:test="123"
$env:test

#result
123

Variable

1
2
3
4
$test="123"
$test
#result
123

If … Else

1
2
3
4
$test="123"
$if($test -eq "123") {echo "yes"} else {echo "no"}
#result
yes
1
2
3
4
5
6
#把判斷的結果存到變數中
$test="123"
$result=if($test -eq "123") {echo "yes"} else {echo "no"}
$result
#result
yes

Split

1
2
3
4
5
6
$test="a,b,c"
$test.Split(",")
#result
a
b
c
1
2
3
4
$test="a,b,c"
$test.Split(",")[0]
#result
a

Invoke-WebRequest

1
2
3
4
5
#需要帳密上傳檔案
$PASSWORD = ConvertTo-SecureString -String "your_password" -AsPlainText -Force
$CREDENTIAL = New-Object -TypeName "System.Management.Automation.PSCredential" -ArgumentList "your_account", $PASSWORD

$Invoke-WebRequest -Method PUT -Uri https://server/file.zip -UseBasicParsing -Credential $CREDENTIAL -Infile ".\file.zip"

Remove-Item

1
2
#force強制刪除
$Remove-Item .\file.zip -Force

New-Item

1
2
3
4
5
6
7
8
#建立 logs 資料夾
$New-Item -ItemType directory -Path C:\logs

#result
目錄: C:\
Mode LastWriteTime Length Name
---- ------------- ------ ----
d----- 2020/2/26 上午 12:28 test

Copy-Item

1
$Copy-Item c:\source_folder C:\target_folder\ -Recurse

Out-File

1
2
3
#將文字寫成檔案
$test="123"
$test | Out-File "C:\log.txt" -Encoding utf8 -Force

Write file without BOM

1
2
$test="123"
$[System.IO.File]::WriteAllLines("C:\log.txt", $test)

Start-Transcript

通常腳本會在長機器的時候自動執行,這時候並不會有人工介入,如果發生錯誤就很需要事後追查 Log ,但一段腳本可能上百上千行,到底錯在哪很難找,這時候就可以透過以下方法把執行的過程跟 output 都儲存下來

1
2
3
$Start-Transcript -Path "c:\logs.txt" -Append
$New-Item -Type Directory C:\TestData2\ -Force
$Stop-Transcript

執行結束後打開 logs.txt 就可以看到以下輸出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
**********************
Windows PowerShell 轉譯開始
**********************
已啟動轉譯,輸出檔為 c:\logs.txt
PS C:\Users\Steven Tsai> New-Item -Type Directory C:\TestData2\ -Force


目錄: C:\


Mode LastWriteTime Length Name
---- ------------- ------ ----
d----- 2020/2/26 上午 12:40 TestData2


PS C:\Users\Steven Tsai> Stop-Transcript
**********************
Windows PowerShell 轉譯結束
結束時間: 20200226004058
**********************

Set Global Enviroment

這是用在自動化安裝一些套件,例如 : Git , consul 後需要設定全域的環境變數,才能讓之後的指令能夠認得 consul –version 這類指令。

如果手動安裝,就是去把環境變數中 Path 加上 consul.exe 的位置,以下範例是抓出 Path 本來的設定值,並在尾段補上新的路徑設定

1
2
3
4
5
$oldpath = (Get-ItemProperty -Path 'Registry::HKEY_LOCAL_MACHINE\System\CurrentControlSet\Control\Session Manager\Environment' -Name PATH).path

$newpath = "${oldpath};C:\consul\"

$Set-ItemProperty -Path 'Registry::HKEY_LOCAL_MACHINE\System\CurrentControlSet\Control\Session Manager\Environment' -Name PATH -Value $newPath

Start-Service

1
$Start-Service your_service_name

register task scheduler

1
2
3
4
5
6
7
8
9
10
11
#要執行的動作
$action= (New-ScheduledTaskAction -Execute "powershell.exe" -Argument "-ExecutionPolicy bypass C:\start.ps1")

#執行權限
$ProvisionPrincipal = New-ScheduledTaskPrincipal -UserId "NT AUTHORITY\SYSTEM" -RunLevel Highest -LogonType S4U

#觸發的時機
$ProvisionTrigger = New-ScheduledTaskTrigger -AtStartup

#註冊 task Schedule
$Register-ScheduledTask -TaskName "StartConsul" -TaskPath "\" -Action $action -Trigger $ProvisionTrigger -Principal $ProvisionPrincipal

Create User

1
2
3
4
5
6
7
8
9
10
11
12
13
#設定 User 密碼
$Secure_String_Pwd=ConvertTo-SecureString "user_password" -AsPlainText -Force

#建立User
$New-LocalUser "teamuser" -Password $Secure_String_Pwd -FullName "Arch team user" -Description "for arch team" -PasswordNeverExpires:$true

#將此 User 加入 Administrators
$Add-LocalGroupMember -Group "Administrators" -Member "teamuser"

#建立一個全新的 Group
$New-LocalGroup -Name "docker-users"
#將此 User 加進去
$Add-LocalGroupMember -Group "docker-users" -Member "archteamuser"

Expand-Archive

1
2
3
$curl https://source_file.zip -o c:\targe_file.zip

$Expand-Archive c:\targe_file.zip c:\file -force

希望學會這個技巧後,以後大家弄CI/CD環境都能海闊天空

(圖片出處 : http://ezvivi2.com/article/200483.asp)

有在處理 CI/CD 的人應該都碰到過維護建置環境的問題,舉例來說,當今天開發 C# 專案可能有 dotnet framework、dotnet core、有些人開發前端會需要 npm …等等,CI 機器就需要裝一堆為了建置佈署的軟體,如果開發人員變多,建置排隊久候,可能就會將環境升級為 Master、Slave 架構,但又面臨了多台 Slave 如何快速增長(通常只有上班時間才會同時這麼多人在建置,所以動態增長 CI 機器有其必要),如何維護多台 Slave 環境,有時候遇到需要的套件版本打架的時候,處理起來真的是會抓狂。

透過容器來建置專案

其實綜觀上述的問題可以發現,每個專案建置所需要的東西可能不盡相同,為了讓 CI 機器能夠滿足所有建置的條件往往會把環境搞得過於複雜,這時候容器就是一個非常好的選擇,它滿足了每次建置環境都是獨立隔離的條件且方便佈署。

一旦使用容器來做為建置的媒介,需要長一台新的 CI 機器時,只要將機器開起來並安裝完 docker 就搞定了(甚至 AWS 都有做好的現成 AMI 連自己安裝都省了),不再需要寫一狗票的腳本來安裝機器。

實例

以下是我一個專案建置時的 dockerfile,沒幾行的 script 就快速講一下,因為我這個專案裡面同時有 dotnet framework 與 dotnet core ,所以找個微軟官方提供的 mcr.microsoft.com/dotnet/framework/sdk:4.8, 裡面已經安裝了下列套件。 Docker Hub 連結

1
2
3
4
5
6
.NET Framework Runtime
Visual Studio Build Tools
Visual Studio Test Agent
NuGet CLI
.NET Framework Targeting Packs
ASP.NET Web Targets

我用這個 base image 開了 workspace 資料夾並把 source code 複製進去後,接著就是大家熟悉的 nuget restore 、 dotnet build、 dotnet publish 在容器的環境內建置專案。

第二段是起另一個 base image windows servercore 2019,將剛剛建置好的 artifact 放到指定的資料夾,最後這一包會建置成 image 並推到公司的 Artifact management 上,其它人只要拉下這個 image 就可以執行了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
FROM mcr.microsoft.com/dotnet/framework/sdk:4.8 AS build-env

WORKDIR c:/workspace

COPY . ./

RUN nuget restore src\nmqv3.sln
RUN dotnet restore --configfile src\.nuget\NuGet.Config src\my_project.sln

run dotnet build -c release src\my_project.sln
run dotnet publish -c Release -r win-x64 --self-contained true src\Router\Router.csproj


FROM mcr.microsoft.com/windows/servercore:ltsc2019

WORKDIR c:/worker
COPY --from=build-env c:/workspace/src/Worker/bin/release/ .

WORKDIR c:/router
COPY --from=build-env c:/workspace/src/Router/bin/Release/netcoreapp3.0/win-x64/publish/ .

從建置到最後要交付的 container image 一氣呵成,全部都濃縮在一份 dockerfile 裡面,這份 dockerfile 會跟著專案內,只要任何一台機器可以執行 docker container 就可以建置佈署這個專案,是不是方便許多 XD

ThreadLocal

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
static ThreadLocal<string> LocalString = new ThreadLocal<string>();

static async Task Main(string[] args)
{
LocalString.Value = "Value 1";
Console.WriteLine($"【A】 Thread: {Thread.CurrentThread.ManagedThreadId}, ExcutionContext: {Thread.CurrentThread.ExecutionContext.GetHashCode()} ,value: {LocalString.Value}");

var t1 = AsyncMethod();

Console.WriteLine($"【D】 Thread: {Thread.CurrentThread.ManagedThreadId}, ExcutionContext: {Thread.CurrentThread.ExecutionContext.GetHashCode()} ,value: {LocalString.Value}");

await t1;
}

static async Task AsyncMethod()
{
Console.WriteLine($"【B】 Thread: {Thread.CurrentThread.ManagedThreadId}, ExcutionContext: {Thread.CurrentThread.ExecutionContext.GetHashCode()} ,value: {LocalString.Value}");

LocalString.Value = "Value 3";
Console.WriteLine($"【C】 Thread: {Thread.CurrentThread.ManagedThreadId}, ExcutionContext: {Thread.CurrentThread.ExecutionContext.GetHashCode()} ,value: {LocalString.Value}");

await Task.Delay(100);

Console.WriteLine($"【E】 Thread: {Thread.CurrentThread.ManagedThreadId}, ExcutionContext: {Thread.CurrentThread.ExecutionContext.GetHashCode()} ,value: {LocalString.Value}");
}


//【A】 Thread: 14, ExcutionContext: 37151951 ,value: Value 1
//【B】 Thread: 14, ExcutionContext: 51676517 ,value: Value 1
//【C】 Thread: 14, ExcutionContext: 51676517 ,value: Value 3
//【D】 Thread: 14, ExcutionContext: 37151951 ,value: Value 3
//【E】 Thread: 16, ExcutionContext: 46128400 ,value:

ThreadLocal 非常容易理解, 每個 Thread 之間彼此是隔離的, 即便 ExcutionContext 不同, 但只要是同一個 Thread 都會是共用的。

所以可以看到 await 之後, 因為不同 Thread 執行剩下的 Code , ThreadLocal 的值就變成預設的空值


AsyncLocal

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
static AsyncLocal<string> LocalString = new AsyncLocal<string>();

static async Task Main(string[] args)
{
LocalString.Value = "Value 1";
Console.WriteLine($"【A】 Thread: {Thread.CurrentThread.ManagedThreadId}, ExcutionContext: {Thread.CurrentThread.ExecutionContext.GetHashCode()} ,value: {LocalString.Value}");

var t1 = AsyncMethod();

Console.WriteLine($"【D】 Thread: {Thread.CurrentThread.ManagedThreadId}, ExcutionContext: {Thread.CurrentThread.ExecutionContext.GetHashCode()} ,value: {LocalString.Value}");

await t1;
}

static async Task AsyncMethod()
{
Console.WriteLine($"【B】 Thread: {Thread.CurrentThread.ManagedThreadId}, ExcutionContext: {Thread.CurrentThread.ExecutionContext.GetHashCode()} ,value: {LocalString.Value}");

LocalString.Value = "Value 3";
Console.WriteLine($"【C】 Thread: {Thread.CurrentThread.ManagedThreadId}, ExcutionContext: {Thread.CurrentThread.ExecutionContext.GetHashCode()} ,value: {LocalString.Value}");

await Task.Delay(100);

Console.WriteLine($"【E】 Thread: {Thread.CurrentThread.ManagedThreadId}, ExcutionContext: {Thread.CurrentThread.ExecutionContext.GetHashCode()} ,value: {LocalString.Value}");
}



//【A】 Thread: 14, ExcutionContext: 5705933 ,value: Value 1
//【B】 Thread: 14, ExcutionContext: 12517624 ,value: Value 1
//【C】 Thread: 14, ExcutionContext: 12517624 ,value: Value 3
//【D】 Thread: 14, ExcutionContext: 5705933 ,value: Value 1
//【E】 Thread: 8, ExcutionContext: 35765882 ,value: Value 3

AsyncLocal 會在每次需要切出 ExcutionContext 時複製一份給新的 ExcutionContext , 所以每個 ExcutionContext間的 AsyncLocal 都是獨立的,但同時可達跨 Thread , ExcutionContext 往下傳遞的特性。

/images/20191109/0.png

( https://www.codeprimers.com/service-discovery-in-microservice-architecture/ )


前言

傳統上一組服務通常都會搭配 DNS + Load balance + serivce cluster, 服務要呼叫時也都是直接打 Domain 讓 Load balance 去導流。

不過這也面臨了管理 Domain 與服務間的複雜度, 例如服務要加一台新的機器需要去設定 Load balance 才有辦法服務(當然也是可以搭配一些 cloud autoscaling 機制來簡化流程); Client 在呼叫 Service 時通常需要真的打打看才有辦法知道服務是否還正常, 無法事先知道服務狀態再決定是否要呼叫。

近年微服務與容器化運用的盛行, 上述方式就顯得有點卡卡的,如果你的服務在 k8s 內那可能還好, 大部分它都幫忙處理掉了, 但如果像我們公司一樣, 很多服務雖然容器化但還無法進到 k8s 內, container 可以在短時間內 scale 成數倍來因應大流量, 但卡在這些服務需要去 load balance 註冊才有辦法讓流量導進來, 那顯然就有點做半套的感覺, Service Discovery 的應用就成為了重要的課題


什麼是 Service Discovery

翻譯成中文就是服務發現(廢話…),顧名思義就是呼叫端在需要時先透過搜尋來找到對應的服務,而要達成這點需要一些配合才能達成。

首先第一步, 當某個服務起來時應該主動向提供 service discovery 註冊自己是什麼服務? 位置在哪? 是否已經可以開始服務了?

/images/20191109/1.gif

註冊時會搭配著 Health check 的機制, 告訴 service discovery 如何檢核這個節點是否健康的

/images/20191109/2.png

Client 在要叫用 service 時, 先透過 service name 詢問有哪些節點可用, 而 service discovery 也能準確地回應有哪些目前還是健康的節點可供呼叫

/images/20191109/3.png

當節點損壞時, 以 container 的案例, 會直接換掉不健康的 container 重長一組, 而這又會回到圖一註冊的流程。

更細緻一點的作法還有對 service discovery 回應的列表做快取,例如快取 1 秒,當服務自己要被收掉時, 做好 graceful shutdown 先把自己從服務列表中 deregister , 並且等待 3 秒後才關機,避免快取到的 Client 打來你剛好停止服務。

或是對服務加上 Tag ,當特定用戶或 VIP 連進來時可以透過搜尋特定機器去執行

/images/20191109/4.png


面臨的問題

但並不是所有事情都是如此美好, 公司目前導入這些機制雖然可以大幅簡化管理內部服務 domain 這類的問題, 但對於 load balance,即選取適合的節點這點上還有一些問題要克服,例如傳統 ELB 這類的服務都會依據每個節點的回應速度來判斷接下來導流的分配,但 service discovery 因為 request 並不會經過它,所以相關 latancy 也無從監控與觀察, 所以 load balance 這題必須額外拉出來做相對應的監控機制。

再來當服務橫跨 vm 跟 k8s 時情境就變得更複雜,不過這些題目等到之後有明確的方向再做整理好了,久違的文章雖然都沒寫到 code,但還是把一些觀念記錄下來。相關的實作等之後有空再補吧 (逃)

/images/20190819/0.jpg

最近因為任務關係對 NATS Streaming 做了一些研究跟 POC,這邊把一些研究到的NATS Streaming 特性記錄下來

什麼是 NATS

NATS 是一種 Queue , 它效能比一般的 Queue 好上許多,號稱 Sender / Reciever 每秒可高達 20 萬筆 Message,這是非常驚人的數字。

/images/20190819/1.png

(來源:https://bravenewgeek.com/dissecting-message-queues/)

可以看到 RabbitMQ 在它旁邊矮了一大截….


NATS 支援 Subscribe 模式,Subscriber 訂閱需要的 subject,當 NATS 收到 Message 時就會同時派發給所有訂閱者。

/images/20190819/2.png

(來源:https://nats-io.github.io/docs/developer/concepts/pubsub.html)


NATS 也支援 Queu Group,NATS 會對同一個 Queue Group 的所有 Subscriber 分配訂閱的 Message,而不會重複

/images/20190819/3.png

(來源:https://nats-io.github.io/docs/developer/concepts/queue.html)

不過這些都不算太特別,畢竟 RabbitMQ 一樣能做到相同的功能,最特別的是 NATS Streaming


什麼是 NATS Streaming

NATS Streaming 是附加在 NATS 之上的加值功能,他可以永久的保留所有的 Message,並且支援 Cursor(指標)功能,你可以任意的讓 Subscriber 回到特定的點 重播 所有的 Message。

這一點相當有用,試想一下,如果今天我們將所有交易的過程都透過 NATS Streaming 記錄下來,假設資料庫或記錄的地方損毀,我們只要重新 Replay 所有 Message,即能重建最後的結果。這也是 Event Sourcing 中最重要的的一環。

為了達成 Event Sourcing 這個設計模式的目標,演練了一些特定的情境

  1. 為了達成 HA 及加快處理速度的需求,需要多個 Subscriber 同時訂閱相同 Subject,並且彼此不會做到重複的 Message。
  2. 演練災難還原,Subscriber 必須能回朔到過去的的定時間點,重新 Replay 所有歷程

1. Queue Group

其實第一項相對於容易,只要用 Queue Group 即可輕鬆達成,只要每個 Subscriber 給定同一組 Queue Group Name 即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
var options = StanOptions.GetDefaultOptions();

var subOptions = StanSubscriptionOptions.GetDefaultOptions();
subOptions.MaxInflight = 1; // MaxInflight


var cf = new StanConnectionFactory();

var c = cf.CreateConnection("test-cluster", clientId, options);
var s = c.Subscribe("foo", queueGroup, subOptions, (obj, args) =>
{
......
});

//等待接收關機訊號

//關閉連線
s.Close();
c.Close();
c.Dispose();

這邊需要特別注意 MaxInflight 這個參數,為了效率 NATS 預設會一次派給 Subscriber 1000 則Message(預設值有點忘記),好讓 Subscriber 不需要每做完一筆才透過網路要下一筆 Message,但這也讓我測出一個問題,當預收的訊息太多並且 Subscriber 根本來不及消化掉,這時候 NATS 會判定該則訊息 timeout,轉而派給同群別的Subscriber 來處理,這時候可能會發生兩個 Subscriber 執行到同一筆 Message 的狀況,所以請估算好 MaxInflight 的值,避免抓一堆做不完重派的狀況發生。

當最後一個 Subscriber 離線後,Queue Group 即會被刪除


2. Durable Name

前面有提到 NATS Streaming 支援 Cursor 的功能,當 Subscriber 都離線時,如果有給它 Durable Name,他會記得上次你執行到哪一筆,當下次 Subscriber 重新連上時會從那個點繼續往下派發

1
2
var subOptions = StanSubscriptionOptions.GetDefaultOptions();
subOptions.DurableName = _appSetting.DURABLENAME;

3. StartAt

接著來實作災難還原的步驟,首先如何讓特定的 Queue Group 退到特定的點,呈上,同時多個 Subscriber 與 Durable Name 都還是要能符合。

SDK 其實有提供 StartAt() 的 API

1
2
3
// Receive messages starting at a specific sequence number
var subOptions = StanSubscriptionOptions.GetDefaultOptions();
subOptions.StartAt(22);

不過這邊需特別注意,同一個 Queue Group 只有在第一個 Subscriber 進來時(也就建立這個 Queue Group 的時候)StartAt 才會生效,之後進來的 Subscriber 帶這個值都會被直接忽略掉。

這邊問題就來了,當我們帶了 Queue Group + Durable Name,即便所有的 Subscriber 都離線了,NATS 還是會貼心的幫你記錄最後執行到的地方,所以之後進來的 Subscriber 也都不可能是第一個了,換言之這樣的狀況是無法重新設定指標的。

所以如果要刪除 Queue Group + Durable Name 的 Queue Group 唯一的方法只有讓所有的 Subscriber 退訂閱

1
2
// 取消訂閱 刪除該Group
s.Unsubscribe();

這邊跟上面的範例程式不一樣,上面的是用 **Close()**,這樣並不會讓 Subscriber 退訂閱,只會是離線而已,唯有用 Unsubscribe() 將 Subscriber 全部退訂閱後,有 Durable Name 的 Queue Group 才會被刪除掉。

另外 NATS SDK 並沒有提供查詢 Queue Group 所有 Subscriber 的 Client ID,依據官方的說法,這份清單應該是使用者需要自己維護的,在上面這個案例上,就會需要將所有 Subscriber 用相同 Client ID 連上線後退訂閱。


4. ManualAcks

用上述的方法成功刪除 Queue Group 後,下一步就是重新讓第一個 Subscriber 去建立 Queue Group 並指定 StartAt ,不過這邊有點麻煩的地方是,Subscriber 一連線後馬上就會開始拋 Message 過來,這時後如果你是希望災難還原與重新執行 Replay 分開做時就會很困擾。

預設 StanSubscriptionOptions 是收到 Message 後自動就幫你 Ack 回報 NATS 你收到了在處理了,NATS 也就會繼續往下把下一筆分配給別人,但我只是想指定位子不想要處理 Message 呢?

這時候就要透過設定將 Ack 改成手動回報

1
2
3
4
5
6
7
8
9
var subOptions = StanSubscriptionOptions.GetDefaultOptions();
subOptions.ManualAcks = true;

.......

var s = c.Subscribe("foo", queueGroup, subOptions, (obj, args) =>
{
args.Message.Ack();
});

這樣只要不呼叫 **Ack()**,就等於不處理這筆 Message 了,當 Subscriber 離線或是 Message timeout,NATS 就會重新分派給其他 Subscriber 處理了,這樣也間接達到重建 Queue Group 與指定 Cursor 的目標。

GitHub SourceCode : 連結

自從加入精神時光屋團隊後,對於一些平行處理、多執行序等程式掌控力就越來越要求。為了不要當拖油瓶,這次練習的是 ThreadPoll,從這個練習可以更精準掌握 Thread 控制技巧。


目標

建立 Thread 其實是需要成本的,所以頻繁的建立 Thread 砍掉 Thread 相當耗效能,另一方面,如果等到有 Task 才開始建立 Thread 可能會導致第一個 Task 回應速度過慢。例如 : IIS 底層就有 ThreadPool ,為了因應 Request 進來時能更快速的回應,但又不能為了一昧追求速度不管資源的有效運用,所以ThreadPool 應該有增長跟最低維持幾條的上下限。

依據上面描述定義以下目標

  1. 自製 ThreadPool
  2. 依據 Task 量決定 Thread 的數量,Task 過多會加開 Thread 處理,反之會減少
  3. ThreadPool 可以設定 Thread 數量上下限

實作

希望呼叫的方式如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class Program
{
static void Main(string[] args)
{
// 建立自己的 ThreadPool ,設定 Thread 至少保持 3 條,上限不超過 10 條
MyThreadPool myTreadPool = new MyThreadPool(3,10);

// 一值塞任務進去,不管任務執行完了沒
// 所以 ThreadPool 應該要能接住 Task 讓它們排隊消耗
for (int i = 0; i < 200; i++)
{
myTreadPool.Enqueue(i);
}

Console.ReadKey();
// 等待執行結束
myTreadPool.WaitFinished();
}
}

MyThreadPool

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class MyThreadPool
{
private BlockingCollection<int> _jobQueue;

private int _minThread;

private int _maxThread;

public MyThreadPool(int minThread,int maxThread)
{
this._minThread = minThread;
this._maxThread = maxThread;
// 最多只能放 100 個 Task
this._jobQueue = new BlockingCollection<int>(100);
}

public void WaitFinished()
{
throw new NotImplementedException();
}

public void Enqueue(int i)
{
_jobQueue.TryAdd(i);
}
}

實作 ThreadPool 保有最少的 Thread

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class MyThreadPool
{
private BlockingCollection<int> _jobQueue;

private int _minThread;

private int _maxThread;

private int _currentThreadCount;

private List<Thread> _threads;

public MyThreadPool(int minThread,int maxThread)
{
this._minThread = minThread;
this._maxThread = maxThread;
this._jobQueue = new BlockingCollection<int>(100);
this._currentThreadCount = 0;
this._threads = new List<Thread>();

for (int i = 0; i < minThread; i++)
{
this.CreateThread();
}
}

private void CreateThread()
{
int id = Interlocked.Increment(ref _currentThreadCount);
if (id > _maxThread)
{
// 可開的 Thread 到達極限, 無法加開
Interlocked.Decrement(ref _currentThreadCount);
return;
}

Thread thread = new Thread(ThreadBody);
thread.Name = $"Thread-{id}";
thread.Start();

this._threads.Add(thread);

Console.WriteLine($"Thread count : {this._currentThreadCount}");
}

private void ThreadBody()
{

}
}

實作 TheadBody

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private void ThreadBody()
{
string name = Thread.CurrentThread.Name;
Console.WriteLine(name + " starts");

// 如果有 Task 還沒有塞完就一直搶來處理
while (!this._jobQueue.IsCompleted)
{
int task = 0;
while(_jobQueue.TryTake(out task, 100))
{
// 模擬 Task 要執行的時間, 0.1 ~ 0.5秒不等
Random rnd = new Random();
int excuteTime = 0;
excuteTime = rnd.Next(100, 500);
Console.WriteLine($"{name} do task_{task} spend {excuteTime} ms");

Thread.Sleep(excuteTime);
}
}

Console.WriteLine(name + " are closed");
}

這邊會有一個問題, 如果塞 Task 的速度不定, 可能時快時慢, 如果中間間隔過長會導致 Thread 一直拿不到 Task 來工作卻又停不下來, 持續空轉 , 所以必須加一個機制讓它等待, 並在必要時刻喚醒


ManualResetEvent

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private ManualResetEvent _mre;
public MyThreadPool(int minThread,int maxThread)
{
...
this._mre = new ManualResetEvent(false);
...
}

...
private void ThreadBody()
{
string name = Thread.CurrentThread.Name;
Console.WriteLine(name + " starts");

// 如果有 Task 還沒有塞完就一直搶來處理
while (!this._jobQueue.IsCompleted)
{
int task = 0;
while(_jobQueue.TryTake(out task, 100))
{
Random rnd = new Random();
int excuteTime = 0;
excuteTime = rnd.Next(100, 500);
Console.WriteLine($"{name} do task_{task} spend {excuteTime} ms");
Thread.Sleep(excuteTime);
}

_mre.WaitOne();
}
}

MSDN : ManualResetEvent Class

ManualResetEvent 就像是個手動的紅綠燈, 可以將 Thread Block 在 WaitOne 這行, 直到呼叫 ManualResetEvent.Set()將燈號切成綠燈, 全部的 Thread 才會往下繼續執行。

與之相對的是 AutoResetEvent Class , 差別只在於 ManualResetEvent 呼叫 Set (綠燈), Reset (紅燈)。而 AutoResetEvent 呼叫 Set 每次只會隨機放一條 Thread , 不像 ManualRestEvent 是全部放


Enqueue

1
2
3
4
5
6
7
8
9
10
11
12
13
public void Enqueue(int i)
{
// 表示容量滿了
while (_jobQueue.TryAdd(i) == false)
{
// Queue Length 過長, 需加開 Thread
this.CreateThread();
}
// 切成綠燈, 放掉所有 Thread 開始搶工作做
_mre.Set();
// 切成紅燈, 如果有 Tread 搶不到事情來做又會被 Block 在 _mre.WaitOne(); 那行
_mre.Reset();
}

實作動態伸縮 Thread 數量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private void ThreadBody()
{
...

while (!this._jobQueue.IsCompleted)
{
if (_mre.WaitOne(5000) == false)
{
// 進到這邊表示 Thread 並不是因為_mre.Set()被喚醒, 而是5秒 timeout
// 嘗試回收 Thread
if (Interlocked.Decrement(ref _currentThreadCount) < _minThread)
{
Interlocked.Increment(ref _currentThreadCount);
}
else
{
Console.WriteLine($"Thread count : {this._currentThreadCount}");
break;
}
}
}
Console.WriteLine(name + " are closed");
}

WaitFinished

這個方法如果被呼叫時, 應該將所有 Thread 都喚醒, 並且等待每個 Thread 都執行完畢

1
2
3
4
5
6
7
8
9
10
11
12
13
public void WaitFinished()
{
this._jobQueue.CompleteAdding();
_mre.Set();

foreach (var t in _threads)
{
if (t != null)
{
t.Join();
}
}
}

**完整版的 Code **

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class Program
{
static void Main(string[] args)
{
// 建立自己的 ThreadPool ,設定 Thread 至少保持 3 條,上限不超過 10 條
MyThreadPool myTreadPool = new MyThreadPool(3,10);

// 一值塞任務進去,不管任務執行完了沒
// 所以 ThreadPool 應該要能接住 Task 讓它們排隊消耗
for (int i = 0; i < 200; i++)
{
myTreadPool.Enqueue(i);
}

Console.ReadKey();
// 等待執行結束
myTreadPool.WaitFinished();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
public class MyThreadPool
{
private BlockingCollection<int> _jobQueue;

private int _minThread;

private int _maxThread;

private int _currentThreadCount;

private ManualResetEvent _mre;

private List<Thread> _threads;

public MyThreadPool(int minThread,int maxThread)
{
this._minThread = minThread;
this._maxThread = maxThread;
// 最多只能放 100 個 Task
this._jobQueue = new BlockingCollection<int>(100);
this._currentThreadCount = 0;
this._mre = new ManualResetEvent(false);
this._threads = new List<Thread>();

for (int i = 0; i < minThread; i++)
{
this.CreateThread();
}
}

private void CreateThread()
{
int id = Interlocked.Increment(ref _currentThreadCount);
if (id > _maxThread)
{
// 可開的 Thread 到達極限, 無法加開
Interlocked.Decrement(ref _currentThreadCount);
return;
}

Thread thread = new Thread(ThreadBody);
thread.Name = $"Thread-{id}";
thread.Start();

this._threads.Add(thread);

Console.WriteLine($"Thread count : {this._currentThreadCount}");
}

public void WaitFinished()
{
this._jobQueue.CompleteAdding();
_mre.Set();

foreach (var t in _threads)
{
if (t != null)
{
t.Join();
}
}
}

public void Enqueue(int i)
{
_mre.Set();

// 表示容量滿了
while (_jobQueue.TryAdd(i) == false)
{
// Queue Length 過長, 需加開 Thread
this.CreateThread();
}

_mre.Reset();
}

private void ThreadBody()
{
string name = Thread.CurrentThread.Name;
Console.WriteLine(name + " starts");

// 如果有 Task 還沒有塞完就一直搶來處理
while (!this._jobQueue.IsCompleted)
{
int task = 0;
while(_jobQueue.TryTake(out task, 100))
{
Random rnd = new Random();
int excuteTime = 0;
excuteTime = rnd.Next(100, 500);
Console.WriteLine($"{name} do task_{task} spend {excuteTime} ms");
Thread.Sleep(excuteTime);
}


if (_mre.WaitOne(5000) == false)
{
// 此條 Thread 5秒都沒有工作, 嘗試收掉
if (Interlocked.Decrement(ref _currentThreadCount) < _minThread)
{
Interlocked.Increment(ref _currentThreadCount);
}
else
{
Console.WriteLine($"Thread count : {this._currentThreadCount}");
break;
}
}
}

Console.WriteLine(name + " are closed");
}
}

執行結果

執行中應該可以看到 Task 消化不夠快而加開 Thread

/images/20190801/1.png

最後因為程式在等待 ReadKey(), 如果超過 5 秒不按會看到 Thread 收掉的訊息

/images/20190801/2.png

按下任意鍵,就會全部收掉並關閉程式

情境

/images/20190530/0.png

最近在公司開發一支稱為 Linter 的小程式,負責檢查 Bitbucket PR 的異動內容,看看是否有符合公司規範 ,例如 : 加了 Config 後是否有確實在對應環境補上相關設定,盡量避免這種編譯檢查不出來但上線才壞掉的情況。

原本預想的架構圖如上圖,使用者發了 PR ,透過 Webhook 觸發 Jenkins Master ,由 Master 安排一台機器去執行檢查。

但面臨一個問題是,當 Linter 這支程式要更版時變得很麻煩,因為每台 Jenkins 都必須要更新這支程式,原本也有想說不如把 Linter 獨立一台機器寫成類似像 API 的服務好了,但未來勢必面臨太多 PR 導致瓶頸,之後在導入 ELB 加多台機器 …. 想著想著就覺得太麻煩了。

之後同事建議可以包成 dotnet tool 的工具,Linter 要更新時只是發佈新版 Nuget ,而不是去佈署每台 Jenkins Slave,而 Jenkins Slave 每次要執行檢查 PR 時,先檢查自己套件是否為最新版的再往下執行。

/images/20190530/1.png

整個流程瞬間變得簡單乾淨許多,所以就動手開始將這套程式包成 dotnet tool


製作 dotnet tool

首先將要包成 dotnet tool 的專案檔 csproj 加上 PackAsTool 設定

1
2
3
4
5
6
7
8
9
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp2.1</TargetFramework>
<AssemblyName>Linter</AssemblyName>
<RootNamespace>Linter</RootNamespace>
<PackAsTool>true</PackAsTool>
<Version>1.0.1</Version>
</PropertyGroup>

將執行檔打包成 nupkg

1
$ dotnet pack -c release -o nupkg -p:PackageVersion=1.0.0 src\Linter.csproj

推上 Nuget Server

1
$ nuget.exe push src\nupkg\Linter.1.0.0.nupkg <Nuget Key> -source <Nuget Server>

執行 dotnet tool

Install

1
$ dotnet tool install --add-source <Nuget Server> -g Linter

Update

1
$ dotnet tool update -g --no-cache Linter

接著就看你原本怎麼封裝 Cli ,直接執行即可

1
$ Linter check -n .......

結語

第一次使用 dotnet tool 有驚豔到,以前都只會傻傻的用 console application ,每次都覺得部屬超麻煩,以後透過 dotnet tool 真的是方便多了

/images/20190507/0.jpg

情境

DBA 反應偵測到有個 Update Query 很頻繁,且通常緊接著 Update 後都會再進 Select 把剛剛 Update 的資料拉走,資料量太大時頻率太高導致 SQL 效能瓶頸,建議調整成 Update 後直接把剛剛異動的資料拉走,而不是拆兩段查詢。

解決方案

當 SQL Server 執行 Update 時會 Lock 相關要異動的資料,並把異動前後的資料放入 Log 中,如果希望能在一次 Query 中就把資料 Update 並把影響到的資料回傳,可以透過 Ouput 這個子句來達成

Ouput 可用於以下情境

Delete

Insert

Update

Merge

我們有一張表,紀載著每個人的名稱與學期分數

/images/20190507/1.png

老師大發慈悲想把低於 60 分的人都改成 60 分及格,這時候語法可以這樣下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
DECLARE @UpdateLog table(
Id int not null,
Name nvarchar(20) not null,
OriScore int not null,
NewScore int not null
)

UPDATE dbo.[User]
SET Score = 60
OUTPUT INSERTED.Id,
INSERTED.Name,
DELETED.Score,
INSERTED.Score
INTO @UpdateLog
WHERE Score < 60

SELECT * FROM @UpdateLog

/images/20190507/2.png

情境

EC2 上的 windows VM 突然無法存取相對應的 AWS 服務,而這些權限原本都是透過 IAM Role 賦予的,之前文章有提過 AWS 機器是如何透過 IAM Role 得知自己有哪些權限

在該機器呼叫 metadata service 會回應錯誤

1
$ curl 169.254.169.254

詢問對 AWS 比較熟的一些同事,聽說在 windows server 2012 版本之後,更換 Instance Type 重新開機時會發生


解決方法

執行 C:\ProgramData\Amazon\EC2-Windows\Launch\Scripts\InitializeInstance.ps1 ,它會重新 Binding 一些相關設定,執行完後就好了