并发概要
随着多核CPU的普及, 为了更快的处理任务, 出现了各种并发编程的模型, 主要有以下几种:
模型名称 | 优点 | 缺点 |
---|---|---|
多进程 | 简单, 隔离性好, 进程间几乎无影响 | 开销最大 |
多线程 | 目前使用最多的方式, 开销比多进程小 | 高并发模式下, 效率会有影响 |
异步 | 相比多线程而言, 可以减少线程的数量 | 编码要求高, 需要对流程分割合理 |
协程 | 用户态线程, 不需要操作系统来调度, 所以轻量, 开销极小 | 需要语言支持 |
协程介绍
协程是个抽象的概念, 可以映射到到操作系统层面的进程, 线程等概念.
由于协程是用户态的线程, 不用操作系统来调度, 所以不受操作系统的限制, 可以轻松的创建百万个, 因此也被称为 "轻量级线程".在 golang 中, 协程不是由库实现的, 而是受语言级别支持的, 因此, 在 golang 中, 使用协程非常方便.
下面通过例子演示在 golang 中, 如何使用协程来完成并发操作.golang 并发
实现方式
golang 中, 通过 go 关键字可以非常简单的启动一个协程, 几乎没有什么学习成本.
当然并发编程中固有的业务上的困难依然存在(比如并发时的同步, 超时等), 但是 golang 在语言级别给我们提供了优雅简洁的解决这些问题的途径.理解了 golang 中协程的使用, 会给我们写并发程序时带来极大的便利.
首先以一个简单的例子开始 golang 的并发编程.package mainimport ( "fmt" "time")func main() { for i := 0; i < 10; i++ { go sum(i, i+10) } time.Sleep(time.Second * 5)}func sum(start, end int) int { var sum int = 0 for i := start; i < end; i++ { sum += i } fmt.Printf("Sum from %d to %d is %d\n", start, end, sum) return sum}
执行结果如下: (同时启动10个协程做累加运算, 10个协程的执行顺序可能会不一样)
$ go run main.goSum from 0 to 10 is 45Sum from 6 to 16 is 105Sum from 7 to 17 is 115Sum from 2 to 12 is 65Sum from 8 to 18 is 125Sum from 1 to 11 is 55Sum from 9 to 19 is 135Sum from 3 to 13 is 75Sum from 4 to 14 is 85Sum from 5 to 15 is 95
通过 go 关键字启动协程之后, 主进程并不会等待协程的执行, 而是继续执行直至结束.
本例中, 如果没有 time.Sleep(time.Second * 5) 等待5秒的话, 那么主进程不会等待那10个协程的运行结果, 直接就结束了. 主进程结束也会导致那10个协程的执行中断, 所以, 如果去掉 time.Sleep 这行代码, 可能屏幕上什么显示也没有.简单示例
实际使用协程时, 我们一般会等待所有协程执行完成(或者超时)后, 才会结束主进程, 但是不会用 time.Sleep 这种方式,
因为主进程并不知道协程什么时候会结束, 没法设置等待时间.这时, 就看出 golang 中的 channel 机制所带来的好处了. 下面用 channel 来改造上面的 time.Sleep
package mainimport "fmt"func main() { var ch = make(chan string) for i := 0; i < 10; i++ { go sum(i, i+10, ch) } for i := 0; i < 10; i++ { fmt.Print(<-ch) }}func sum(start, end int, ch chan string) { var sum int = 0 for i := start; i < end; i++ { sum += i } ch <- fmt.Sprintf("Sum from %d to %d is %d\n", start, end, sum)}
程序执行结果和上面一样, 因为是并发的缘故, 可能输出的 sum 顺序可能会不一样.
$ go run main.goSum from 9 to 19 is 135Sum from 0 to 10 is 45Sum from 5 to 15 is 95Sum from 6 to 16 is 105Sum from 7 to 17 is 115Sum from 2 to 12 is 65Sum from 8 to 18 is 125Sum from 3 to 13 is 75Sum from 1 to 11 is 55Sum from 4 to 14 is 85
golang 的 chan 可以是任意类型的, 上面的例子中定义的是 string 型.
从上面的程序可以看出, 往 chan 中写入数据之后, 协程会阻塞在那里, 直到在某个地方将 chan 中的值读取出来, 协程才会继续运行下去.上面的例子中, 我们启动了10个协程, 每个协程都往 chan 中写入了一个字符串, 然后在 main 函数中, 依次读取 chan 中的字符串, 并在屏幕上打印出来.
通过 golang 中的 chan, 不仅实现了主进程 和 协程之间的通信, 而且不用像 time.Sleep 那样不可控(因为你不知道要 Sleep 多长时间).并发时的缓冲
上面的例子中, 所有协程使用的是同一个 chan, chan 的容量默认只有 1, 当某个协程向 chan 中写入数据时, 其他协程再次向 chan 中写入数据时, 其实是阻塞的.
等到 chan 中的数据被读出之后, 才会再次让某个其他协程写入, 因为每个协程都执行的非常快, 所以看不出来.改造下上面的例子, 加入些 Sleep 代码, 延长每个协程的执行时间, 我们就可以看出问题, 代码如下:
package mainimport ( "fmt" "time")func main() { var ch = make(chan string) for i := 0; i < 5; i++ { go sum(i, i+10, ch) } for i := 0; i < 10; i++ { time.Sleep(time.Second * 1) fmt.Print(<-ch) }}func sum(start, end int, ch chan string) int { ch <- fmt.Sprintf("Sum from %d to %d is starting at %s\n", start, end, time.Now().String()) var sum int = 0 for i := start; i < end; i++ { sum += i } time.Sleep(time.Second * 10) ch <- fmt.Sprintf("Sum from %d to %d is %d at %s\n", start, end, sum, time.Now().String()) return sum}
执行结果如下:
$ go run main.goSum from 4 to 14 is starting at 2015-10-13 13:59:56.025633342 +0800 CSTSum from 3 to 13 is starting at 2015-10-13 13:59:56.025608644 +0800 CSTSum from 0 to 10 is starting at 2015-10-13 13:59:56.025508327 +0800 CSTSum from 2 to 12 is starting at 2015-10-13 13:59:56.025574486 +0800 CSTSum from 1 to 11 is starting at 2015-10-13 13:59:56.025593711 +0800 CSTSum from 4 to 14 is 85 at 2015-10-13 14:00:07.030611465 +0800 CSTSum from 3 to 13 is 75 at 2015-10-13 14:00:08.031926629 +0800 CSTSum from 0 to 10 is 45 at 2015-10-13 14:00:09.036724803 +0800 CSTSum from 2 to 12 is 65 at 2015-10-13 14:00:10.038125044 +0800 CSTSum from 1 to 11 is 55 at 2015-10-13 14:00:11.040366206 +0800 CST
为了演示 chan 的阻塞情况, 上面的代码中特意加了一些 time.Sleep 函数.
- 每个执行 Sum 函数的协程都会运行 10 秒
- main函数中每隔 1 秒读一次 chan 中的数据
从打印结果我们可以看出, 所有协程几乎是同一时间开始的, 说明了协程确实是并发的.
其中, 最快的协程(Sum from 4 to 14…)执行了 11 秒左右, 为什么是 11 秒左右呢? 说明它阻塞在了 Sum 函数中的第一行上, 等了 1 秒之后, main 函数开始读出 chan 中数据后才继续运行. 它自身运行需要 10 秒, 加上等待的 1 秒, 正好 11 秒左右.最慢的协程执行了 15 秒左右, 这个也很好理解, 总共启动了 5 个协程, main 函数每隔 1 秒 读出一次 chan, 最慢的协程等待了 5 秒,
再加上自身执行了 10 秒, 所以一共 15 秒左右.到这里, 我们很自然会想到能否增加 chan 的容量, 从而使得每个协程尽快执行, 完成自己的操作, 而不用等待, 消除由于 main 函数的处理所带来的瓶颈呢?
答案是当然可以, 而且在 golang 中实现还很简单, 只要在创建 chan 时, 指定 chan 的容量就行.package mainimport ( "fmt" "time")func main() { var ch = make(chan string, 10) for i := 0; i < 5; i++ { go sum(i, i+10, ch) } for i := 0; i < 10; i++ { time.Sleep(time.Second * 1) fmt.Print(<-ch) }}func sum(start, end int, ch chan string) int { ch <- fmt.Sprintf("Sum from %d to %d is starting at %s\n", start, end, time.Now().String()) var sum int = 0 for i := start; i < end; i++ { sum += i } time.Sleep(time.Second * 10) ch <- fmt.Sprintf("Sum from %d to %d is %d at %s\n", start, end, sum, time.Now().String()) return sum}
执行结果如下:
$ go run main.goSum from 0 to 10 is starting at 2015-10-13 14:22:14.64534265 +0800 CSTSum from 2 to 12 is starting at 2015-10-13 14:22:14.645382961 +0800 CSTSum from 3 to 13 is starting at 2015-10-13 14:22:14.645408947 +0800 CSTSum from 4 to 14 is starting at 2015-10-13 14:22:14.645417257 +0800 CSTSum from 1 to 11 is starting at 2015-10-13 14:22:14.645427028 +0800 CSTSum from 1 to 11 is 55 at 2015-10-13 14:22:24.6461138 +0800 CSTSum from 3 to 13 is 75 at 2015-10-13 14:22:24.646330223 +0800 CSTSum from 2 to 12 is 65 at 2015-10-13 14:22:24.646325521 +0800 CSTSum from 4 to 14 is 85 at 2015-10-13 14:22:24.646343061 +0800 CSTSum from 0 to 10 is 45 at 2015-10-13 14:22:24.64634674 +0800 CST
从执行结果可以看出, 所有协程几乎都是 10秒完成的. 所以在使用协程时, 记住可以通过使用缓存来进一步提高并发性.
并发时的超时
并发编程, 由于不能确保每个协程都能及时响应, 有时候协程长时间没有响应, 主进程不可能一直等待, 这时候就需要超时机制.
在 golang 中, 实现超时机制也很简单.package mainimport ( "fmt" "time")func main() { var ch = make(chan string, 1) var timeout = make(chan bool, 1) go sum(1, 10, ch) go func() { time.Sleep(time.Second * 5) // 5 秒超时 timeout <- true }() select { case sum := <-ch: fmt.Print(sum) case <-timeout: fmt.Println("Sorry, TIMEOUT!") }}func sum(start, end int, ch chan string) int { var sum int = 0 for i := start; i < end; i++ { sum += i } time.Sleep(time.Second * 10) ch <- fmt.Sprintf("Sum from %d to %d is %d\n", start, end, sum) return sum}
通过一个匿名函数来控制超时, 然后同时启动 计算 sum 的协程和timeout协程, 在 select 中看谁先结束,
如果 timeout 结束后, 计算 sum 的协程还没有结束的话, 就会进入超时处理.上例中, timeout 只有5秒, sum协程会执行10秒, 所以执行结果如下:
$ go run main.goSorry, TIMEOUT!
修改 time.Sleep(time.Second * 5) 为 time.Sleep(time.Second * 15) 的话, 就会看到 sum 协程的执行结果