关于一个简单的子集判断的性能分析

最近遇到一个需求,需要做基于日志标签的实时分发,基本原理是一条日志传递到程序后,系统采集该日志的标签属性,跟后台的预定义的多个规则标签集合进行匹配,如果Ok则将规则与该日志绑定,等待后期的处理。

比如一条业务日志具有标签: dns, usa, 192.168.0.1, 规则库的当前规则示例:

  • 规则1 : dns
  • 规则2: dns, usa
  • 规则3: dns, usa, 192.168.0.2

则前两个规则分别捕获该日志,用于下一步处理。所以这就是一个简单的匹配问题。针对集合匹配的话,最简单直接的方式是两次循环,查询是否包含子集合。为简单起见这里使用标签ID,也就是一个int数组表示一组标签:

func isSubsetWithLoop(originalSet []int, newSet []int) bool {
    sizeOfOriginal := len(originalSet)
    sizeOfNewSet := len(newSet)
    found := 0
    if sizeOfNewSet == 0 || sizeOfOriginal < sizeOfNewSet {
        return false
    }
    for _, i := range newSet {
    inner:
        for _, j := range originalSet {
            // println(i, j)
            if i == j {
                found++
                break inner
            }
        }
    }
    return sizeOfNewSet == found
}

但是考虑到两次循环程序的执行复杂度基本上O(n*m)的时间复杂度。所以考虑有没有更快的方式解决问题。主要的思路有两个:

  • 排序集合一,对于集合二的数据进行二分查找,获取是否存在集合一中。时间复杂度 基本上O(mlogm + nlogm) +
  • 排序集合一和集合二, 通过位移的方式比较两个集合是否包含的关系,时间复杂度 基本上是O(mlogm + nlogn)
  • 使用map集合保存集合一的数据,查询集合二中数据是否存在map中,时间复杂度 基本上是O(m + n)

看上去使用map可以获得比较好的时间复杂度算法,具体实施上如下面的所示:

func isSubsetWithMap(originalSet []int, newSet []int) bool {
    sizeOfOriginal := len(originalSet)
    sizeOfNewSet := len(newSet)
    if sizeOfNewSet == 0 || sizeOfOriginal < sizeOfNewSet {
        return false
    }
    mapOfArray := make(map[int]bool, sizeOfOriginal)
    for _, v := range originalSet {
        mapOfArray[v] = true
    }
    for _, i := range newSet {
        if !mapOfArray[i] {
            return false
        }
    }
    return true
}
压力测试

根据上面的代码编写对应的压力测试,看一下性能是否和之前猜想的一致, 下面的代码生成对应的测试矩阵, 设定初始规则为3个标签,而日志标签的范围则从8-1024, 尽管可能实际并不会那么多个标签。

func BenchmarkIsSubset(b *testing.B) {
    subsetFuncs := []struct {
        name string
        fun  func(originalSet []int, newSet []int) bool
    }{

        {"map", isSubsetWithMap},
        {"loop", isSubsetWithLoop},
    }

    for _, subsetFuncStruct := range subsetFuncs {
        subset := getRandomArr(3, 100)
        for k := 3.; k <= 10; k++ {
            max := int(math.Pow(2, k))
            arr := shufferArray(getRandomArr(max, max*2))
            b.Run(fmt.Sprintf("%s/%d/%d", subsetFuncStruct.name, len(subset), len(arr)), func(b *testing.B) {
                for i := 0; i < b.N; i++ {
                    subsetFuncStruct.fun(arr, subset)
                }
            })
        }
    }
}

实际执行的时候测试结果如下, 可以发现对于map和loop两种方式基本上都符合线性的增长趋势,但是明显loop会更低一些。按道理来说map查询应该更快一些才对?

BenchmarkIsSubset/map/3/8-4             10000000           233 ns/op
BenchmarkIsSubset/map/3/16-4             2000000           737 ns/op
BenchmarkIsSubset/map/3/32-4             1000000          1467 ns/op
BenchmarkIsSubset/map/3/64-4              500000          2951 ns/op
BenchmarkIsSubset/map/3/128-4             200000          5636 ns/op
BenchmarkIsSubset/map/3/256-4             200000         11243 ns/op
BenchmarkIsSubset/map/3/512-4             100000         22432 ns/op
BenchmarkIsSubset/map/3/1024-4             30000         44178 ns/op

BenchmarkIsSubset/loop/3/8-4                              50000000            35.5 ns/op
BenchmarkIsSubset/loop/3/16-4                             30000000            61.3 ns/op
BenchmarkIsSubset/loop/3/32-4                             20000000           118 ns/op
BenchmarkIsSubset/loop/3/64-4                             10000000           195 ns/op
BenchmarkIsSubset/loop/3/128-4                             5000000           283 ns/op
BenchmarkIsSubset/loop/3/256-4                             2000000           616 ns/op
BenchmarkIsSubset/loop/3/512-4                             2000000           723 ns/op
BenchmarkIsSubset/loop/3/1024-4                            1000000          1615 ns/op
性能分析pporf

使用go tool工具中的pporf工具可以对于程序执行的时间和内存进行分析,因为内存并不是我们主要考虑的问题,我们仅对于CPU时间进行分析。

再次执行测试并生成对应的测试性能文件:

go test -cpuprofile cpu.prof -bench . -benchtime 2s
go tool pprof cpu.prof

