Swift GCD并发控制

本文章基于Swift5版本编写,介绍GCD(Grand Central Dispatch)的使用方式和相关的特性。GCD可以用于一些并发任务的执行和调度,可将一些计算密集型的处理任务转移至后台执行,从而防止任务导致主线程的阻塞,保证用户UI的流畅性。

GCD基础概念

不管是Linux系统,还是IOS系统,一个App(应用)都包含有一个或者多个线程,这些线程可以并发执行,比如同时执行多个数据下载任务,从而减少执行实现,提高处理效率。操作系统层面负责调度管理所有的线程执行,决定什么时候去执行这些并发的线程。

单核设备通过分时复用的方式,将时间切分成片,分给多个线程使用,执行的时候不断的进行切换,从而实现并发执行。多核设备则可以实现并行执行,彼此不需要共享同一个时间轴。关于并发和并行的介绍,可以参考https://blog.golang.org/concurrency-is-not-parallelism。 需要理解的是,并行一定是并发的,但是并发不一定是并行。

GCD构建于线程之上,管理一组线程池。在GCD中我们将需要并发执行的代码封装在执行单元中发送给调度队列,GCD决定什么时候选择什么线程去执行这些单元。同时GCD依赖于可用的操作系统资源和处理能力来决定实际的并发效果。

GCD通过DispatchQueue来管理调度队列,提交的任务在队列中按照FIFO的方式调度执行,也就是先提交的任务先执行, 这种方式的好处是可以保证线程执行顺序(尽管不一定哪个先结束)。调度队列本身是线程安全的,在任何线程中获取都可以。对于使用GCD来说,需要学习的是分发队列的几种类型和以及如何提交执行单元。

串行和并行

调度队列在IOS中分为串行和并行两种,串行保证了任何时候只有一个任务被执行,GCD控制调度的时间。

并行队列则可以允许多个任务在同时执行,但是任务的执行顺序仍旧是依照提交的顺序来执行。只是有些任务因为时间执行较长,可能比后面提交的还要更晚结束。

关于什么时候执行任务单元,则完全基于GCD本身的调度策略,如果执行时间重叠,则调度策略决定是运行在另外的核心上还是在单核心上执行上下文切换实现。

GCD队列类型

GCD的队列类型包含三种:Main Queue, Global Queue以及Custom Queue。

  • Main Queue:运行在主线程,主要用于UI的管理是一个串行的队列

  • Global Queue: 并行队列,整个操作系统共享的队列。Global Queue的优先级分为高中低以及后台执行,但是在创建的时候我们通过以下的几个QOS类别进行分类,GCD来管理具体的映射

    • User-interactive, 高优先级,用于UI更新,事件处理以及低延迟的任务,一般在主线程中执行。

    • User-initiated: 一般是用户从UI设置的一些异步任务,等待返回从而继续后续的UI操作执行,较高的优先级别。

    • Utility : 一些长时间运行的任务,比如计算或者网络处理,该类型一般映射到低优先级的队列中。

    • Background 用于一些后台执行的任务,比如抓取网络数据,进行备份等操作,用户无感知的任务执行。

  • Custom队列:可以自定义类型的队列, 可以用来执行一些后台任务,串行或者并行执行,如果串行执行的话可以保证数据访问的一致性。

同步和异步

GCD在创建任务的时候可以设置为同步或者异步。同步的方式及执行该任务,直到结束任务后返回。异步则立即返回,不会阻塞当前线程的执行,内部的调度器来决定什么时候去执行任务。

执行的任务一般通过Clousre的方式来管理和创建,这个任务类型为DispatchWorkItem,

比如下面的代码:

假如我们需要处理一个图片,作为一个CPU密集的处理程序,处理时间可能需要很久,处理完成后我们需要展示在View层上,代码如下。直接在UI层比如点击一个按钮执行的话,将导致当前的线程也就是UI层的Main Thread阻塞,无法再进行任何的用户交互操作。

let greatImage = processImage(image) showImage(greatImage)

我们可以将代码改写为下面的方式执行,创建一个独立的线程,线程为异步执行,也就是会直接返回,在闭包中我们处理图片,对于处理完成的图片我们获取全局的mainThread并在此线程中执行任务(也是异步执行)

