Go 部落格
Go 併發模式:管道與取消
引言
Go 的併發原語使得構建流式資料管道變得容易,這些管道可以有效地利用 I/O 和多個 CPU。本文提供了此類管道的示例,強調了操作失敗時出現的細微之處,並介紹了優雅處理故障的技術。
什麼是管道?
在 Go 中沒有管道的正式定義;它只是眾多併發程式型別中的一種。非正式地講,管道是由通道連線的一系列階段,每個階段是一組執行相同函式的 goroutine。在每個階段中,goroutine 會
- 透過入站通道從上游接收值
- 對資料執行某些函式,通常會產生新值
- 透過出站通道將值傳送到下游
每個階段都有任意數量的入站和出站通道,但第一階段和最後階段除外,它們分別只有出站或入站通道。第一階段有時稱為源或生產者;最後階段稱為匯或消費者。
我們將從一個簡單的管道示例開始,以解釋這些思想和技術。稍後,我們將介紹一個更實際的示例。
數字平方
考慮一個有三個階段的管道。
第一階段,gen
,是一個將整數列表轉換為發射列表中整數的通道的函式。gen
函式啟動一個 goroutine,將整數傳送到通道,並在所有值傳送完畢後關閉通道
func gen(nums ...int) <-chan int { out := make(chan int) go func() { for _, n := range nums { out <- n } close(out) }() return out }
第二階段,sq
,從通道接收整數並返回一個通道,該通道發射接收到的每個整數的平方。在入站通道關閉且此階段將所有值傳送到下游後,它會關閉出站通道
func sq(in <-chan int) <-chan int { out := make(chan int) go func() { for n := range in { out <- n * n } close(out) }() return out }
main
函式設定管道並執行最後階段:它從第二階段接收值並列印每一個值,直到通道關閉
func main() { // Set up the pipeline. c := gen(2, 3) out := sq(c) // Consume the output. fmt.Println(<-out) // 4 fmt.Println(<-out) // 9 }
由於 sq
的入站和出站通道型別相同,我們可以多次組合它。我們也可以將 main
重寫為範圍迴圈,就像其他階段一樣
func main() { // Set up the pipeline and consume the output. for n := range sq(sq(gen(2, 3))) { fmt.Println(n) // 16 then 81 } }
扇出、扇入
多個函式可以從同一個通道讀取,直到該通道關閉;這稱為扇出。這提供了一種將工作分配給一組工作者以並行化 CPU 使用和 I/O 的方法。
一個函式可以透過將輸入通道複用到一個單一通道(該通道在所有輸入通道都關閉時關閉),從多個輸入讀取並繼續執行直到所有輸入都關閉。這稱為扇入。
我們可以修改我們的管道以執行兩個 sq
例項,每個例項從同一個輸入通道讀取。我們引入一個新函式 merge 來扇入結果
func main() { in := gen(2, 3) // Distribute the sq work across two goroutines that both read from in. c1 := sq(in) c2 := sq(in) // Consume the merged output from c1 and c2. for n := range merge(c1, c2) { fmt.Println(n) // 4 then 9, or 9 then 4 } }
merge
函式透過為每個入站通道啟動一個 goroutine 將值複製到唯一的出站通道,從而將通道列表轉換為單個通道。一旦所有 output
goroutine 都已啟動,merge
會再啟動一個 goroutine,以便在該通道上的所有傳送操作完成後關閉出站通道。
在已關閉的通道上傳送會引發 panic,因此在呼叫 close 之前確保所有傳送操作完成非常重要。sync.WaitGroup
型別提供了一種安排此同步的簡單方法
func merge(cs ...<-chan int) <-chan int { var wg sync.WaitGroup out := make(chan int) // Start an output goroutine for each input channel in cs. output // copies values from c to out until c is closed, then calls wg.Done. output := func(c <-chan int) { for n := range c { out <- n } wg.Done() } wg.Add(len(cs)) for _, c := range cs { go output(c) } // Start a goroutine to close out once all the output goroutines are // done. This must start after the wg.Add call. go func() { wg.Wait() close(out) }() return out }
中途停止
我們的管道函式有一個模式
- 階段在所有傳送操作完成後關閉其出站通道。
- 階段不斷從入站通道接收值,直到這些通道關閉。
這種模式允許每個接收階段寫成一個 range
迴圈,並確保一旦所有值都已成功傳送到下游,所有 goroutine 都會退出。
但在實際的管道中,階段並不總是接收所有入站值。有時這是故意設計的:接收方可能只需要值的子集就能繼續進行。更常見的情況是,階段提前退出,因為入站值表示先前階段發生了錯誤。無論哪種情況,接收方都不必等待剩餘的值到達,並且我們希望較早的階段停止產生後續階段不需要的值。
在我們的示例管道中,如果某個階段未能消費所有入站值,則嘗試傳送這些值的 goroutine 將無限期阻塞
// Consume the first value from the output. out := merge(c1, c2) fmt.Println(<-out) // 4 or 9 return // Since we didn't receive the second value from out, // one of the output goroutines is hung attempting to send it. }
這會導致資源洩露:goroutine 消耗記憶體和執行時資源,goroutine 堆疊中的堆引用會阻止資料被垃圾回收。goroutine 本身不會被垃圾回收;它們必須自行退出。
我們需要安排管道的上游階段即使在下游階段未能接收所有入站值時也能退出。一種方法是將出站通道更改為帶緩衝區。緩衝區可以容納固定數量的值;如果緩衝區中有空間,傳送操作會立即完成
c := make(chan int, 2) // buffer size 2
c <- 1 // succeeds immediately
c <- 2 // succeeds immediately
c <- 3 // blocks until another goroutine does <-c and receives 1
在建立通道時已知要傳送的值的數量時,緩衝區可以簡化程式碼。例如,我們可以重寫 gen
將整數列表複製到帶緩衝的通道中,並避免建立新的 goroutine
func gen(nums ...int) <-chan int { out := make(chan int, len(nums)) for _, n := range nums { out <- n } close(out) return out }
回到我們管道中被阻塞的 goroutine,我們可能會考慮為 merge
返回的出站通道新增一個緩衝區
func merge(cs ...<-chan int) <-chan int { var wg sync.WaitGroup out := make(chan int, 1) // enough space for the unread inputs // ... the rest is unchanged ...
雖然這解決了此程式中被阻塞的 goroutine,但這程式碼不好。這裡選擇緩衝區大小為 1 取決於知道 merge
將接收多少值以及下游階段將消費多少值。這是脆弱的:如果我們向 gen
傳遞額外的值,或者如果下游階段讀取的值減少了,我們又會遇到阻塞的 goroutine。
相反,我們需要提供一種方法,讓下游階段能夠向上遊傳送方指示它們將停止接受輸入。
顯式取消
當 main
決定在未接收到 out
中的所有值時退出,它必須告訴上游階段的 goroutine 放棄它們嘗試傳送的值。它透過向一個名為 done
的通道傳送值來實現。它傳送兩個值,因為潛在有兩個被阻塞的傳送方
func main() { in := gen(2, 3) // Distribute the sq work across two goroutines that both read from in. c1 := sq(in) c2 := sq(in) // Consume the first value from output. done := make(chan struct{}, 2) out := merge(done, c1, c2) fmt.Println(<-out) // 4 or 9 // Tell the remaining senders we're leaving. done <- struct{}{} done <- struct{}{} }
傳送 goroutine 將其傳送操作替換為 select
語句,該語句在向 out
傳送發生時或當它們從 done
接收到值時繼續執行。done
的值型別為空結構體,因為值不重要:是接收事件指示應放棄向 out
的傳送。output
goroutine 繼續在其入站通道 c
上迴圈,因此上游階段不會被阻塞。(我們稍後會討論如何允許此迴圈提前返回。)
func merge(done <-chan struct{}, cs ...<-chan int) <-chan int { var wg sync.WaitGroup out := make(chan int) // Start an output goroutine for each input channel in cs. output // copies values from c to out until c is closed or it receives a value // from done, then output calls wg.Done. output := func(c <-chan int) { for n := range c { select { case out <- n: case <-done: } } wg.Done() } // ... the rest is unchanged ...
這種方法存在一個問題:每個下游接收方需要知道潛在阻塞的上游傳送方的數量,並安排在提前返回時向這些傳送方發出訊號。跟蹤這些計數是乏味且容易出錯的。
我們需要一種方法來告訴未知且數量不限的 goroutine 停止向下遊傳送它們的值。在 Go 中,我們可以透過關閉通道來做到這一點,因為 在已關閉通道上的接收操作總是能立即進行,產生元素型別的零值。
這意味著 main
只需關閉 done
通道即可解除所有傳送方的阻塞。這種關閉實際上是對傳送方的廣播訊號。我們將管道函式每個都擴充套件為接受 done
作為引數,並透過 defer
語句安排關閉操作,以便 main
的所有返回路徑都將訊號通知管道階段退出。
func main() { // Set up a done channel that's shared by the whole pipeline, // and close that channel when this pipeline exits, as a signal // for all the goroutines we started to exit. done := make(chan struct{}) defer close(done) in := gen(done, 2, 3) // Distribute the sq work across two goroutines that both read from in. c1 := sq(done, in) c2 := sq(done, in) // Consume the first value from output. out := merge(done, c1, c2) fmt.Println(<-out) // 4 or 9 // done will be closed by the deferred call. }
現在,我們管道的每個階段都可以立即返回,只要 done
關閉。merge
中的 output
例程可以返回而無需耗盡其入站通道,因為它知道上游傳送方 sq
在 done
關閉時將停止嘗試傳送。output
透過 defer
語句確保在所有返回路徑上呼叫 wg.Done
func merge(done <-chan struct{}, cs ...<-chan int) <-chan int { var wg sync.WaitGroup out := make(chan int) // Start an output goroutine for each input channel in cs. output // copies values from c to out until c or done is closed, then calls // wg.Done. output := func(c <-chan int) { defer wg.Done() for n := range c { select { case out <- n: case <-done: return } } } // ... the rest is unchanged ...
類似地,sq
可以在 done
關閉後立即返回。sq
透過 defer
語句確保在其所有返回路徑上關閉其 out
通道
func sq(done <-chan struct{}, in <-chan int) <-chan int { out := make(chan int) go func() { defer close(out) for n := range in { select { case out <- n * n: case <-done: return } } }() return out }
以下是管道構建的指導方針
- 階段在所有傳送操作完成後關閉其出站通道。
- 階段持續從入站通道接收值,直到這些通道關閉或傳送方解除阻塞。
管道透過確保有足夠的緩衝區容納所有傳送的值,或者在接收方可能放棄通道時明確通知傳送方來解除傳送方的阻塞。
遍歷並摘要檔案樹
讓我們考慮一個更實際的管道。
MD5 是一種訊息摘要演算法,常用作檔案校驗和。命令列工具 md5sum
可以列印一系列檔案的摘要值。
% md5sum *.go
d47c2bbc28298ca9befdfbc5d3aa4e65 bounded.go
ee869afd31f83cbb2d10ee81b2b831dc parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96 serial.go
我們的示例程式類似於 md5sum
,但它接受單個目錄作為引數,並按路徑名排序後列印該目錄下每個普通檔案的摘要值。
% go run serial.go .
d47c2bbc28298ca9befdfbc5d3aa4e65 bounded.go
ee869afd31f83cbb2d10ee81b2b831dc parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96 serial.go
我們的程式的主函式呼叫一個輔助函式 MD5All
,該函式返回一個從路徑名到摘要值的對映,然後對結果進行排序並列印
func main() { // Calculate the MD5 sum of all files under the specified directory, // then print the results sorted by path name. m, err := MD5All(os.Args[1]) if err != nil { fmt.Println(err) return } var paths []string for path := range m { paths = append(paths, path) } sort.Strings(paths) for _, path := range paths { fmt.Printf("%x %s\n", m[path], path) } }
MD5All
函式是我們討論的重點。在 serial.go 中,實現沒有使用併發,它只是在遍歷檔案樹時簡單地讀取和計算每個檔案的摘要。
// MD5All reads all the files in the file tree rooted at root and returns a map // from file path to the MD5 sum of the file's contents. If the directory walk // fails or any read operation fails, MD5All returns an error. func MD5All(root string) (map[string][md5.Size]byte, error) { m := make(map[string][md5.Size]byte) err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error { if err != nil { return err } if !info.Mode().IsRegular() { return nil } data, err := ioutil.ReadFile(path) if err != nil { return err } m[path] = md5.Sum(data) return nil }) if err != nil { return nil, err } return m, nil }
並行摘要計算
在 parallel.go 中,我們將 MD5All
分成一個兩階段的管道。第一階段,sumFiles
,遍歷檔案樹,在新的 goroutine 中對每個檔案計算摘要,並將結果傳送到值型別為 result
的通道上
type result struct { path string sum [md5.Size]byte err error }
sumFiles
返回兩個通道:一個用於 results
,另一個用於 filepath.Walk
返回的錯誤。遍歷函式為處理每個普通檔案啟動一個新的 goroutine,然後檢查 done
。如果 done
關閉,遍歷會立即停止
func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) { // For each regular file, start a goroutine that sums the file and sends // the result on c. Send the result of the walk on errc. c := make(chan result) errc := make(chan error, 1) go func() { var wg sync.WaitGroup err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error { if err != nil { return err } if !info.Mode().IsRegular() { return nil } wg.Add(1) go func() { data, err := ioutil.ReadFile(path) select { case c <- result{path, md5.Sum(data), err}: case <-done: } wg.Done() }() // Abort the walk if done is closed. select { case <-done: return errors.New("walk canceled") default: return nil } }) // Walk has returned, so all calls to wg.Add are done. Start a // goroutine to close c once all the sends are done. go func() { wg.Wait() close(c) }() // No select needed here, since errc is buffered. errc <- err }() return c, errc }
MD5All
從 c
接收摘要值。MD5All
在發生錯誤時提前返回,透過 defer
關閉 done
func MD5All(root string) (map[string][md5.Size]byte, error) { // MD5All closes the done channel when it returns; it may do so before // receiving all the values from c and errc. done := make(chan struct{}) defer close(done) c, errc := sumFiles(done, root) m := make(map[string][md5.Size]byte) for r := range c { if r.err != nil { return nil, r.err } m[r.path] = r.sum } if err := <-errc; err != nil { return nil, err } return m, nil }
受限的並行性
在 parallel.go 中的 MD5All
實現為每個檔案啟動一個新的 goroutine。在一個包含許多大檔案的目錄中,這可能會分配比機器可用記憶體更多的記憶體。
我們可以透過限制並行讀取的檔案數量來限制這些分配。在 bounded.go 中,我們透過建立固定數量的 goroutine 來讀取檔案。我們的管道現在有三個階段:遍歷檔案樹、讀取並計算檔案摘要、收集摘要。
第一階段,walkFiles
,發射檔案樹中普通檔案的路徑
func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) { paths := make(chan string) errc := make(chan error, 1) go func() { // Close the paths channel after Walk returns. defer close(paths) // No select needed for this send, since errc is buffered. errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error { if err != nil { return err } if !info.Mode().IsRegular() { return nil } select { case paths <- path: case <-done: return errors.New("walk canceled") } return nil }) }() return paths, errc }
中間階段啟動固定數量的 digester
goroutine,它們從 paths
接收檔名,並透過通道 c
傳送 results
func digester(done <-chan struct{}, paths <-chan string, c chan<- result) {
for path := range paths {
data, err := ioutil.ReadFile(path)
select {
case c <- result{path, md5.Sum(data), err}:
case <-done:
return
}
}
}
與我們之前的示例不同,digester
不會關閉其輸出通道,因為有多個 goroutine 在共享通道上傳送。相反,MD5All
中的程式碼會在所有 digesters
完成後安排關閉通道
// Start a fixed number of goroutines to read and digest files. c := make(chan result) var wg sync.WaitGroup const numDigesters = 20 wg.Add(numDigesters) for i := 0; i < numDigesters; i++ { go func() { digester(done, paths, c) wg.Done() }() } go func() { wg.Wait() close(c) }()
我們也可以讓每個 digester 建立並返回其自己的輸出通道,但那樣我們就需要額外的 goroutine 來扇入結果。
最後階段從 c
接收所有 results
,然後檢查來自 errc
的錯誤。此檢查不能提前進行,因為在此之前,walkFiles
可能會因向下遊傳送值而阻塞
m := make(map[string][md5.Size]byte) for r := range c { if r.err != nil { return nil, r.err } m[r.path] = r.sum } // Check whether the Walk failed. if err := <-errc; err != nil { return nil, err } return m, nil }
結論
本文介紹了在 Go 中構建流式資料管道的技術。處理此類管道中的故障很棘手,因為管道中的每個階段都可能因嘗試向下遊傳送值而阻塞,並且下游階段可能不再關心傳入的資料。我們展示瞭如何透過關閉通道向管道啟動的所有 goroutine 廣播“完成”訊號,並定義了正確構建管道的指導方針。
延伸閱讀
下一篇文章:Go 地鼠
上一篇文章:FOSDEM 2014 上的 Go 演講
部落格索引