原文 Streaming IO in Go
在 Go 语言中,实现输入输出操作有专门的原语,这些原语将数据构建为可读写的字节流。Go 语言标准库中 io 包提供了 io.Reader
和 io.Write
r 接口,分别用于数据输入和输出操作,如下图所示:
Go 提供了许多 api,支持来自于内存、文件、网络连接等资源的流式 IO。本文主要讲解如何使用 io.Reader
和 io. Writer
接口创建能够处理流式数据的 Go 程序,分别提供了基于标准库和自定义类型的实现。
io.Reader 接口 reader 由 io.Reader 接口定义,可以从数据源读取数据到传输缓冲区,缓冲区中的数据可以继续被流式处理或者直接消费,如下图所示:
如果要把一个 Go 语言类型作为 reader 来使用,这个类型必须实现 io.Reader
所定义的 Read(p []byte)
方法,如下所示:
1 2 3 type Reader interface { Read(p []byte ) (n int , err error) }
Read()
方法需要返回读取的字节数,如果发生错误则同时返回 error ,如果数据源的所有内容已被读取完毕,则 Read()
方法应当返回 io.EOF
错误。
读取规则(新增) 根据 Reddit 上的反馈,我决定新增这部分内容帮助理解读取操作。一个 reader 的行为取决于它的具体实现,但是 io.Reader
的文档中定义了一些基本的规则,在使用 reader 的时候你应当首先阅读这些说明文档:
Read()
方法会尽可能但最多读取 len(p) 个字节到 p 中。
调用 Read()
方法读取的字节数 n 可能小于 len(p)。
在发生读取错误的时候,Read()
仍然有可能已经成功读取了 n 个字节的数据到缓冲区 p 中。例如,正在从一个 TCP 套接字中读取数据时套接字突然关闭。根据使用场景的不同,你可以保留这些成功读取的数据或者丢掉它们重新尝试读取。
当数据源中的可用数据被读完时,Read()
方法有可能返回一个非零的 n 和 err = io.EOF
。但是,根据具体的实现,reader 也可以选择在读取结束时返回一个非零的 n 和 err = nil
。在这种情况下,所有后续的读取操作必须返回 n = 0
和 err = io.EOF
。
最后,Read()
调用返回 n = 0
和 err = nil
并不意味着 EOF 因为下一次调用 Read()
方法可能会返回数据。
正如你所看到的,有时候直接从 reader 中读取数据有些棘手。但幸运的是,标准库中实现的 reader 都遵循符合常理的方法使得可以比较容易地实现流式操作。尽管如此,在使用一个 reader 之前仍然应当仔细查阅它的文档。
流式读取数据 从一个 reader 中流式读取数据很容易。Read()
方法被设计为在循环中调用,在每一次迭代中,Read()
方法会从数据源读取一个数据 chunk 并将其放入到缓冲区 p 中。循环过程一直进行直到 Read()
方法返回 io.EOF
错误。 下面是一个简单的例子:使用 strings.NewReader(string) 创建了一个字符串 reader,对一个源字符串中的字节进行流式处理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 func main () { reader := strings.NewReader("Clear is better than clever" ) p := make ([]byte , 4 ) for { n, err := reader.Read(p) if err != nil { if err == io.EOF { fmt.Println(string (p[:n])) break } fmt.Println(err) os.Exit(1 ) } fmt.Println(string (p[:n])) } }
上述代码使用 make([]byte, 4)
创建了一个容量和长度均为 4 个字节的字节切片作为缓冲区 p。这个缓冲区长度被故意设置为小于源字符串的长度,用来演示如何流式处理长度大于缓冲区长度的数据源。
实现自定义 io.Reader 上面的例子使用了标准库的 reader ,现在我们尝试自定义实现一个 reader。下面的代码定义了一个 rader 类型实现了 io.Reader 接口,作用是过滤掉数据源中所有的非字母字符。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 type alphaReader struct { src string cur int } func newAlphaReader (src string ) *alphaReader { return &alphaReader{src: src} } func alpha (r byte ) byte { if (r >= 'A' && r <= 'Z' ) || (r >= 'a' && r <= 'z' ) { return r } return 0 } func (a *alphaReader) Read (p []byte ) (int , error) { if a.cur >= len (a.src) { return 0 , io.EOF } x := len (a.src) - a.cur n, bound := 0 , 0 if x >= len (p) { bound = len (p) } else if x <= len (p) { bound = x } buf := make ([]byte , bound) for n < bound { if char := alpha(a.src[a.cur]); char != 0 { buf[n] = char } n++ a.cur++ } copy (p, buf) return n, nil } func main () { reader := newAlphaReader("Hello! It's 9am, where is the sun?" ) p := make ([]byte , 4 ) for { n, err := reader.Read(p) if err == io.EOF { break } fmt.Print(string (p[:n])) } fmt.Println() }
当程序执行时会输出:
1 2 $> go run alpha_reader.go HelloItsamwhereisthesun
链式读取 标准库中有很多 reader 已经实现了链式读取。使用一个 reader 作为另一个 reader 的数据源是一种常见的做法。reader 的链式组合使得一个 reader 可以复用另一个 reader 已经实现的逻辑,就像下面代码的做法:更新了 alphaReader
的实现使其可以接受一个 io.Reader
作为数据源。这样的做法通过复用已有 reader 的工作来降低代码的复杂性。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 type alphaReader struct { reader io.Reader } func newAlphaReader (reader io.Reader) *alphaReader { return &alphaReader{reader: reader} } func alpha (r byte ) byte { if (r >= 'A' && r <= 'Z' ) || (r >= 'a' && r <= 'z' ) { return r } return 0 } func (a *alphaReader) Read (p []byte ) (int , error) { n, err := a.reader.Read(p) if err != nil { return n, err } buf := make ([]byte , n) for i := 0 ; i < n; i++ { if char := alpha(p[i]); char != 0 { buf[i] = char } } copy (p, buf) return n, nil } func main () { reader := newAlphaReader(strings.NewReader("Hello! It's 9am, where is the sun?" )) p := make ([]byte , 4 ) for { n, err := reader.Read(p) if err == io.EOF { break } fmt.Print(string (p[:n])) } fmt.Println() }
io.Writer 接口 writer 由 io.Writer
接口定义,实现流式地将数据从缓冲区写入到目标资源中,如下图所示:
writer 必须实现 io.Write
r 接口的 Write(p []byte)
方法。Wirte()
)方法被设计为从缓冲区 p 中读取数据并将其写入到指定的目标资源中。
1 2 3 type Writer interface { Write(p []byte ) (n int , err error) }
Write()
方法的实现应该返回写入的字节数,发生错误时返回 error
。
使用 writers 标准库中有很多预先实现了 io.Writer
的类型。使用 writers 也是比较简单的,下面的代码演示了使用 bytes.Buffer
作为一个 writer 向内存中写入数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 func main () { proverbs := []string { "Channels orchestrate mutexes serialize" , "Cgo is not Go" , "Errors are values" , "Don't panic" , } var writer bytes.Buffer for _, p := range proverbs { n, err := writer.Write([]byte (p)) if err != nil { fmt.Println(err) os.Exit(1 ) } if n != len (p) { fmt.Println("failed to write data" ) os.Exit(1 ) } } fmt.Println(writer.String()) }
实现自定义 io.Writer 下面的代码展示了如何实现一个名为 chanWriter 的自定义io.Writer
用来将缓冲区中的内容作为字节序列写入到一个 Go channel 中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 type chanWriter struct { ch chan byte } func newChanWriter () *chanWriter { return &chanWriter{make (chan byte , 1024 )} } func (w *chanWriter) Chan () <-chan byte { return w.ch } func (w *chanWriter) Write (p []byte ) (int , error) { n := 0 for _, b := range p { w.ch <- b n++ } return n, nil } func (w *chanWriter) Close () error { close (w.ch) return nil } func main () { writer := newChanWriter() go func () { defer writer.Close() writer.Write([]byte ("Stream " )) writer.Write([]byte ("me!" )) }() for c := range writer.Chan() { fmt.Printf("%c" , c) } fmt.Println() }
使用 writer 也非常简单,在 main() 函数中调用(在一个单独的 goroutine 中) writer.Write()
方法即可。 chanWriter 也实现了 io.Closer
接口,写入完成后调用 writer.Close()
方法关闭 channel 防止访问 channel 陷入死锁。
IO 操作常用的类型和包 之前提到,Go 语言标准库中提供了很多有用的函数和类型使得操作流式 IO 变得非常简单。
os.File os.File 类型表示本地系统的一个文件。它同时实现了 io.Reader
和 io.Writer
接口,所以可以被用在任何流式 IO 操作的场景。例如,下面的代码展示了如何将字符串切片直接写入到文件中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 func main () { proverbs := []string { "Channels orchestrate mutexes serialize\n" , "Cgo is not Go\n" , "Errors are values\n" , "Don't panic\n" , } file, err := os.Create("./proverbs.txt" ) if err != nil { fmt.Println(err) os.Exit(1 ) } defer file.Close() for _, p := range proverbs { n, err := file.Write([]byte (p)) if err != nil { fmt.Println(err) os.Exit(1 ) } if n != len (p) { fmt.Println("failed to write data" ) os.Exit(1 ) } } fmt.Println("file write done" ) }
另外, io.File
也可以作为 reader 使用实现流式读取本地文件中的内容。例如,下面的代码演示了读取一个文件并将其内容打印出来:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 func main () { file, err := os.Open("./proverbs.txt" ) if err != nil { fmt.Println(err) os.Exit(1 ) } defer file.Close() p := make ([]byte , 4 ) for { n, err := file.Read(p) if err == io.EOF { break } fmt.Print(string (p[:n])) } }
标准输出、输入和错误 os 包提供了三个变量:os.Stdout
、os.Stdin
和 os.Stderr
。它们都是 *os.File
类型,分别表示操作系统的标准输出、输入和错误的文件句柄。例如,下面的代码展示直接向标准输出写入内容:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 func main () { proverbs := []string { "Channels orchestrate mutexes serialize\n" , "Cgo is not Go\n" , "Errors are values\n" , "Don't panic\n" , } for _, p := range proverbs { n, err := os.Stdout.Write([]byte (p)) if err != nil { fmt.Println(err) os.Exit(1 ) } if n != len (p) { fmt.Println("failed to write data" ) os.Exit(1 ) } } }
io.Copy() 使用 io.Copy()
函数可以很容易地流式地从源 reader 向目标 writer 拷贝数据。它抽象了 for-loop
模式和正确处理 io.EOF
及字节数的过程。 下面的代码展示了一个之前代码的简化版本,拷贝 proverbs 中的内容到 file 文件中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 func main () { proverbs := new (bytes.Buffer) proverbs.WriteString("Channels orchestrate mutexes serialize\n" ) proverbs.WriteString("Cgo is not Go\n" ) proverbs.WriteString("Errors are values\n" ) proverbs.WriteString("Don't panic\n" ) file, err := os.Create("./proverbs.txt" ) if err != nil { fmt.Println(err) os.Exit(1 ) } defer file.Close() if _, err := io.Copy(file, proverbs); err != nil { fmt.Println(err) os.Exit(1 ) } fmt.Println("file created" ) }
类似地,我们可以重写之前的程序使用 io.Copy()
将本地文件中的内容流式地写入到标准输出:
1 2 3 4 5 6 7 8 9 10 11 12 13 func main () { file, err := os.Open("./proverbs.txt" ) if err != nil { fmt.Println(err) os.Exit(1 ) } defer file.Close() if _, err := io.Copy(os.Stdout, file); err != nil { fmt.Println(err) os.Exit(1 ) } }
io.WriteString() io.WriteString()
为写入字符串到指定 writer 提供了便利:
1 2 3 4 5 6 7 8 9 10 11 12 func main () { file, err := os.Create("./magic_msg.txt" ) if err != nil { fmt.Println(err) os.Exit(1 ) } defer file.Close() if _, err := io.WriteString(file, "Go is fun!" ); err != nil { fmt.Println(err) os.Exit(1 ) } }
管道 writers and readers io.PipeWrite
r 和 io.PipeReade
r 将 IO 操作建模为基于内存的管道。数据被写入到管道的写端并在另外一个 goroutine 中从读端读出。下面的代码使用 io.Pipe()
创建了一个管道 reader/writer 对用来从 proverbs 缓冲区中拷贝数据到 io.Stdout
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 func main () { proverbs := new (bytes.Buffer) proverbs.WriteString("Channels orchestrate mutexes serialize\n" ) proverbs.WriteString("Cgo is not Go\n" ) proverbs.WriteString("Errors are values\n" ) proverbs.WriteString("Don't panic\n" ) piper, pipew := io.Pipe() go func () { defer pipew.Close() io.Copy(pipew, proverbs) }() io.Copy(os.Stdout, piper) piper.Close() }
Buffered IO Go 通过 bufio
包支持 Buffered IO 使得操作文本内容更加方便。例如,下面的代码使用 '\n'
作为分隔符一行一行地读取文件的内容:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 func main () { file, err := os.Open("./planets.txt" ) if err != nil { fmt.Println(err) os.Exit(1 ) } defer file.Close() reader := bufio.NewReader(file) for { line, err := reader.ReadString('\n' ) if err != nil { if err == io.EOF { break } else { fmt.Println(err) os.Exit(1 ) } } fmt.Print(line) } }
Util 包 ioutil 包是 io 包的一个子包,它提供了几个方便的函数用于执行 IO 操作。例如,下面的代码使用 ReadFile
函数读取文件的所有内容到一个 []byte
中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 package mainimport ( "io/ioutil" ... ) func main () { bytes, err := ioutil.ReadFile("./planets.txt" ) if err != nil { fmt.Println(err) os.Exit(1 ) } fmt.Printf("%s" , bytes) }
总结 这篇文章展示了如何使用 io.Reader
和 io.Writer
接口实现流式 IO 操作。读过这篇文章后你应该理解了如何使用 io 包写程序处理流式 IO。 这里只对支持流式 IO 的包进行了简单的讨论,并没有深入探讨文件 IO 、缓冲 IO、网络 IO 和格式化 IO 等。我希望这篇文章能给你一些关于 Go 语言流式 IO 编程范式的启发。