Go 部落格
Go 併發模式:管道和取消
引言
Go 的併發原語使得構建能夠有效利用 I/O 和多個 CPU 的流式資料管道變得容易。本文將介紹此類管道的示例,重點介紹操作失敗時出現的細微之處,並介紹乾淨地處理故障的技巧。
什麼是管道?
Go 中沒有管道的正式定義;它只是眾多併發程式型別中的一種。非正式地講,管道是由通道連線的一系列階段,其中每個階段都是執行相同函式的 goroutine 組。在每個階段,goroutine
- 透過入站通道從上游接收值
- 對該資料執行某些函式,通常會生成新值
- 透過出站通道將值傳送到*下游*
每個階段都有任意數量的入站和出站通道,除了第一個和最後一個階段,它們分別只有出站或入站通道。第一個階段有時被稱為源或生產者;最後一個階段,匯或消費者。
我們將從一個簡單的示例管道開始來解釋這些概念和技術。稍後,我們將介紹一個更真實的示例。
平方數
考慮一個具有三個階段的管道。
第一個階段 `gen` 是一個將整數列表轉換為通道的函式,該通道發出列表中的整數。`gen` 函式啟動一個 goroutine,該 goroutine 將整數傳送到通道,並在傳送完所有值後關閉通道
func gen(nums ...int) <-chan int { out := make(chan int) go func() { for _, n := range nums { out <- n } close(out) }() return out }
第二個階段 `sq` 從通道接收整數,並返回一個通道,該通道發出每個接收到的整數的平方。在入站通道關閉並且此階段已將所有值傳送到下游後,它會關閉出站通道
func sq(in <-chan int) <-chan int { out := make(chan int) go func() { for n := range in { out <- n * n } close(out) }() return out }
`main` 函式設定管道並執行最後一個階段:它從第二個階段接收值並列印每個值,直到通道關閉
func main() { // Set up the pipeline. c := gen(2, 3) out := sq(c) // Consume the output. fmt.Println(<-out) // 4 fmt.Println(<-out) // 9 }
由於 `sq` 的入站和出站通道型別相同,因此我們可以多次組合它。我們還可以像其他階段一樣,將 `main` 重寫為 range 迴圈
func main() { // Set up the pipeline and consume the output. for n := range sq(sq(gen(2, 3))) { fmt.Println(n) // 16 then 81 } }
扇出,扇入
多個函式可以從同一個通道讀取,直到該通道關閉;這稱為扇出。這提供了一種在一組工作程序之間分發工作的方法,以並行化 CPU 使用和 I/O。
一個函式可以從多個輸入讀取,並繼續處理直到所有輸入都關閉,方法是將輸入通道多路複用到單個通道上,當所有輸入都關閉時,該通道就會關閉。這稱為扇入。
我們可以更改管道以執行兩個 `sq` 例項,每個例項從同一個輸入通道讀取。我們引入了一個新函式 _merge_ 來將結果扇入
func main() { in := gen(2, 3) // Distribute the sq work across two goroutines that both read from in. c1 := sq(in) c2 := sq(in) // Consume the merged output from c1 and c2. for n := range merge(c1, c2) { fmt.Println(n) // 4 then 9, or 9 then 4 } }
`merge` 函式透過為每個入站通道啟動一個 goroutine,將通道列表轉換為單個通道,該 goroutine 將值複製到唯一的出站通道。啟動所有 `output` goroutine 後,`merge` 會啟動一個額外的 goroutine,在完成對該通道的所有傳送後關閉出站通道。
在已關閉的通道上傳送會引起恐慌,因此在呼叫 close 之前確保所有傳送都已完成很重要。`sync.WaitGroup
` 型別提供了一種簡單的方法來安排此同步
func merge(cs ...<-chan int) <-chan int { var wg sync.WaitGroup out := make(chan int) // Start an output goroutine for each input channel in cs. output // copies values from c to out until c is closed, then calls wg.Done. output := func(c <-chan int) { for n := range c { out <- n } wg.Done() } wg.Add(len(cs)) for _, c := range cs { go output(c) } // Start a goroutine to close out once all the output goroutines are // done. This must start after the wg.Add call. go func() { wg.Wait() close(out) }() return out }
提前終止
我們的管道函式遵循一種模式
- 階段在完成所有傳送操作後關閉其出站通道。
- 階段一直從入站通道接收值,直到這些通道關閉。*
這種模式允許將每個接收階段編寫為 `range` 迴圈,並確保所有 goroutine 在所有值成功傳送到下游後退出。
但在實際管道中,階段並不總是接收所有入站值。有時是故意的:接收方可能只需要一部分值才能繼續。更常見的是,由於入站值代表早期階段的錯誤,因此階段會提前退出。在這兩種情況下,接收方都不應該等待剩餘的值到達,並且我們希望早期階段停止生成下游不需要的值。
在我們的示例管道中,如果一個階段未能消耗所有入站值,則嘗試傳送這些值的 goroutine 將無限期阻塞
// Consume the first value from the output. out := merge(c1, c2) fmt.Println(<-out) // 4 or 9 return // Since we didn't receive the second value from out, // one of the output goroutines is hung attempting to send it. }
這是一個資源洩漏:goroutine 會消耗記憶體和執行時資源,goroutine 堆疊中的堆引用會阻止資料被垃圾回收。goroutine 不會被垃圾回收;它們必須自行退出。
我們需要安排我們的管道的上游階段退出,即使下游階段未能接收所有入站值。一種方法是更改出站通道以具有緩衝區。緩衝區可以容納固定數量的值;如果緩衝區中有空間,傳送操作會立即完成
c := make(chan int, 2) // buffer size 2
c <- 1 // succeeds immediately
c <- 2 // succeeds immediately
c <- 3 // blocks until another goroutine does <-c and receives 1
當要在通道建立時知道要傳送的值的數量時,緩衝區可以簡化程式碼。例如,我們可以重寫 `gen` 來將整數列表複製到緩衝通道中,並避免建立新的 goroutine
func gen(nums ...int) <-chan int { out := make(chan int, len(nums)) for _, n := range nums { out <- n } close(out) return out }
回到我們管道中被阻塞的 goroutine,我們可以考慮為 `merge` 返回的出站通道新增緩衝區
func merge(cs ...<-chan int) <-chan int { var wg sync.WaitGroup out := make(chan int, 1) // enough space for the unread inputs // ... the rest is unchanged ...
雖然這解決了此程式中被阻塞的 goroutine,但這是糟糕的程式碼。此處緩衝區大小 1 的選擇取決於知道 `merge` 將接收到的值的數量以及下游階段將消耗的值的數量。這很脆弱:如果我們向 `gen` 傳遞一個額外的值,或者下游階段讀取的值少於該值,我們將再次遇到被阻塞的 goroutine。
相反,我們需要提供一種方法,讓下游階段向傳送者指示它們將停止接受輸入。
顯式取消
當 `main` 決定在未接收完 `out` 的所有值的情況下退出時,它必須告知上游階段的 goroutine 放棄它們正在嘗試傳送的值。它透過在名為 `done` 的通道上傳送值來做到這一點。它傳送兩個值,因為可能存在兩個被阻塞的傳送者
func main() { in := gen(2, 3) // Distribute the sq work across two goroutines that both read from in. c1 := sq(in) c2 := sq(in) // Consume the first value from output. done := make(chan struct{}, 2) out := merge(done, c1, c2) fmt.Println(<-out) // 4 or 9 // Tell the remaining senders we're leaving. done <- struct{}{} done <- struct{}{} }
傳送 goroutine 將它們的傳送操作替換為 `select` 語句,該語句會在 `out` 上的傳送發生時或在它們從 `done` 接收到值時繼續。`done` 的值型別是空結構,因為值無關緊要:接收事件表明 `out` 上的傳送應被放棄。`output` goroutine 繼續在它們的入站通道 `c` 上迴圈,因此上游階段不會被阻塞。(我們稍後將討論如何允許此迴圈提前返回。)
func merge(done <-chan struct{}, cs ...<-chan int) <-chan int { var wg sync.WaitGroup out := make(chan int) // Start an output goroutine for each input channel in cs. output // copies values from c to out until c is closed or it receives a value // from done, then output calls wg.Done. output := func(c <-chan int) { for n := range c { select { case out <- n: case <-done: } } wg.Done() } // ... the rest is unchanged ...
這種方法有一個問題:*每個*下游接收者都需要知道可能被阻塞的上游傳送者的數量,並安排在提前返回時向這些傳送者發出訊號。跟蹤這些計數既繁瑣又容易出錯。
我們需要一種方法來告訴未知且無界數量的 goroutine 停止將其值傳送到下游。在 Go 中,我們可以透過關閉通道來做到這一點,因為在已關閉的通道上執行接收操作始終可以立即進行,併產生元素型別的零值。
這意味著 `main` 僅透過關閉 `done` 通道就可以解除所有傳送者的阻塞。此關閉有效地向傳送者廣播訊號。我們將*每個*管道函式擴充套件為接受 `done` 作為引數,並透過 `defer` 語句安排關閉,以便從 `main` 返回的所有路徑都會向管道階段發出退出訊號。
func main() { // Set up a done channel that's shared by the whole pipeline, // and close that channel when this pipeline exits, as a signal // for all the goroutines we started to exit. done := make(chan struct{}) defer close(done) in := gen(done, 2, 3) // Distribute the sq work across two goroutines that both read from in. c1 := sq(done, in) c2 := sq(done, in) // Consume the first value from output. out := merge(done, c1, c2) fmt.Println(<-out) // 4 or 9 // done will be closed by the deferred call. }
我們的每個管道階段現在都可以儘快返回,只要 `done` 被關閉。`merge` 中的 `output` 例程可以在不耗盡其入站通道的情況下返回,因為它知道上游傳送者 `sq` 將在 `done` 關閉時停止嘗試傳送。`output` 透過 `defer` 語句確保在所有返回路徑上都呼叫 `wg.Done`
func merge(done <-chan struct{}, cs ...<-chan int) <-chan int { var wg sync.WaitGroup out := make(chan int) // Start an output goroutine for each input channel in cs. output // copies values from c to out until c or done is closed, then calls // wg.Done. output := func(c <-chan int) { defer wg.Done() for n := range c { select { case out <- n: case <-done: return } } } // ... the rest is unchanged ...
類似地,`sq` 可以在 `done` 關閉後儘快返回。`sq` 透過 `defer` 語句確保其 `out` 通道在所有返回路徑上都已關閉
func sq(done <-chan struct{}, in <-chan int) <-chan int { out := make(chan int) go func() { defer close(out) for n := range in { select { case out <- n * n: case <-done: return } } }() return out }
以下是構建管道的指南
- 階段在完成所有傳送操作後關閉其出站通道。
- 階段一直從入站通道接收值,直到這些通道關閉或傳送者被解除阻塞。
管道透過確保有足夠的緩衝區容納所有傳送的值,或者透過在接收方可以放棄通道時顯式地向傳送者發出訊號來解除傳送者阻塞。
消化樹
讓我們考慮一個更真實的管道。
MD5 是一種非常有用的訊息摘要演算法,可用作檔案校驗和。`md5sum` 命令列實用程式會列印檔案的摘要值。
% md5sum *.go
d47c2bbc28298ca9befdfbc5d3aa4e65 bounded.go
ee869afd31f83cbb2d10ee81b2b831dc parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96 serial.go
我們的示例程式類似於 `md5sum`,但它接受單個目錄作為引數,並列印該目錄下的每個常規檔案的摘要值,按路徑名排序。
% go run serial.go .
d47c2bbc28298ca9befdfbc5d3aa4e65 bounded.go
ee869afd31f83cbb2d10ee81b2b831dc parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96 serial.go
我們的程式的 `main` 函式呼叫一個名為 `MD5All` 的輔助函式,該函式返回一個從路徑名到摘要值的對映,然後對結果進行排序和列印
func main() { // Calculate the MD5 sum of all files under the specified directory, // then print the results sorted by path name. m, err := MD5All(os.Args[1]) if err != nil { fmt.Println(err) return } var paths []string for path := range m { paths = append(paths, path) } sort.Strings(paths) for _, path := range paths { fmt.Printf("%x %s\n", m[path], path) } }
我們的討論重點是 `MD5All` 函式。在serial.go中,該實現不使用併發,而是在遍歷樹時簡單地讀取和計算每個檔案的摘要。
// MD5All reads all the files in the file tree rooted at root and returns a map // from file path to the MD5 sum of the file's contents. If the directory walk // fails or any read operation fails, MD5All returns an error. func MD5All(root string) (map[string][md5.Size]byte, error) { m := make(map[string][md5.Size]byte) err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error { if err != nil { return err } if !info.Mode().IsRegular() { return nil } data, err := ioutil.ReadFile(path) if err != nil { return err } m[path] = md5.Sum(data) return nil }) if err != nil { return nil, err } return m, nil }
並行消化
在parallel.go中,我們將 `MD5All` 分成一個兩階段管道。第一個階段 `sumFiles` 遍歷樹,在新 goroutine 中計算每個檔案的摘要,並將結果傳送到型別為 `result` 的通道
type result struct { path string sum [md5.Size]byte err error }
`sumFiles` 返回兩個通道:一個用於 `results`,另一個用於 `filepath.Walk` 返回的錯誤。walk 函式啟動一個新的 goroutine 來處理每個常規檔案,然後檢查 `done`。如果 `done` 已關閉,則 walk 會立即停止
func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) { // For each regular file, start a goroutine that sums the file and sends // the result on c. Send the result of the walk on errc. c := make(chan result) errc := make(chan error, 1) go func() { var wg sync.WaitGroup err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error { if err != nil { return err } if !info.Mode().IsRegular() { return nil } wg.Add(1) go func() { data, err := ioutil.ReadFile(path) select { case c <- result{path, md5.Sum(data), err}: case <-done: } wg.Done() }() // Abort the walk if done is closed. select { case <-done: return errors.New("walk canceled") default: return nil } }) // Walk has returned, so all calls to wg.Add are done. Start a // goroutine to close c once all the sends are done. go func() { wg.Wait() close(c) }() // No select needed here, since errc is buffered. errc <- err }() return c, errc }
`MD5All` 從 `c` 接收摘要值。`MD5All` 在發生錯誤時提前返回,透過 `defer` 關閉 `done`
func MD5All(root string) (map[string][md5.Size]byte, error) { // MD5All closes the done channel when it returns; it may do so before // receiving all the values from c and errc. done := make(chan struct{}) defer close(done) c, errc := sumFiles(done, root) m := make(map[string][md5.Size]byte) for r := range c { if r.err != nil { return nil, r.err } m[r.path] = r.sum } if err := <-errc; err != nil { return nil, err } return m, nil }
有界並行
中的 `MD5All` 實現parallel.go為每個檔案啟動一個新的 goroutine。在包含許多大檔案的目錄中,這可能會分配比機器上可用記憶體更多的記憶體。
我們可以透過限制並行讀取檔案的數量來限制這些分配。在bounded.go中,我們透過建立固定數量的 goroutine 來讀取檔案來實現這一點。我們的管道現在有三個階段:遍歷樹,讀取和計算檔案摘要,以及收集摘要。
第一個階段 `walkFiles` 發出樹中常規檔案的路徑
func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) { paths := make(chan string) errc := make(chan error, 1) go func() { // Close the paths channel after Walk returns. defer close(paths) // No select needed for this send, since errc is buffered. errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error { if err != nil { return err } if !info.Mode().IsRegular() { return nil } select { case paths <- path: case <-done: return errors.New("walk canceled") } return nil }) }() return paths, errc }
中間階段啟動固定數量的 `digester` goroutine,這些 goroutine 從 `paths` 接收檔名,並將 `results` 傳送到通道 `c`
func digester(done <-chan struct{}, paths <-chan string, c chan<- result) {
for path := range paths {
data, err := ioutil.ReadFile(path)
select {
case c <- result{path, md5.Sum(data), err}:
case <-done:
return
}
}
}
與我們之前的示例不同,`digester` 不會關閉其輸出通道,因為有多個 goroutine 在共享通道上傳送。相反,`MD5All` 中的程式碼會在所有 `digester` 完成後安排通道關閉
// Start a fixed number of goroutines to read and digest files. c := make(chan result) var wg sync.WaitGroup const numDigesters = 20 wg.Add(numDigesters) for i := 0; i < numDigesters; i++ { go func() { digester(done, paths, c) wg.Done() }() } go func() { wg.Wait() close(c) }()
我們也可以讓每個 digester 建立並返回自己的輸出通道,但那樣我們就需要額外的 goroutine 來扇入結果。
最後一個階段從 `c` 接收所有 `results`,然後檢查來自 `errc` 的錯誤。此檢查不能提前進行,因為在此之前,`walkFiles` 可能會在傳送值到下游時被阻塞
m := make(map[string][md5.Size]byte) for r := range c { if r.err != nil { return nil, r.err } m[r.path] = r.sum } // Check whether the Walk failed. if err := <-errc; err != nil { return nil, err } return m, nil }
結論
本文介紹了在 Go 中構建流式資料管道的技術。處理此類管道中的故障很棘手,因為管道中的每個階段都可能因嘗試將值傳送到下游而被阻塞,而下游階段可能不再關心傳入的資料。我們展示瞭如何關閉通道可以向管道啟動的所有 goroutine 廣播“完成”訊號,並定義了正確構建管道的準則。
延伸閱讀
- Go 併發模式(影片)介紹了 Go 併發原語的基礎知識和幾種應用方法。
- Go 高階併發模式(影片)涵蓋了 Go 原語更復雜的用法,特別是 `select`。
- Douglas McIlroy 的論文《Squinting at Power Series》展示了 Go 式併發如何為複雜計算提供優雅的支援。
下一篇文章:Go Gopher
上一篇文章:FOSDEM 2014 Go 會議
部落格索引