一个资源池的Go实现

资源池模式是一种比较典型的应用模式,通过一个池子来管理资源,可以有效的降低不断创建新资源带来的资源开销,一般线程池都设置有固定大小的资源限制,同时池子也需要管理如何创建资源。下面是使用Go语言实现的一个资源池管理对象,所有资源满足io.Close方法也就是需要实现Close()方法即可,用于释放必要的资源对象。

type Pool struct {
    mu        sync.Mutex
    resources chan io.Closer
    factory   func() (io.Closer, error)
    closed    bool
}

// ErrPoolClosed is returned when an Acquire returns on a
// closed pool.
var ErrPoolClosed = errors.New("Pool has been closed")

// New creates a pool that manages resources. A pool requires a
// function that can allocate a new resource and the size of
// the pool.
func New(size uint, f func() (io.Closer, error)) (*Pool, error) {
    if size == 0 {
        return nil, errors.New("Size value too small")
    }

    return &Pool{
        factory:   f,
        resources: make(chan io.Closer, size),
    }, nil
}

上面的锁用于实现并发安全的访问,多个Goroutine同时访问资源的时候,消除数据访问的冲突。

Pool需要实现的三个基本方法包含如下:

  • Acquire 用于从池子中获取一个可复用的单元,当池子不为空,且当前尚未关闭的时候,可以拿到原来创建好的资源,否则利用工厂函数创建一个新的资源
  • Release 释放资源并归还到池子当中,如果池子已经满了,或者已经关闭,则自动丢弃该对象即可。
  • Close 关闭资源池,不再写入和读取资源,并释放资源(调用资源本身的Close方法)
// Acquire retrieves a resource    from the pool.
func (p *Pool) Acquire() (io.Closer, error) {
    select {

    // Check for a free resource.
    case r, wd := <-p.resources:
        log.Println("Acquire:", "Shared Resource")
        if !wd {
            return nil, ErrPoolClosed
        }
        return r, nil

    // Provide a new resource since there are none available.
    default:
        log.Println("Acquire:", "New Resource")
        return p.factory()
    }
}

// Release places a new resource onto the pool.
func (p *Pool) Release(r io.Closer) {

    // Secure this operation with the Close operation.
    p.mu.Lock()
    defer p.mu.Unlock()

    // If the pool is closed, discard the resource.
    if p.closed {
        r.Close()
        return
    }

    select {
    // Attempt to place the new resource on the queue.
    case p.resources <- r:
        log.Println("Release:", "In Queue")

    // If the queue is already at cap we close the resource.
    default:
        log.Println("Release:", "Closing")
        r.Close()
    }
}

// Close will shutdown the pool and close all existing resources.
func (p *Pool) Close() error {

    // Secure this operation with the Release operation.
    p.mu.Lock()
    defer p.mu.Unlock()

    // If the pool is already close, don't do anything.
    if p.closed {
        return ErrPoolClosed
    }

    // Set the pool as closed.
    p.closed = true

    // Close the channel before we drain the channel of its
    // resources. If we don't do this, we will have a deadlock.
    close(p.resources)

    // Close the resources
    for r := range p.resources {
        r.Close()
    }

    return nil
}

发表评论

电子邮件地址不会被公开。 必填项已用*标注