Skip to main content

grpc p2c balancer

项目地址:

https://github.com/sado0823/go-kitx/blob/master/grpc/balancer/p2c/p2c.go

what?

P2C算法全称pick of 2 choices,相比WRR(加权轮询算法),P2C随机选择两个节点后在这俩节点里选择优胜者,并通过指数加权移动平均算法统计服务端的实时状态,从而做出最优选择。

  • p2c (Pick Of 2 Choices) 二选一: 在多个节点中随机选择两个节点。

选择两个符合健康度的节点进行计算负载, 优先选择出符合标准的节点, 这里计算负载有运用cpu的方式和计算延迟(lag)+节点拥塞度(inflight)的方式

  • EWMA (Exponentially Weighted Moving-Average) 指数加权平均算法公式: vt = vt-1 * β + vt * (1 - β)

这个算法之前我们在自适应限流bbr算法中也有用到, 主要用于计算cpu负载, 优势为能够减小尖刺(网络波动)的影响

vt:当前值

vt-1: 上一周期值

β:常量

  • 牛顿冷却定律衰减模型: β = 1/e^(k*△t)

运用这个模型就要用于计算EWMA中的β

why?

一般的使用场景是:

  • 负载均衡算法尽量选择负载最低的节点
  • 负载均衡算法尽量选择延迟最低的节点
  • 隔离故障节点, 自动摘流
  • 故障节点恢复后自动, 自动接流
  • 判断是否存在某行数据,用以减少对磁盘访问,提高服务的访问性能

how?

基本思想

EWMA

EWMA主要用于与一般的算数平均数作对比, 计算公式为vt = vt-1 * β + vt * (1 - β)

我们用一周的温度计算来做比较, ewma中β值一般在于(0-1)之间, 这里选取10% = 0.1

image-20220412003955372

一周的趋势图如下

image-20220412004052835

从上图可以看出, 使用ewma能够使得曲线非常的平滑, 从而减缓一些尖刺行为, β越大越接近平均值, β越小越接近原值.

有一个一般用的计算β公式为: 2/(n+1)

牛顿冷却定律衰减模型

公式: β = 1/e^(k*△t), e是数学常数, △t为第t次的请求延迟, k表示衰减系数 (代码中实现为10s)

image-20220412010038364

x = k * △t, 则公式为f(x) = 1/e^x, x越大, y越小, 也就是说

  • △t延迟越高, β越小, ewma中β越小, 则越接近原值, 延迟高的情况下凸显出原始延迟, 能够检测到网络毛刺
  • △t延迟越低, β越大, ewma中β越大, 则越接近平均值

基本流程

image-20220412101142089

源码分析

roundrobin

先看google grpc roundrobin内部实现 https://github.com/grpc/grpc-go/blob/master/balancer/roundrobin/roundrobin.go

package roundrobin

import (
"sync"

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/grpcrand"
)

// Name is the name of round_robin balancer.
const Name = "round_robin"

var logger = grpclog.Component("roundrobin")

// newBuilder creates a new roundrobin balancer builder.
func newBuilder() balancer.Builder {
return base.NewBalancerBuilder(Name, &rrPickerBuilder{}, base.Config{HealthCheck: true})
}

func init() {
balancer.Register(newBuilder())
}

type rrPickerBuilder struct{}

