Golang Channel

Channel 原理和最佳实践

实现原理

TODO

常用操作

超时控制

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func do(f func()) <-chan struct{} {
    // 传递完成信号
    ch := make(chan struct{}, 1)
    go func() {
        defer func() {
            if err := recover(); err != nil {
                fmt.Println(err)
            }
        }()
        f()
        ch <- struct{}{}
    }()
    return ch
}

func main() {
    select {
    case <-do(func() {
        fmt.Println("do")
        time.Sleep(time.Second)
    }):
        fmt.Println("done")
    case <-time.After(time.Second * 2):
        fmt.Println("timeout")
    }
}

Goroutine 通信

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// 向 channel 里投递数据
func Provider(ch chan<- int, nums ...int) {
    for _, num := range nums {
        ch <- num
    }
    close(ch)
}

// 从 channel 里消费数据
func Consumer(ch <-chan int, done chan<- struct{}, f func(int)) {
    for num := range ch {
        f(num)
    }
    done <- struct{}{}
    close(done)
}

func main() {
    // 数据交换 channel
    ch := make(chan int, 5)
    // 控制 channel 退出
    done := make(chan struct{})

    go Provider(ch, 1, 2, 3, 4, 5)
    go Consumer(ch, done, func(num int) {
        fmt.Println(num)
        time.Sleep(time.Second)
    })

    // 接收退出并进行超时处理
    select {
    case <-done:
        fmt.Println("done")
    case <-time.After(time.Second * 5):
        fmt.Println("timeout")
    }
}

模拟互斥锁

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func main() {
    // 模拟互斥锁
    mutex := make(chan struct{}, 1)

    counter := 0
    atomicIncrease := func() {
        mutex <- struct{}{} // 加锁
        counter++
        <-mutex // 解锁
    }

    increase := func(size int, done chan<- struct{}) {
        for i := 0; i < size; i++ {
            atomicIncrease()
        }
        done <- struct{}{}
    }

    done := make(chan struct{})
    // 开两个 goroutine 并行进行自增运算
    go increase(100000, done)
    go increase(100000, done)
    <-done
    <-done
    fmt.Println(counter) // 200000
}

模拟定时器效果

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
// 每隔指定时间向通道中发送一次信号
func Tick(duration time.Duration) <-chan struct{} {
    ch := make(chan struct{}, 1)
    go func() {
        for {
            time.Sleep(duration)
            select {
            case ch <- struct{}{}:
            }
        }
    }()
    return c
}

func main() {
    for range Tick(time.Second) {
        fmt.Println("hello world")
    }
}

模拟信号量

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
type Semaphore struct {
    sync.Locker // 显示声明实现了 sync.Locker interface
    ch chan struct{}
}

// 创建一个新的信号量
func NewSemaphore(capacity int) sync.Locker {
    if capacity <= 0 {
        capacity = 1 // 容量为1就变成了一个互斥锁
    }
    return &Semaphore{ch: make(chan struct{}, capacity)}
}

// 请求一个资源
func (s *Semaphore) Lock() {
    s.ch <- struct{}{}
}

// 释放资源
func (s *Semaphore) Unlock() {
    <-s.ch
}

References

Last updated on 2023-03-19 21:23:50