在线程池中存在几个概念:
核心线程数
、最大线程数
、任务队列
。
与其他池化技术不同的是,线程池是基于生产者-消费者
模式来实现的,任务的提交方是生产者,线程池是消费者 。当我们需要执行某个任务时,只需要把任务扔到线程池中即可。
池化技术:这里的池化和卷积的池化不一样,这里的池化技术简单点来说,就是提前保存大量的资源,以备不时之需
线程池中执行任务的流程如下图如下。
那么使用线程池可以带来一系列好处:
立即执行
。首先检测线程池运行状态,如果不是RUNNING,则直接拒绝,线程池要保证在RUNNING的状态下执行任务。
常见的拒绝策略有以下几种
- AbortPolicy 中止策略:丢弃任务并
抛出异常
。- DiscardPolicy 丢弃策略:丢弃任务,但是
不抛出异常
。如果线程队列已满,则后续提交的任务都会被丢弃,且是静默丢弃。- DiscardOldestPolicy 弃老策略:丢弃队列最前面的任务,然后重新提交被拒绝的任务。
定义任务Task 并 定义NewTask来新建Task对象
type Task struct {
f func() error
}
func NewTask(f func() error) *Task {
return &Task{f: f}
}
定义 WorkPool 线程池
type WorkPool struct {
TaskQueue chan *Task // Task队列
workNum int // 协程池中最大的worker数量
shop chan struct{} // 停止工作标识
}
创建 WorkPool 的函数
func NewWorkPool(cap int) *WorkPool {
if cap <= 0 {
cap = 10
}
return &WorkPool{
TaskQueue: make(chan *Task),
workNum: cap,
shop: make(chan struct{}),
}
}
具体的协程池中的工作节点
func (p *WorkPool) worker(workId int) {
for task := range p.TaskQueue {
err := task.Execute()
if err != nil {
fmt.Println(err)
continue
}
fmt.Printf(" work id %d finished \n", workId) // 打印出具体是哪个节点进行工作
}
}
协程池启动函数
func (p *WorkPool) run() {
// 根据work num 去创建worker工作
for i := 0; i < p.workNum; i++ {
go p.worker(i)
}
<-p.shop
}
协程池关闭函数
func (p *WorkPool) close() {
p.shop <- struct{}{}
}
测试一下,使用定时器,每2秒进行一次投放,并且投放超过5个之后开始停止。
func TestWorkPool(t *testing.T) {
task := NewTask(func() error {
fmt.Print(time.Now())
return nil
})
taskCount := 0
ticker := time.NewTicker(2 * time.Second)
p := NewWorkPool(3)
go func(c *time.Ticker) {
for {
p.TaskQueue <- task
<-c.C
taskCount++
if taskCount == 5 {
p.close()
break
}
}
return
}(ticker)
p.run()
}
结果:
可以看到结果是每两秒进行一次打印,并且worker对象都不一样。
package gorountine_pool
import (
"fmt"
"testing"
"time"
)
func TestWorkPool(t *testing.T) {
task := NewTask(func() error {
fmt.Print(time.Now())
return nil
})
taskCount := 0
ticker := time.NewTicker(2 * time.Second)
p := NewWorkPool(3)
go func(c *time.Ticker) {
for {
p.TaskQueue <- task
<-c.C
taskCount++
if taskCount == 5 {
p.close()
break
}
}
return
}(ticker)
p.run()
}
type Task struct {
f func() error
}
func NewTask(f func() error) *Task {
return &Task{f: f}
}
// Execute 执行业务方法
func (t *Task) Execute() error {
return t.f()
}
type WorkPool struct {
TaskQueue chan *Task // task队列
workNum int // 携程池中最大的worker数量
shop chan struct{} // 停止标识
}
// 创建Pool的函数
func NewWorkPool(cap int) *WorkPool {
if cap <= 0 {
cap = 10
}
return &WorkPool{
TaskQueue: make(chan *Task),
workNum: cap,
shop: make(chan struct{}),
}
}
func (p *WorkPool) worker(workId int) {
// 具体的工作
for task := range p.TaskQueue {
err := task.Execute()
if err != nil {
fmt.Println(err)
continue
}
fmt.Printf(" work id %d finished \n", workId)
}
}
// 携程池开始工作
func (p *WorkPool) run() {
// 根据work num 去创建worker工作
for i := 0; i < p.workNum; i++ {
go p.worker(i)
}
<-p.shop
}
func (p *WorkPool) close() {
p.shop <- struct{}{}
}
[1] https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html
[2] https://blog.csdn.net/weixin_44688301/article/details/123292211
[3] https://www.bilibili.com/video/BV1Nf4y137na
更多【golang-线程池详解并使用Go语言实现 Pool】相关视频教程:www.yxfzedu.com