func (*rrPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker {
...
}

type rrPicker struct {
}

func (p *rrPicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
...
}

以上代码省去了关键实现, 可以看出, 要实现一个负载均衡算法, 只要需要实现内部的两个interface

// 1. 有节点更新会调用build方法
type PickerBuilder interface {
Build(info PickerBuildInfo) balancer.Picker
}
// 2. 挑选节点
type Picker interface {
Pick(info PickInfo) (PickResult, error)
}
// 3. 注册负载均衡器
func init() {
balancer.Register(newBuilder())
}

p2c

https://github.com/sado0823/go-kitx/blob/master/grpc/balancer/p2c/p2c.go

接口
// 实现 PickerBuilder 接口
type p2cPickBuilder struct {
}
// 实现 Picker 接口
type p2cPicker struct {
conns []*subConn
r *rand.Rand
stamp *atomicx.AtomicDuration
lock sync.Mutex
}
核心数据结构
type p2cSubConn struct {
addr resolver.Address
conn balancer.SubConn

lagEWMA uint64 // 请求耗时, 计算后的ewma

inFlight int64 // 节点拥塞度, 正在处理的请求
successEWMA uint64 // 请求成功数量
requests int64 // 请求量

lastLag int64 // 上一次请求耗时, 用于计算ewma
pickTime int64 // 上一次选择的时间时间戳
}

在此基础上, 还可以再加上weight字段, 设置权重

load负载计算

有两种方式:

  • load = lagEWMA * inFlight: lagEWMA 相当于平均请求耗时,inFlight 是当前节点正在处理请求的数量,相乘大致计算出了当前节点的网络负载, 我们采用的就是这种方式
  • oad = cpuEWMA * lag * inFlight: cpuEWMA 利用服务端的cpu值计算负载, 可以从balancer.DoneInfo. Trailer中从服务端传递, 实际为metadata.MD类型, map[string][]string的别名
// load = lagEWMA * inFlight
func (s *p2cSubConn) load() int64 {
lag := int64(math.Sqrt(float64(atomic.LoadUint64(&s.lagEWMA) + 1)))
load := lag * (atomic.LoadInt64(&s.inFlight) + 1)
if load == 0 {
// penalty是初始化没有数据时的惩罚值, 在没有被选过的情况下, 会强制选择一次
return penalty
}

return load
}
Pick过程
    var chosen *p2cSubConn
switch len(p.conns) {
case 0: // 没有节点, 直接报错
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
case 1: // 一个节点, 直接返回
chosen = p.choose(p.conns[0], nil)
case 2: // 两个节点, 返回负载最低的节点
chosen = p.choose(p.conns[0], p.conns[1])
default: // 多个节点, 最多经常三次计算, 选择合适的节点
var node1, node2 *p2cSubConn
// 三次随机选择节点
for i := 0; i < pickTimes; i++ {
a := p.r.Intn(len(p.conns))
b := p.r.Intn(len(p.conns) - 1)
if b >= a {
// 防止出现相同节点
b++
}
node1 = p.conns[a]
node2 = p.conns[b]

// 选出一次符合要求的节点则停止
if node1.healthy() && node2.healthy() {
break
}
}

chosen = p.choose(node1, node2)
}
请求结束, 更新节点信息
        // 正在处理的请求数-1
atomic.AddInt64(&c.inFlight, -1)

// 计算相对时间
now := time.Since(initTime)
last := atomic.SwapInt64(&c.lastLag, int64(now))
td := int64(now) - last
if td < 0 {
td = 0
}

// 牛顿冷却定律的衰减模型, 确定ewma中的β值, β = 1/e^(k*△t)
beta := math.Exp(float64(-td) / float64(decayTime))
lag := int64(now) - start
if lag < 0 {
lag = 0
}
olag := atomic.LoadUint64(&c.lagEWMA)
if olag == 0 {
beta = 0
}

// 指数加权平均算法 vt = vt-1 * β + vt * (1 - β)
// 存储当前lagEWMA
atomic.StoreUint64(&c.lagEWMA, uint64(float64(olag)*beta+float64(lag)*(1-beta)))

success := initSuccess
if info.Err != nil {
switch status.Code(info.Err) {
case codes.DeadlineExceeded, codes.Internal, codes.Unavailable, codes.DataLoss:
success = 0
}
}

oldSuccess := atomic.LoadUint64(&c.successEWMA)
// 指数加权平均算法 vt = vt-1 * β + vt * (1 - β)
// 存储当前successEWMA
atomic.StoreUint64(&c.successEWMA, uint64(float64(oldSuccess)*beta+float64(success)*(1-beta)))

example

func test() {
cc, err := grpc.Dial(r.Scheme()+":///test.server",
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, p2c.Name)))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
defer cc.Close()
}

references