上面的操作会进入命令行模式,执行top10, top20分别返回花费最大的命令执行。尽管isSubsetWithLoop排名第一,但是由于实际上执行的次数也要高于isSubsetWithMap, 所以按照之前的测试看不出单一的函数执行的性能比较。

    flat  flat%   sum%        cum   cum%
    23.57s 42.07% 42.07%     23.57s 42.07%  github.com/zhangmingkai4315/workspace/01-subset.isSubsetWithLoop
    13.63s 24.33% 66.40%     13.63s 24.33%  runtime.pthread_cond_signal
     7.94s 14.17% 80.58%     11.85s 21.15%  runtime.mapassign_fast64
     2.60s  4.64% 85.22%      2.60s  4.64%  runtime.aeshash64
     1.46s  2.61% 87.83%     15.27s 27.26%  github.com/zhangmingkai4315/workspace/01-subset.isSubsetWithMap
     1.38s  2.46% 90.29%     40.22s 71.80%  github.com/zhangmingkai4315/workspace/01-subset.BenchmarkIsSubset.func1
     1.08s  1.93% 92.22%      1.08s  1.93%  runtime.pthread_cond_wait
     0.56s     1% 93.22%      0.56s     1%  runtime.add

修改测试文件:

func BenchmarkIsSubset(b *testing.B) {
    arr := shufferArray(getRandomArr(20, 100))
    subset := getRandomArr(5, 100)
    for i := 0; i < b.N; i++ {
        isSubsetWithMap(arr, subset)
        isSubsetWithLoop(arr, subset)
    }
}

重新执行上面的流程:

      flat  flat%   sum%        cum   cum%
     1.08s 29.59% 29.59%      1.08s 29.59%  runtime.pthread_cond_signal
     0.94s 25.75% 55.34%      1.54s 42.19%  runtime.mapassign_fast64
     0.33s  9.04% 64.38%      0.33s  9.04%  runtime.aeshash64
     0.25s  6.85% 71.23%      2.25s 61.64%  github.com/zhangmingkai4315/workspace/01-subset.isSubsetWithMap
     0.19s  5.21% 76.44%      0.19s  5.21%  github.com/zhangmingkai4315/workspace/01-subset.isSubsetWithLoop
     0.12s  3.29% 79.73%      0.12s  3.29%  runtime.memclrNoHeapPointers
     0.07s  1.92% 81.64%      0.07s  1.92%  runtime.add
     0.07s  1.92% 83.56%      0.08s  2.19%  runtime.overLoadFactor
     0.06s  1.64% 85.21%      0.06s  1.64%  runtime.pthread_cond_wait
     0.05s  1.37% 86.58%      0.05s  1.37%  runtime.bucketShift
     0.04s  1.10% 87.67%      0.06s  1.64%  runtime.mapaccess1_fast64

这样通过相同次数的执行我们可以比较出不同函数的性能, 可以看到对于isSubsetWithMap函数执行高于Loop的执行(至少在测试子集为5个,源集合为20个的情况下)。上面的表格显示可能生成map结构和分配数据的效率导致整体map的性能低于loop。我们可以在交互窗口中输入list命令查看具体的每一个函数的执行情况:

(pprof) list github.com/zhangmingkai4315/workspace/01-subset.isSubsetWithMap
Total: 3.65s subset.isSubsetWithMap in /main.go
     250ms      2.25s (flat, cum) 61.64% of Total
               .          .     31:       }
         .          .     32:    }
         .          .     33:    return sizeOfNewSet == found
         .          .     34:}
         .          .     35:
      10ms       10ms     36:func isSubsetWithMap(originalSet []int, newSet []int) bool {
         .          .     37:    sizeOfOriginal := len(originalSet)
         .          .     38:    sizeOfNewSet := len(newSet)
         .          .     39:    if sizeOfNewSet == 0 || sizeOfOriginal < sizeOfNewSet {
         .          .     40:        return false
         .          .     41:    }
      10ms      410ms     42:    mapOfArray := make(map[int]bool, sizeOfOriginal)
     110ms      110ms     43:    for _, v := range originalSet {
     120ms      1.66s     44:        mapOfArray[v] = true
         .          .     45:    }
         .          .     46:    for _, i := range newSet {
         .       60ms     47:        if !mapOfArray[i] {
         .          .     48:            return false
         .          .     49:        }
         .          .     50:    }
         .          .     51:    return true
         .          .     52:}

对比 loop版本的代码执行情况, 基本上没有太复杂的逻辑和执行花费比较高的代码。看来不能完全的依赖于O(n)来判断一个函数是否高效率的依据。

     190ms      190ms (flat, cum)  5.21% of Total
         .          .     15:
         .          .     16:func isSubsetWithLoop(originalSet []int, newSet []int) bool {
         .          .     17:    sizeOfOriginal := len(originalSet)
         .          .     18:    sizeOfNewSet := len(newSet)
         .          .     19:    found := 0
      10ms       10ms     20:    if sizeOfNewSet == 0 || sizeOfOriginal < sizeOfNewSet {
         .          .     21:        return false
         .          .     22:    }
      10ms       10ms     23:    for _, i := range newSet {
         .          .     24:    inner:
     160ms      160ms     25:        for _, j := range originalSet {
         .          .     26:            // println(i, j)
      10ms       10ms     27:            if i == j {
         .          .     28:                found++
         .          .     29:                break inner
         .          .     30:            }
         .          .     31:        }
         .          .     32:    }

同时在交互终端中执行web可以在浏览器打开自动生成的的svg文件,查看流程图。

假如不需要临时生成map,而是提前生成完成,每次都直接进行比对的话,其实map的版本还是相对较快的。对于代码不确定性能的时候,还是要多谢一些测试来检查性能。

另外原始代码还实现了sort版本的两个函数,放在github上,大家感兴趣的话可以查看, 对于上述的代码或者文章描述有任何疑问,欢迎大家交流讨论。

具体的代码可以参考查看Github Gist

发表评论

电子邮件地址不会被公开。 必填项已用*标注