grpc-go source code trace - gRPC client 與 server 建立連線過程

5 分鐘閱讀

前言

其實一開始的目的是想要研究 gRPC 的 retry 機制,不過在了解 retry 之前勢必要先說明整個 gRPC client 與 server 建立連線的過程,因此就先用 source code trace 的方式簡單說明在呼叫 grpc.Dial 後所執行的連線流程,包含 gRPC 實現 load-balancing 的機制。

packages

grpc-go v1.19.1

grpc.Dial 的背後

以下是 client 向 server 連線的基本方式。

1
2
3
4
5
conn, err := grpc.Dial(serverIpAddress, grpc.WithInsecure())

func Dial(target string, opts ...DialOption) (*ClientConn, error) {
  return DialContext(context.Background(), target, opts...)
}

首先, Client 需要透過 Name Resolver 解析 Dial 中的 target string 來取得 Server 正確的 IP Addresses 和 port,以利後續建立 connection。例如使用 DNS Name Resolver,就可以透過 conn, err := grpc.Dial("dns:///your.target.name:8888") 這種 Domain name 的方式來發送 RPCs。

關於 Name Resolution 部分可以參閱 官方文件 說明。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// Notice: 部分 source code 被移除
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
  if cc.dopts.resolverBuilder == nil {
	  cc.parsedTarget = parseTarget(cc.target)
	  grpclog.Infof("parsed scheme: %q", cc.parsedTarget.Scheme)
	  cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)
	  if cc.dopts.resolverBuilder == nil {
		  // If resolver builder is still nil, the parsed target's scheme is
		  // not registered. Fallback to default resolver and set Endpoint to
		  // the original target.
			grpclog.Infof("scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme)
			cc.parsedTarget = resolver.Target{
				Scheme:   resolver.GetDefaultScheme(),
				Endpoint: target,
			}
			// Default use passthrough resolver builder (passthrough.go)
			cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)
		}
	}
	
	rWrapper, err := newCCResolverWrapper(cc)
	if err != nil {
		return nil, fmt.Errorf("failed to build resolver: %v", err)
	}
}

Resolver 是一個 interface,使用者可以根據需求來註冊不同實作的 resolver,例如 etcd-io/etcd 。而在沒有指定 resolver 情況下(我們可以使用 URI scheme 來選擇特定 resolver, e.g. etcd:// ), grpc-go 會使用預設 passthrough resolver 去解析名稱。(passthrough 其實就是很單純地把 target 再返回來,因此僅適用於簡單應用或是測試。另外在 gRPC 文件中有提到,預設是使用 DNS,不過 grpc-go 卻是用 passthrough)。

想參考第三方實作 resolver ,可參見 etcd clientv3 resolver

1
2
3
4
5
6
7
8
9
func newCCResolverWrapper(cc *ClientConn) (*ccResolverWrapper, error) {
  ccr := &ccResolverWrapper{
    cc:     cc,
    addrCh: make(chan []resolver.Address, 1),
    scCh:   make(chan string, 1),
  }
  // Create a resolver from resolver builder
  ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, resolver.BuildOption{DisableServiceConfig: cc.dopts.disableServiceConfig})
}

以下以 default 的 passthrough resolver 來說明接下去的流程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// Default use passthrough Builder.
func (*passthroughBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) {
  r := &passthroughResolver{
		target: target,
		cc:     cc,
	}
	r.start()
}

func (r *passthroughResolver) start() {
  // 直接返回使用者輸入的 target
	r.cc.UpdateState(resolver.State{Addresses: []resolver.Address})
}

func (ccr *ccResolverWrapper) UpdateState(s resolver.State) {
  // 透過 wrapper 通知 ClientConn 更新 State
  ccr.cc.updateResolverState(s)
}

當 Name Resolver 解析完 target 之後,會透過 balancer wrapper 產生的 watcher 通知 Balancer 最新的 Addresses。

gRPC-Dial-1.png

Balancer 是 gRPC 實現 load-balacing 要角,它最主要負責 Handle connection 和 addresses 的狀態變化(例如當 Addresses 更新時則更新連線),以及在後續 transport 階段決定該選擇哪個 server connection(又稱為 balancer policy)。 gRPC Load Balance 採用 External Load Balancing Service,讓使用者可以選擇其他實作的 Balancer。

Balancer takes input from gRPC, manages SubConns, and collects and aggregates the connectivity states. It also generates and updates the Picker used by gRPC to pick SubConns for RPCs.

SubConn represents a gRPC sub connection. Each sub connection contains a list of addresses. gRPC will try to connect to them (in sequence), and stop trying the remainder once one connection is successful.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func (cc *ClientConn) updateResolverState(s resolver.State) error {
  if cc.dopts.balancerBuilder == nil {
    if isGRPCLB {
			newBalancerName = grpclbName
		} else if cc.sc != nil && cc.sc.LB != nil {
			newBalancerName = *cc.sc.LB
		} else {
		  // Default use pick first balancer(pick_first.go)
			newBalancerName = PickFirstBalancerName
		}
		cc.switchBalancer(newBalancerName)
  }
  cc.balancerWrapper.updateResolverState(s)
}

// Use resolverUpdateCh to pass resolved Addresses to Balancer
func (ccb *ccBalancerWrapper) updateResolverState(s resolver.State) {
  ccb.resolverUpdateCh <- &s
}