DispatchQueue.global(qos: .userInitiated).async { [weak self] in
guard let self = self else {
  return
}
let overlayImage = self.processImage(self.image)
DispatchQueue.main.async { [weak self] in
   self?.showImage(overlayImage)
 }
}
  • 于涉及UI更新操作的,一定要在main thread中执行,也就是DispatchQueue.main。

  • 对于计算密集或者网络处理的任务,可以放入global queue中执行

延迟调度

延迟调度可以用于指定任务在固定的时间内执行,相对于timer更容易调度和执行,只需要将调度任务像之前那样放在闭包中即可。

let delayInSeconds = 2.0
 
  DispatchQueue.main.asyncAfter(deadline: .now() + delayInSeconds) { [weak self] in
    guard let self = self else{
      return
    }
    if PhotoManager.shared.photos.count > 0 {
      self.navigationItem.prompt = nil
    }else{
      self.navigationItem.prompt = "Add photos here"
    }
    self.navigationController?.viewIfLoaded?.setNeedsLayout()
  }

不要使用延迟调度机制去解决数据竞争的问题!

单例模式和调度器

单例模式往往用于多个视图中的调用,比如下面的代码中定义的管理器, 其中private指定了代码中不能使用直接的初始化只能采用shared来管理创建。

final class PhotoManager {
private init() {}
static let shared = PhotoManager()

private var unsafePhotos: [Photo] = []

var photos: [Photo] {
  return unsafePhotos
}

}

对于该单例模式对象,当在多线程访问的时候,需要读写操作的时候,如何保证线程的安全性,比如我们现在需要向管理器添加一些图片内容。假如这些操作分布在多个线程中,我们采用如下面的方式:

 private var unsafePhotos: [Photo] = []

var photos: [Photo] {
  return unsafePhotos
}

func addPhoto(_ photo: Photo) {
  unsafePhotos.append(photo)
  DispatchQueue.main.async { [weak self] in
    self?.postContentAddedNotification()
  }
}

上面的方式中,使用photos读取数据的时候每次返回一组全新的数据(sturct类型pass by value),可以保证与原始数据的独立性, 但是多线程访问下无法保证安全的执行,当一个线程写数据,另一个读取数据的时候就会出现数据竞态的发生。

Swift内建的类型Array和Dictionay都是使用struct实现,因此按值复制传递(但是也是仅仅在修改后才会执行复制),class是按引用传递。

这里为了保证读写的安全,我们引入读写锁的概念来管理数据的访问。当创建任务单元的时候,设置flag告诉GCD,该任务的执行方式是否允许与其他任务并行操作。

  • 在Global Queue的并发队列中执行Barrier Task将导致所有共享的队列任务的执行暂停。

  • 在Custom Serial 的串行队列中执行Barrier Task不会有任何的变化(本身串行)

  • 尽量在Custom 并发模式下使用。

  •  

修改上面的代码:

 private let concurrentPhotoQueue = DispatchQueue(label: "cn.zhangmingkai.photoApp.photoQueue", attributes: .concurrent)


private var unsafePhotos: [Photo] = []

var photos: [Photo] {
  return unsafePhotos
}

func addPhoto(_ photo: Photo) {
  //unsafePhotos.append(photo)
  concurrentPhotoQueue.async(flags: .barrier){ [weak self] in
    guard let self = self else{
      return
    }
   
    self.unsafePhotos.append(photo)
   
    DispatchQueue.main.async { [weak self] in
      self?.postContentAddedNotification()
    }
  }
 
}
  • 创建一个异步的自定义队列

  • 在队列中异步的执行任务,并设置任务为 Barrier Task

  • 修改数据内容

  • 在主队列中发送通知,告知UI组件是否完成状态操作。

原有的读取数据的操作也需要进行处理,来保证数据的读取操作的安全, 这里使用sync操作,用于追踪barriers任务的执行情况,并且等待操作完成后再去处理。保证任务的执行顺序不会被额外的写入操作所切分,从而实现数据的一致性。

 var photos: [Photo] {
  var photosCopy: [Photo]!
  concurrentPhotoQueue.sync {
    photosCopy = self.unsafePhotos
  }
  return photosCopy
}

DispatchGroup组管理

下面的代码执行用于下载多个图片并修改view视图,但是存在严重的Bug,不管我们是否正确的执行结束,在点击开始后都会直接提示完成下载。问题在于内部的回调函数立即返回,导致直接调用completion函数。

 func downloadPhotos(withCompletion completion: BatchPhotoDownloadingCompletionClosure?) {
  var storedError: NSError?
  for address in [PhotoURLString.overlyAttachedGirlfriend,
                  PhotoURLString.successKid,
                  PhotoURLString.lotsOfFaces] {
                    let url = URL(string: address)
                    let photo = DownloadPhoto(url: url!) { _, error in
                      if error != nil {
                        storedError = error
                      }
                    }
                    PhotoManager.shared.addPhoto(photo)
  }
 
  completion?(storedError)
}

为了实现预期的目的,解决Bug 我们通过DispatchGroup组来管理异步任务的处理,这里的代码需要使用enter和leave来平衡整个的执行流程。wait将阻塞在程序中直到完成或者指定的超时的发生。

 func downloadPhotos(withCompletion completion: BatchPhotoDownloadingCompletionClosure?) {
   DispatchQueue.global(qos: .userInitiated).async {
     var storedError: NSError?
     let downloadGroup = DispatchGroup()
       
        for address in [PhotoURLString.overlyAttachedGirlfriend,
                        PhotoURLString.successKid,
                        PhotoURLString.lotsOfFaces] {
                          let url = URL(string: address)
                         
                          downloadGroup.enter()
                         
                         
                          let photo = DownloadPhoto(url: url!) { _, error in
                            if error != nil {
                              storedError = error
                            }
                            downloadGroup.leave()
                          }
                          PhotoManager.shared.addPhoto(photo)
                         
        }
        downloadGroup.wait()
        // 主线程上执行
        DispatchQueue.main.async {
           completion?(storedError)
        }
   }
 }
     downloadGroup.notify(queue: DispatchQueue.main){
       completion?(storedError)
     }

另外上面的代码中,我们知道需要处理三个任务的并发处理,可以考虑直接使用concurrentPerform来执行多任务的下载. 实例代码中依旧需要使用一个DispatchGroup来管理异步任务和等待完成,同时这里的userInitiated用于设置其级别,

除了等待和执行模式外,还可以使用notify的方式来管理数据的处理, 另外notify也不需要将原来的代码包裹在独立的Main Thread下面执行。

调度任务的取消

DispatchWorkItem可以用来执行一些任务的取消,级别任务已经被发送到队列中等待执行。比如下面的代码随机的取消几个任务的执行,这里需要注意的是,一旦任务被成功的取消,则需要同时处理leave相关的操作,否则任务将无法被终止退出。

   var storedError: NSError?
  let downloadGroup = DispatchGroup()
  var addresses = [PhotoURLString.overlyAttachedGirlfriend,
                   PhotoURLString.successKid,
                   PhotoURLString.lotsOfFaces]
  addresses += addresses + addresses
 
  var blocks: [DispatchWorkItem] = []
 
 
  for index in 0 ..< addresses.count{
    downloadGroup.enter()
   
    let block  = DispatchWorkItem{
      let address = addresses[index]
      let url = URL(string: address)
     
      let photo = DownloadPhoto(url: url!) { _, error in
        if error != nil{
          storedError = error
        }
        downloadGroup.leave()
      }
      PhotoManager.shared.addPhoto(photo)
     
    }
    blocks.append(block)
    DispatchQueue.main.async(execute: block)
  }
 
 
  for block in blocks[3 ..< blocks.count]{
    let cancel = Bool.random()
   
    if cancel {
      block.cancel()
      downloadGroup.leave()
    }
  }
 
  downloadGroup.notify(queue: DispatchQueue.main) {
    completion?(storedError)
  }

Semaphores

信号量作为线程中重要的组成部分,可以被用来编写并发及异步的程序。信号量在多线程中访问时安全的,因此可以被多个线程同时访问和处理。

一个信号量由一个线程队列和计数器构成。线程队列FIFO接收线程并访问资源。计数器用于追踪是否一个线程应当访问共享资源。

信号量可以用于控制最大的并发数量比如下面的实例代码中,使用信号量,来允许最大三个任务同时被执行。

let queue = DispatchQueue(label: "com.gcd.myQueue", attributes: .concurrent)
let semaphore = DispatchSemaphore(value: 3)
for i in 0 ..> 15 {
 queue.async {
    let songNumber = i + 1
    semaphore.wait()
    print("Downloading song", songNumber)
    sleep(2) // Download take ~2 sec each
    print("Downloaded song", songNumber)
    semaphore.signal()
 }
}

同时信号量允许设置超时时间,一旦超时直接返回,而不再进行后续的处理。

发表评论

邮箱地址不会被公开。 必填项已用*标注