func (cc *ClientConn) switchBalancer(name string) {
  cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)
}

func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.BuildOptions) *ccBalancerWrapper {
  ccb := &ccBalancerWrapper{
		cc:               cc,
		stateChangeQueue: newSCStateUpdateBuffer(),
		resolverUpdateCh: make(chan *resolver.State, 1),
		done:             make(chan struct{}),
		subConns:         make(map[*acBalancerWrapper]struct{}),
	}
	// Create a watcher for name resolved events.
	go ccb.watcher() 
	ccb.balancer = b.Build(ccb, bopts)
	return ccb
}

以下擷取 watcher 接收 resolver Update 的後續處理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// watcher balancer functions sequentially, so the balancer can be implemented
// lock-free.
func (ccb *ccBalancerWrapper) watcher() {
  for {
		select {
		case s := <-ccb.resolverUpdateCh:
			select {
			case <-ccb.done:
				ccb.balancer.Close()
				return
			default:
			}
			if ub, ok := ccb.balancer.(balancer.V2Balancer); ok {
				ub.UpdateResolverState(*s)
			} else {
				ccb.balancer.HandleResolvedAddrs(s.Addresses, nil)
			}
		}
	}
}

這邊以簡單地 pick first balancer 為例,觀察 Balancer Handle resolved addresses 實際行為。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
type pickfirstBalancer struct {
	cc balancer.ClientConn
	sc balancer.SubConn
}

func (b *pickfirstBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
	if err != nil {
		grpclog.Infof("pickfirstBalancer: HandleResolvedAddrs called with error %v", err)
		return
	}
	if b.sc == nil {
		b.sc, err = b.cc.NewSubConn(addrs, balancer.NewSubConnOptions{})
		if err != nil {
			grpclog.Errorf("pickfirstBalancer: failed to NewSubConn: %v", err)
			return
		}
		b.cc.UpdateBalancerState(connectivity.Idle, &picker{sc: b.sc})
		b.sc.Connect() // Connect to server (b.sc is balancer_conn_wrapper)
	} else {
		b.sc.UpdateAddresses(addrs)
		b.sc.Connect()
	}
}

上面可以看到如果沒有 subConn ,則會透過 balancer_conn_wrapper 通知 ClientConn 新建一條 SubConn,接著觸發 Connnect。

1
2
3
4
5
6
7
func (ac *addrConn) connect() error {
  ac.updateConnectivityState(connectivity.Connecting)
	ac.mu.Unlock()

	// Start a goroutine connecting to the server asynchronously.
	go ac.resetTransport()
}

此時就會另起 goroutine 來正式與 Server 建立連線。 gRPC-Dial-2.png

resetTransport 是一個無限循環的迴圈,意味著如果 Server 端異常導致 disconnect 時,client 端會重新嘗試連線,直到連線成功或是 connectivity.Shutdown 為止。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
func (ac *addrConn) resetTransport() {
  // Reconnect forever
	for i := 0; ; i++ {
		if i > 0 {
			ac.cc.resolveNow(resolver.ResolveNowOption{})
		}

		ac.mu.Lock()
		if ac.state == connectivity.Shutdown {
			ac.mu.Unlock()
			return
		}

		newTr, addr, reconnect, err := ac.tryAllAddrs(addrs, connectDeadline)
		if err != nil {
			ac.mu.Lock()
			if ac.state == connectivity.Shutdown {
				ac.mu.Unlock()
				return
			}
			ac.updateConnectivityState(connectivity.TransientFailure)

			// Backoff.
			b := ac.resetBackoff
			ac.mu.Unlock()

      // wait and reconnect
		  timer := time.NewTimer(backoffFor)
			select {
			case <-timer.C:
				ac.mu.Lock()
				ac.backoffIdx++
				ac.mu.Unlock()
			case <-b:
				timer.Stop()
			case <-ac.ctx.Done():
				timer.Stop()
				return
			}
			continue
		}

		ac.mu.Lock()
		if ac.state == connectivity.Shutdown {
			newTr.Close()
			ac.mu.Unlock()
			return
		}
		ac.curAddr = addr
		ac.transport = newTr
		ac.backoffIdx = 0

    // A connection with Ready state can be picked
    if !healthcheckManagingState {
			ac.updateConnectivityState(connectivity.Ready)
		}
		ac.mu.Unlock()
		
		// Block until the created transport is down. And when this happens,
		// we restart from the top of the addr list.
		<-reconnect.Done()
		hcancel()
}

如果在一開始的 grpc.Dial 額外設置 grpc.WithBlock,則會等到確認 connectivity.Ready 後才會返回。不然在預設狀態下是 non-blocking ,讓 Client 在等待連線成功之餘可以做更多事情。

最後可以透過實驗來觀察 gRPC log,對流程有更深刻的了解。只要加入 environment variable GRPC_GO_LOG_VERBOSITY_LEVEL=99 GRPC_GO_LOG_SEVERITY_LEVEL=info

就會看到在無法與 server 連線的情況下, client 會不斷地進行反覆重新連線行為。

gRPC-Dial-3.png

References

  1. https://github.com/grpc/grpc/blob/master/doc/load-balancing.md